Last active
August 28, 2023 21:26
-
-
Save napsternxg/fe29bb094acc90e62d99f6d393d647af to your computer and use it in GitHub Desktop.
asyncio_queue_event
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import random | |
import time | |
from dataclasses import dataclass | |
from typing import Any | |
from tqdm.auto import tqdm | |
logger = logging.getLogger(__name__) | |
logging.basicConfig( | |
format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s", | |
datefmt="%Y-%m-%d:%H:%M:%S", | |
level=logging.DEBUG, | |
) | |
""" | |
Related video: https://www.youtube.com/watch?v=kMcwcJdIvHI | |
""" | |
@dataclass | |
class Item: | |
idx: int | |
value: Any | |
retries_left: int = 0 | |
class AsyncQueueRunner(object): | |
def __init__(self, num_consumers=3, source_queue_size=10, max_retries=3): | |
self.pbar = None | |
self.source_queue = asyncio.Queue(maxsize=source_queue_size) | |
self.sink_queue = asyncio.Queue() | |
self.ready_event = asyncio.Event() | |
self.num_consumers = num_consumers | |
self.max_retries = max_retries | |
async def producer(self, iterator): | |
logger.debug("producer with iterator is ready") | |
self.ready_event.set() | |
for i, val in enumerate(iterator): | |
item = Item(i, val, self.max_retries) | |
# item = random.randint(1, 10) | |
await self.source_queue.put(item) | |
logger.debug(f"Produced item {item}") | |
self.pbar.total += 1 | |
await self.source_queue.put(None) | |
async def consume(self, item): | |
time.sleep(random.random() * 0.1) | |
if item.value % 10 == 0: | |
raise Exception(f"{item} is bad") | |
return item | |
async def flush(self, item): | |
print(item, file=self.fp) | |
return item | |
async def consumer(self, name): | |
logger.debug(f"consumer {name} ready") | |
while True: | |
logger.debug(f"Consumer {name} waiting for event to clear") | |
await self.ready_event.wait() | |
logger.debug(f"Consumer {name} waiting for item") | |
item = await self.source_queue.get() | |
if item is None: | |
await self.sink_queue.put(item) | |
self.source_queue.task_done() | |
break | |
try: | |
await self.consume(item) | |
logger.debug(f"Consumer {name} consumed item {item}") | |
await self.sink_queue.put(item) | |
except Exception as e: | |
logger.exception( | |
f"Consumer {name} failed on item {item}. Exception: {e}" | |
) | |
self.ready_event.clear() | |
await asyncio.sleep(0.5) | |
self.ready_event.set() | |
if item.retries_left > 0: | |
item = Item(item.idx, item.value, item.retries_left - 1) | |
logger.debug(f"Retries remaining: readding {item}") | |
await self.source_queue.put(item) | |
self.source_queue.task_done() | |
logger.debug(f"Consumer {name} exiting.") | |
async def sink(self): | |
logger.debug("Sink is ready") | |
while True: | |
item = await self.sink_queue.get() | |
if item is None: | |
logger.debug(f"Sink got {item=}. Exiting Loop.") | |
break | |
await self.flush(item) | |
logger.debug(f"Sink consumed item {item}") | |
self.pbar.update(1) | |
self.sink_queue.task_done() | |
self.sink_queue.task_done() | |
logger.debug("Sink exiting.") | |
async def cleanup(self): | |
logger.debug( | |
f"Finished run. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}" | |
) | |
await self.source_queue.join() | |
logger.debug("Finished source_queue join") | |
await self.sink_queue.join() | |
logger.debug("Finished sink_queue join") | |
await self.sink_coroutine | |
logger.debug( | |
f"Finished sink. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}" | |
) | |
for i, c in enumerate(self.consumer_coroutines): | |
c.cancel() | |
logger.debug( | |
f"Finished consumer {i}. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}" | |
) | |
async def run(self, iterator): | |
logger.debug( | |
f"Starting run. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}" | |
) | |
with tqdm(total=0) as self.pbar: | |
self.ready_event.clear() | |
self.sink_coroutine = asyncio.create_task(self.sink()) | |
self.consumer_coroutines = [ | |
asyncio.create_task(self.consumer(i)) for i in range(self.num_consumers) | |
] | |
await self.producer(iterator) | |
await self.cleanup() | |
async def main(): | |
runner = AsyncQueueRunner() | |
with open("log.txt", "w+") as runner.fp: | |
await runner.run(range(3)) | |
logger.info("Running another round") | |
await runner.run(range(4)) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
""" | |
Related video: https://www.youtube.com/watch?v=kMcwcJdIvHI | |
""" | |
async def producer(queue, event): | |
while True: | |
# await asyncio.sleep(1) | |
item = random.randint(1, 10) | |
await queue.put(item) | |
print(f'Produced item {item}') | |
async def consumer(queue, event, name): | |
while True: | |
await asyncio.sleep(1) | |
if event.is_set(): | |
print(f'Consumer {name} waiting for event to clear') | |
await asyncio.sleep(1) | |
continue | |
else: | |
print(f'Consumer {name} waiting for item') | |
item = await queue.get() | |
if item == 5: | |
print(f'Consumer {name} failed on item {item}') | |
event.set() | |
await asyncio.sleep(1) | |
event.clear() | |
else: | |
print(f'Consumer {name} consumed item {item}') | |
queue.task_done() | |
async def main(): | |
queue = asyncio.Queue(maxsize=10) | |
event = asyncio.Event() | |
consumers = [consumer(queue, event, i) for i in range(3)] | |
producer_coroutine = producer(queue, event) | |
await asyncio.gather(*consumers, producer_coroutine) | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
import logging | |
logger = logging.getLogger(__name__) | |
logging.basicConfig( | |
format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s", | |
datefmt="%Y-%m-%d:%H:%M:%S", | |
level=logging.INFO, | |
) | |
""" | |
Related video: https://www.youtube.com/watch?v=kMcwcJdIvHI | |
""" | |
async def producer(queue, event): | |
# while True: | |
for i in range(10): | |
# await asyncio.sleep(1) | |
item = random.randint(1, 10) | |
await queue.put(item) | |
logger.info(f'Produced item {item}') | |
await queue.put(None) | |
async def consumer(queue, event, name, sink_queue, close_producer): | |
while True: | |
await asyncio.sleep(1) | |
if event.is_set(): | |
logger.info(f'Consumer {name} waiting for event to clear') | |
await asyncio.sleep(1) | |
continue | |
else: | |
logger.info(f'Consumer {name} waiting for item') | |
if close_producer.is_set(): | |
logger.info(f'Consumer {name}: close_producer is set. Exiting Loop.') | |
break | |
item = await queue.get() | |
if item is None: | |
logger.info(f'Consumer {name} got {item=}. Exiting Loop.') | |
await sink_queue.put(item) | |
close_producer.set() | |
logger.info(f'Consumer {name} setting close_producer.') | |
queue.task_done() | |
break | |
if item == 5: | |
logger.info(f'Consumer {name} failed on item {item}') | |
event.set() | |
await asyncio.sleep(1) | |
event.clear() | |
else: | |
logger.info(f'Consumer {name} consumed item {item}') | |
await sink_queue.put(item) | |
queue.task_done() | |
async def sink(sink_queue): | |
while True: | |
await asyncio.sleep(1) | |
item = await sink_queue.get() | |
logger.info(f'Sink consumed item {item}') | |
sink_queue.task_done() | |
if item is None: | |
logger.info(f'Sink got {item=}. Exiting Loop.') | |
break | |
async def main(): | |
queue = asyncio.Queue(maxsize=10) | |
sink_queue = asyncio.Queue(maxsize=10) | |
event = asyncio.Event() | |
close_producer = asyncio.Event() | |
consumers = [consumer(queue, event, i, sink_queue, close_producer) for i in range(3)] | |
producer_coroutine = producer(queue, event) | |
sink_coroutine = sink(sink_queue) | |
await asyncio.gather(*consumers, producer_coroutine, sink_coroutine) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment