Skip to content

Instantly share code, notes, and snippets.

@richpsharp
Created February 4, 2021 02:13
Show Gist options
  • Save richpsharp/b2bbee8d7d86fbcd1a5fcdc443aa6e84 to your computer and use it in GitHub Desktop.
Save richpsharp/b2bbee8d7d86fbcd1a5fcdc443aa6e84 to your computer and use it in GitHub Desktop.
Script to test many flow accumulation threshold values on a DEM while taking advantage of multiprocessing and avoided reexecution
"""TFA script for Stacie to experiment with TFAs."""
import logging
import os
import pygeoprocessing.routing
import multiprocessing
import sys
import taskgraph
WORKSPACE_DIR = 'workspace'
DEM_RASTER_PATH = r"D:\repositories\floodplain-extraction-utility\sample_data\DEM_Morrosquillo.tif"
FLOW_ACCUM_THRESHOLD_LIST = [100, 1000, 2000, 5000]
logging.basicConfig(
level=logging.DEBUG,
format=(
'%(asctime)s (%(relativeCreated)d) %(processName)s %(levelname)s '
'%(name)s [%(funcName)s:%(lineno)d] %(message)s'),
stream=sys.stdout)
LOGGER = logging.getLogger(__name__)
logging.getLogger('taskgraph').setLevel(logging.INFO)
def main():
"""Entry point."""
os.makedirs(WORKSPACE_DIR, exist_ok=True)
task_graph = taskgraph.TaskGraph(
WORKSPACE_DIR, min(
len(FLOW_ACCUM_THRESHOLD_LIST),
multiprocessing.cpu_count()), 5.0)
filled_dem = os.path.join(WORKSPACE_DIR, 'filled_dem.tif')
fill_dem_task = task_graph.add_task(
func=pygeoprocessing.routing.fill_pits,
args=((DEM_RASTER_PATH, 1), filled_dem),
target_path_list=[filled_dem],
kwargs={'working_dir': WORKSPACE_DIR},
task_name='fill pits')
flow_dir_mfd_path = os.path.join(WORKSPACE_DIR, 'flow_dir_mfd.tif')
flow_dir_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_dir_mfd,
args=((filled_dem, 1), flow_dir_mfd_path),
target_path_list=[flow_dir_mfd_path],
kwargs={'working_dir': WORKSPACE_DIR},
dependent_task_list=[fill_dem_task],
task_name='flow dir')
flow_accum_raster_path = os.path.join(
WORKSPACE_DIR, 'flow_accum_mfd.tif')
flow_accum_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_mfd,
args=((flow_dir_mfd_path, 1), flow_accum_raster_path),
target_path_list=[flow_accum_raster_path],
dependent_task_list=[flow_dir_task],
task_name='flow accumulation')
for flow_accum_threshold in FLOW_ACCUM_THRESHOLD_LIST:
stream_raster_path = os.path.join(
WORKSPACE_DIR, f'tfa_{flow_accum_threshold}_stream.tif')
task_graph.add_task(
func=pygeoprocessing.routing.extract_streams_mfd,
args=(
(flow_accum_raster_path, 1), (flow_dir_mfd_path, 1),
flow_accum_threshold, stream_raster_path),
target_path_list=[stream_raster_path],
dependent_task_list=[flow_accum_task],
task_name=(
f'extract streams with flow accum {flow_accum_threshold}'))
task_graph.close()
task_graph.join()
LOGGER.info('all done!')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment