Created
March 15, 2024 08:17
-
-
Save onjin/fcfe93df71803a6d4741b44ace0b6252 to your computer and use it in GitHub Desktop.
example creator/producers/consumers async patter - reusable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import httpx | |
from typing import Callable, List, Any | |
from rich.console import Console | |
console = Console() | |
async def producer( | |
callback: Callable[[int, Any], Any], | |
input_queue: asyncio.Queue[str | None], | |
output_queue: asyncio.Queue[Any], | |
producer_id: int, | |
) -> None: | |
while True: | |
line = await input_queue.get() | |
if line is None: | |
for _ in range(num_consumers): | |
await output_queue.put(None) | |
break | |
await output_queue.put(await callback(producer_id, line)) | |
input_queue.task_done() | |
async def consumer( | |
callback: Callable[[int, Any], Any], | |
output_queue: asyncio.Queue[Any], | |
consumer_id: int, | |
) -> None: | |
while True: | |
data = await output_queue.get() | |
if data is None: | |
output_queue.task_done() | |
break | |
await callback(consumer_id, data) | |
output_queue.task_done() | |
async def run(num_producers: int, num_consumers: int, endpoints_file: str) -> None: | |
input_queue: asyncio.Queue[str | None] = asyncio.Queue() | |
output_queue: asyncio.Queue[Any] = asyncio.Queue() | |
async def input_provider( | |
input_queue: asyncio.Queue[str | None], endpoints_file: str, num_producers: int | |
) -> None: | |
with open(endpoints_file, "r") as file: | |
for line in file: | |
console.print(f"[green]Providing: {line.strip()}") | |
await input_queue.put(line.strip()) | |
for _ in range(num_producers): | |
await input_queue.put(None) | |
async def producer_callback(producer_id: int, data: str): | |
console.print(f"[blue]Producer: {producer_id} got line {data}") | |
async with httpx.AsyncClient() as client: | |
result = await client.get(data) | |
return result.headers["date"] | |
async def consumer_callback(consumer_id: int, data: str): | |
console.print(f"[white]Consumer: {consumer_id} got line {data}") | |
tasks: List[asyncio.Task[Any]] = [ | |
asyncio.create_task(input_provider(input_queue, endpoints_file, num_producers)) | |
] | |
tasks.extend( | |
[ | |
asyncio.create_task( | |
producer(producer_callback, input_queue, output_queue, pid) | |
) | |
for pid in range(num_producers) | |
] | |
) | |
tasks.extend( | |
[ | |
asyncio.create_task(consumer(consumer_callback, output_queue, cid)) | |
for cid in range(num_consumers) | |
] | |
) | |
await asyncio.gather(*tasks) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-p", "--number_of_producers", type=int, default=5) | |
parser.add_argument("-c", "--number_of_consumers", type=int, default=10) | |
parser.add_argument("-f", "--endpoints_file", type=str, required=True) | |
args = parser.parse_args() | |
num_producers: int = args.number_of_producers | |
num_consumers: int = args.number_of_consumers | |
endpoints_file: str = args.endpoints_file | |
asyncio.run(run(num_producers, num_consumers, endpoints_file)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment