import time from typing import Dict, Optional from dramatiq import Message, Broker from dramatiq.results import ResultMissing class TaskQueue: """A task queue using Dramatiq background task framework. https://dramatiq.io/ The purpose this queue is to feed tasks to the Dramatiq workers as fast as they can handle. The queue should be set to the same size as the number of total worker threads available. This is useful for running task sets that are CPU bounded: you know the number of CPUs and the load capacity of a server and you do not want to exceed this capacity. When the queue is fully allocated, the queue will block until existing tasks are complete and there is more free space. An incomplete usage example: .. code-block:: python # Setup Dramatiq import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq.results import Results from dramatiq.results.backends import RedisBackend redis_result_backend = RedisBackend(client=_client) redis_broker = RedisBroker(client=_client) redis_broker.add_middleware(Results(backend=redis_result_backend)) dramatiq.set_broker(redis_broker) @dramatiq.actor def long_process(id: int) -> int: '''Example blocking background worker task''' r = random.randint(5) print(f"Background process {id} sleeping {r} seconds") time.sleep(r) return r # Assume we have started dramatiq worker controller with 2 processes, 5 threads queue = TaskQueue(redis_broker, 10) # We are going to generate more messages than we have workers, # so after 10 messages the loop starts to slow down, # while the queue is blocking until more background workers # are cleared up total_slept = 0 for i in range(100): message = long_process.message(i) results = queue.add_and_wait_until_space(message) for slept_time in results: total_slepts += slept_time print("All background workers slept {total_slept} seconds") """ def __init__(self, broker: Broker, size: int, poll_delay=0.5): self.broker = broker self.tasks: Dict[int, Optional[Message]] = {} for i in range(size): self.tasks[i] = None self.poll_delay = poll_delay def add_and_wait_until_space(self, m: Message) -> list: """Add a new task. Sends a new task to the background workers as soon as any background worker is finished with a previous task. If all background workers are busy then block. :return: List of results of previous tasks that have completed """ ready = [] while (free_slot := self.peak_free_slot()) is None: time.sleep(self.poll_delay) ready += self.scoop_finished() # Assing task to the queue self.tasks[free_slot] = m # Send task to the worker pool self.broker.enqueue(m) return ready def peak_free_slot(self) -> Optional[int]: """Get the first free slot in the queue (if any)""" for slot, message in self.tasks.items(): if message is None: return slot return None def scoop_finished(self) -> list: """Find finished tasks and make space in the queue. :return: Results of completed tasks """ ready = {} message: Message for slot, message in self.tasks.items(): try: result = message.get_result() except ResultMissing: continue ready[slot] = result # Clean finished for ready_slot in ready.keys(): self.tasks[ready_slot] = None return list(ready.values())