Skip to content

Instantly share code, notes, and snippets.

@hholst80
Last active October 5, 2024 17:44
Show Gist options
  • Save hholst80/80a5c350d63d44fe69b8a2e7f3664729 to your computer and use it in GitHub Desktop.
Save hholst80/80a5c350d63d44fe69b8a2e7f3664729 to your computer and use it in GitHub Desktop.
async generator playground
import asyncio
from asyncio.exceptions import CancelledError
async def gen(nam="gen"):
for x in range(10):
r = range(10*x, 10*(x+1))
print(f"{nam} produced range: [{10*x},{10*(x+1)})")
yield r
await asyncio.sleep(1)
async def flatten(nam, s):
Inf = float('Inf')
while True:
try:
x = (yield)
except (GeneratorExit, CancelledError):
break
else:
m0 = +Inf
m1 = -Inf
for y in x:
m0 = min(m0, y)
m1 = max(m1, y)
await s.asend(y)
await asyncio.sleep(0.2)
print(f"{nam} consumed range: [{m0},{m1+1})")
# NOTE: This is push based, the logic will only happen when the await asend()
# is called.
async def reduce(nam="reduce"):
s = 0
while True:
try:
x = (yield)
s = s + x
except (GeneratorExit, CancelledError):
break
print(f"{nam} sum: {s}")
# NOTE: To make the logic run TWO flatten at the same time,
# we cannot await the first we must let it be in flight
# NOTE: This is is not a generator but a co-routine.
async def dealer(g, a, b):
print("starting dealer")
tasks = set()
async for x in g:
t = asyncio.create_task(a.asend(x))
a, b = b, a
tasks.add(t)
if len(tasks) >= 2:
_, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
await asyncio.wait(tasks)
print("closing down dealer")
async def main():
print(f"The sum should be: {sum(range(100))}")
g = gen()
s = reduce()
await s.asend(None)
a = flatten("a", s)
b = flatten("b", s)
await a.asend(None)
await b.asend(None)
await dealer(g, a, b)
await a.aclose()
await b.aclose()
await s.aclose()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment