Created
October 29, 2024 02:29
-
-
Save Kurry/4c30e9d82ca78716d0e090eb1c79cede to your computer and use it in GitHub Desktop.
Asynchronous Rate-Limited Queue System with Producer-Consumer Pattern in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import enum | |
import logging | |
from contextlib import asynccontextmanager | |
from dataclasses import dataclass | |
from typing import AsyncGenerator, Callable, List, Any, Optional | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class TaskSignal(enum.Enum): | |
"""Enumeration for task control signals""" | |
STOP = "STOP" | |
@dataclass | |
class QueueMessage: | |
"""Message structure for queue communication""" | |
payload: Any | |
signal: Optional[TaskSignal] = None | |
class RateLimitedQueue(asyncio.Queue): | |
""" | |
A rate-limited queue for processing tasks. | |
Args: | |
max_rate: Maximum number of tasks to process per second. | |
name: Queue identifier for logging purposes. | |
""" | |
def __init__(self, max_rate: float, name: str = "default"): | |
super().__init__() | |
self.max_rate = max_rate | |
self.tokens = max_rate | |
self.last_token_issued = asyncio.get_event_loop().time() | |
self.name = name | |
self.active = True | |
async def get(self) -> QueueMessage: | |
"""Get an item from the queue, respecting the rate limit.""" | |
if self.active: | |
await self.acquire_token() | |
return await super().get() | |
raise RuntimeError("Queue is no longer active") | |
async def put(self, item: Any) -> None: | |
"""Put an item into the queue.""" | |
if self.active: | |
message = QueueMessage(payload=item) if not isinstance(item, QueueMessage) else item | |
await super().put(message) | |
else: | |
raise RuntimeError("Queue is no longer active") | |
async def acquire_token(self) -> None: | |
"""Acquire a token to process a task, respecting the rate limit.""" | |
now = asyncio.get_event_loop().time() | |
elapsed = now - self.last_token_issued | |
self.tokens += elapsed * self.max_rate | |
self.last_token_issued = now | |
self.tokens = min(self.tokens, self.max_rate) | |
if self.tokens < 1: | |
await asyncio.sleep(1 / self.max_rate) | |
self.tokens -= 1 | |
def close(self) -> None: | |
"""Mark the queue as inactive.""" | |
self.active = False | |
@asynccontextmanager | |
async def managed_queue(name: str, max_rate: float) -> AsyncGenerator[RateLimitedQueue, None]: | |
""" | |
Context manager for queue lifecycle management. | |
Args: | |
name: Queue identifier | |
max_rate: Maximum processing rate | |
""" | |
logger.info(f"Initializing queue: {name}") | |
queue = RateLimitedQueue(max_rate=max_rate, name=name) | |
try: | |
yield queue | |
finally: | |
queue.close() | |
logger.info(f"Shutting down queue: {name}") | |
class Producer: | |
""" | |
Task producer that generates work items. | |
Args: | |
queue: The queue to produce items to | |
item_range: Range of items to produce | |
producer_id: Unique identifier for this producer | |
""" | |
def __init__(self, queue: RateLimitedQueue, item_range: range, producer_id: int): | |
self.queue = queue | |
self.item_range = item_range | |
self.producer_id = producer_id | |
self.task: Optional[asyncio.Task] = None | |
async def produce(self) -> None: | |
"""Produce items to the queue.""" | |
try: | |
for item in self.item_range: | |
if not self.queue.active: | |
break | |
logger.info(f"Producer {self.producer_id} producing item: {item}") | |
await self.queue.put(item) | |
await asyncio.sleep(0.05) # Simulate work | |
logger.info(f"Producer {self.producer_id} completed") | |
except Exception as e: | |
logger.error(f"Producer {self.producer_id} error: {e}") | |
raise | |
def start(self) -> asyncio.Task: | |
"""Start the producer as an independent task.""" | |
self.task = asyncio.create_task(self.produce()) | |
return self.task | |
class Consumer: | |
""" | |
Task consumer that processes work items. | |
Args: | |
queue: The queue to consume items from | |
plugins: List of plugin functions to process items | |
consumer_id: Unique identifier for this consumer | |
""" | |
def __init__(self, queue: RateLimitedQueue, plugins: List[Callable], consumer_id: int): | |
self.queue = queue | |
self.plugins = plugins | |
self.consumer_id = consumer_id | |
self.task: Optional[asyncio.Task] = None | |
async def consume(self) -> None: | |
"""Consume and process items from the queue.""" | |
try: | |
while self.queue.active: | |
try: | |
message = await self.queue.get() | |
if isinstance(message, QueueMessage): | |
if message.signal == TaskSignal.STOP: | |
logger.info(f"Consumer {self.consumer_id} received stop signal") | |
self.queue.task_done() # Important: mark the stop signal as done | |
break | |
item = message.payload | |
logger.info(f"Consumer {self.consumer_id} processing item: {item}") | |
for plugin in self.plugins: | |
await plugin(item) | |
self.queue.task_done() | |
except asyncio.CancelledError: | |
break | |
except Exception as e: | |
logger.error(f"Consumer {self.consumer_id} processing error: {e}") | |
if self.queue.active: # Only mark as done if queue is still active | |
self.queue.task_done() | |
logger.info(f"Consumer {self.consumer_id} shutting down") | |
except Exception as e: | |
logger.error(f"Consumer {self.consumer_id} error: {e}") | |
raise | |
def start(self) -> asyncio.Task: | |
"""Start the consumer as an independent task.""" | |
self.task = asyncio.create_task(self.consume()) | |
return self.task | |
async def example_plugin(item: Any) -> None: | |
"""Example plugin function to process items.""" | |
logger.info(f"Processing item {item} with example plugin") | |
await asyncio.sleep(0.1) # Simulate processing work | |
async def shutdown_consumers(queue: RateLimitedQueue, consumers: List[Consumer]) -> None: | |
"""Gracefully shutdown consumers.""" | |
logger.info("Beginning consumer shutdown sequence") | |
# Send stop signals | |
for _ in range(len(consumers)): | |
await queue.put(QueueMessage(payload=None, signal=TaskSignal.STOP)) | |
# Wait for all consumers to process their stop signals | |
await queue.join() | |
# Cancel any remaining consumer tasks | |
for consumer in consumers: | |
if consumer.task and not consumer.task.done(): | |
consumer.task.cancel() | |
logger.info("Consumer shutdown sequence completed") | |
async def main() -> None: | |
"""Main function to orchestrate the producer-consumer system.""" | |
async with managed_queue("main_queue", max_rate=2) as queue: | |
try: | |
# Create multiple producers | |
producers = [ | |
Producer(queue, range(i * 5, (i + 1) * 5), i) | |
for i in range(3) | |
] | |
# Create multiple consumers | |
consumers = [ | |
Consumer(queue, [example_plugin], i) | |
for i in range(2) | |
] | |
# Start all producers and consumers as independent tasks | |
producer_tasks = [producer.start() for producer in producers] | |
consumer_tasks = [consumer.start() for consumer in consumers] | |
# Wait for all producers to complete | |
await asyncio.gather(*producer_tasks) | |
logger.info("All producers have completed") | |
# Shutdown consumers gracefully | |
await shutdown_consumers(queue, consumers) | |
# Wait for all consumers to complete their shutdown | |
await asyncio.gather(*consumer_tasks) | |
logger.info("All consumers have completed") | |
except Exception as e: | |
logger.error(f"Error in main: {e}") | |
raise | |
finally: | |
logger.info("Cleanup completed") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment