Last active
March 31, 2020 08:30
-
-
Save stoensin/9784d8bb377a27991853f2e743906d82 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
1 当指定异常被引发时,使用on_exception装饰器重试。这里有一个例子,当出现任何requests异常时,使用指数退避(backoff.expo即退避时间指数增长): | |
2 当目标函数返回值符合某个特定条件时,on_predicate装饰器会安排重试。当为外部生成内容轮询资源时可能有用。 | |
3 两个backoff装饰器都可以选择使用关键字参数on_success、on_backoff和on_giveup接受事件处理程序函数。这在报告统计或执行其他自定义日志方面可能有用。 | |
''' | |
@backoff.on_predicate(backoff.fibo, max_value=13) | |
@backoff.on_exception(backoff.expo, | |
(requests.exceptions.HTTPError,requests.exceptions.Timeout), | |
max_time=60) | |
@backoff.on_exception(backoff.expo, | |
requests.exceptions.Timeout, | |
max_time=300) | |
def poll_for_message(queue): | |
return queue.get() | |
my_logger = logging.getLogger('my_logger') | |
my_handler = logging.StreamHandler() | |
my_logger.add_handler(my_handler) | |
my_logger.setLevel(logging.ERROR) | |
@backoff.on_exception(backoff.expo, | |
requests.exception.RequestException, | |
logger=my_logger) | |
def test(): | |
print(1) | |
import time | |
import random | |
import backoff | |
class MyException(Exception): | |
def __init__(self, status, message): | |
super().__init__(status, message) | |
self.status = status | |
self.message = message | |
def backoff_hdlr(details): | |
print("Backing off {wait:0.1f} seconds afters {tries} tries " | |
"calling function {target} with args {args} and kwargs " | |
"{kwargs}".format(**details)) | |
def success_hdlr(details): | |
print("Success offafters {tries} tries " | |
"calling function {target} with args {args} and kwargs " | |
"{kwargs}".format(**details)) | |
def giveup_hdlr(details): | |
print("Giveup off {tries} tries " | |
"calling function {target} with args {args} and kwargs " | |
"{kwargs}".format(**details)) | |
@backoff.on_predicate( | |
backoff.constant, | |
# 当 random num 不等 10009 则继续 | |
# 当 random_num 等于 10009 后,会调用 on_success | |
lambda x: x != 10009, | |
on_success=success_hdlr, | |
on_backoff=backoff_hdlr, | |
on_giveup=giveup_hdlr, | |
max_time=2 | |
) | |
def main(): | |
num = random.randint(10000, 10010) | |
print("time is {}, num is {}, retry...".format(time.time(), num)) | |
return num | |
@backoff.on_exception( | |
backoff.constant, | |
MyException, | |
# 当 Exception 实例对象的 status 为 10009 成立时退出 | |
# 当条件成立时,调用的是 on_giveup | |
giveup=lambda e: e.status == 10009, | |
on_success=success_hdlr, | |
on_backoff=backoff_hdlr, | |
on_giveup=giveup_hdlr, | |
) | |
def main2(): | |
num = random.randint(10000, 10010) | |
print("time is {}, num is {}, retry...".format(time.time(), num)) | |
# 如果是通过这个条件成立退出,调用的是 on_success | |
if num == 10010: | |
return | |
raise MyException(num, "hhh") | |
if __name__ == "__main__": | |
#main() | |
main2() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import copy | |
import inspect | |
import logging | |
from functools import wraps | |
import async_timeout | |
logger = logging.getLogger(__name__) | |
__version__ = '0.2.2' | |
propagate = forever = ... | |
class RetryError(Exception): | |
pass | |
class ConditionError(Exception): | |
pass | |
def unpartial(fn): | |
while hasattr(fn, 'func'): | |
fn = fn.func | |
return fn | |
def isexception(obj): | |
return ( | |
isinstance(obj, Exception) or | |
(inspect.isclass(obj) and (issubclass(obj, Exception))) | |
) | |
@asyncio.coroutine | |
def callback(attempt, exc, args, kwargs, delay=None, *, loop): | |
if delay is None: | |
delay = callback.delay | |
yield from asyncio.sleep(attempt * delay, loop=loop) | |
return retry | |
callback.delay = 0.5 | |
def retry( | |
fn=None, | |
*, | |
attempts=3, | |
immutable=False, | |
cls=False, | |
kwargs=False, | |
callback=callback, | |
fallback=RetryError, | |
timeout=None, | |
retry_exceptions=(Exception,), | |
fatal_exceptions=(asyncio.CancelledError,), | |
loop=None # noqa | |
): | |
def wrapper(fn): | |
@wraps(fn) | |
@asyncio.coroutine | |
def wrapped(*fn_args, **fn_kwargs): | |
if isinstance(loop, str): | |
assert cls ^ kwargs, 'choose self.loop or kwargs["loop"]' | |
if cls: | |
_self = getattr(unpartial(fn), '__self__', None) | |
if _self is None: | |
assert fn_args, 'seems not unbound function' | |
_self = fn_args[0] | |
_loop = getattr(_self, loop) | |
elif kwargs: | |
_loop = fn_kwargs[loop] | |
elif loop is None: | |
_loop = asyncio.get_event_loop() | |
else: | |
_loop = loop | |
if ( | |
timeout is not None and | |
asyncio.TimeoutError not in retry_exceptions | |
): | |
_retry_exceptions = (asyncio.TimeoutError,) + retry_exceptions | |
else: | |
_retry_exceptions = retry_exceptions | |
attempt = 1 | |
if cls: | |
assert fn_args | |
self, *fn_args = fn_args | |
fn_args = tuple(fn_args) | |
while True: | |
if immutable: | |
_fn_args = copy.deepcopy(fn_args) | |
kwargs_loop = isinstance(loop, str) and kwargs | |
if kwargs_loop: | |
obj = fn_kwargs.pop(loop) | |
_fn_kwargs = copy.deepcopy(fn_kwargs) | |
if kwargs_loop: | |
fn_kwargs[loop] = _fn_kwargs[loop] = obj | |
else: | |
_fn_args, _fn_kwargs = fn_args, fn_kwargs | |
if cls: | |
_fn_args = (self,) + _fn_args | |
try: | |
ret = fn(*_fn_args, **_fn_kwargs) | |
if timeout is None: | |
if asyncio.iscoroutinefunction(unpartial(fn)): | |
ret = yield from ret | |
else: | |
if not asyncio.iscoroutinefunction(unpartial(fn)): | |
raise ConditionError( | |
'Can\'t set timeout for non coroutinefunction', | |
) | |
with async_timeout.timeout(timeout, loop=_loop): | |
ret = yield from ret | |
return ret | |
except ConditionError: | |
raise | |
except fatal_exceptions: | |
raise | |
except _retry_exceptions as exc: | |
_attempts = 'infinity' if attempts is forever else attempts | |
context = { | |
'fn': fn, | |
'attempt': attempt, | |
'attempts': _attempts, | |
} | |
if ( | |
_loop.get_debug() or | |
(attempts is not forever and attempt == attempts) | |
): | |
logger.warning( | |
exc.__class__.__name__ + ' -> Attempts (%(attempt)d) are over for %(fn)r', # noqa | |
context, | |
exc_info=exc, | |
) | |
if fallback is propagate: | |
raise exc | |
if isexception(fallback): | |
raise fallback from exc | |
if callable(fallback): | |
ret = fallback(fn_args, fn_kwargs, loop=_loop) | |
if asyncio.iscoroutinefunction(unpartial(fallback)): # noqa | |
ret = yield from ret | |
else: | |
ret = fallback | |
return ret | |
logger.debug( | |
exc.__class__.__name__ + ' -> Tried attempt #%(attempt)d from total %(attempts)s for %(fn)r', # noqa | |
context, | |
exc_info=exc, | |
) | |
ret = callback( | |
attempt, exc, fn_args, fn_kwargs, loop=_loop, | |
) | |
attempt += 1 | |
if asyncio.iscoroutinefunction(unpartial(callback)): | |
ret = yield from ret | |
if ret is not retry: | |
return ret | |
return wrapped | |
if fn is None: | |
return wrapper | |
if callable(fn): | |
return wrapper(fn) | |
raise NotImplementedError |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import random | |
import time | |
import decorator | |
from functools import partial | |
logging_logger = logging.getLogger(__name__) | |
def __retry_internal(f, exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0, | |
logger=logging_logger): | |
""" | |
Executes a function and retries it if it failed. | |
:param f: the function to execute. | |
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception. | |
:param tries: the maximum number of attempts. default: -1 (infinite). | |
:param delay: initial delay between attempts. default: 0. | |
:param max_delay: the maximum value of delay. default: None (no limit). | |
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff). | |
:param jitter: extra seconds added to delay between attempts. default: 0. | |
fixed if a number, random if a range tuple (min, max) | |
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts. | |
default: retry.logging_logger. if None, logging is disabled. | |
:returns: the result of the f function. | |
""" | |
_tries, _delay = tries, delay | |
while _tries: | |
try: | |
return f() | |
except exceptions as e: | |
_tries -= 1 | |
if not _tries: | |
raise | |
if logger is not None: | |
logger.warning('%s, retrying in %s seconds...', e, _delay) | |
time.sleep(_delay) | |
_delay *= backoff | |
if isinstance(jitter, tuple): | |
_delay += random.uniform(*jitter) | |
else: | |
_delay += jitter | |
if max_delay is not None: | |
_delay = min(_delay, max_delay) | |
def retry(exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0, logger=logging_logger): | |
"""Returns a retry decorator. | |
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception. | |
:param tries: the maximum number of attempts. default: -1 (infinite). | |
:param delay: initial delay between attempts. default: 0. | |
:param max_delay: the maximum value of delay. default: None (no limit). | |
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff). | |
:param jitter: extra seconds added to delay between attempts. default: 0. | |
fixed if a number, random if a range tuple (min, max) | |
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts. | |
default: retry.logging_logger. if None, logging is disabled. | |
:returns: a retry decorator. | |
""" | |
@decorator | |
def retry_decorator(f, *fargs, **fkwargs): | |
args = fargs if fargs else list() | |
kwargs = fkwargs if fkwargs else dict() | |
return __retry_internal(partial(f, *args, **kwargs), exceptions, tries, delay, max_delay, backoff, jitter, | |
logger) | |
return retry_decorator | |
def retry_call(f, fargs=None, fkwargs=None, exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, | |
jitter=0, | |
logger=logging_logger): | |
""" | |
Calls a function and re-executes it if it failed. | |
:param f: the function to execute. | |
:param fargs: the positional arguments of the function to execute. | |
:param fkwargs: the named arguments of the function to execute. | |
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception. | |
:param tries: the maximum number of attempts. default: -1 (infinite). | |
:param delay: initial delay between attempts. default: 0. | |
:param max_delay: the maximum value of delay. default: None (no limit). | |
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff). | |
:param jitter: extra seconds added to delay between attempts. default: 0. | |
fixed if a number, random if a range tuple (min, max) | |
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts. | |
default: retry.logging_logger. if None, logging is disabled. | |
:returns: the result of the f function. | |
""" | |
args = fargs if fargs else list() | |
kwargs = fkwargs if fkwargs else dict() | |
return __retry_internal(partial(f, *args, **kwargs), exceptions, tries, delay, max_delay, backoff, jitter, logger) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def retry_handler(retry_time: int, retry_interval: float, retry_on_exception: [BaseException], *args, **kwargs): | |
def is_exception(exception: [BaseException]): | |
for exp in retry_on_exception: | |
if isinstance(exception,exp): | |
return True | |
return False | |
# return isinstance(exception, retry_on_exception) | |
def _retry(*args, **kwargs): | |
return Retrying(wait_fixed=retry_interval * 1000).fixed_sleep(*args, **kwargs) | |
return retry( | |
wait_func=_retry, | |
stop_max_attempt_number=retry_time, | |
retry_on_exception=is_exception | |
) | |
class ProxyUtil: | |
def get_proxies(self): | |
r = requests.get('代理地址') | |
print('正在获取') | |
raise IOError | |
# raise IndexError | |
print('获取到最新代理 = %s' % r.text) | |
params = dict() | |
if r and r.status_code == 200: | |
proxy = str(r.content, encoding='utf-8') | |
params['http'] = 'http://' + proxy | |
params['https'] = 'https://' + proxy | |
@retry_handler(retry_time=2, retry_interval=5, retry_on_exception=[IOError,IndexError]) | |
# @retry(stop_max_attempt_number=5,retry_on_exception=wraper) | |
def retry_test(self): | |
self.get_proxies() | |
print('io') | |
if __name__ == '__main__': | |
proxy = ProxyUtil() | |
proxy.retry_test() | |
# demo useage | |
@retry | |
def never_give_up_never_surrender(): | |
print "Retry forever ignoring Exceptions, don't wait between retries" | |
@retry(stop_max_attempt_number=7) #Stopping after 7 attempts | |
@retry(stop_max_delay=10000) #Stopping after 10 second | |
@retry(wait_fixed=2000) #Wait 2 second between retries | |
@retry(wait_random_min=1000, wait_random_max=2000) #Randomly wait 1 to 2 seconds between retries | |
def retry_if_io_error(exception): | |
"""Return True if we should retry (in this case when it's an IOError), False otherwise""" | |
return isinstance(exception, IOError) | |
@retry(retry_on_exception=retry_if_io_error) | |
def might_io_error(): | |
print "Retry forever with no wait if an IOError occurs, raise any other errors" | |
@retry(retry_on_exception=retry_if_io_error, wrap_exception=True) | |
def only_raise_retry_error_when_not_io_error(): | |
print "Retry forever with no wait if an IOError occurs, raise any other errors wrapped in RetryError" | |
def retry_if_result_none(result): | |
"""Return True if we should retry (in this case when result is None), False otherwise""" | |
return result is None | |
@retry(retry_on_result=retry_if_result_none) | |
def might_return_none(): | |
print "Retry forever ignoring Exceptions with no wait if return value is None" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
import random | |
import sys | |
import time | |
import traceback | |
import six | |
# sys.maxint / 2, since Python 3.2 doesn't have a sys.maxint... | |
MAX_WAIT = 1073741823 | |
def _retry_if_exception_of_type(retryable_types): | |
def _retry_if_exception_these_types(exception): | |
return isinstance(exception, retryable_types) | |
return _retry_if_exception_these_types | |
def retry(*dargs, **dkw): | |
""" | |
Decorator function that instantiates the Retrying object | |
@param *dargs: positional arguments passed to Retrying object | |
@param **dkw: keyword arguments passed to the Retrying object | |
""" | |
# support both @retry and @retry() as valid syntax | |
if len(dargs) == 1 and callable(dargs[0]): | |
def wrap_simple(f): | |
@six.wraps(f) | |
def wrapped_f(*args, **kw): | |
if dkw.get('deterministic_generators') and \ | |
(inspect.isasyncgenfunction(f) or inspect.isgeneratorfunction(f)): | |
return Retrying().call_async(f, *args, **kw) | |
return Retrying().call(f, *args, **kw) | |
return wrapped_f | |
return wrap_simple(dargs[0]) | |
else: | |
def wrap(f): | |
@six.wraps(f) | |
def wrapped_f(*args, **kw): | |
if dkw.get('deterministic_generators') and \ | |
(inspect.isasyncgenfunction(f) or inspect.isgeneratorfunction(f)): | |
return Retrying(*dargs, **dkw).call_async(f, *args, **kw) | |
return Retrying(*dargs, **dkw).call(f, *args, **kw) | |
return wrapped_f | |
return wrap | |
class Retrying(object): | |
def __init__(self, | |
stop=None, wait=None, | |
stop_max_attempt_number=None, | |
stop_max_delay=None, | |
wait_fixed=None, | |
wait_random_min=None, wait_random_max=None, | |
wait_incrementing_start=None, wait_incrementing_increment=None, | |
wait_incrementing_max=None, | |
wait_exponential_multiplier=None, wait_exponential_max=None, | |
retry_on_exception=None, | |
retry_on_result=None, | |
wrap_exception=False, | |
stop_func=None, | |
wait_func=None, | |
wait_jitter_max=None, | |
before_attempts=None, | |
after_attempts=None, | |
deterministic_generators=False): | |
self._stop_max_attempt_number = 5 if stop_max_attempt_number is None else stop_max_attempt_number | |
self._stop_max_delay = 100 if stop_max_delay is None else stop_max_delay | |
self._wait_fixed = 1000 if wait_fixed is None else wait_fixed | |
self._wait_random_min = 0 if wait_random_min is None else wait_random_min | |
self._wait_random_max = 1000 if wait_random_max is None else wait_random_max | |
self._wait_incrementing_start = 0 if wait_incrementing_start is None else wait_incrementing_start | |
self._wait_incrementing_increment = 100 if wait_incrementing_increment is None else wait_incrementing_increment | |
self._wait_exponential_multiplier = 1 if wait_exponential_multiplier is None else wait_exponential_multiplier | |
self._wait_exponential_max = MAX_WAIT if wait_exponential_max is None else wait_exponential_max | |
self._wait_incrementing_max = MAX_WAIT if wait_incrementing_max is None else wait_incrementing_max | |
self._wait_jitter_max = 0 if wait_jitter_max is None else wait_jitter_max | |
self._before_attempts = before_attempts | |
self._after_attempts = after_attempts | |
self._deterministic_generators = deterministic_generators | |
self._deterministic_offset = -1 | |
# TODO add chaining of stop behaviors | |
# stop behavior | |
stop_funcs = [] | |
if stop_max_attempt_number is not None: | |
stop_funcs.append(self.stop_after_attempt) | |
if stop_max_delay is not None: | |
stop_funcs.append(self.stop_after_delay) | |
if stop_func is not None: | |
self.stop = stop_func | |
elif stop is None: | |
self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs) | |
else: | |
self.stop = getattr(self, stop) | |
# TODO add chaining of wait behaviors | |
# wait behavior | |
wait_funcs = [lambda *args, **kwargs: 0] | |
if wait_fixed is not None: | |
wait_funcs.append(self.fixed_sleep) | |
if wait_random_min is not None or wait_random_max is not None: | |
wait_funcs.append(self.random_sleep) | |
if wait_incrementing_start is not None or wait_incrementing_increment is not None: | |
wait_funcs.append(self.incrementing_sleep) | |
if wait_exponential_multiplier is not None or wait_exponential_max is not None: | |
wait_funcs.append(self.exponential_sleep) | |
if wait_func is not None: | |
self.wait = wait_func | |
elif wait is None: | |
self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs) | |
else: | |
self.wait = getattr(self, wait) | |
# retry on exception filter | |
if retry_on_exception is None: | |
self._retry_on_exception = self.always_reject | |
else: | |
# this allows for providing a tuple of exception types that | |
# should be allowed to retry on, and avoids having to create | |
# a callback that does the same thing | |
if isinstance(retry_on_exception, (tuple)): | |
retry_on_exception = _retry_if_exception_of_type( | |
retry_on_exception) | |
self._retry_on_exception = retry_on_exception | |
# retry on result filter | |
if retry_on_result is None: | |
self._retry_on_result = self.never_reject | |
else: | |
self._retry_on_result = retry_on_result | |
self._wrap_exception = wrap_exception | |
def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms): | |
"""Stop after the previous attempt >= stop_max_attempt_number.""" | |
return previous_attempt_number >= self._stop_max_attempt_number | |
def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms): | |
"""Stop after the time from the first attempt >= stop_max_delay.""" | |
return delay_since_first_attempt_ms >= self._stop_max_delay | |
@staticmethod | |
def no_sleep(previous_attempt_number, delay_since_first_attempt_ms): | |
"""Don't sleep at all before retrying.""" | |
return 0 | |
def fixed_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): | |
"""Sleep a fixed amount of time between each retry.""" | |
return self._wait_fixed | |
def random_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): | |
"""Sleep a random amount of time between wait_random_min and wait_random_max""" | |
return random.randint(self._wait_random_min, self._wait_random_max) | |
def incrementing_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): | |
""" | |
Sleep an incremental amount of time after each attempt, starting at | |
wait_incrementing_start and incrementing by wait_incrementing_increment | |
""" | |
result = self._wait_incrementing_start + (self._wait_incrementing_increment * (previous_attempt_number - 1)) | |
if result > self._wait_incrementing_max: | |
result = self._wait_incrementing_max | |
if result < 0: | |
result = 0 | |
return result | |
def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): | |
exp = 2 ** previous_attempt_number | |
result = self._wait_exponential_multiplier * exp | |
if result > self._wait_exponential_max: | |
result = self._wait_exponential_max | |
if result < 0: | |
result = 0 | |
return result | |
@staticmethod | |
def never_reject(result): | |
return False | |
@staticmethod | |
def always_reject(result): | |
return True | |
def should_reject(self, attempt): | |
reject = False | |
if attempt.has_exception: | |
reject |= self._retry_on_exception(attempt.value[1]) | |
else: | |
reject |= self._retry_on_result(attempt.value) | |
return reject | |
def call(self, fn, *args, **kwargs): | |
self._deterministic_offset = -1 | |
assert not self._deterministic_generators | |
is_generator = inspect.isasyncgenfunction(fn) or inspect.isgeneratorfunction(fn) | |
start_time = int(round(time.time() * 1000)) | |
attempt_number = 1 | |
while True: | |
if self._before_attempts: | |
self._before_attempts(attempt_number) | |
try: | |
if is_generator: | |
result = list(fn(*args, **kwargs)) | |
else: | |
result = fn(*args, **kwargs) | |
attempt = Attempt(result, attempt_number, False) | |
except: | |
tb = sys.exc_info() | |
attempt = Attempt(tb, attempt_number, True) | |
if not self.should_reject(attempt): | |
if is_generator: | |
return self._yelded_data(attempt) | |
return attempt.get(self._wrap_exception) | |
if self._after_attempts: | |
self._after_attempts(attempt_number) | |
delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time | |
if self.stop(attempt_number, delay_since_first_attempt_ms): | |
if not self._wrap_exception and attempt.has_exception: | |
# get() on an attempt with an exception should cause it to be raised, but raise just in case | |
raise attempt.get() | |
else: | |
raise RetryError(attempt) | |
else: | |
sleep = self.wait(attempt_number, delay_since_first_attempt_ms) | |
if self._wait_jitter_max: | |
jitter = random.random() * self._wait_jitter_max | |
sleep = sleep + max(0, jitter) | |
time.sleep(sleep / 1000.0) | |
attempt_number += 1 | |
def call_async(self, fn, *args, **kwargs): | |
self._deterministic_offset = -1 | |
assert self._deterministic_generators | |
assert inspect.isasyncgenfunction(fn) or inspect.isgeneratorfunction(fn) | |
start_time = int(round(time.time() * 1000)) | |
attempt_number = 1 | |
while True: | |
if self._before_attempts: | |
self._before_attempts(attempt_number) | |
try: | |
result = yield from self._deterministic_generation(fn, *args, **kwargs) | |
attempt = Attempt(result, attempt_number, False) | |
except: | |
tb = sys.exc_info() | |
attempt = Attempt(tb, attempt_number, True) | |
if not self.should_reject(attempt): | |
return self._yelded_data(attempt) | |
if self._after_attempts: | |
self._after_attempts(attempt_number) | |
delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time | |
if self.stop(attempt_number, delay_since_first_attempt_ms): | |
if not self._wrap_exception and attempt.has_exception: | |
# get() on an attempt with an exception should cause it to be raised, but raise just in case | |
raise attempt.get() | |
else: | |
raise RetryError(attempt) | |
else: | |
sleep = self.wait(attempt_number, delay_since_first_attempt_ms) | |
if self._wait_jitter_max: | |
jitter = random.random() * self._wait_jitter_max | |
sleep = sleep + max(0, jitter) | |
time.sleep(sleep / 1000.0) | |
attempt_number += 1 | |
def _yelded_data(self, attempt): | |
yield from attempt.get(self._wrap_exception) | |
def _deterministic_generation(self, fn, *args, **kwargs): | |
for i, v in enumerate(fn(*args, **kwargs)): | |
if i <= self._deterministic_offset: | |
continue | |
yield v | |
self._deterministic_offset = i | |
class Attempt(object): | |
""" | |
An Attempt encapsulates a call to a target function that may end as a | |
normal return value from the function or an Exception depending on what | |
occurred during the execution. | |
""" | |
def __init__(self, value, attempt_number, has_exception): | |
self.value = value | |
self.attempt_number = attempt_number | |
self.has_exception = has_exception | |
def get(self, wrap_exception=False): | |
""" | |
Return the return value of this Attempt instance or raise an Exception. | |
If wrap_exception is true, this Attempt is wrapped inside of a | |
RetryError before being raised. | |
""" | |
if self.has_exception: | |
if wrap_exception: | |
raise RetryError(self) | |
else: | |
six.reraise(self.value[0], self.value[1], self.value[2]) | |
else: | |
return self.value | |
def __repr__(self): | |
if self.has_exception: | |
return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2]))) | |
else: | |
return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value) | |
class RetryError(Exception): | |
""" | |
A RetryError encapsulates the last Attempt instance right before giving up. | |
""" | |
def __init__(self, last_attempt): | |
self.last_attempt = last_attempt | |
def __str__(self): | |
return "RetryError[{0}]".format(self.last_attempt) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment