Created
February 4, 2021 02:13
-
-
Save richpsharp/b2bbee8d7d86fbcd1a5fcdc443aa6e84 to your computer and use it in GitHub Desktop.
Script to test many flow accumulation threshold values on a DEM while taking advantage of multiprocessing and avoided reexecution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""TFA script for Stacie to experiment with TFAs.""" | |
import logging | |
import os | |
import pygeoprocessing.routing | |
import multiprocessing | |
import sys | |
import taskgraph | |
WORKSPACE_DIR = 'workspace' | |
DEM_RASTER_PATH = r"D:\repositories\floodplain-extraction-utility\sample_data\DEM_Morrosquillo.tif" | |
FLOW_ACCUM_THRESHOLD_LIST = [100, 1000, 2000, 5000] | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format=( | |
'%(asctime)s (%(relativeCreated)d) %(processName)s %(levelname)s ' | |
'%(name)s [%(funcName)s:%(lineno)d] %(message)s'), | |
stream=sys.stdout) | |
LOGGER = logging.getLogger(__name__) | |
logging.getLogger('taskgraph').setLevel(logging.INFO) | |
def main(): | |
"""Entry point.""" | |
os.makedirs(WORKSPACE_DIR, exist_ok=True) | |
task_graph = taskgraph.TaskGraph( | |
WORKSPACE_DIR, min( | |
len(FLOW_ACCUM_THRESHOLD_LIST), | |
multiprocessing.cpu_count()), 5.0) | |
filled_dem = os.path.join(WORKSPACE_DIR, 'filled_dem.tif') | |
fill_dem_task = task_graph.add_task( | |
func=pygeoprocessing.routing.fill_pits, | |
args=((DEM_RASTER_PATH, 1), filled_dem), | |
target_path_list=[filled_dem], | |
kwargs={'working_dir': WORKSPACE_DIR}, | |
task_name='fill pits') | |
flow_dir_mfd_path = os.path.join(WORKSPACE_DIR, 'flow_dir_mfd.tif') | |
flow_dir_task = task_graph.add_task( | |
func=pygeoprocessing.routing.flow_dir_mfd, | |
args=((filled_dem, 1), flow_dir_mfd_path), | |
target_path_list=[flow_dir_mfd_path], | |
kwargs={'working_dir': WORKSPACE_DIR}, | |
dependent_task_list=[fill_dem_task], | |
task_name='flow dir') | |
flow_accum_raster_path = os.path.join( | |
WORKSPACE_DIR, 'flow_accum_mfd.tif') | |
flow_accum_task = task_graph.add_task( | |
func=pygeoprocessing.routing.flow_accumulation_mfd, | |
args=((flow_dir_mfd_path, 1), flow_accum_raster_path), | |
target_path_list=[flow_accum_raster_path], | |
dependent_task_list=[flow_dir_task], | |
task_name='flow accumulation') | |
for flow_accum_threshold in FLOW_ACCUM_THRESHOLD_LIST: | |
stream_raster_path = os.path.join( | |
WORKSPACE_DIR, f'tfa_{flow_accum_threshold}_stream.tif') | |
task_graph.add_task( | |
func=pygeoprocessing.routing.extract_streams_mfd, | |
args=( | |
(flow_accum_raster_path, 1), (flow_dir_mfd_path, 1), | |
flow_accum_threshold, stream_raster_path), | |
target_path_list=[stream_raster_path], | |
dependent_task_list=[flow_accum_task], | |
task_name=( | |
f'extract streams with flow accum {flow_accum_threshold}')) | |
task_graph.close() | |
task_graph.join() | |
LOGGER.info('all done!') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment