Skip to content

Instantly share code, notes, and snippets.

@Kurry
Created October 29, 2024 02:29
Show Gist options
  • Save Kurry/4c30e9d82ca78716d0e090eb1c79cede to your computer and use it in GitHub Desktop.
Save Kurry/4c30e9d82ca78716d0e090eb1c79cede to your computer and use it in GitHub Desktop.
Asynchronous Rate-Limited Queue System with Producer-Consumer Pattern in Python
import asyncio
import enum
import logging
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncGenerator, Callable, List, Any, Optional
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class TaskSignal(enum.Enum):
"""Enumeration for task control signals"""
STOP = "STOP"
@dataclass
class QueueMessage:
"""Message structure for queue communication"""
payload: Any
signal: Optional[TaskSignal] = None
class RateLimitedQueue(asyncio.Queue):
"""
A rate-limited queue for processing tasks.
Args:
max_rate: Maximum number of tasks to process per second.
name: Queue identifier for logging purposes.
"""
def __init__(self, max_rate: float, name: str = "default"):
super().__init__()
self.max_rate = max_rate
self.tokens = max_rate
self.last_token_issued = asyncio.get_event_loop().time()
self.name = name
self.active = True
async def get(self) -> QueueMessage:
"""Get an item from the queue, respecting the rate limit."""
if self.active:
await self.acquire_token()
return await super().get()
raise RuntimeError("Queue is no longer active")
async def put(self, item: Any) -> None:
"""Put an item into the queue."""
if self.active:
message = QueueMessage(payload=item) if not isinstance(item, QueueMessage) else item
await super().put(message)
else:
raise RuntimeError("Queue is no longer active")
async def acquire_token(self) -> None:
"""Acquire a token to process a task, respecting the rate limit."""
now = asyncio.get_event_loop().time()
elapsed = now - self.last_token_issued
self.tokens += elapsed * self.max_rate
self.last_token_issued = now
self.tokens = min(self.tokens, self.max_rate)
if self.tokens < 1:
await asyncio.sleep(1 / self.max_rate)
self.tokens -= 1
def close(self) -> None:
"""Mark the queue as inactive."""
self.active = False
@asynccontextmanager
async def managed_queue(name: str, max_rate: float) -> AsyncGenerator[RateLimitedQueue, None]:
"""
Context manager for queue lifecycle management.
Args:
name: Queue identifier
max_rate: Maximum processing rate
"""
logger.info(f"Initializing queue: {name}")
queue = RateLimitedQueue(max_rate=max_rate, name=name)
try:
yield queue
finally:
queue.close()
logger.info(f"Shutting down queue: {name}")
class Producer:
"""
Task producer that generates work items.
Args:
queue: The queue to produce items to
item_range: Range of items to produce
producer_id: Unique identifier for this producer
"""
def __init__(self, queue: RateLimitedQueue, item_range: range, producer_id: int):
self.queue = queue
self.item_range = item_range
self.producer_id = producer_id
self.task: Optional[asyncio.Task] = None
async def produce(self) -> None:
"""Produce items to the queue."""
try:
for item in self.item_range:
if not self.queue.active:
break
logger.info(f"Producer {self.producer_id} producing item: {item}")
await self.queue.put(item)
await asyncio.sleep(0.05) # Simulate work
logger.info(f"Producer {self.producer_id} completed")
except Exception as e:
logger.error(f"Producer {self.producer_id} error: {e}")
raise
def start(self) -> asyncio.Task:
"""Start the producer as an independent task."""
self.task = asyncio.create_task(self.produce())
return self.task
class Consumer:
"""
Task consumer that processes work items.
Args:
queue: The queue to consume items from
plugins: List of plugin functions to process items
consumer_id: Unique identifier for this consumer
"""
def __init__(self, queue: RateLimitedQueue, plugins: List[Callable], consumer_id: int):
self.queue = queue
self.plugins = plugins
self.consumer_id = consumer_id
self.task: Optional[asyncio.Task] = None
async def consume(self) -> None:
"""Consume and process items from the queue."""
try:
while self.queue.active:
try:
message = await self.queue.get()
if isinstance(message, QueueMessage):
if message.signal == TaskSignal.STOP:
logger.info(f"Consumer {self.consumer_id} received stop signal")
self.queue.task_done() # Important: mark the stop signal as done
break
item = message.payload
logger.info(f"Consumer {self.consumer_id} processing item: {item}")
for plugin in self.plugins:
await plugin(item)
self.queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Consumer {self.consumer_id} processing error: {e}")
if self.queue.active: # Only mark as done if queue is still active
self.queue.task_done()
logger.info(f"Consumer {self.consumer_id} shutting down")
except Exception as e:
logger.error(f"Consumer {self.consumer_id} error: {e}")
raise
def start(self) -> asyncio.Task:
"""Start the consumer as an independent task."""
self.task = asyncio.create_task(self.consume())
return self.task
async def example_plugin(item: Any) -> None:
"""Example plugin function to process items."""
logger.info(f"Processing item {item} with example plugin")
await asyncio.sleep(0.1) # Simulate processing work
async def shutdown_consumers(queue: RateLimitedQueue, consumers: List[Consumer]) -> None:
"""Gracefully shutdown consumers."""
logger.info("Beginning consumer shutdown sequence")
# Send stop signals
for _ in range(len(consumers)):
await queue.put(QueueMessage(payload=None, signal=TaskSignal.STOP))
# Wait for all consumers to process their stop signals
await queue.join()
# Cancel any remaining consumer tasks
for consumer in consumers:
if consumer.task and not consumer.task.done():
consumer.task.cancel()
logger.info("Consumer shutdown sequence completed")
async def main() -> None:
"""Main function to orchestrate the producer-consumer system."""
async with managed_queue("main_queue", max_rate=2) as queue:
try:
# Create multiple producers
producers = [
Producer(queue, range(i * 5, (i + 1) * 5), i)
for i in range(3)
]
# Create multiple consumers
consumers = [
Consumer(queue, [example_plugin], i)
for i in range(2)
]
# Start all producers and consumers as independent tasks
producer_tasks = [producer.start() for producer in producers]
consumer_tasks = [consumer.start() for consumer in consumers]
# Wait for all producers to complete
await asyncio.gather(*producer_tasks)
logger.info("All producers have completed")
# Shutdown consumers gracefully
await shutdown_consumers(queue, consumers)
# Wait for all consumers to complete their shutdown
await asyncio.gather(*consumer_tasks)
logger.info("All consumers have completed")
except Exception as e:
logger.error(f"Error in main: {e}")
raise
finally:
logger.info("Cleanup completed")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment