Created
April 15, 2020 19:21
-
-
Save multun/618e833116b37068d802440bf68f2b1e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def merge_iter(iterables): | |
# create async iterators from the iterables | |
iterators = [iterable.__aiter__() for iterable in iterables] | |
# create an next() task per iterator | |
tasks = {asyncio.ensure_future(it.__anext__()): it for it in iterators} | |
try: | |
while tasks: | |
# wait for an iterator to complete | |
done_tasks, _pending_tasks = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) | |
# collect the results from the completed tasks | |
for done_task in done_tasks: | |
try: | |
yield done_task.result() | |
except StopAsyncIteration: | |
tasks.pop(done_task) | |
continue | |
# find out what iterator generated that task | |
iterator = tasks.pop(done_task) | |
# get a new task from the iterator | |
tasks[asyncio.ensure_future(iterator.__anext__())] = iterator | |
except: | |
# On error, cancel all tasks | |
async def _cancel_tasks(): | |
for task in tasks.keys(): | |
task.cancel() | |
await asyncio.gather(*tasks.keys(), return_exceptions=True) | |
await asyncio.shield(_cancel_tasks()) | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment