Last active
October 13, 2021 15:04
-
-
Save vxgmichel/ad2aef82dd849d346dc20c3127af026c to your computer and use it in GitHub Desktop.
Asynchronous implementation of itertools.groupby for aiostream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from aiostream.aiter_utils import anext | |
from aiostream import stream, operator, streamcontext, pipe | |
import pytest | |
@operator(pipable=True) | |
async def groupby(source, key=None): | |
key = key if key is not None else lambda: None | |
iscorofunc = asyncio.iscoroutinefunction(key) | |
current_item = object() | |
current_key = target_key = object() | |
async def groupby_values(streamer, target_key): | |
nonlocal current_key, current_item | |
while current_key == target_key: | |
yield current_item | |
try: | |
current_item = await anext(streamer) | |
except StopAsyncIteration: | |
return | |
if iscorofunc: | |
current_key = await key(current_item) | |
else: | |
current_key = key(current_item) | |
async with streamcontext(source) as streamer: | |
# Loop over groups | |
while True: | |
# Loop over items | |
while current_key == target_key: | |
try: | |
current_item = await anext(streamer) | |
except StopAsyncIteration: | |
return | |
if iscorofunc: | |
current_key = await key(current_item) | |
else: | |
current_key = key(current_item) | |
target_key = current_key | |
substreamer = streamcontext(groupby_values(streamer, target_key)) | |
yield (current_key, substreamer) | |
@pytest.fixture(params=['even', 'aeven']) | |
def even(request): | |
def even(x): | |
return x % 2 == 0 | |
async def aeven(x): | |
await asyncio.sleep(0.01) | |
return even(x) | |
return even if request.param == 'even' else aeven | |
@pytest.mark.asyncio | |
async def test_groupby(even): | |
xs = stream.range(5) | groupby.pipe(even) | |
keys, streams = zip(*await stream.list(xs)) | |
assert keys == (True, False, True, False, True) | |
results = [] | |
async with xs.stream() as streamer: | |
async for key, values in streamer: | |
values = await (values | pipe.list()) | |
results.append((key, values)) | |
assert results == [ | |
(True, [0]), (False, [1]), (True, [2]), (False, [3]), (True, [4])] | |
xs = stream.range(0, 5, 2) | |
ys = stream.range(1, 5, 2) | |
zs = (xs + ys) | groupby.pipe(even) | |
keys, streams = zip(*await stream.list(zs)) | |
assert keys == (True, False) | |
results = [] | |
async with zs.stream() as streamer: | |
async for key, values in streamer: | |
values = await (values | pipe.list()) | |
results.append((key, values)) | |
assert results == [(True, [0, 2, 4]), (False, [1, 3])] |
@erikbern I don't have time to integrate this feature at the moment but feel free to make a PR if you want this to happen, I'll be happy to review it :)
Nice! Actually realized groupby
won't quite work for my use case, but if you're interested in adding a time slicing / chunking operator, then I'm happy to take a stab at it!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can this be merged into aiostream :) Realized I want something like this for something I'm building (I want to take an async gen and "chunk" it every 60s into a sub-generator)