Last active
May 21, 2019 19:38
-
-
Save mgxd/9a00cba44ece7c92deccb8f88ae6cdea to your computer and use it in GitHub Desktop.
asyncio_pydra.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import concurrent.futures as cf | |
# from multiprocessing.context import ForkServerContext | |
import time | |
import functools | |
# used to bypass Event Loop error in Jupyter Notebooks | |
import nest_asyncio | |
nest_asyncio.apply() | |
# Dependencies for :class:`Node` | |
################################ | |
# stripped down example | |
# requires Node, Workflow classes | |
class Inputs(): | |
def __init__(self): | |
self.val = None | |
def __iter__(self): | |
yield self.val | |
class Outputs(list): | |
def __init__(self, *args): | |
self.append('out') | |
for arg in args: | |
self.append(arg) | |
class Interface(): | |
def __init__(self, inputs, outputs): | |
self.inputs = inputs | |
self.outputs = {o: None for o in outputs} | |
self._results = {} | |
@property | |
def results(self): | |
return self._results | |
def run(self): | |
# sim some computing time... | |
if self.inputs.val == 3: | |
time.sleep(5) | |
time.sleep(5) | |
# just add one to result | |
plusone = self.inputs.val + 1 | |
self._results['out'] = plusone | |
return self.results | |
class Node(): | |
def __init__(self, name, ikws=None, oargs=None): | |
self.name = name | |
self.inputs = Inputs() | |
self.outputs = Outputs() | |
if ikws: | |
self.inputs = Inputs(**ikws) | |
if oargs: | |
self.outputs = Outputs(*oargs) | |
self.interface = Interface(self.inputs, self.outputs) | |
self._results = {} | |
self._dependencies = [] | |
self.done = False | |
def __repr__(self): | |
return self.name | |
def _is_runnable(self): | |
if not self._dependencies: | |
return True | |
return all([node.done for node in self._dependencies]) | |
def run(self): | |
print("NODE %s: running" % self.name) | |
# check if any inputs are futures - if so, wait! | |
# wait til it is available | |
# cannot run if missing input! | |
self._results = self.interface.run() | |
print("NODE %s: completed" % self.name) | |
self.done = True | |
print(self._results) | |
return self._results | |
# @property | |
# def done(self): | |
# if self._results: | |
# return True | |
@property | |
def results(self): | |
return self._results | |
class Workflow(): | |
""" | |
Basic DAG with asyncio/cf execution by default. | |
Currently does not have a sense of topological sorting, | |
so nodes should be added in order of execution. | |
Example: | |
... wf = Workflow() | |
... a = Node('a') | |
... b = Node('b') | |
... wf.connect(a, b) | |
... wf.run() # or wf() | |
""" | |
graph = {} # should be ordered | |
@property | |
def size(self): | |
# connected nodes | |
return len(self.graph.keys()) | |
def __init__(self): | |
# ensure empty on initialization | |
self.clear() | |
self._loop = asyncio.get_event_loop() | |
def __call__(self): | |
self.run() | |
def add(self, node): | |
"""Add node to workflow""" | |
self.graph[node] = set() | |
def connect(self, n1, n2): | |
"""Add an edge to the graph between Nodes n1 to n2""" | |
if not self.graph.get(n1): | |
self.add(n1) | |
if not self.graph.get(n2): | |
self.add(n2) | |
elif n2 in self.graph[n1]: | |
raise AttributeError('%s already connected to %s', n1, n2) | |
self.graph[n1].add(n2) | |
#n2._is_runnable = False | |
n2._dependencies.append(n1) | |
def clear(self): | |
"""Reset graph""" | |
self.graph = {} | |
def topsort(self): | |
pass | |
def _set_needed_outputs(self, node): | |
"""Helper method for downstream node variable assignment | |
TODO: replace val with node's results | |
""" | |
for target in self.graph[node]: | |
print("Assigning Node %s's input to %d" % (target, node._results['out'])) | |
# update downstream nodes | |
target.inputs.val = node._results['out'] | |
############################################################# | |
####################### Execution 1 ######################### | |
############################################################# | |
# # Wrapping corouting which waits for return from process pool. | |
# async def get_results(self, executor, node): | |
# """Wait for node results""" | |
# # trigger the run | |
# loop = asyncio.get_event_loop() | |
# res = await loop.run_in_executor(executor, node.run) | |
# return node, res | |
# # asyncio Task (coroutine in eventloop) | |
# async def _run_wf(self, executor): | |
# """ | |
# Schedule run in event loop. | |
# """ | |
# # create the process pool | |
# with executor as wf_executor: | |
# print("Workflow contains %d nodes" % self.size, end="\n*******\n") | |
# # this method submits in chunks | |
# graph = set(self.graph) | |
# while graph: | |
# # Calling the asyncio coroutine and get a future of node results | |
# futures = [self.get_results(wf_executor, node) for node | |
# in graph if node._is_runnable] | |
# for fut in asyncio.as_completed(futures, timeout=12): | |
# node, val = await fut | |
# print("Completed", node) | |
# graph.discard(node) | |
# for target in self.graph[node]: | |
# print("Assigning Node %s's input to %d" % (target, val['out'])) | |
# # update downstream nodes | |
# target.inputs.val = val['out'] | |
# # assumes only 1 connection, needs to be smarter | |
# target._is_runnable = True | |
# def run(self, executor=None): | |
# """Run the future until completed""" | |
# # for now, just run concurrently on local CPUs | |
# if not executor: | |
# executor = cf.ProcessPoolExecutor() | |
# asyncio.run(self._run_wf(executor)) | |
############################################################# | |
####################### Execution 2 ######################### | |
############################################################# | |
# Wrapping corouting which waits for return from process pool. | |
async def get_results(self, executor, node): | |
"""Wait for node results""" | |
# trigger the node to run | |
fut = await self._loop.run_in_executor(executor, node.run) | |
return node, fut | |
# asyncio Task (coroutine in eventloop) | |
async def _run_wf(self, executor): | |
""" | |
Schedule run in event loop. | |
""" | |
# create the process pool | |
print("Workflow contains %d nodes" % self.size, end="\n*******\n") | |
remaining = set(self.graph) | |
pending = set() | |
tasks = set() | |
with executor as wf_executor: | |
loops = 0 | |
while remaining or pending: | |
# check number of cycles / pending nodes | |
# loops += 1 | |
# print(loops, remaining) | |
toqueue = [n for n in remaining if n._is_runnable()] | |
for node in toqueue: | |
tasks.add(asyncio.create_task( | |
self.get_results(wf_executor, node) | |
)) | |
# remove it once queued | |
remaining.discard(node) | |
# combine past pending with futures | |
tasks.union(pending) | |
done, pending = await asyncio.wait( | |
tasks, return_when=asyncio.FIRST_COMPLETED | |
) | |
for fut in done: | |
tasks.discard(fut) | |
pending.discard(fut) | |
node, val = await fut | |
node.done = True | |
node._results = val | |
self._set_needed_outputs(node) | |
# if not done and not pending: | |
# # escape | |
# break | |
def run(self, executor=None): | |
"""Run the future until completed""" | |
# for now, just run concurrently on local CPUs | |
if not executor: | |
executor = cf.ProcessPoolExecutor() | |
asyncio.run(self._run_wf(executor)) | |
def test_linear(): | |
""" | |
One linear workflow | |
A -> B -> C | |
""" | |
a = Node('a') | |
a.inputs.val = 1 | |
b = Node('b') | |
c = Node('c') | |
wf = Workflow() | |
wf.connect(a, b) | |
wf.connect(b, c) | |
wf.run() | |
def test_concurrent(): | |
""" | |
Parallel workflow - [B, C] should execute concurrently | |
A -> B --> D | |
\-> C | |
""" | |
a = Node('a') | |
a.inputs.val = 1 | |
b = Node('b') | |
c = Node('c') | |
d = Node('d') | |
wf = Workflow() | |
wf.connect(a, b) | |
wf.connect(a, c) | |
wf.connect(b, d) | |
wf() | |
def test_combination(): | |
""" | |
Half concurrent workflow - [A/B] should execute concurrently, | |
then join at C and linearly proceed. | |
A | |
\ | |
C --> D | |
/ | |
B | |
""" | |
a = Node('a') | |
a.inputs.val = 3 | |
b = Node('b') | |
b.inputs.val = 5 | |
c = Node('c') | |
d = Node('d') | |
wf = Workflow() | |
wf.connect(a, c) | |
wf.connect(b, c) | |
wf.connect(c, d) | |
wf.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import concurrent.futures as cf | |
import time | |
from functools import partial | |
class Submitter: | |
def __init__(self): | |
self.loop = None | |
self.pool = cf.ProcessPoolExecutor() | |
self.remaining = [] | |
self.futures = set() | |
async def submit_job(self, runnable): | |
if isinstance(runnable, Workflow): | |
newsubmitter = Submitter() | |
newsubmitter.loop = self.loop | |
res = await newsubmitter.run(runnable) | |
else: | |
res = await self.loop.run_in_executor( | |
self.pool, runnable.run | |
) | |
return runnable, res | |
async def fetch_jobs(self, runnable=None): | |
done = [] | |
try: | |
done, pending = await asyncio.wait( | |
self.futures, return_when=asyncio.FIRST_COMPLETED | |
) | |
except ValueError: | |
# nothing pending! | |
pending = set() | |
# preserve pending tasks | |
for fut in done: | |
task, res = await fut | |
print(f"{task.name} completed at {time.time()}") | |
self.futures.remove(fut) | |
if not isinstance(task, Workflow): | |
task.outfield = res | |
runnable.assign_next(task, res) | |
return runnable | |
async def run(self, runnable): | |
maxloops = 10 | |
count = 0 | |
while not runnable.done: | |
jobs = [j for j in runnable.graph if j.ready] | |
for job in jobs: | |
print(f"submitting {job.name}") | |
job_future = asyncio.create_task(self.submit_job(job)) | |
self.futures.add(job_future) | |
await self.fetch_jobs(runnable) | |
count += 1 | |
if count > maxloops: | |
breakpoint() | |
if runnable.outfield is None: | |
print("is none!!") | |
runnable.outfield = 1 | |
return runnable.outfield | |
############################################### | |
# Entrypoints | |
############################################### | |
def submit(self, runnable): | |
""" | |
Entrypoint for task/workflow submission | |
Anything spawned after will be submitted as a task | |
through ``submit_existing`` | |
""" | |
if not self.loop: | |
self.loop = asyncio.get_event_loop() | |
print(f"Starting {runnable}") | |
self.loop.run_until_complete(self.run(runnable)) | |
print(f"Completed {runnable} at {time.time()}") | |
class Task: | |
def __init__(self, name, x=None): | |
self.name = name | |
self.infield = x | |
self._outfield = None | |
def __repr__(self): | |
return self.name | |
@property | |
def outfield(self): | |
return self._outfield | |
@outfield.setter | |
def outfield(self, val): | |
self._outfield = val | |
@property | |
def done(self): | |
return self.outfield is not None | |
@property | |
def ready(self): | |
if self.done: | |
return False | |
return self.infield is not None | |
def run(self, submitter=None): | |
time.sleep(1) | |
self.outfield = self.infield + 1 | |
return self.outfield | |
class Workflow(Task): | |
def __init__(self, graph, name, x=None): | |
self.graph = graph | |
self.name = name | |
self.infield = x | |
@property | |
def done(self): | |
for task in self.graph: | |
if not task.done: | |
return False | |
return True | |
@property | |
def outfield(self): | |
return [*self.graph][-1].outfield | |
@property | |
def ready(self): | |
if self.done: | |
return False | |
for task in self.graph: | |
return True if task.ready else False | |
return False | |
def assign_next(self, task, res): | |
for targ in self.graph[task]: | |
if isinstance(targ, Workflow): | |
# assign first node | |
targ = [*targ.graph][0] | |
print(f"Assigning {res} to {targ.name} infield") | |
targ.infield = res | |
def run(self, submitter=None): | |
if not submitter: | |
submitter = Submitter() | |
submitter.submit(self) | |
self.outfield = 'done' | |
return self.outfield | |
def wf1(): | |
"""Linear workflow with a set of concurrent nodes and a subworkflow""" | |
t1 = Task('first', 1) | |
t2 = Task('second', 2) | |
t3 = Task('third') | |
t6 = Task('last') | |
t4 = Task('sub-fourth') | |
t5 = Task('sub-fifth') | |
graph1 = { | |
t4: [t5], | |
} | |
swf = Workflow(graph1, 'subwf') | |
graph2 = { | |
t1: [t2], | |
t2: [t3], | |
t3: [swf], | |
swf: [t6], | |
t6: [], | |
} | |
wf = Workflow(graph2, 'workflow') | |
return wf | |
def wf2(): | |
"""Concurrent workflows""" | |
t1 = Task("wf_a1", 1) | |
t2 = Task("wf_a2") | |
graph1 = { | |
t1: [t2], | |
t2:[] | |
} | |
wf_a = Workflow(graph1, 'wfa') | |
wf_b = wf1() | |
graph = { | |
wf_a: [], | |
wf_b: [] | |
} | |
wf = Workflow(graph, 'wf') | |
return wf | |
if __name__ == "__main__": | |
wf = wf2() | |
sub = Submitter() | |
sub.submit(wf) |
satra
commented
May 21, 2019
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment