Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Last active April 29, 2019 21:01
Show Gist options
  • Select an option

  • Save vxgmichel/e5d15559050784e44b54c9511764db4d to your computer and use it in GitHub Desktop.

Select an option

Save vxgmichel/e5d15559050784e44b54c9511764db4d to your computer and use it in GitHub Desktop.
Hot stream implementation with aiostream
import asyncio
from aiostream import streamcontext
from aiostream.aiter_utils import anext
from aiostream.core import Streamer
async def cancel_and_join(task):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
class HotAsyncIterator:
def __init__(self, queue):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
next_ = await self.queue.get()
value = await asyncio.shield(next_)
return value
class HotStreamer(Streamer):
def __init__(self, source, maxlen=1):
self.source = source
self.maxlen = maxlen
self.queues = []
self.task = None
self.future = None
self.started = asyncio.Event()
async def __aenter__(self):
self.task = asyncio.create_task(self._target())
await self.started.wait()
return self
async def __aexit__(self, *args):
await self.aclose()
async def _target(self):
async with streamcontext(self.source) as streamer:
while True:
try:
coro = anext(streamer)
self.future = asyncio.create_task(coro)
for queue in self.queues:
if queue.full():
_ = queue.get_nowait()
queue.put_nowait(self.future)
self.started.set()
await self.future
except Exception:
break
finally:
await cancel_and_join(self.future)
def __aiter__(self):
queue = asyncio.Queue(maxsize=self.maxlen)
queue.put_nowait(self.future)
self.queues.append(queue)
return HotAsyncIterator(queue)
async def aclose(self):
await cancel_and_join(self.task)
def hotstream(*args, **kwargs):
return HotStreamer(*args, **kwargs)
async def main():
from aiostream import stream, pipe
xs = stream.count(interval=1)
async def worker(wid):
template = f'w{wid}: {{}}'
ys = hot | pipe.print(template)
await ys[:5]
async with hotstream(xs) as hot:
t1 = asyncio.create_task(worker(1))
await asyncio.sleep(1.5)
t2 = asyncio.create_task(worker(2))
await asyncio.gather(t1, t2)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment