Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created October 21, 2018 08:24
Show Gist options
  • Select an option

  • Save njsmith/4db568255a276d4c7cf8a9a6b4295348 to your computer and use it in GitHub Desktop.

Select an option

Save njsmith/4db568255a276d4c7cf8a9a6b4295348 to your computer and use it in GitHub Desktop.
Idea for ergonomic, safe alternative to async generators
from functools import wraps, partial
from contextlib import asynccontextmanager
import trio
def producer(wrapped):
@asynccontextmanager
@functools.wraps(wrapped)
async def wrapper(*args, **kwargs):
if "send_channel" in kwargs:
raise TypeError
send_channel, receive_channel = trio.open_memory_channel(0)
kwargs["send_channel"] = send_channel
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(partial(wrapped, *args, **kwargs))
yield receive_channel
wrapper.raw = wrapped
return wrapper
# Defining an async-generator-like function that can use nurseries etc.
# without all the usual mess that async generators cause
@producer
async def squares_in_range(low, high, *, send_channel):
for i in range(low, high):
await send_channel.send(i ** 2)
# Using it
async with squares_in_range(0, 10) as sqiter:
async for square in sqiter:
...
# Or, if you want to use your own channel, or your own nursery:
await squares_in_range.raw(0, 10, send_channel=my_send_channel)
@vxgmichel
Copy link
Copy Markdown

vxgmichel commented Nov 15, 2018

I had to tweak a few things in order to get the async iteration to stop when the wrapped function is finished:

def producer(wrapped):

    @asynccontextmanager
    @wraps(wrapped)
    async def wrapper(*args, **kwargs):
        if "send_channel" in kwargs:
            raise TypeError

        send_channel, receive_channel = trio.open_memory_channel(0)

        async def target():
            async with send_channel:
                await wrapped(*args, **kwargs, send_channel=send_channel)

        async with trio.open_nursery() as nursery:
            async with receive_channel:
                nursery.start_soon(target)
                yield receive_channel
                nursery.cancel_scope.cancel()

    wrapper.raw = wrapped
    return wrapper

Also, this is conceptually much closer to go-style generators than python async generator, since the producer is running in its own task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment