Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Last active October 13, 2021 15:04
Show Gist options
  • Save vxgmichel/ad2aef82dd849d346dc20c3127af026c to your computer and use it in GitHub Desktop.
Save vxgmichel/ad2aef82dd849d346dc20c3127af026c to your computer and use it in GitHub Desktop.
Asynchronous implementation of itertools.groupby for aiostream
import asyncio
from aiostream.aiter_utils import anext
from aiostream import stream, operator, streamcontext, pipe
import pytest
@operator(pipable=True)
async def groupby(source, key=None):
key = key if key is not None else lambda: None
iscorofunc = asyncio.iscoroutinefunction(key)
current_item = object()
current_key = target_key = object()
async def groupby_values(streamer, target_key):
nonlocal current_key, current_item
while current_key == target_key:
yield current_item
try:
current_item = await anext(streamer)
except StopAsyncIteration:
return
if iscorofunc:
current_key = await key(current_item)
else:
current_key = key(current_item)
async with streamcontext(source) as streamer:
# Loop over groups
while True:
# Loop over items
while current_key == target_key:
try:
current_item = await anext(streamer)
except StopAsyncIteration:
return
if iscorofunc:
current_key = await key(current_item)
else:
current_key = key(current_item)
target_key = current_key
substreamer = streamcontext(groupby_values(streamer, target_key))
yield (current_key, substreamer)
@pytest.fixture(params=['even', 'aeven'])
def even(request):
def even(x):
return x % 2 == 0
async def aeven(x):
await asyncio.sleep(0.01)
return even(x)
return even if request.param == 'even' else aeven
@pytest.mark.asyncio
async def test_groupby(even):
xs = stream.range(5) | groupby.pipe(even)
keys, streams = zip(*await stream.list(xs))
assert keys == (True, False, True, False, True)
results = []
async with xs.stream() as streamer:
async for key, values in streamer:
values = await (values | pipe.list())
results.append((key, values))
assert results == [
(True, [0]), (False, [1]), (True, [2]), (False, [3]), (True, [4])]
xs = stream.range(0, 5, 2)
ys = stream.range(1, 5, 2)
zs = (xs + ys) | groupby.pipe(even)
keys, streams = zip(*await stream.list(zs))
assert keys == (True, False)
results = []
async with zs.stream() as streamer:
async for key, values in streamer:
values = await (values | pipe.list())
results.append((key, values))
assert results == [(True, [0, 2, 4]), (False, [1, 3])]
@erikbern
Copy link

erikbern commented Oct 13, 2021

Can this be merged into aiostream :) Realized I want something like this for something I'm building (I want to take an async gen and "chunk" it every 60s into a sub-generator)

@vxgmichel
Copy link
Author

@erikbern I don't have time to integrate this feature at the moment but feel free to make a PR if you want this to happen, I'll be happy to review it :)

@erikbern
Copy link

Nice! Actually realized groupby won't quite work for my use case, but if you're interested in adding a time slicing / chunking operator, then I'm happy to take a stab at it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment