Created
September 30, 2023 17:05
-
-
Save bshlgrs/d0d002aeabb8c1884786c986af8b7878 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
from typing import AsyncIterator, Awaitable, Callable, Coroutine, Optional, TypeVar | |
A = TypeVar("A") | |
B = TypeVar("B") | |
async def yield_with_delay(val, delay): | |
for i in range(5): | |
await trio.sleep(delay) | |
yield val | |
async def bind(xs: list[A], fn: Callable[[A], AsyncIterator[B]]) -> AsyncIterator[B]: | |
"""Monadic bind for async iterators""" | |
yield_thing: Optional[B] = None | |
yield_event = trio.Event() | |
yielding_lock = trio.Lock() | |
yielding_is_done_event = trio.Event() | |
num_completed = 0 | |
async def run(x: A): | |
nonlocal yield_thing | |
async for result in fn(x): | |
async with yielding_lock: | |
yield_thing = result | |
yield_event.set() | |
await yielding_is_done_event.wait() | |
nonlocal num_completed | |
num_completed += 1 | |
if num_completed == len(xs): | |
raise StopAsyncIteration | |
try: | |
async with trio.open_nursery() as nursery: | |
for x in xs: | |
nursery.start_soon(run, x) | |
while num_completed < len(xs): | |
await yield_event.wait() | |
yield yield_thing | |
yield_event = trio.Event() | |
yielding_is_done_event.set() | |
yielding_is_done_event = trio.Event() | |
except StopAsyncIteration: | |
return | |
async def bind_main(): | |
async def slow_thing(x): | |
async for res in yield_with_delay(x, x * 0.01): | |
yield res | |
async for thing in bind([1, 2, 3, 4, 5], slow_thing): | |
print(thing) | |
trio.run(bind_main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment