Created
October 25, 2017 15:49
-
-
Save vxgmichel/445651b2d5b68426a348bad88ff15430 to your computer and use it in GitHub Desktop.
Isolate an asynchronous generator by running it in a background task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from itertools import count | |
from collections import AsyncIterable | |
async def clock(start=0, step=1, interval=1.): | |
for i in count(start, step): | |
yield i | |
await asyncio.sleep(interval) | |
class isolate(AsyncIterable): | |
def __init__(self, agen): | |
self._agen = agen | |
self._loop = asyncio.get_event_loop() | |
self._future = self._loop.create_future() | |
self._task = self._loop.create_task(self._target()) | |
self._last = None | |
@property | |
def last_value(self): | |
return self._last | |
async def _target(self): | |
try: | |
async for item in self._agen: | |
self._set_result(item) | |
raise StopAsyncIteration | |
except Exception as exc: | |
self._set_exception(exc) | |
def _set_result(self, item): | |
self._future.set_result(item) | |
self._future = self._loop.create_future() | |
self._last = item | |
def _set_exception(self, exc): | |
self._future.set_exception(exc) | |
def __aiter__(self): | |
return self | |
def __anext__(self): | |
return self._future | |
async def test(): | |
clk = isolate(clock()) | |
await asyncio.sleep(1.5) | |
async for i in clk: | |
print(i) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(test()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment