| 
          from typing import Tuple, Dict, Union | 
        
        
           | 
          import dataclasses | 
        
        
           | 
          import datetime | 
        
        
           | 
          from time import sleep | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          from kombu import Connection | 
        
        
           | 
          from kombu.log import get_logger | 
        
        
           | 
          from kombu.mixins import ConsumerProducerMixin | 
        
        
           | 
          from kombu.pools import producers | 
        
        
           | 
          import kombu | 
        
        
           | 
          
 | 
        
        
           | 
          from queues import task_queues | 
        
        
           | 
          
 | 
        
        
           | 
          logger = get_logger(__name__) | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          def timestep(timeout=1): | 
        
        
           | 
              while True: | 
        
        
           | 
                  yield datetime.datetime.now() | 
        
        
           | 
                  sleep(timeout) | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          class TokenWorker(ConsumerProducerMixin): | 
        
        
           | 
              def __init__(self, connection, token_queue, message_queues, task_callback): | 
        
        
           | 
                  self.connection = connection | 
        
        
           | 
                  self.token_queue = token_queue | 
        
        
           | 
                  self.message_queues = message_queues | 
        
        
           | 
                  self.task_callback = task_callback | 
        
        
           | 
                  self.accept = ["pickle", "json"] | 
        
        
           | 
          
 | 
        
        
           | 
              def get_consumers(self, Consumer, channel): | 
        
        
           | 
                  self.token_queue = self.token_queue(channel) | 
        
        
           | 
                  self.message_queues = [m(channel) for m in self.message_queues] | 
        
        
           | 
                  return [ | 
        
        
           | 
                      Consumer( | 
        
        
           | 
                          queues=[self.token_queue(channel)], | 
        
        
           | 
                          on_message=self.handle_token, | 
        
        
           | 
                          accept=self.accept, | 
        
        
           | 
                          # Only take one token at a time | 
        
        
           | 
                          prefetch_count=1, | 
        
        
           | 
                      ) | 
        
        
           | 
                  ] | 
        
        
           | 
          
 | 
        
        
           | 
              def get_first_available_message(self): | 
        
        
           | 
                  return next( | 
        
        
           | 
                      filter(None, (m.get(accept=self.accept) for m in self.message_queues)), None | 
        
        
           | 
                  ) | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_token(self, token): | 
        
        
           | 
                  message = self.get_first_available_message() | 
        
        
           | 
                  if message is None: | 
        
        
           | 
                      # back off otherwise we'll simply be hammering: | 
        
        
           | 
                      # a) the message queue server | 
        
        
           | 
                      # b) the local cpu | 
        
        
           | 
                      sleep(1) | 
        
        
           | 
                      # requeue the token for | 
        
        
           | 
                      token.requeue() | 
        
        
           | 
                      res = None | 
        
        
           | 
                  else: | 
        
        
           | 
                      token.ack() | 
        
        
           | 
                      res = self.handle_message(message.decode(), message) | 
        
        
           | 
          
 | 
        
        
           | 
                  return res | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_message(self, task, message): | 
        
        
           | 
                  logger.debug(task) | 
        
        
           | 
                  try: | 
        
        
           | 
                      res = self.task_callback(name=task.name, args=task.args, kwargs=task.kwargs) | 
        
        
           | 
                      logger.info(res) | 
        
        
           | 
                      message.ack() | 
        
        
           | 
                  except: | 
        
        
           | 
                      message.requeue() | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          class App: | 
        
        
           | 
              def __init__(self, name, broker): | 
        
        
           | 
                  self.name = name | 
        
        
           | 
                  self.broker = broker | 
        
        
           | 
                  self._tasks = dict() | 
        
        
           | 
                  self._tokens = dict() | 
        
        
           | 
          
 | 
        
        
           | 
              def run_worker(self): | 
        
        
           | 
                  with Connection(self.broker) as conn: | 
        
        
           | 
                      try: | 
        
        
           | 
                          worker = TokenWorker( | 
        
        
           | 
                              connection=conn, | 
        
        
           | 
                              token_queue=task_queues["token-bucket"], | 
        
        
           | 
                              message_queues=[task_queues["tasks"]], | 
        
        
           | 
                              task_callback=self.run_task, | 
        
        
           | 
                          ) | 
        
        
           | 
                          worker.run() | 
        
        
           | 
                      except KeyboardInterrupt: | 
        
        
           | 
                          print("bye bye") | 
        
        
           | 
          
 | 
        
        
           | 
              def task(self, func): | 
        
        
           | 
                  """Decorator to create a task callable out of any function""" | 
        
        
           | 
          
 | 
        
        
           | 
                  name = self.gen_task_name(func) | 
        
        
           | 
                  self._tasks[name] = func | 
        
        
           | 
          
 | 
        
        
           | 
                  def _send_as_task(*args, **kwargs): | 
        
        
           | 
                      self.send_task(name, args, kwargs) | 
        
        
           | 
          
 | 
        
        
           | 
                  return _send_as_task | 
        
        
           | 
          
 | 
        
        
           | 
              def gen_task_name(self, func): | 
        
        
           | 
                  return f"{self.name}.{func.__name__}" | 
        
        
           | 
          
 | 
        
        
           | 
              def run_task(self, name, args, kwargs): | 
        
        
           | 
                  func = self._tasks[name] | 
        
        
           | 
                  return func(*args, **kwargs) | 
        
        
           | 
          
 | 
        
        
           | 
              def send_task(self, name, args, kwargs): | 
        
        
           | 
                  # payload = {"name": name, "args": args, "kwargs": kwargs} | 
        
        
           | 
                  payload = Task(name=name, args=args, kwargs=kwargs) | 
        
        
           | 
          
 | 
        
        
           | 
                  with Connection(self.broker) as conn: | 
        
        
           | 
                      with producers[conn].acquire(block=True) as producer: | 
        
        
           | 
                          producer.publish( | 
        
        
           | 
                              payload, | 
        
        
           | 
                              serializer="pickle", | 
        
        
           | 
                              compression="bzip2", | 
        
        
           | 
                              exchange=task_queues["tasks"].exchange, | 
        
        
           | 
                              declare=[task_queues["tasks"].exchange, task_queues["tasks"]], | 
        
        
           | 
                              routing_key=task_queues["tasks"].routing_key, | 
        
        
           | 
                          ) | 
        
        
           | 
          
 | 
        
        
           | 
              def token(self, func): | 
        
        
           | 
                  """Decorator to create a token callable out of any function""" | 
        
        
           | 
          
 | 
        
        
           | 
                  name = self.gen_task_name(func) | 
        
        
           | 
                  self._tokens[name] = func | 
        
        
           | 
          
 | 
        
        
           | 
                  def _send_as_token(*args, **kwargs): | 
        
        
           | 
                      self.send_token() | 
        
        
           | 
          
 | 
        
        
           | 
                  return _send_as_token | 
        
        
           | 
          
 | 
        
        
           | 
              def send_token(self): | 
        
        
           | 
                  payload = {} | 
        
        
           | 
                  with Connection(self.broker) as conn: | 
        
        
           | 
                      with producers[conn].acquire(block=True) as producer: | 
        
        
           | 
                          producer.publish( | 
        
        
           | 
                              payload, | 
        
        
           | 
                              serializer="pickle", | 
        
        
           | 
                              compression="bzip2", | 
        
        
           | 
                              exchange=task_queues["token-bucket"].exchange, | 
        
        
           | 
                              declare=[ | 
        
        
           | 
                                  task_queues["token-bucket"].exchange, | 
        
        
           | 
                                  task_queues["token-bucket"], | 
        
        
           | 
                              ], | 
        
        
           | 
                              routing_key=task_queues["token-bucket"].routing_key, | 
        
        
           | 
                          ) | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          @dataclasses.dataclass | 
        
        
           | 
          class Task: | 
        
        
           | 
              name: str | 
        
        
           | 
              args: Tuple | 
        
        
           | 
              kwargs: Dict |