Skip to content

Instantly share code, notes, and snippets.

@bnaul
Last active October 13, 2016 23:19
Show Gist options
  • Save bnaul/1f0598d8e0b52b27db51e97b9cbe838c to your computer and use it in GitHub Desktop.
Save bnaul/1f0598d8e0b52b27db51e97b9cbe838c to your computer and use it in GitHub Desktop.
import imp
from os.path import expanduser
import dask
import numpy as np
from cesium import featurize, time_series
imp.reload(dask.async)
execute_task_old = dask.async.execute_task
def execute_and_log_task(key, task, data, queue, get_id, raise_on_exception=False):
# print("key: ", key)
# print("task: ", task)
# print("data: ", data)
# print("code: {} = {}({})".format(key, task[0].__name__,
# ', '.join([str(t) for t in task[1:]])))
return execute_task_old(key, task, data, queue, get_id, raise_on_exception=False)
if __name__ == '__main__':
dask.async.execute_task = execute_and_log_task
# def double(x):
# return 2 * x
# dsk = {'x': [1, 2, 3], 'two_x': (map, double, 'x'), 'total': (sum, 'two_x')}
# value = dask.async.get_sync(dsk, 'total')
listcomp = lambda f, x: [f(x_i) for x_i in x]
dsk = {'uris': [expanduser('~/.local/cesium/ts_data/217801.nc'),
expanduser('~/.local/cesium/ts_data/224635.nc')],
'all_time_series': (listcomp, time_series.from_netcdf, 'uris'),
'all_features': (listcomp, lambda ts: featurize.featurize_single_ts(ts,
features_to_use=['maximum', 'minimum']),
'all_time_series'),
'computed_fset': (featurize.assemble_featureset, 'all_features', 'all_time_series')
}
value = dask.async.get_sync(dsk, 'computed_fset')
print(value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment