Last active
May 20, 2020 17:03
-
-
Save richpsharp/66861b9355202b5d5b4ca13945f3b480 to your computer and use it in GitHub Desktop.
traditional multiprocessing approach
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import os | |
import sys | |
import json | |
import glob | |
import logging | |
#import taskgraph | |
import pandas as pd | |
from wbnci import biodiversity, carbon | |
def worker(work_queue): | |
while True: | |
payload = work_queue.get() | |
if payload == 'STOP': | |
work_queue.put('STOP') | |
return | |
func, args, kwargs, task_name, target_path_list = payload | |
if target_path_list: | |
if all([os.path.exists(p) for p in target_path_list]): | |
print(f"{task_name}'s file list exists, skipping execution") | |
return | |
if not args: | |
args = [] | |
if not kwargs: | |
kwargs = {} | |
try: | |
func(*args, **kwargs) | |
print(f"{task_name} run success") | |
except Exception as e: | |
print(f"EXCEPTION on {task_name}: {str(e)}") | |
def execute(args): | |
logfile = os.path.join(args['working_dir'], "biodiv_carbon_log.log") | |
logger = logging.getLogger(__name__) | |
if (logger.hasHandlers()): | |
logger.handlers.clear() | |
logger.addHandler(logging.FileHandler(logfile)) | |
# logger.setLevel(logging.INFO) | |
#tg = taskgraph.TaskGraph(args['working_dir'], n_workers=12) | |
n_workers = 12 | |
work_queue = multiprocessing.Queue(n_workers * 2) | |
worker_list = [] | |
for _ in range(n_workers): | |
worker_list.append( | |
multiprocessing.Process( | |
target=worker, | |
args=(work_queue,))) | |
worker_list[-1].start() | |
df = pd.read_csv(args['country_list_file']) | |
country_list = list(df['nev_name']) | |
for c in country_list: | |
# if c in ['France', 'Norway', 'Kiribati', 'Fiji', 'Netherlands', | |
# 'French Southern and Antarctic Lands', 'French Polynesia']: | |
# continue | |
if c not in ['Denmark', 'Estonia', 'Ecuador']: | |
continue | |
country_dir = os.path.join(args['working_dir'], 'packages', c) | |
analyze_country(country_dir, args, work_queue, logger=logger) | |
work_queue.put('STOP') | |
# tg.close() | |
# tg.join() | |
for worker_process in worker_list: | |
worker_process.join() | |
def analyze_country(country_dir, args, work_queue, logger=None): | |
""" | |
`work_queue` is the multiprocessing queue that will handle execution | |
of arbitrary functions | |
Assumes that `country_dir` points to a data package output by | |
`construct_data_packages.py`. | |
`args` should be the config file arguments. | |
`tg` is a `taskgraph.TaskGraph`. | |
Once this has run, model results will be found in `{country_dir}/ModelResults`. | |
Each file will be named `{scenario}_{model}.tif`. | |
These data are ready to be aggregated to SDU. | |
""" | |
country_name = os.path.basename(country_dir) | |
print(f'running service models for {country_name}') | |
output_dir = os.path.join(country_dir, 'ModelResults') | |
if not os.path.isdir(output_dir): | |
os.makedirs(output_dir) | |
scenario_dir = os.path.join(country_dir, 'Projected', 'Scenarios') | |
scenario_files = glob.glob(os.path.join(scenario_dir, '*.tif')) | |
scenario_names = [os.path.splitext(os.path.basename(sf))[0] for sf in scenario_files] | |
biodiversity_args = { | |
'core_data_folder': "/Users/peterhawthorne/Projects/WBNCI/data/biodiversity", | |
'country_folder': country_dir, | |
'scenario_folder': os.path.join(country_dir, 'Projected', 'Scenarios'), | |
'output_folder': output_dir | |
} | |
biodiversity_target_file_list = [ | |
os.path.join(output_dir, f"{scenario}_biodiversity.tif") for scenario in scenario_names | |
] | |
carbon_args = { | |
'country_folder': country_dir, | |
'scenario_folder': os.path.join(country_dir, 'Projected', 'Scenarios'), | |
'output_folder': output_dir | |
} | |
carbon_target_file_list = [ | |
os.path.join(output_dir, f"{scenario}_carbon.tif") for scenario in scenario_names | |
] | |
# tg.add_task( | |
# func=biodiversity_task, | |
# args=[country_name, biodiversity_args], | |
# kwargs={'logger': logger}, | |
# task_name=f'biodiversity_{country_dir}', | |
# target_path_list=biodiversity_target_file_list | |
# ) | |
# order of args to queue is: | |
# func, args=None, kwargs=None, task_name=None, target_path_list=None | |
work_queue.put(( | |
biodiversity_task, [country_name, biodiversity_args], | |
{'logger': logger}, f'biodiversity_{country_dir}', | |
biodiversity_target_file_list)) | |
# tg.add_task( | |
# func=carbon_task, | |
# args=[country_name, carbon_args], | |
# kwargs={'logger': logger}, | |
# task_name=f'carbon_{country_dir}', | |
# target_path_list=carbon_target_file_list | |
# ) | |
work_queue.put(( | |
carbon_task, [country_name, carbon_args], {'logger': logger}, | |
f'carbon_{country_dir}', carbon_target_file_list)) | |
def biodiversity_task(country_name, args, logger=None): | |
""" | |
Helper function to wrap the model call in a try-except block and to optionally | |
log success or failure. | |
""" | |
try: | |
biodiversity.execute(args) | |
except Exception: | |
if logger is not None: | |
logger.info(f"ModelFail,biodiversity,{country_name}") | |
else: | |
if logger is not None: | |
logger.info(f"ModelSuccess,biodiversity,{country_name}") | |
def carbon_task(country_name, args, logger=None): | |
""" | |
Helper function to wrap the model call in a try-except block and to optionally | |
log success or failure. | |
""" | |
try: | |
carbon.execute(args) | |
except Exception: | |
if logger is not None: | |
logger.info(f"ModelFail,carbon,{country_name}") | |
else: | |
if logger is not None: | |
logger.info(f"ModelSuccess,carbon,{country_name}") | |
if __name__ == '__main__': | |
default_config = "config_example.json" | |
if len(sys.argv) == 2: | |
config_file = sys.argv[1] | |
else: | |
config_file = default_config | |
with open(config_file, 'r') as f: | |
args = json.load(f) | |
execute(args) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment