Last active
May 15, 2018 08:41
-
-
Save aveek-adsnative/08c814556afffa9fbc10764c18c01501 to your computer and use it in GitHub Desktop.
A Fail Fast Multiprocess Generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
######################################## | |
# Test Fail Safe Multiprocessing # | |
######################################## | |
from fail_fast_multiprocessing_generator import MultiprocessGenerator | |
a=[1, 5, 7] | |
#Sample workers | |
def worker1(): | |
for i in range(3): | |
val = i, [a[i]*2] | |
yield val | |
def worker2(): | |
for i in range(3): | |
val = i, [a[i*2]*2] | |
yield val | |
def worker3(): | |
for i in range(3): | |
val = i, [a[i]*2] | |
yield val | |
def worker4(): | |
for i in range(3): | |
val = i, [a[i]*2] | |
yield val | |
def worker5(): | |
for i in range(3): | |
val = i, [a[i]*2] | |
yield val | |
DATA_FUNCTIONS = [ | |
(worker1, 'A'), | |
(worker2, 'B'), | |
(worker3, 'C'), | |
(worker4, 'D'), | |
(worker5, 'E')] | |
background_procs = [] | |
procs = {} | |
def run(): | |
try: | |
for func, id in DATA_FUNCTIONS: | |
procs[func] = MultiprocessGenerator(func, id) | |
background_procs.append(procs[func]) | |
def get_all_data(datafunc): | |
return procs[datafunc] | |
for func, id in DATA_FUNCTIONS: | |
all_data = get_all_data(func) | |
while True: | |
try: | |
key, val = all_data.next() | |
print key, val | |
except StopIteration, s: | |
print s | |
break | |
except Exception, e: | |
for proc in background_procs: | |
proc.terminate() | |
print "Don't do anything, ABORT!" | |
raise e | |
print "Do something with the accumulated data" | |
if __name__ == "__main__": | |
run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
############################################### | |
# Python Recipe for Fail Safe multiprocessing # | |
############################################### | |
from multiprocessing import Process, Queue | |
class MultiprocessGenerator(object): | |
""" | |
Generator that runs in a separate process, returning values to calling | |
process | |
""" | |
def __init__(self, iterator, | |
# sentinel needs to be value based since it'll be serialized | |
# and sent over a pipe | |
id, | |
sentinel='STOP', | |
error='ERROR', | |
queue_maxsize=0, | |
multiprocessing_timeout=5 | |
): | |
self._iterator = iterator | |
self._id = id | |
self._sentinel = sentinel | |
self._error = error | |
self._queue = Queue(maxsize=queue_maxsize) | |
self._process = Process( | |
name=repr(iterator), | |
target=self._run | |
) | |
self._process.daemon = False | |
self.multiprocessing_timeout = multiprocessing_timeout | |
self._process.start() | |
def __repr__(self): | |
return 'MultiprocessGenerator({!r})'.format(self._iterator) | |
def _run(self): | |
try: | |
for key, value in self._iterator(): | |
self._queue.put((key, value)) | |
except Exception, e: | |
self._is_error = True | |
self._queue.put(self._error) | |
finally: | |
self._queue.put(self._sentinel) | |
self._queue.close() | |
def next(self): | |
next_obj = self._queue.get(True, self.multiprocessing_timeout) | |
if next_obj == self._sentinel: | |
raise StopIteration("finished worker {0}".format(self._id)) | |
elif next_obj == self._error: | |
raise Exception("exception raised in worker {0}".format(self._id)) | |
else: | |
return next_obj | |
def terminate(self): | |
self._process.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment