Created
October 26, 2018 04:24
-
-
Save losymear/aa6a9376dde0534f042e3ea4010b7e45 to your computer and use it in GitHub Desktop.
python3 event loop demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import heapq | |
import types | |
import time | |
""" | |
https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/ | |
""" | |
class Task: | |
"""Represent how long a coroutine should wait before starting again. | |
Comparison operators are implemented for use by heapq. Two-item | |
tuples unfortunately don't work because when the datetime.datetime | |
instances are equal, comparison falls to the coroutine and they don't | |
implement comparison methods, triggering an exception. | |
Think of this as being like asyncio.Task/curio.Task. | |
""" | |
def __init__(self, wait_until, coro): | |
self.coro = coro | |
self.waiting_until = wait_until | |
def __eq__(self, other): | |
return self.waiting_until == other.waiting_until | |
def __lt__(self, other): | |
return self.waiting_until < other.waiting_until | |
class SleepingLoop: | |
"""An event loop focused on delaying execution of coroutines. | |
Think of this as being like asyncio.BaseEventLoop/curio.Kernel. | |
""" | |
def __init__(self, *coros): | |
self._new = coros | |
self._waiting = [] | |
def run_until_complete(self): | |
# Start all the coroutines. | |
for coro in self._new: | |
wait_for = coro.send(None) | |
heapq.heappush(self._waiting, Task(wait_for, coro)) | |
# Keep running until there is no more work to do. | |
while self._waiting: | |
now = datetime.datetime.now() | |
# Get the coroutine with the soonest resumption time. | |
task = heapq.heappop(self._waiting) | |
if now < task.waiting_until: | |
# We're ahead of schedule; wait until it's time to resume. | |
delta = task.waiting_until - now | |
time.sleep(delta.total_seconds()) | |
now = datetime.datetime.now() | |
try: | |
# It's time to resume the coroutine. | |
wait_until = task.coro.send(now) | |
heapq.heappush(self._waiting, Task(wait_until, task.coro)) | |
except StopIteration: | |
# The coroutine is done. | |
pass | |
@types.coroutine | |
def sleep(seconds): | |
"""Pause a coroutine for the specified number of seconds. | |
Think of this as being like asyncio.sleep()/curio.sleep(). | |
""" | |
now = datetime.datetime.now() | |
wait_until = now + datetime.timedelta(seconds=seconds) | |
# Make all coroutines on the call stack pause; the need to use `yield` | |
# necessitates this be generator-based and not an async-based coroutine. | |
actual = yield wait_until | |
# Resume the execution stack, sending back how long we actually waited. | |
return actual - now | |
async def countdown(label, length, *, delay=0): | |
"""Countdown a launch for `length` seconds, waiting `delay` seconds. | |
This is what a user would typically write. | |
""" | |
print(label, 'waiting', delay, 'seconds before starting countdown') | |
delta = await sleep(delay) | |
print(label, 'starting after waiting', delta) | |
while length: | |
print(label, 'T-minus', length) | |
waited = await sleep(1) | |
length -= 1 | |
print(label, 'lift-off!') | |
def main(): | |
"""Start the event loop, counting down 3 separate launches. | |
This is what a user would typically write. | |
""" | |
loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2), | |
countdown('C', 4, delay=1)) | |
start = datetime.datetime.now() | |
loop.run_until_complete() | |
print('Total elapsed time is', datetime.datetime.now() - start) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment