Last active
November 15, 2018 13:42
-
-
Save vxgmichel/9d38cfb058f3699a6adbab9965541d20 to your computer and use it in GitHub Desktop.
Merging async generators with trio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
import random | |
async def merge(agens): | |
async def produce(agen, channel): | |
async with channel: | |
async for item in agen: | |
await channel.send(item) | |
async with trio.open_nursery() as nursery: | |
send_channel, receive_channel = trio.open_memory_channel(0) | |
async with receive_channel: | |
async with send_channel: | |
for agen in agens: | |
nursery.start_soon(produce, agen, send_channel.clone()) | |
async for item in receive_channel: | |
yield item | |
async def random_agen(i): | |
for value in range(random.randint(1, 10)): | |
await trio.sleep(random.random()) | |
yield i, value | |
if random.random() > 0.99: | |
raise RuntimeError('Oops') | |
async def main(): | |
agens = [random_agen(i) for i in range(10)] | |
async for item in merge(agens): | |
print(*item) | |
if __name__ == '__main__': | |
trio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment