Skip to content

Instantly share code, notes, and snippets.

@benanne
Last active April 30, 2017 14:30
Show Gist options
  • Select an option

  • Save benanne/8752270 to your computer and use it in GitHub Desktop.

Select an option

Save benanne/8752270 to your computer and use it in GitHub Desktop.
buffered threaded generator
import threading
import Queue
import time
def buffered_gen(source_gen, buffer_size=2, sleep_time=1):
"""
Generator that runs a slow source generator in a separate thread.
buffer_size: the maximal number of items to pre-generate (length of the buffer)
"""
buffer = Queue.Queue(maxsize=buffer_size)
def _buffered_generation_thread(source_gen, buffer):
while True:
# we block here when the buffer is full. There's no point in generating more data
# when the buffer is full, it only causes extra memory usage and effectively
# increases the buffer size by one.
while buffer.full():
print "DEBUG: buffer is full, waiting to generate more data."
time.sleep(sleep_time)
try:
data = source_gen.next()
except StopIteration:
break
buffer.put(data)
thread = threading.Thread(target=_buffered_generation_thread, args=(source_gen, buffer))
thread.setDaemon(True)
thread.start()
while True:
yield buffer.get()
buffer.task_done()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment