Last active
April 29, 2019 21:01
-
-
Save vxgmichel/e5d15559050784e44b54c9511764db4d to your computer and use it in GitHub Desktop.
Hot stream implementation with aiostream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from aiostream import streamcontext | |
| from aiostream.aiter_utils import anext | |
| from aiostream.core import Streamer | |
| async def cancel_and_join(task): | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| class HotAsyncIterator: | |
| def __init__(self, queue): | |
| self.queue = queue | |
| def __aiter__(self): | |
| return self | |
| async def __anext__(self): | |
| next_ = await self.queue.get() | |
| value = await asyncio.shield(next_) | |
| return value | |
| class HotStreamer(Streamer): | |
| def __init__(self, source, maxlen=1): | |
| self.source = source | |
| self.maxlen = maxlen | |
| self.queues = [] | |
| self.task = None | |
| self.future = None | |
| self.started = asyncio.Event() | |
| async def __aenter__(self): | |
| self.task = asyncio.create_task(self._target()) | |
| await self.started.wait() | |
| return self | |
| async def __aexit__(self, *args): | |
| await self.aclose() | |
| async def _target(self): | |
| async with streamcontext(self.source) as streamer: | |
| while True: | |
| try: | |
| coro = anext(streamer) | |
| self.future = asyncio.create_task(coro) | |
| for queue in self.queues: | |
| if queue.full(): | |
| _ = queue.get_nowait() | |
| queue.put_nowait(self.future) | |
| self.started.set() | |
| await self.future | |
| except Exception: | |
| break | |
| finally: | |
| await cancel_and_join(self.future) | |
| def __aiter__(self): | |
| queue = asyncio.Queue(maxsize=self.maxlen) | |
| queue.put_nowait(self.future) | |
| self.queues.append(queue) | |
| return HotAsyncIterator(queue) | |
| async def aclose(self): | |
| await cancel_and_join(self.task) | |
| def hotstream(*args, **kwargs): | |
| return HotStreamer(*args, **kwargs) | |
| async def main(): | |
| from aiostream import stream, pipe | |
| xs = stream.count(interval=1) | |
| async def worker(wid): | |
| template = f'w{wid}: {{}}' | |
| ys = hot | pipe.print(template) | |
| await ys[:5] | |
| async with hotstream(xs) as hot: | |
| t1 = asyncio.create_task(worker(1)) | |
| await asyncio.sleep(1.5) | |
| t2 = asyncio.create_task(worker(2)) | |
| await asyncio.gather(t1, t2) | |
| if __name__ == '__main__': | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment