Last active
August 29, 2015 13:55
-
-
Save benanne/8761111 to your computer and use it in GitHub Desktop.
Generator that runs a slow source generator in a separate process.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import multiprocessing as mp | |
| def buffered_gen_mp(source_gen, buffer_size=2, sleep_time=1): | |
| """ | |
| Generator that runs a slow source generator in a separate process. | |
| buffer_size: the maximal number of items to pre-generate (length of the buffer) | |
| """ | |
| buffer = mp.Queue(maxsize=buffer_size) | |
| def _buffered_generation_process(source_gen, buffer): | |
| while True: | |
| # we block here when the buffer is full. There's no point in generating more data | |
| # when the buffer is full, it only causes extra memory usage and effectively | |
| # increases the buffer size by one. | |
| while buffer.full(): | |
| # print "DEBUG: buffer is full, waiting to generate more data." | |
| time.sleep(sleep_time) | |
| try: | |
| data = source_gen.next() | |
| except StopIteration: | |
| buffer.close() # signal that we're done putting data in the buffer | |
| break | |
| buffer.put(data) | |
| process = mp.Process(target=_buffered_generation_process, args=(source_gen, buffer)) | |
| process.start() | |
| # we could daemonize the process, but then the buffered generator would | |
| # be unable to spawn any more processes itself. | |
| while True: | |
| try: | |
| yield buffer.get() | |
| except IOError: | |
| # if the buffer has been closed, calling get() on it will raise IOError. | |
| # this means that we're done iterating. | |
| break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment