Last active
October 5, 2024 17:44
-
-
Save hholst80/80a5c350d63d44fe69b8a2e7f3664729 to your computer and use it in GitHub Desktop.
async generator playground
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio.exceptions import CancelledError | |
async def gen(nam="gen"): | |
for x in range(10): | |
r = range(10*x, 10*(x+1)) | |
print(f"{nam} produced range: [{10*x},{10*(x+1)})") | |
yield r | |
await asyncio.sleep(1) | |
async def flatten(nam, s): | |
Inf = float('Inf') | |
while True: | |
try: | |
x = (yield) | |
except (GeneratorExit, CancelledError): | |
break | |
else: | |
m0 = +Inf | |
m1 = -Inf | |
for y in x: | |
m0 = min(m0, y) | |
m1 = max(m1, y) | |
await s.asend(y) | |
await asyncio.sleep(0.2) | |
print(f"{nam} consumed range: [{m0},{m1+1})") | |
# NOTE: This is push based, the logic will only happen when the await asend() | |
# is called. | |
async def reduce(nam="reduce"): | |
s = 0 | |
while True: | |
try: | |
x = (yield) | |
s = s + x | |
except (GeneratorExit, CancelledError): | |
break | |
print(f"{nam} sum: {s}") | |
# NOTE: To make the logic run TWO flatten at the same time, | |
# we cannot await the first we must let it be in flight | |
# NOTE: This is is not a generator but a co-routine. | |
async def dealer(g, a, b): | |
print("starting dealer") | |
tasks = set() | |
async for x in g: | |
t = asyncio.create_task(a.asend(x)) | |
a, b = b, a | |
tasks.add(t) | |
if len(tasks) >= 2: | |
_, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
await asyncio.wait(tasks) | |
print("closing down dealer") | |
async def main(): | |
print(f"The sum should be: {sum(range(100))}") | |
g = gen() | |
s = reduce() | |
await s.asend(None) | |
a = flatten("a", s) | |
b = flatten("b", s) | |
await a.asend(None) | |
await b.asend(None) | |
await dealer(g, a, b) | |
await a.aclose() | |
await b.aclose() | |
await s.aclose() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment