Created
February 20, 2016 00:02
-
-
Save Mec-iS/df08afcb70b8ab56469b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
<http://www.snarky.ca/how-the-heck-does-async-await-work-in-python-3-5> | |
""" | |
import datetime | |
import heapq | |
import types | |
import time | |
class Task: | |
"""A default task that makes a coroutine to wait. | |
This can be like a asyncio.Task or curio.Task | |
""" | |
def __init__(self, wait_until, coro): | |
self.coro = coro | |
self.waiting_until = wait_until | |
def __eq__(self, other): | |
return self.waiting_until == other.waiting_until | |
def __lt__(self, other): | |
return self.waiting_until < other.waiting_until | |
class Loop: | |
"""An event loop that just waits to delay execution | |
of coroutines. | |
This can be a asyncio.BaseEventLoop or a curio.Kernel. | |
""" | |
def __init__(self, *coros): | |
self._new = coros | |
self._waiting = [] | |
def run_until_complete(self): | |
for coro in self._new: | |
# create a fake reply from the coroutine (send 'None') | |
wait_for = coro.send(None) | |
heapq.heappush(self._waiting, Task(wait_for, coro)) | |
while self._waiting: | |
now = datetime.datetime.now() | |
# get the coroutine with the soonest resumption time | |
task = heapq.heappop(self._waiting) | |
if now < task.waiting_until: | |
# it's early, wait until is time to resume | |
delta = task.waiting_until - now | |
time.sleep(delta.total_seconds()) | |
now = datetime.datetime.now() | |
try: | |
# resume the coroutine | |
wait_until = task.coro.send(now) | |
heapq.heappush(self._waiting, Task(wait_until, task.coro)) | |
except StopIteration: | |
# coroutine is done | |
pass | |
@types.coroutine | |
def sleep(seconds): | |
"""Pause a coroutine for a specified number of seconds. | |
Like asyncio.sleep() or curio.sleep() | |
""" | |
now = datetime.datetime.now() | |
wait_until = now + datetime.timedelta(seconds=seconds) | |
# this is a generator based coroutine | |
actual = yield wait_until | |
return actual - now | |
async def countdown(label, length, *, delay=0): | |
"""Countdown a launch for 'length' seconds, waiting 'delay' seconds.""" | |
print(label, 'waiting', delay, 'seconds before starting countdown') | |
delta = await sleep(delay) | |
print(label, 'start after waiting', delta) | |
while length: | |
print(label, 'T-minus', length) | |
waited = await sleep(1) | |
length -= 1 | |
print(label) | |
def main(): | |
"""Start the event loop, counting down 3 launches.""" | |
loop = Loop(countdown('A', 5), countdown('B',3, delay=2), countdown('C', 4, delay=1)) | |
start = datetime.datetime.now() | |
loop.run_until_complete() | |
print('Total elapsed time is', datetime.datetime.now() - start) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment