Skip to content

Instantly share code, notes, and snippets.

@foarsitter
Forked from showa-yojyo/producer_consumer.py
Last active January 22, 2024 15:28
Show Gist options
  • Save foarsitter/e1c8bc4351a48e07f9f82a145eea315b to your computer and use it in GitHub Desktop.
Save foarsitter/e1c8bc4351a48e07f9f82a145eea315b to your computer and use it in GitHub Desktop.
Implement Producer/Consumer pattern with asyncio.Queue
"""
DRY approach of https://gist.github.com/showa-yojyo/4ed200d4c41f496a45a7af2612912df3
"""
import asyncio
import random
from collections.abc import Callable
async def producer(n):
for x in range(1, n + 1):
# produce an item
print(f"producing {x}/{n}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# put the item in the queue
yield x
async def consumer(item):
# process the item
print(f"consuming {item}...")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
class Worker:
def __init__(self, consume_method: Callable, producer_method: Callable):
self.queue: asyncio.Queue = asyncio.Queue()
self.producer_method: Callable = producer_method
self.consume_method: Callable = consume_method
self.consumers = []
def create_consumers(self, workers: int = 3):
for _ in range(workers):
consumer = asyncio.create_task(self.consume())
self.consumers.append(consumer)
async def produce(self, n):
async for x in self.producer_method(n):
await self.queue.put(x)
async def consume(self):
while True:
# wait for an item from the producer
item = await self.queue.get()
await self.consume_method(item)
self.queue.task_done()
async def run(self, n):
self.create_consumers(10)
await self.produce(n)
# wait until the consumer has processed all items
await self.queue.join()
# the consumers are still awaiting for an item, cancel them
for consumer in self.consumers:
consumer.cancel()
# wait until all worker tasks are cancelled
await asyncio.gather(*self.consumers, return_exceptions=True)
def asyncio_run(self):
asyncio.run(self.run(10))
if __name__ == "__main__":
worker = Worker(consumer, producer)
worker.asyncio_run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment