Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Last active November 15, 2018 13:42
Show Gist options
  • Save vxgmichel/9d38cfb058f3699a6adbab9965541d20 to your computer and use it in GitHub Desktop.
Save vxgmichel/9d38cfb058f3699a6adbab9965541d20 to your computer and use it in GitHub Desktop.
Merging async generators with trio
import trio
import random
async def merge(agens):
async def produce(agen, channel):
async with channel:
async for item in agen:
await channel.send(item)
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
async with receive_channel:
async with send_channel:
for agen in agens:
nursery.start_soon(produce, agen, send_channel.clone())
async for item in receive_channel:
yield item
async def random_agen(i):
for value in range(random.randint(1, 10)):
await trio.sleep(random.random())
yield i, value
if random.random() > 0.99:
raise RuntimeError('Oops')
async def main():
agens = [random_agen(i) for i in range(10)]
async for item in merge(agens):
print(*item)
if __name__ == '__main__':
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment