Skip to content

Instantly share code, notes, and snippets.

@wware
Last active March 9, 2016 18:24
Show Gist options
  • Save wware/b0e4cdada686d0948c7f to your computer and use it in GitHub Desktop.
Save wware/b0e4cdada686d0948c7f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import logging
from Queue import Empty
from multiprocessing import Queue, Pool, Process
logging.getLogger('').setLevel(logging.DEBUG)
"""
We can send instances thru Queues, and because the Queue is not a list,
we are not asking all the instances to exist simultaneously in all processes.
We count the returned results and when we have the number we expect, we
call pool.terminate.
"""
class Foo(object):
def __init__(self, x):
self.x = x
def y(self):
return self.x * 2
def worker_func(inq, outq):
while True:
try:
assert inq.qsize() < 22
logging.debug('Before get')
foo = inq.get()
logging.debug('After get %d', foo.x)
outq.put(Foo(foo.y()))
except Exception as e:
logging.exception(e)
return
if __name__ == '__main__':
inq = Queue()
outq = Queue()
n = expected = 250
workers = 20
pool = []
for _ in range(workers):
p = Process(target=worker_func, args=(inq, outq))
p.start()
pool.append(p)
i = 0
while expected:
if i < n:
inq.put(Foo(i))
i += 1
try:
x = outq.get(block=True)
logging.info('Done hacking %d', x.y())
expected -= 1
except Empty:
pass
[p.terminate() for p in pool]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment