Created
November 14, 2012 17:28
-
-
Save jrydberg/4073498 to your computer and use it in GitHub Desktop.
simple gevent supervisor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class UnhandledExceptionError(Exception): | |
"""The exception could not be handled by the supervisor.""" | |
class SupervisorKilledError(Exception): | |
"""The supervisor was killed.""" | |
class SupervisorAbortedError(Exception): | |
"""The supervisor gave up after maximum number of failures.""" | |
class _Supervisor(SilentGreenlet): | |
"""Run a given function inside a separate greenlet, and possibly | |
restart it if it fails. | |
""" | |
def __init__(self, log, clock, max_delay, fn, args, kw): | |
SilentGreenlet.__init__(self, self._run, fn, args, kw) | |
self.log = log | |
self.clock = clock | |
self.max_delay = max_delay | |
self.handled = set() | |
self._delay = 1 | |
self._timeout = None | |
self._abort_timeout = None | |
def handle(self, *exctypes): | |
"""Tell the supervisor that the given exception types are | |
valid, and that they supervisor should restart the greenlet if | |
one of these errors are encountered. | |
""" | |
for exctype in exctypes: | |
self.handled.add(exctype) | |
return self | |
def timeout(self, timeout): | |
"""Set a timeout on how long the supervised action may take. | |
If it times out, consider it a handled error. | |
""" | |
self._timeout = timeout | |
return self | |
def abort_after(self, timeout): | |
"""Abort the supervisor if the action hasn't completed within | |
the specified time frame, including restarts. | |
""" | |
self._abort_timeout = timeout | |
return self | |
def start(self): | |
"""Start supervising.""" | |
SilentGreenlet.start(self) | |
return self | |
def close(self): | |
"""Terminate the supervisor.""" | |
self.kill(SupervisorKilledError()) | |
def _run(self, fn, args, kw): | |
started_at = self.clock.time() | |
while True: | |
g = SilentGreenlet.spawn(fn, *args, **kw) | |
try: | |
# Do the timeout in an inner try-except block so that | |
# if a Timeout is raised that is not ours is caught by | |
# the normal exception handler. | |
timeout = gevent.Timeout( | |
self._timeout or self._abort_timeout) | |
try: | |
with timeout: | |
return g.get() | |
except gevent.Timeout as t: | |
if t is not timeout: | |
raise | |
self.log.info("supervised action timed out") | |
except SupervisorKilledError: | |
self.log.info("supervisor killed") | |
raise | |
except Exception, e: | |
if not type(e) in self.handled: | |
self.log.error("got *UNHANDLED* exception", | |
exc_info=g._exc_info) | |
raise UnhandledExceptionError(e, g._exc_info) | |
self.log.info("caught handled exception: %r" % ( | |
g._exception,), exc_info=g._exc_info) | |
time_spent = self.clock.time() - started_at | |
if self._abort_timeout and time_spent > self._abort_timeout: | |
self.log.error("supervisor aborted: timed out") | |
raise SupervisorAbortedError() | |
self._delay += random.random() * self._delay | |
delay = min(self.max_delay, self._delay) | |
self.log.debug("will restart action in %f seconds" % (delay,)) | |
try: | |
self.clock.sleep(delay) | |
except SupervisorKilledError: | |
self.log.info("supervisor killed") | |
raise | |
def supervise(log, time, max_delay, fn, *args, **kw): | |
"""Create a supervisor for function C{fn}. | |
This will keep running C{fn} until it succeeds, provided that the | |
errors can be handled by the supervisor. | |
The supervisor will do exponential back-off, up to C{max_delay} | |
seconds. C{time} is a clock-like object that is used to sleep. | |
""" | |
return _Supervisor(log, time, max_delay, fn, args, kw) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SupervisorTestCase(TestCase): | |
"""Test cases for our supervisor.""" | |
def setUp(self): | |
self.log = mock() | |
self.clock = Clock() | |
self.max_delay = 10 | |
def test_unhandled_errors_is_propagated(self): | |
def test(): | |
raise ValueError("HELLO WORLD") | |
s = supervise(self.log, self.clock, self.max_delay, test) | |
self.assertRaises(UnhandledExceptionError, s.start().get) | |
def test_handled_errors_restarts_function(self): | |
def test(l=[]): | |
l.append('x') | |
if len(l) == 1: | |
raise ValueError("WHAT") | |
return len(l) | |
s = supervise(self.log, self.clock, self.max_delay, test).handle( | |
ValueError).start() | |
self.assertEquals(2, s.get()) | |
def test_supervisor_aborts_after_timeout(self): | |
def test(): | |
assert False, "oh no" | |
supervisor = supervise(self.log, self.clock, self.max_delay, test) | |
supervisor.handle(AssertionError).abort_after(2) | |
self.assertRaises(SupervisorAbortedError, supervisor.start().get) | |
def test_supervisor_action_timeout_restarts_action(self): | |
calls = [] | |
def test(calls=calls): | |
calls.append(1) | |
gevent.sleep(10) | |
supervisor = supervise(self.log, self.clock, self.max_delay, test) | |
supervisor.timeout(1).abort_after(3) | |
self.assertRaises(SupervisorAbortedError, supervisor.start().get) | |
# Just make sure that we have called test more than once. | |
self.assertTrue(sum(calls) > 1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment