Skip to content

Instantly share code, notes, and snippets.

@aveek-adsnative
Last active May 15, 2018 08:41
Show Gist options
  • Save aveek-adsnative/08c814556afffa9fbc10764c18c01501 to your computer and use it in GitHub Desktop.
Save aveek-adsnative/08c814556afffa9fbc10764c18c01501 to your computer and use it in GitHub Desktop.
A Fail Fast Multiprocess Generator
#!/usr/bin/env python
# -*- coding: utf-8 -*-
########################################
# Test Fail Safe Multiprocessing #
########################################
from fail_fast_multiprocessing_generator import MultiprocessGenerator
a=[1, 5, 7]
#Sample workers
def worker1():
for i in range(3):
val = i, [a[i]*2]
yield val
def worker2():
for i in range(3):
val = i, [a[i*2]*2]
yield val
def worker3():
for i in range(3):
val = i, [a[i]*2]
yield val
def worker4():
for i in range(3):
val = i, [a[i]*2]
yield val
def worker5():
for i in range(3):
val = i, [a[i]*2]
yield val
DATA_FUNCTIONS = [
(worker1, 'A'),
(worker2, 'B'),
(worker3, 'C'),
(worker4, 'D'),
(worker5, 'E')]
background_procs = []
procs = {}
def run():
try:
for func, id in DATA_FUNCTIONS:
procs[func] = MultiprocessGenerator(func, id)
background_procs.append(procs[func])
def get_all_data(datafunc):
return procs[datafunc]
for func, id in DATA_FUNCTIONS:
all_data = get_all_data(func)
while True:
try:
key, val = all_data.next()
print key, val
except StopIteration, s:
print s
break
except Exception, e:
for proc in background_procs:
proc.terminate()
print "Don't do anything, ABORT!"
raise e
print "Do something with the accumulated data"
if __name__ == "__main__":
run()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
###############################################
# Python Recipe for Fail Safe multiprocessing #
###############################################
from multiprocessing import Process, Queue
class MultiprocessGenerator(object):
"""
Generator that runs in a separate process, returning values to calling
process
"""
def __init__(self, iterator,
# sentinel needs to be value based since it'll be serialized
# and sent over a pipe
id,
sentinel='STOP',
error='ERROR',
queue_maxsize=0,
multiprocessing_timeout=5
):
self._iterator = iterator
self._id = id
self._sentinel = sentinel
self._error = error
self._queue = Queue(maxsize=queue_maxsize)
self._process = Process(
name=repr(iterator),
target=self._run
)
self._process.daemon = False
self.multiprocessing_timeout = multiprocessing_timeout
self._process.start()
def __repr__(self):
return 'MultiprocessGenerator({!r})'.format(self._iterator)
def _run(self):
try:
for key, value in self._iterator():
self._queue.put((key, value))
except Exception, e:
self._is_error = True
self._queue.put(self._error)
finally:
self._queue.put(self._sentinel)
self._queue.close()
def next(self):
next_obj = self._queue.get(True, self.multiprocessing_timeout)
if next_obj == self._sentinel:
raise StopIteration("finished worker {0}".format(self._id))
elif next_obj == self._error:
raise Exception("exception raised in worker {0}".format(self._id))
else:
return next_obj
def terminate(self):
self._process.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment