import time
from typing import Dict, Optional

from dramatiq import Message, Broker
from dramatiq.results import ResultMissing


class TaskQueue:
    """A task queue using Dramatiq background task framework.

    https://dramatiq.io/

    The purpose this queue is to feed tasks to the Dramatiq workers
    as fast as they can handle. The queue should be set to the same size
    as the number of total worker threads available.

    This is useful for running task sets that are CPU bounded:
    you know the number of CPUs and the load capacity of a server and
    you do not want to exceed this capacity.

    When the queue is fully allocated,
    the queue will block until existing tasks are complete and there is more free space.

    An incomplete usage example:

    .. code-block:: python


        # Setup Dramatiq
        import dramatiq
        from dramatiq.brokers.redis import RedisBroker
        from dramatiq.results import Results
        from dramatiq.results.backends import RedisBackend

        redis_result_backend = RedisBackend(client=_client)
        redis_broker = RedisBroker(client=_client)
        redis_broker.add_middleware(Results(backend=redis_result_backend))
        dramatiq.set_broker(redis_broker)

        @dramatiq.actor
        def long_process(id: int) -> int:
            '''Example blocking background worker task'''
            r = random.randint(5)
            print(f"Background process {id} sleeping {r} seconds")
            time.sleep(r)
            return r

        # Assume we have started dramatiq worker controller with 2 processes, 5 threads
        queue = TaskQueue(redis_broker, 10)

        # We are going to generate more messages than we have workers,
        # so after 10 messages the loop starts to slow down,
        # while the queue is blocking until more background workers
        # are cleared up
        total_slept = 0
        for i in range(100):
            message = long_process.message(i)
            results = queue.add_and_wait_until_space(message)
            for slept_time in results:
                total_slepts += slept_time

        print("All background workers slept {total_slept} seconds")

    """

    def __init__(self, broker: Broker, size: int, poll_delay=0.5):
        self.broker = broker
        self.tasks: Dict[int, Optional[Message]] = {}
        for i in range(size):
            self.tasks[i] = None
        self.poll_delay = poll_delay

    def add_and_wait_until_space(self, m: Message) -> list:
        """Add a new task.

        Sends a new task to the background workers as soon as any background worker is finished
        with a previous task. If all background workers are busy then block.

        :return: List of results of previous tasks that have completed
        """
        ready = []
        while (free_slot := self.peak_free_slot()) is None:
            time.sleep(self.poll_delay)
            ready += self.scoop_finished()

        # Assing task to the queue
        self.tasks[free_slot] = m

        # Send task to the worker pool
        self.broker.enqueue(m)

        return ready

    def peak_free_slot(self) -> Optional[int]:
        """Get the first free slot in the queue (if any)"""
        for slot, message in self.tasks.items():
            if message is None:
                return slot
        return None

    def scoop_finished(self) -> list:
        """Find finished tasks and make space in the queue.

        :return: Results of completed tasks
        """

        ready = {}

        message: Message
        for slot, message in self.tasks.items():
            try:
                result = message.get_result()
            except ResultMissing:
                continue

            ready[slot] = result

        # Clean finished
        for ready_slot in ready.keys():
            self.tasks[ready_slot] = None

        return list(ready.values())