Skip to content

Instantly share code, notes, and snippets.

@evansd
Last active December 22, 2016 13:10
Show Gist options
  • Select an option

  • Save evansd/ab3161634eebb992eba5 to your computer and use it in GitHub Desktop.

Select an option

Save evansd/ab3161634eebb992eba5 to your computer and use it in GitHub Desktop.
from Queue import Queue
import sys
from threading import Thread
END_OF_STREAM = object()
class QueueWithException(Queue):
exc_info = None
def consume_in_new_thread(iterable, buffer_size=-1):
"""
Start consuming `iterable` immediately in a new thread, buffering values up
to `buffer_size`, or without limit if `buffer_size` is negative.
Return an iterator over the buffered values.
"""
queue = QueueWithException(buffer_size)
thread = Thread(target=populate_queue, args=(iterable, queue))
# Mark thread as daemon so it doesn't keep the process alive after the main
# thead exits
thread.daemon = True
thread.start()
return read_from_queue(queue)
def populate_queue(iterable, queue):
try:
for item in iterable:
queue.put(item)
except:
queue.exc_info = sys.exc_info()
finally:
queue.put(END_OF_STREAM)
def read_from_queue(queue):
for item in iter(queue.get, END_OF_STREAM):
yield item
if queue.exc_info is not None:
raise queue.exc_info[0], queue.exc_info[1], queue.exc_info[2]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment