Skip to content

Instantly share code, notes, and snippets.

@agoose77
Last active April 9, 2020 18:28
Show Gist options
  • Save agoose77/10c2d91762a76e509019a0c3c12a5e7c to your computer and use it in GitHub Desktop.
Save agoose77/10c2d91762a76e509019a0c3c12a5e7c to your computer and use it in GitHub Desktop.
from collections import deque, namedtuple, defaultdict
from time import time
from types import coroutine
from enum import auto, Enum
from operator import itemgetter
def async_contextmanager(func):
class ctx:
def __init__(self, *args, **kwargs):
self.f = func(*args, **kwargs)
async def __aenter__(self):
return await self.f.asend(None)
async def __aexit__(self, exc_type, exc, tb):
try:
await self.f.asend(None)
except StopAsyncIteration:
pass
return ctx
class Trap(Enum):
SLEEP = auto()
SPAWN = auto()
WHOAMI = auto()
CANCEL = auto()
REQ_LOOP = auto()
GET_EVENT = auto()
class TaskCancelled(Exception):
pass
class Task:
def __init__(self, coro):
self.coro = coro
self.next_exc = None
self.next_result = None
def cancel(self):
self.next_exc = TaskCancelled
def update(self):
if self.next_exc is not None:
result = self.coro.throw(self.next_exc)
self.next_exc = None
else:
result = self.coro.send(self.next_result)
self.next_result = None
return result
@coroutine
def get_event(event_id):
return (yield Trap.GET_EVENT, event_id)
@coroutine
def get_loop():
return (yield Trap.REQ_LOOP,)
@coroutine
def sleep(dt=0.0):
yield Trap.SLEEP, dt
@coroutine
def whoami():
return (yield Trap.WHOAMI, )
@coroutine
def spawn_task(coro, delay=None):
return (yield Trap.SPAWN, coro, delay)
@coroutine
def cancel(task):
return (yield Trap.CANCEL, task)
async def make_collision(name, delay):
await sleep(delay)
loop = (await get_loop())
loop.events.append(('collision', name))
async def example():
await sleep(1.5)
me = await whoami()
print("Now awake", me)
await spawn_task(make_collision('Bob', delay=1.0))
await spawn_task(make_collision('Janice', delay=1.9))
await spawn_task(make_collision('Carl', delay=1.4))
while True:
collider, = await get_event('collision')
print("Collided with", collider)
_get_wake_time = itemgetter(0)
class Loop:
def __init__(self):
self._active_tasks = deque()
self._tasks = []
self._sleeping_tasks = []
self._waiting_tasks = defaultdict(list)
self.events = []
def create_task(self, coro, delay=None):
task = Task(coro)
self._tasks.append(task)
if delay is None:
self._active_tasks.append(task)
else:
wake_time = self._now() + delay
self._sleep_task(task, wake_time)
return task
def run_until_complete(self, *initial_coros):
for coro in initial_coros:
self.create_task(coro)
while self._tasks:
self.step()
def step(self,):
now = self._now()
# Wake tasks
while self._sleeping_tasks:
wake_time, task = self._sleeping_tasks[0]
if wake_time > now:
break
del self._sleeping_tasks[0]
self._active_tasks.append(task)
# Wake for event
for event_id, *args in self.events:
waiting_tasks = self._waiting_tasks[event_id]
for task in waiting_tasks:
task.next_result = args
self._active_tasks.append(task)
waiting_tasks.clear()
self.events.clear()
for i in range(len(self._active_tasks)):
task = self._active_tasks.popleft()
try:
result = task.update()
except (StopIteration, TaskCancelled):
self._tasks.remove(task)
continue
if isinstance(result, tuple):
trap, *args = result
if trap == Trap.SLEEP:
delay, = args
wake_time = self._now() + delay
self._sleep_task(task, wake_time)
continue
elif trap == Trap.SPAWN:
coro, delay = args
child_task = self.create_task(coro, delay)
task.next_result = child_task
elif trap == Trap.WHOAMI:
task.next_result = task
elif trap == Trap.CANCEL:
target_task, = args
target_task.cancel()
elif trap == Trap.REQ_LOOP:
task.next_result = self
elif trap == Trap.GET_EVENT:
event_id, = args
self._waiting_tasks[event_id].append(task)
continue
self._active_tasks.append(task)
def _sleep_task(self, task, wake_time):
self._sleeping_tasks.append((wake_time, task))
self._sleeping_tasks.sort(key=_get_wake_time)
def _now(self):
return time()
if __name__ == "__main__":
loop = Loop()
loop.create_task(example())
for i in range(60*5):
loop.step()
import time as t; t.sleep(1/60)
print("Finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment