Last active
November 6, 2021 01:47
-
-
Save miohtama/6b455958cc88a873e8b659346c71a23a to your computer and use it in GitHub Desktop.
Throttling background task queue using Dramatiq - wait until more workers are freed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from typing import Dict, Optional | |
from dramatiq import Message, Broker | |
from dramatiq.results import ResultMissing | |
class TaskQueue: | |
"""A task queue using Dramatiq background task framework. | |
https://dramatiq.io/ | |
The purpose this queue is to feed tasks to the Dramatiq workers | |
as fast as they can handle. The queue should be set to the same size | |
as the number of total worker threads available. | |
This is useful for running task sets that are CPU bounded: | |
you know the number of CPUs and the load capacity of a server and | |
you do not want to exceed this capacity. | |
When the queue is fully allocated, | |
the queue will block until existing tasks are complete and there is more free space. | |
An incomplete usage example: | |
.. code-block:: python | |
# Setup Dramatiq | |
import dramatiq | |
from dramatiq.brokers.redis import RedisBroker | |
from dramatiq.results import Results | |
from dramatiq.results.backends import RedisBackend | |
redis_result_backend = RedisBackend(client=_client) | |
redis_broker = RedisBroker(client=_client) | |
redis_broker.add_middleware(Results(backend=redis_result_backend)) | |
dramatiq.set_broker(redis_broker) | |
@dramatiq.actor | |
def long_process(id: int) -> int: | |
'''Example blocking background worker task''' | |
r = random.randint(5) | |
print(f"Background process {id} sleeping {r} seconds") | |
time.sleep(r) | |
return r | |
# Assume we have started dramatiq worker controller with 2 processes, 5 threads | |
queue = TaskQueue(redis_broker, 10) | |
# We are going to generate more messages than we have workers, | |
# so after 10 messages the loop starts to slow down, | |
# while the queue is blocking until more background workers | |
# are cleared up | |
total_slept = 0 | |
for i in range(100): | |
message = long_process.message(i) | |
results = queue.add_and_wait_until_space(message) | |
for slept_time in results: | |
total_slepts += slept_time | |
print("All background workers slept {total_slept} seconds") | |
""" | |
def __init__(self, broker: Broker, size: int, poll_delay=0.5): | |
self.broker = broker | |
self.tasks: Dict[int, Optional[Message]] = {} | |
for i in range(size): | |
self.tasks[i] = None | |
self.poll_delay = poll_delay | |
def add_and_wait_until_space(self, m: Message) -> list: | |
"""Add a new task. | |
Sends a new task to the background workers as soon as any background worker is finished | |
with a previous task. If all background workers are busy then block. | |
:return: List of results of previous tasks that have completed | |
""" | |
ready = [] | |
while (free_slot := self.peak_free_slot()) is None: | |
time.sleep(self.poll_delay) | |
ready += self.scoop_finished() | |
# Assing task to the queue | |
self.tasks[free_slot] = m | |
# Send task to the worker pool | |
self.broker.enqueue(m) | |
return ready | |
def peak_free_slot(self) -> Optional[int]: | |
"""Get the first free slot in the queue (if any)""" | |
for slot, message in self.tasks.items(): | |
if message is None: | |
return slot | |
return None | |
def scoop_finished(self) -> list: | |
"""Find finished tasks and make space in the queue. | |
:return: Results of completed tasks | |
""" | |
ready = {} | |
message: Message | |
for slot, message in self.tasks.items(): | |
try: | |
result = message.get_result() | |
except ResultMissing: | |
continue | |
ready[slot] = result | |
# Clean finished | |
for ready_slot in ready.keys(): | |
self.tasks[ready_slot] = None | |
return list(ready.values()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment