Last active
January 7, 2024 19:57
-
-
Save crizCraig/88c0294b395f341bfb4f646b16c9bfa7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import AsyncGenerator | |
async_q: asyncio.Queue[str] = asyncio.Queue() | |
async def producer() -> None: | |
try: | |
# Assuming `a` should be a value like 1, 2, etc. | |
# a = 1 | |
await async_q.put(str(a)) # a not defined | |
await asyncio.sleep(0.1) | |
except Exception as e: | |
print(f"Error in producer: {e}") | |
raise e | |
async def consumer() -> AsyncGenerator[str, None]: | |
while True: | |
try: | |
item = await asyncio.wait_for(async_q.get(), 3) | |
yield item | |
except asyncio.TimeoutError: | |
print("Timeout in consumer") | |
break | |
async def test_async_gen() -> None: | |
tasks = [asyncio.create_task(producer()) for _ in range(10)] | |
# Waiting on consumer first means exceptions won't be noticed | |
# until after the timeout in consumer since gather is what | |
# checks for exceptions | |
async for item in consumer(): | |
print(item) | |
await asyncio.gather(*tasks, return_exceptions=False) | |
print("Done") | |
if __name__ == "__main__": | |
asyncio.run(test_async_gen()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment