Last active
July 28, 2023 11:31
-
-
Save Olegt0rr/d4bd881f2f3fdacede426b1368139582 to your computer and use it in GitHub Desktop.
Such a solution may be needed for the effective use of third-party asynchronous iterators, which imply a step-by-step return of one element, and under the hood make a request for several elements ahead. E.g. `.find()` method of motor mongo client.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import TYPE_CHECKING, TypeVar | |
if TYPE_CHECKING: | |
from collections.abc import AsyncIterator | |
T = TypeVar("T") | |
async def async_chunks( | |
async_iterator: AsyncIterator[T], | |
size: int, | |
) -> AsyncIterator[list[T]]: | |
"""Generate chunks from an asynchronous sequence. | |
Chunks are lists consists of original ``T`` elements. | |
The chunk can't contain more than ``size`` elements. | |
The last chunk might contain less than ``size`` elements, | |
but can't be empty. | |
""" | |
finished = False | |
while not finished: | |
results: list[T] = [] | |
for _ in range(size): | |
try: | |
# for python3.10+ `anext(async_iterator)` is available, | |
# but for lower versions compatibility let's use this: | |
result = await async_iterator.__anext__() | |
except StopAsyncIteration: | |
finished = True | |
else: | |
results.append(result) | |
if results: | |
yield results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment