Created
July 14, 2018 13:03
-
-
Save sdressler/3aba8c306471c1d49fcc8fa9cb61c3fc to your computer and use it in GitHub Desktop.
Naive Scheduler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import time | |
class Scheduler(object): | |
def __init__(self, tasks, graph, entrypoint): | |
self.tasks = tasks | |
self.graph = graph | |
self.entrypoint = entrypoint | |
self.finished_tasks = set() | |
def get_following_tasks(self, node): | |
nodes = set() | |
for task_id, dependencies in self.graph.items(): | |
if node in dependencies: | |
nodes.add(node) | |
return nodes | |
def get_dependencies(self, node): | |
return self.graph.get(node, []) | |
def task_is_runnable(self, task): | |
dependencies = set(self.get_dependencies(task)) | |
return dependencies.issubset(self.finished_tasks) | |
def on_task_done(self, task): | |
task_id, result = task.result() | |
print(f'Task {task_id} done with {result}') | |
self.finished_tasks.add(task_id) | |
async def _task_run_wrapper(self, task_id): | |
task, arg = self.tasks[task_id] | |
return (task_id, await task(arg)) | |
async def _run_impl(self): | |
queue = asyncio.Queue() | |
for task in self.graph[self.entrypoint]: | |
await queue.put(task) | |
while not queue.empty(): | |
next_task = await queue.get() | |
if not self.task_is_runnable(next_task): | |
await queue.put(next_task) | |
continue | |
# push followers into queue | |
for task in self.get_following_tasks(next_task): | |
await queue.put(task) | |
fut = asyncio.ensure_future(self._task_run_wrapper(next_task)) | |
fut.add_done_callback(self.on_task_done) | |
def run(self): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(self._run_impl()) | |
async def some_task(foo): | |
entrance_time = time.time() | |
await asyncio.sleep(1) | |
print(f'Hello from {foo} @ {entrance_time}') | |
if __name__ == '__main__': | |
tasks = { | |
'a': (some_task, 'a'), | |
'b': (some_task, 'b'), | |
'c': (some_task, 'c'), | |
'd': (some_task, 'd') | |
} | |
graph = { | |
'c': ['a', 'b'], | |
'd': ['c'] | |
} | |
Scheduler(tasks, graph, 'c').run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment