Skip to content

Instantly share code, notes, and snippets.

@sdressler
Created July 14, 2018 13:03
Show Gist options
  • Save sdressler/3aba8c306471c1d49fcc8fa9cb61c3fc to your computer and use it in GitHub Desktop.
Save sdressler/3aba8c306471c1d49fcc8fa9cb61c3fc to your computer and use it in GitHub Desktop.
Naive Scheduler
#!/usr/bin/env python3
import asyncio
import time
class Scheduler(object):
def __init__(self, tasks, graph, entrypoint):
self.tasks = tasks
self.graph = graph
self.entrypoint = entrypoint
self.finished_tasks = set()
def get_following_tasks(self, node):
nodes = set()
for task_id, dependencies in self.graph.items():
if node in dependencies:
nodes.add(node)
return nodes
def get_dependencies(self, node):
return self.graph.get(node, [])
def task_is_runnable(self, task):
dependencies = set(self.get_dependencies(task))
return dependencies.issubset(self.finished_tasks)
def on_task_done(self, task):
task_id, result = task.result()
print(f'Task {task_id} done with {result}')
self.finished_tasks.add(task_id)
async def _task_run_wrapper(self, task_id):
task, arg = self.tasks[task_id]
return (task_id, await task(arg))
async def _run_impl(self):
queue = asyncio.Queue()
for task in self.graph[self.entrypoint]:
await queue.put(task)
while not queue.empty():
next_task = await queue.get()
if not self.task_is_runnable(next_task):
await queue.put(next_task)
continue
# push followers into queue
for task in self.get_following_tasks(next_task):
await queue.put(task)
fut = asyncio.ensure_future(self._task_run_wrapper(next_task))
fut.add_done_callback(self.on_task_done)
def run(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._run_impl())
async def some_task(foo):
entrance_time = time.time()
await asyncio.sleep(1)
print(f'Hello from {foo} @ {entrance_time}')
if __name__ == '__main__':
tasks = {
'a': (some_task, 'a'),
'b': (some_task, 'b'),
'c': (some_task, 'c'),
'd': (some_task, 'd')
}
graph = {
'c': ['a', 'b'],
'd': ['c']
}
Scheduler(tasks, graph, 'c').run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment