Created
June 3, 2018 18:44
-
-
Save orenshk/84b3760484b7330db4205c2de10f77ce to your computer and use it in GitHub Desktop.
AWS exponential back off with boto3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from botocore.exceptions import ClientError | |
class BotoBackoff(object): | |
""" | |
Wrap a client for an AWS service such that every call is backed by exponential backoff with jitter. | |
Examples: | |
>>> ecs = BotoBackoff('ecs') | |
>>> ecs.list_tasks(cluster='my-cluster') | |
Args: | |
service (str): Name of AWS Service to wrap. | |
min_sleep_time (float): The minimum amount of time to sleep in case of failure. | |
max_retries (int): The maximum amount of retries to perform. | |
""" | |
def __init__(self, service, min_sleep_time=1e-2, max_retries=15): | |
self._service = boto3.client(service) | |
self.min_sleep_time = min_sleep_time | |
self.max_retries = max_retries | |
self.logger = logging.getLogger(LOGGER_NAME) | |
def __getattr__(self, item): | |
fn = getattr(self._service, item) | |
if not callable(fn): | |
return fn | |
def call_with_backoff(**api_kwargs): | |
num_retries = 0 | |
while True: | |
try: | |
self.logger.debug('BotoBackoff Calling {}'.format(fn)) | |
return fn(**api_kwargs) | |
except ClientError as err: | |
if "Rate exceeded" in err.args[0]: | |
# if we hit the retry limit, we'll go to sleep for a bit then try again. | |
# the number of retries determines our sleep time. This thread will sleep for | |
# min_sleep_time * random.randint(1, 2 ** num_retries), up to at most | |
# min_sleep_time * max_retries. | |
# After max_retries, we can't give up, so we scale back the number of retries by a random int | |
# to avoid collision with other threads. | |
num_retries += 1 | |
if num_retries > self.max_retries: | |
num_retries = random.randint(1, self.max_retries) | |
sleep_time = self.min_sleep_time * random.randint(1, 2 ** num_retries) | |
self.logger.debug("{} Hit retry limit, sleeping for {} seconds".format(item, sleep_time)) | |
self.logger.debug("arguments: {}".format(json.dumps(api_kwargs, indent=4, separators=(',', ': ')))) | |
self.logger.error(err) | |
time.sleep(sleep_time) | |
else: | |
# let the caller handle every other error. | |
raise | |
return call_with_backoff | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
when the rate is exceeded, is there a way we can capture the revised payload UnprocessedItems or do we need to retry writing the whole payload again?