Created
August 1, 2019 15:40
-
-
Save tomdottom/63f14433a739c646e3cb39dc9cd7525e to your computer and use it in GitHub Desktop.
Distributed rate limited task queues.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""demo.py | |
Distributed rate limited task queues. | |
Uses a leaky-bucket token-queue to ensure multiple workers don't exceed global rate limit. | |
# Setup & Dependencies | |
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3 | |
pip install kombu | |
# Term 1 & 2 | |
python demo.py worker | |
# Term 3 | |
python demo.py tokens | |
# Term 4 | |
python demo.py scheduler | |
""" | |
from __future__ import absolute_import, unicode_literals | |
from functools import partial | |
from time import sleep | |
import datetime | |
import itertools | |
import sys | |
from kombu import Connection, Exchange, Queue | |
BROKER_URL = "amqp://localhost:5672//" | |
TASK_EXCHANGE = Exchange("tasks", type="direct") | |
TASK_QUEUES = { | |
# Leaky bucket. Can only be filled up to 10 tokens | |
"token-bucket": Queue( | |
"token-bucket", TASK_EXCHANGE, routing_key="token-bucket", max_length=10 | |
), | |
"tasks": Queue("tasks", TASK_EXCHANGE, routing_key="tasks"), | |
} | |
def timestep(timeout=1): | |
while True: | |
yield datetime.datetime.now() | |
sleep(timeout) | |
def publish(queue, message): | |
message = queue.exchange.Message(body=message, content_encoding="utf-8") | |
return queue.exchange.publish(message=message, routing_key=queue.routing_key) | |
def init(): | |
print("Initialising queues") | |
with Connection(BROKER_URL) as conn: | |
channel = conn.channel() | |
task_queue = TASK_QUEUES["tasks"](channel) | |
token_queue = TASK_QUEUES["token-bucket"](channel) | |
task_queue.declare() | |
token_queue.declare() | |
def worker(): | |
print("Starting worker") | |
with Connection(BROKER_URL) as conn: | |
channel = conn.channel() | |
task_queue = TASK_QUEUES["tasks"](channel) | |
token_queue = TASK_QUEUES["token-bucket"](channel) | |
for dt in timestep(0.1): | |
token = token_queue.get() | |
if not token: | |
continue | |
task_message = task_queue.get() | |
if not task_message: | |
token.requeue() | |
continue | |
token.ack() | |
print("Received: {0}".format(task_message.payload)) | |
task_message.ack() | |
def tokens(): | |
print("Starting tokens generator") | |
with Connection(BROKER_URL) as conn: | |
channel = conn.channel() | |
token_queue = TASK_QUEUES["token-bucket"](channel) | |
token_put = partial(publish, token_queue) | |
for dt in timestep(1): | |
message = "Token created at {0}".format(dt) | |
foo = token_put(message) | |
print(message) | |
def scheduler(num=20): | |
print("Starting scheduler") | |
print(f"Sending {num} messages") | |
with Connection(BROKER_URL) as conn: | |
channel = conn.channel() | |
task_queue = TASK_QUEUES["tasks"](channel) | |
task_put = partial(publish, task_queue) | |
for dt in itertools.islice(timestep(0.05), 0, num): | |
message = "helloworld, sent at {0}".format(dt) | |
task_put(message) | |
print("Sent: {0}".format(message)) | |
if __name__ == "__main__": | |
try: | |
component_name = sys.argv[1] | |
component = locals()[component_name] | |
init() | |
component() | |
except (IndexError, KeyError): | |
print(r"Start a worker, tokens generator, and scheduler in separate terminals") | |
print(r"python demo.py {worker,tokens,scheduler}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment