Skip to content

Instantly share code, notes, and snippets.

@benanne
Last active August 29, 2015 13:55
Show Gist options
  • Select an option

  • Save benanne/8761111 to your computer and use it in GitHub Desktop.

Select an option

Save benanne/8761111 to your computer and use it in GitHub Desktop.
Generator that runs a slow source generator in a separate process.
import time
import multiprocessing as mp
def buffered_gen_mp(source_gen, buffer_size=2, sleep_time=1):
"""
Generator that runs a slow source generator in a separate process.
buffer_size: the maximal number of items to pre-generate (length of the buffer)
"""
buffer = mp.Queue(maxsize=buffer_size)
def _buffered_generation_process(source_gen, buffer):
while True:
# we block here when the buffer is full. There's no point in generating more data
# when the buffer is full, it only causes extra memory usage and effectively
# increases the buffer size by one.
while buffer.full():
# print "DEBUG: buffer is full, waiting to generate more data."
time.sleep(sleep_time)
try:
data = source_gen.next()
except StopIteration:
buffer.close() # signal that we're done putting data in the buffer
break
buffer.put(data)
process = mp.Process(target=_buffered_generation_process, args=(source_gen, buffer))
process.start()
# we could daemonize the process, but then the buffered generator would
# be unable to spawn any more processes itself.
while True:
try:
yield buffer.get()
except IOError:
# if the buffer has been closed, calling get() on it will raise IOError.
# this means that we're done iterating.
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment