Skip to content

Instantly share code, notes, and snippets.

@jaycosaur
Last active February 1, 2020 12:11
Show Gist options
  • Save jaycosaur/e76947484639ce901273e3d05edb9a89 to your computer and use it in GitHub Desktop.
Save jaycosaur/e76947484639ce901273e3d05edb9a89 to your computer and use it in GitHub Desktop.
Multiplex class joins messages from multiple queues together using QueuePipe's. Useful for fan-in jobs.
from queue import Queue
from threading import Thread
from typing import Any, List, Iterator
class PipeClose:
pass
class QueuePipe:
def __init__(self, queue_in: Queue, queue_out: Queue) -> None:
self._queue_in = queue_in
self._queue_out = queue_out
self.thread = Thread(target=self._pipe, args=(self._queue_in, self._queue_out,))
self.thread.start()
def _pipe(self, queue_in: Queue, queue_out: Queue) -> None:
while True:
mes_in = queue_in.get()
queue_out.put(mes_in)
if isinstance(mes_in, PipeClose):
return
def stop(self) -> None:
self._queue_in.put(PipeClose())
class Multiplex:
def __init__(
self, *queues: Queue,
):
self.__multiplexed_queue = Queue(maxsize=0)
self._input_queues = queues
self._pipes: List[QueuePipe] = [
QueuePipe(q_in, self.__multiplexed_queue) for q_in in self._input_queues
]
self.is_stopped = False
def stop(self) -> None:
for pipe in self._pipes:
pipe.stop()
self.is_stopped = True
def ouput(self) -> Queue:
return self.__multiplexed_queue
def __call__(self) -> Any:
return self.__multiplexed_queue.get()
def __next__(self) -> Any:
if self.is_stopped:
raise StopIteration
return self.__multiplexed_queue.get()
def __iter__(self) -> Iterator[Any]:
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment