|
from typing import Tuple, Dict, Union |
|
import dataclasses |
|
import datetime |
|
from time import sleep |
|
|
|
|
|
from kombu import Connection |
|
from kombu.log import get_logger |
|
from kombu.mixins import ConsumerProducerMixin |
|
from kombu.pools import producers |
|
import kombu |
|
|
|
from queues import task_queues |
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
def timestep(timeout=1): |
|
while True: |
|
yield datetime.datetime.now() |
|
sleep(timeout) |
|
|
|
|
|
class TokenWorker(ConsumerProducerMixin): |
|
def __init__(self, connection, token_queue, message_queues, task_callback): |
|
self.connection = connection |
|
self.token_queue = token_queue |
|
self.message_queues = message_queues |
|
self.task_callback = task_callback |
|
self.accept = ["pickle", "json"] |
|
|
|
def get_consumers(self, Consumer, channel): |
|
self.token_queue = self.token_queue(channel) |
|
self.message_queues = [m(channel) for m in self.message_queues] |
|
return [ |
|
Consumer( |
|
queues=[self.token_queue(channel)], |
|
on_message=self.handle_token, |
|
accept=self.accept, |
|
# Only take one token at a time |
|
prefetch_count=1, |
|
) |
|
] |
|
|
|
def get_first_available_message(self): |
|
return next( |
|
filter(None, (m.get(accept=self.accept) for m in self.message_queues)), None |
|
) |
|
|
|
def handle_token(self, token): |
|
message = self.get_first_available_message() |
|
if message is None: |
|
# back off otherwise we'll simply be hammering: |
|
# a) the message queue server |
|
# b) the local cpu |
|
sleep(1) |
|
# requeue the token for |
|
token.requeue() |
|
res = None |
|
else: |
|
token.ack() |
|
res = self.handle_message(message.decode(), message) |
|
|
|
return res |
|
|
|
def handle_message(self, task, message): |
|
logger.debug(task) |
|
try: |
|
res = self.task_callback(name=task.name, args=task.args, kwargs=task.kwargs) |
|
logger.info(res) |
|
message.ack() |
|
except: |
|
message.requeue() |
|
|
|
|
|
class App: |
|
def __init__(self, name, broker): |
|
self.name = name |
|
self.broker = broker |
|
self._tasks = dict() |
|
self._tokens = dict() |
|
|
|
def run_worker(self): |
|
with Connection(self.broker) as conn: |
|
try: |
|
worker = TokenWorker( |
|
connection=conn, |
|
token_queue=task_queues["token-bucket"], |
|
message_queues=[task_queues["tasks"]], |
|
task_callback=self.run_task, |
|
) |
|
worker.run() |
|
except KeyboardInterrupt: |
|
print("bye bye") |
|
|
|
def task(self, func): |
|
"""Decorator to create a task callable out of any function""" |
|
|
|
name = self.gen_task_name(func) |
|
self._tasks[name] = func |
|
|
|
def _send_as_task(*args, **kwargs): |
|
self.send_task(name, args, kwargs) |
|
|
|
return _send_as_task |
|
|
|
def gen_task_name(self, func): |
|
return f"{self.name}.{func.__name__}" |
|
|
|
def run_task(self, name, args, kwargs): |
|
func = self._tasks[name] |
|
return func(*args, **kwargs) |
|
|
|
def send_task(self, name, args, kwargs): |
|
# payload = {"name": name, "args": args, "kwargs": kwargs} |
|
payload = Task(name=name, args=args, kwargs=kwargs) |
|
|
|
with Connection(self.broker) as conn: |
|
with producers[conn].acquire(block=True) as producer: |
|
producer.publish( |
|
payload, |
|
serializer="pickle", |
|
compression="bzip2", |
|
exchange=task_queues["tasks"].exchange, |
|
declare=[task_queues["tasks"].exchange, task_queues["tasks"]], |
|
routing_key=task_queues["tasks"].routing_key, |
|
) |
|
|
|
def token(self, func): |
|
"""Decorator to create a token callable out of any function""" |
|
|
|
name = self.gen_task_name(func) |
|
self._tokens[name] = func |
|
|
|
def _send_as_token(*args, **kwargs): |
|
self.send_token() |
|
|
|
return _send_as_token |
|
|
|
def send_token(self): |
|
payload = {} |
|
with Connection(self.broker) as conn: |
|
with producers[conn].acquire(block=True) as producer: |
|
producer.publish( |
|
payload, |
|
serializer="pickle", |
|
compression="bzip2", |
|
exchange=task_queues["token-bucket"].exchange, |
|
declare=[ |
|
task_queues["token-bucket"].exchange, |
|
task_queues["token-bucket"], |
|
], |
|
routing_key=task_queues["token-bucket"].routing_key, |
|
) |
|
|
|
|
|
@dataclasses.dataclass |
|
class Task: |
|
name: str |
|
args: Tuple |
|
kwargs: Dict |