Last active
October 11, 2020 05:59
-
-
Save cwells/5a99d576800031018530b71385d5d75c to your computer and use it in GitHub Desktop.
Execution of topologically-sorted functions and methods
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
import json | |
import time | |
from collections import ChainMap | |
from functools import partial | |
import click | |
from taskgraph import TaskGraph | |
task = TaskGraph() | |
class Job: | |
def __init__(self): | |
self.start_time = time.time() | |
def do_work(self, delay): | |
time.sleep(delay) | |
return round(time.time() - self.start_time, 1) | |
@task.requires() | |
def foo(self, delay): | |
return { "foo": self.do_work(delay) } | |
@task.requires() | |
def bar(self, delay): | |
return { "bar": self.do_work(delay) } | |
@task.requires(bar) | |
def baz(self, delay): | |
return { "baz": self.do_work(delay) } | |
@task.requires(foo, bar) | |
def qux(self, delay): | |
return { "qux": self.do_work(delay) } | |
@task.requires(qux, baz) | |
def quz(self, delay): | |
return { "quz": self.do_work(delay) } | |
@task.requires(foo, bar) | |
def xyzzy(self, delay): | |
return { "xyzzy": self.do_work(delay) } | |
@click.command() | |
@click.option('--parallel', '-p', is_flag=True, help="Run tasks in parallel.") | |
@click.option('--pool-size', '-z', type=click.IntRange(1, 8), default=4, help="Size of pool for parallel processing.") | |
@click.option('--delay', '-d', type=click.IntRange(0, 10), default=0, help="Seconds to sleep in functions.") | |
@click.option('--graph', '-g', is_flag=True, help="Dump graph to stdout and exit (affected by -p flag).") | |
def main(parallel, pool_size, delay, graph): | |
if graph: | |
print(task.graph(parallel)) | |
raise SystemExit | |
job = Job() | |
run = partial(task.run_parallel, pool_size=pool_size) if parallel else task.run | |
record = dict(ChainMap(*run(job, delay=delay))) | |
print(json.dumps(record, indent=2)) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
click==7.1.2 | |
toposort==1.5 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
from multiprocessing import Pool | |
from toposort import toposort, toposort_flatten | |
class TaskGraph: | |
'''Task dependency tree. | |
Tasks may be run serially or in parallel. | |
Declare tasks using @TaskGraph.requires(). | |
''' | |
def __init__(self): | |
self._graph = {} | |
self._tasks = {} | |
def requires(self, *deps): | |
'''Decorator for declaring a function as a task as well as listing | |
other tasks as dependencies. | |
''' | |
def wrapper(fn): | |
self._graph[fn.__name__] = set(f.__name__ for f in deps or []) | |
self._tasks[fn.__name__] = fn | |
return fn | |
return wrapper | |
def graph(self, parallel=False): | |
'''Return formatted graph representation as string. | |
''' | |
if parallel: | |
return "(" + ") -> (".join(', '.join(s) for s in toposort(self._graph)) + ")" | |
return ' -> '.join([ s for s in toposort_flatten(self._graph) ]) | |
def run(self, *args, **kwargs): | |
'''Run tasks serially. | |
Useful when: | |
- tasks share mutable state | |
- tasks are too small to be worth running in parallel | |
''' | |
for task in toposort_flatten(self._graph): | |
yield self._tasks[task](*args, **kwargs) | |
def run_parallel(self, *args, pool_size=4, **kwargs): | |
'''Run independent tasks in parallel. | |
For example, given dependencies a -> b and c -> d, (a, c) would be run | |
in parallel, followed by (b, d) being run in parallel. | |
''' | |
for tasks in toposort(self._graph): | |
with Pool(processes=pool_size) as pool: | |
results = [] | |
for task in tasks: | |
results.append(pool.apply_async(self._tasks[task], args, kwargs)) | |
for result in results: | |
yield result.get() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment