Skip to content

Instantly share code, notes, and snippets.

@grinsted
Last active December 23, 2020 10:06
Show Gist options
  • Save grinsted/5de7aa2ec1c95eafe7b02bc1b83a5bc4 to your computer and use it in GitHub Desktop.
Save grinsted/5de7aa2ec1c95eafe7b02bc1b83a5bc4 to your computer and use it in GitHub Desktop.
Some code to calculate balance fluxes.
import numpy as np
import heapq
import scipy.ndimage
# lebrocq et al.
# https://www.sciencedirect.com/science/article/pii/S0098300406000781?via%3Dihub
def balanceflux(Z, P, n=1):
"""
Accumulate flux as it flows downslope.
Parameters
----------
Z : ndarray
DEM
P : ndarray
Total mass balance per grid cell.
n : float, optional (default=1)
flux routing exponent.
Returns
-------
Q : ndarray
accumulated flux from upstream cells.
Remarks:
* does not accumulate from egdes
* accumulated flux can never be negative.
* beware of units: if P is given as m3/yr/cell, then Q will also be m3/yr/cell
Usage:
Q = balanceflux(DEM, SMB*gridcellarea)
Aslak Grinsted 2020
"""
#accumulate from top - ignore edges.
ix = np.argsort(Z[1:-1, 1:-1].ravel())[::-1]
rows, cols = np.unravel_index(ix, np.array(Z.shape)-2)
rows = rows + 1
cols = cols + 1
#every
Q = P
#indices of neighbors:
dr = np.array((-1, -1, -1, 0, 0, 1, 1, 1))
dc = np.array((-1, 0, 1, -1, 1, -1, 0, 1))
dist = np.sqrt(dc**2 + dr**2)
for i in range(len(rows)):
r, c = rows[i], cols[i]
q = Q[r,c]
if q<0: # do not allow negative flux.
Q[r,c]=0
q=0
if np.isnan(q):
continue
slope = (Z[r,c] - Z[r+dr,c+dc])/dist
slope[slope<0] = 0
weight = slope**n
totweight = np.nansum(weight)
if totweight==0:
continue
weight = weight*q / totweight
Q[r+dr,c+dc] += weight
return Q
#her fra:http://arcgisandpython.blogspot.com/2012/01/python-flood-fill-algorithm.html
#https://github.com/cgmorton/flood-fill/blob/master/floodfill/floodfill.py
def flood_fill(input_array, four_way=False):
"""
Flood fill depressions/sinks in floating point array from edges
Parameters
----------
input_array : ndarray
Input array to be filled
four_way : bool, optional
If True, search 4 immediately adjacent cells
(cross structuring element)
If False, search all 8 adjacent cells
(square structuring element).
The Default is False.
Returns
-------
out : ndarray
Filled array
References
----------
.. [1] Soille and Gratin, 1994. An Efficient Algorithm for Drainage
Networks Extraction on DEMs. Journal of Visual Communication and Image
Representation, 5(2), 181-189
.. [2] Liu et al., 2009. Another Fast and Simple DEM Depression-Filling
Algorithm Based on Priority Queue Structure. Atmopsheric and Oceanic
Science Letters, 2(4) 214-219
"""
# Rename or copy input so that input_array is a local variable?
# input_array = np.copy(input_array)
# Set h_max to a value larger than the array maximum to ensure
# that the while loop will terminate
h_max = np.nanmax(input_array) + 100
# Build mask of cells with data not on the edge of the image
# Use 3x3 square structuring element
# Build Structuring element only using NumPy module
data_mask = np.isfinite(input_array)
inside_mask = scipy.ndimage.binary_erosion(
data_mask,
structure=np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]).astype(np.bool))
edge_mask = (data_mask & ~inside_mask)
# Initialize output array as max value test_array except edges
output_array = np.copy(input_array)
output_array[inside_mask] = h_max
# Build priority queue and place edge pixels into priority queue
# Last value is flag to indicate if cell is an edge cell
put = heapq.heappush
get = heapq.heappop
fill_heap = [
(output_array[t_row, t_col], int(t_row), int(t_col), True)
for t_row, t_col in np.transpose(np.where(edge_mask))]
heapq.heapify(fill_heap)
def neighbors(row, col, four_way=False):
"""Return indices of adjacent cells"""
if four_way:
return [
(row - 1, col), (row, col + 1),
(row + 1, col), (row, col - 1)]
else:
return [
(row - 1, col), (row - 1, col + 1),
(row, col + 1), (row + 1, col + 1),
(row + 1, col), (row + 1, col - 1),
(row, col - 1), (row - 1, col - 1)]
# Iterate until priority queue is empty
while True:
try:
h_crt, t_row, t_col, edge_flag = get(fill_heap)
except IndexError:
break
for n_row, n_col in neighbors(t_row, t_col, four_way):
# Skip cell if outside array edges
if edge_flag:
try:
if not inside_mask[n_row, n_col]:
continue
except IndexError:
continue
if output_array[n_row, n_col] == h_max:
output_array[n_row, n_col] = max(
h_crt, input_array[n_row, n_col])
put(fill_heap, (output_array[n_row, n_col], n_row, n_col, False))
return output_array
def masked_smooth(Z,sigma,mask):
"""
Apply a gaussian smoothing to everything inside a mask
Parameters
----------
Z : ndarray
Input array to be filled
sigma: float
standard deviation of gaussian blur
mask : ndarray
mask of Booleans indicating which areas should be smoothed.
Returns
-------
Zs : ndarray
smoothed array
Aslak Grinsted 2020
"""
filt = lambda x: scipy.ndimage.gaussian_filter(x, sigma)
Zs = filt(Z*mask)/filt(mask.astype(float))
Zs[~mask] = Z[~mask]
return Zs
if __name__ == "__main__":
#test code
import pyimgraft
Z=pyimgraft.geoimread(r'c:\users\ag\hugedata\gimpdem_810m.tif',
roi_crs='LL', roi_x=-26.6, roi_y=71.32, buffer = 50e3)
Z.data = Z.data*1.0
S=flood_fill(Z.data)
P= Z*0+1
Q=balanceflux(S,P.data)
P.data=Q
P.plot.imshow()
Zs = Z.copy(deep=True)
Zs.data = masked_smooth(Zs.data,1,Zs.data>1500)
Zs.plot.imshow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment