Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Created August 9, 2019 16:35
Show Gist options
  • Save maxfischer2781/51d49519a100b5487416e011c9fb20f6 to your computer and use it in GitHub Desktop.
Save maxfischer2781/51d49519a100b5487416e011c9fb20f6 to your computer and use it in GitHub Desktop.
An iterable variant of multiprocessing.Queue
from multiprocessing.queues import Queue
from multiprocessing import get_context, Process, cpu_count
import os
class IterableQueue(Queue):
"""
``multiprocessing.Queue`` that can be iterated to ``get`` values
:param sentinel: signal that no more items will be received
"""
def __init__(self, maxsize=0, *, ctx=None, sentinel=None):
self.sentinel = sentinel
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
self.put(self.sentinel)
super().close()
def __iter__(self):
return self
def __next__(self):
result = self.get()
if result == self.sentinel:
# re-queue sentinel for other listeners
self.put(result)
raise StopIteration
return result
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
def consumer(queue):
print(f"[{os.getpid()}] Consuming")
for i in queue:
print(f"[{os.getpid()}] < {i}")
n = fib(i)
print(f"[{os.getpid()}] {i} > {n}")
print(f"[{os.getpid()}] Closing")
def producer():
print("Enqueueing")
with IterableQueue() as queue:
procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())]
[p.start() for p in procs]
[queue.put(i) for i in range(36)]
print("Finished")
if __name__ == "__main__":
producer()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment