Created
August 9, 2019 16:35
-
-
Save maxfischer2781/51d49519a100b5487416e011c9fb20f6 to your computer and use it in GitHub Desktop.
An iterable variant of multiprocessing.Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing.queues import Queue | |
from multiprocessing import get_context, Process, cpu_count | |
import os | |
class IterableQueue(Queue): | |
""" | |
``multiprocessing.Queue`` that can be iterated to ``get`` values | |
:param sentinel: signal that no more items will be received | |
""" | |
def __init__(self, maxsize=0, *, ctx=None, sentinel=None): | |
self.sentinel = sentinel | |
super().__init__( | |
maxsize=maxsize, | |
ctx=ctx if ctx is not None else get_context() | |
) | |
def close(self): | |
self.put(self.sentinel) | |
super().close() | |
def __iter__(self): | |
return self | |
def __next__(self): | |
result = self.get() | |
if result == self.sentinel: | |
# re-queue sentinel for other listeners | |
self.put(result) | |
raise StopIteration | |
return result | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.close() | |
def fib(n): | |
if n < 2: | |
return n | |
return fib(n-1) + fib(n-2) | |
def consumer(queue): | |
print(f"[{os.getpid()}] Consuming") | |
for i in queue: | |
print(f"[{os.getpid()}] < {i}") | |
n = fib(i) | |
print(f"[{os.getpid()}] {i} > {n}") | |
print(f"[{os.getpid()}] Closing") | |
def producer(): | |
print("Enqueueing") | |
with IterableQueue() as queue: | |
procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())] | |
[p.start() for p in procs] | |
[queue.put(i) for i in range(36)] | |
print("Finished") | |
if __name__ == "__main__": | |
producer() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment