-
-
Save foarsitter/e1c8bc4351a48e07f9f82a145eea315b to your computer and use it in GitHub Desktop.
Implement Producer/Consumer pattern with asyncio.Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
DRY approach of https://gist.github.com/showa-yojyo/4ed200d4c41f496a45a7af2612912df3 | |
""" | |
import asyncio | |
import random | |
from collections.abc import Callable | |
async def producer(n): | |
for x in range(1, n + 1): | |
# produce an item | |
print(f"producing {x}/{n}") | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
# put the item in the queue | |
yield x | |
async def consumer(item): | |
# process the item | |
print(f"consuming {item}...") | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
# Notify the queue that the item has been processed | |
class Worker: | |
def __init__(self, consume_method: Callable, producer_method: Callable): | |
self.queue: asyncio.Queue = asyncio.Queue() | |
self.producer_method: Callable = producer_method | |
self.consume_method: Callable = consume_method | |
self.consumers = [] | |
def create_consumers(self, workers: int = 3): | |
for _ in range(workers): | |
consumer = asyncio.create_task(self.consume()) | |
self.consumers.append(consumer) | |
async def produce(self, n): | |
async for x in self.producer_method(n): | |
await self.queue.put(x) | |
async def consume(self): | |
while True: | |
# wait for an item from the producer | |
item = await self.queue.get() | |
await self.consume_method(item) | |
self.queue.task_done() | |
async def run(self, n): | |
self.create_consumers(10) | |
await self.produce(n) | |
# wait until the consumer has processed all items | |
await self.queue.join() | |
# the consumers are still awaiting for an item, cancel them | |
for consumer in self.consumers: | |
consumer.cancel() | |
# wait until all worker tasks are cancelled | |
await asyncio.gather(*self.consumers, return_exceptions=True) | |
def asyncio_run(self): | |
asyncio.run(self.run(10)) | |
if __name__ == "__main__": | |
worker = Worker(consumer, producer) | |
worker.asyncio_run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment