Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save richpsharp/66861b9355202b5d5b4ca13945f3b480 to your computer and use it in GitHub Desktop.
Save richpsharp/66861b9355202b5d5b4ca13945f3b480 to your computer and use it in GitHub Desktop.
traditional multiprocessing approach
import multiprocessing
import os
import sys
import json
import glob
import logging
#import taskgraph
import pandas as pd
from wbnci import biodiversity, carbon
def worker(work_queue):
while True:
payload = work_queue.get()
if payload == 'STOP':
work_queue.put('STOP')
return
func, args, kwargs, task_name, target_path_list = payload
if target_path_list:
if all([os.path.exists(p) for p in target_path_list]):
print(f"{task_name}'s file list exists, skipping execution")
return
if not args:
args = []
if not kwargs:
kwargs = {}
try:
func(*args, **kwargs)
print(f"{task_name} run success")
except Exception as e:
print(f"EXCEPTION on {task_name}: {str(e)}")
def execute(args):
logfile = os.path.join(args['working_dir'], "biodiv_carbon_log.log")
logger = logging.getLogger(__name__)
if (logger.hasHandlers()):
logger.handlers.clear()
logger.addHandler(logging.FileHandler(logfile))
# logger.setLevel(logging.INFO)
#tg = taskgraph.TaskGraph(args['working_dir'], n_workers=12)
n_workers = 12
work_queue = multiprocessing.Queue(n_workers * 2)
worker_list = []
for _ in range(n_workers):
worker_list.append(
multiprocessing.Process(
target=worker,
args=(work_queue,)))
worker_list[-1].start()
df = pd.read_csv(args['country_list_file'])
country_list = list(df['nev_name'])
for c in country_list:
# if c in ['France', 'Norway', 'Kiribati', 'Fiji', 'Netherlands',
# 'French Southern and Antarctic Lands', 'French Polynesia']:
# continue
if c not in ['Denmark', 'Estonia', 'Ecuador']:
continue
country_dir = os.path.join(args['working_dir'], 'packages', c)
analyze_country(country_dir, args, work_queue, logger=logger)
work_queue.put('STOP')
# tg.close()
# tg.join()
for worker_process in worker_list:
worker_process.join()
def analyze_country(country_dir, args, work_queue, logger=None):
"""
`work_queue` is the multiprocessing queue that will handle execution
of arbitrary functions
Assumes that `country_dir` points to a data package output by
`construct_data_packages.py`.
`args` should be the config file arguments.
`tg` is a `taskgraph.TaskGraph`.
Once this has run, model results will be found in `{country_dir}/ModelResults`.
Each file will be named `{scenario}_{model}.tif`.
These data are ready to be aggregated to SDU.
"""
country_name = os.path.basename(country_dir)
print(f'running service models for {country_name}')
output_dir = os.path.join(country_dir, 'ModelResults')
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
scenario_dir = os.path.join(country_dir, 'Projected', 'Scenarios')
scenario_files = glob.glob(os.path.join(scenario_dir, '*.tif'))
scenario_names = [os.path.splitext(os.path.basename(sf))[0] for sf in scenario_files]
biodiversity_args = {
'core_data_folder': "/Users/peterhawthorne/Projects/WBNCI/data/biodiversity",
'country_folder': country_dir,
'scenario_folder': os.path.join(country_dir, 'Projected', 'Scenarios'),
'output_folder': output_dir
}
biodiversity_target_file_list = [
os.path.join(output_dir, f"{scenario}_biodiversity.tif") for scenario in scenario_names
]
carbon_args = {
'country_folder': country_dir,
'scenario_folder': os.path.join(country_dir, 'Projected', 'Scenarios'),
'output_folder': output_dir
}
carbon_target_file_list = [
os.path.join(output_dir, f"{scenario}_carbon.tif") for scenario in scenario_names
]
# tg.add_task(
# func=biodiversity_task,
# args=[country_name, biodiversity_args],
# kwargs={'logger': logger},
# task_name=f'biodiversity_{country_dir}',
# target_path_list=biodiversity_target_file_list
# )
# order of args to queue is:
# func, args=None, kwargs=None, task_name=None, target_path_list=None
work_queue.put((
biodiversity_task, [country_name, biodiversity_args],
{'logger': logger}, f'biodiversity_{country_dir}',
biodiversity_target_file_list))
# tg.add_task(
# func=carbon_task,
# args=[country_name, carbon_args],
# kwargs={'logger': logger},
# task_name=f'carbon_{country_dir}',
# target_path_list=carbon_target_file_list
# )
work_queue.put((
carbon_task, [country_name, carbon_args], {'logger': logger},
f'carbon_{country_dir}', carbon_target_file_list))
def biodiversity_task(country_name, args, logger=None):
"""
Helper function to wrap the model call in a try-except block and to optionally
log success or failure.
"""
try:
biodiversity.execute(args)
except Exception:
if logger is not None:
logger.info(f"ModelFail,biodiversity,{country_name}")
else:
if logger is not None:
logger.info(f"ModelSuccess,biodiversity,{country_name}")
def carbon_task(country_name, args, logger=None):
"""
Helper function to wrap the model call in a try-except block and to optionally
log success or failure.
"""
try:
carbon.execute(args)
except Exception:
if logger is not None:
logger.info(f"ModelFail,carbon,{country_name}")
else:
if logger is not None:
logger.info(f"ModelSuccess,carbon,{country_name}")
if __name__ == '__main__':
default_config = "config_example.json"
if len(sys.argv) == 2:
config_file = sys.argv[1]
else:
config_file = default_config
with open(config_file, 'r') as f:
args = json.load(f)
execute(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment