Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tomdottom/1072f7b23c22f872faf59b1cacaf5bcd to your computer and use it in GitHub Desktop.
Save tomdottom/1072f7b23c22f872faf59b1cacaf5bcd to your computer and use it in GitHub Desktop.
Distributed rate limiting of multiple tasks queues (RabbitMQ) and the Kombu package

Basic task processing with distributed rate limiting

Proof of concept of distributed rate limiting multiple workers processing speed.

Rate limiting follows a leaky bucket algorithim. The bucket is implemented using a speical token-bucket queue. Max size of the bucket is enforced by using the max length of a token queue. The bucket is refilled by a single worker, which also is responsible for the refill rate.

Workers must get a token before fetching and processing any tasks. If no tasks are available then token is returned.

Setup

  1. install docker
  2. install dependencies pipenv sync & pipenv shell

Running

# Setup
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3

# Terminal 1
python worker.py

# Terminal 2
python tokens.py

# Terminal 2
python scheduler.py
from kombu.log import get_logger
from kombu.utils.debug import setup_logging
from utils import App
logger = get_logger(__name__)
app = App(name="test", broker="amqp://guest:guest@localhost:5672//")
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
kombu = "*"
[requires]
python_version = "3.7"
from kombu import Exchange, Queue
task_exchange = Exchange("tasks", type="direct")
task_queues = {
"token-bucket": Queue(
"token-bucket", task_exchange, routing_key="token-bucket", max_length=10
),
"tasks": Queue("tasks", task_exchange, routing_key="tasks"),
}
from kombu.log import get_logger
from kombu.utils.debug import setup_logging
from tasks import hello
logger = get_logger(__name__)
if __name__ == "__main__":
setup_logging(loglevel="INFO", loggers=[""])
for i in range(5):
message = f"world - {i}"
logger.info(f"Sending message: {message}")
hello(message)
from kombu.log import get_logger
from kombu.utils.debug import setup_logging
from app import app
logger = get_logger(__name__)
@app.token
def token():
return "OK"
@app.task
def hello(message):
return f"Hello {message}"
TOKEN_RATE = 1 # per second
from kombu.log import get_logger
from kombu.utils.debug import setup_logging
from utils import timestep
from tasks import token
logger = get_logger(__name__)
if __name__ == "__main__":
setup_logging(loglevel="INFO", loggers=[""])
for dt in timestep(TOKEN_RATE):
message = "Token created at {0}".format(dt)
token()
logger.info(message)
from typing import Tuple, Dict, Union
import dataclasses
import datetime
from time import sleep
from kombu import Connection
from kombu.log import get_logger
from kombu.mixins import ConsumerProducerMixin
from kombu.pools import producers
import kombu
from queues import task_queues
logger = get_logger(__name__)
def timestep(timeout=1):
while True:
yield datetime.datetime.now()
sleep(timeout)
class TokenWorker(ConsumerProducerMixin):
def __init__(self, connection, token_queue, message_queues, task_callback):
self.connection = connection
self.token_queue = token_queue
self.message_queues = message_queues
self.task_callback = task_callback
self.accept = ["pickle", "json"]
def get_consumers(self, Consumer, channel):
self.token_queue = self.token_queue(channel)
self.message_queues = [m(channel) for m in self.message_queues]
return [
Consumer(
queues=[self.token_queue(channel)],
on_message=self.handle_token,
accept=self.accept,
# Only take one token at a time
prefetch_count=1,
)
]
def get_first_available_message(self):
return next(
filter(None, (m.get(accept=self.accept) for m in self.message_queues)), None
)
def handle_token(self, token):
message = self.get_first_available_message()
if message is None:
# back off otherwise we'll simply be hammering:
# a) the message queue server
# b) the local cpu
sleep(1)
# requeue the token for
token.requeue()
res = None
else:
token.ack()
res = self.handle_message(message.decode(), message)
return res
def handle_message(self, task, message):
logger.debug(task)
try:
res = self.task_callback(name=task.name, args=task.args, kwargs=task.kwargs)
logger.info(res)
message.ack()
except:
message.requeue()
class App:
def __init__(self, name, broker):
self.name = name
self.broker = broker
self._tasks = dict()
self._tokens = dict()
def run_worker(self):
with Connection(self.broker) as conn:
try:
worker = TokenWorker(
connection=conn,
token_queue=task_queues["token-bucket"],
message_queues=[task_queues["tasks"]],
task_callback=self.run_task,
)
worker.run()
except KeyboardInterrupt:
print("bye bye")
def task(self, func):
"""Decorator to create a task callable out of any function"""
name = self.gen_task_name(func)
self._tasks[name] = func
def _send_as_task(*args, **kwargs):
self.send_task(name, args, kwargs)
return _send_as_task
def gen_task_name(self, func):
return f"{self.name}.{func.__name__}"
def run_task(self, name, args, kwargs):
func = self._tasks[name]
return func(*args, **kwargs)
def send_task(self, name, args, kwargs):
# payload = {"name": name, "args": args, "kwargs": kwargs}
payload = Task(name=name, args=args, kwargs=kwargs)
with Connection(self.broker) as conn:
with producers[conn].acquire(block=True) as producer:
producer.publish(
payload,
serializer="pickle",
compression="bzip2",
exchange=task_queues["tasks"].exchange,
declare=[task_queues["tasks"].exchange, task_queues["tasks"]],
routing_key=task_queues["tasks"].routing_key,
)
def token(self, func):
"""Decorator to create a token callable out of any function"""
name = self.gen_task_name(func)
self._tokens[name] = func
def _send_as_token(*args, **kwargs):
self.send_token()
return _send_as_token
def send_token(self):
payload = {}
with Connection(self.broker) as conn:
with producers[conn].acquire(block=True) as producer:
producer.publish(
payload,
serializer="pickle",
compression="bzip2",
exchange=task_queues["token-bucket"].exchange,
declare=[
task_queues["token-bucket"].exchange,
task_queues["token-bucket"],
],
routing_key=task_queues["token-bucket"].routing_key,
)
@dataclasses.dataclass
class Task:
name: str
args: Tuple
kwargs: Dict
from kombu.log import get_logger
from kombu.utils.debug import setup_logging
from app import app
# needed to ensuer tasks is imported and callable
from tasks import hello
logger = get_logger(__name__)
if __name__ == "__main__":
setup_logging(loglevel="INFO", loggers=[""])
app.run_worker()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment