Skip to content

Instantly share code, notes, and snippets.

@crizCraig
Last active January 7, 2024 19:57
Show Gist options
  • Save crizCraig/88c0294b395f341bfb4f646b16c9bfa7 to your computer and use it in GitHub Desktop.
Save crizCraig/88c0294b395f341bfb4f646b16c9bfa7 to your computer and use it in GitHub Desktop.
import asyncio
from typing import AsyncGenerator
async_q: asyncio.Queue[str] = asyncio.Queue()
async def producer() -> None:
try:
# Assuming `a` should be a value like 1, 2, etc.
# a = 1
await async_q.put(str(a)) # a not defined
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error in producer: {e}")
raise e
async def consumer() -> AsyncGenerator[str, None]:
while True:
try:
item = await asyncio.wait_for(async_q.get(), 3)
yield item
except asyncio.TimeoutError:
print("Timeout in consumer")
break
async def test_async_gen() -> None:
tasks = [asyncio.create_task(producer()) for _ in range(10)]
# Waiting on consumer first means exceptions won't be noticed
# until after the timeout in consumer since gather is what
# checks for exceptions
async for item in consumer():
print(item)
await asyncio.gather(*tasks, return_exceptions=False)
print("Done")
if __name__ == "__main__":
asyncio.run(test_async_gen())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment