Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Created October 24, 2019 16:55
Show Gist options
  • Save thehesiod/05298c1c89c7b5a38da0abc4ccbed7b7 to your computer and use it in GitHub Desktop.
Save thehesiod/05298c1c89c7b5a38da0abc4ccbed7b7 to your computer and use it in GitHub Desktop.
import asyncio
import os
import time
import logging
from multiprocessing import Process, Queue
from queue import Empty
import botocore.session
from botocore.credentials import Credentials, CredentialResolver, CredentialProvider, AssumeRoleProvider
_original_create_credential_resolver = None
_current_credentials = None
_credential_queue = Queue(maxsize=1)
_credential_acquisition_process = None
_async_refresh_task = None
def _background_botocore_credential_acquisition() -> None:
"""
This loop gets the credentials from the default botocore credentials provider and passes them back to the main
process - botocore itself ensure it will refresh before the credentials are actually expired. We check at a higher
frequency than the window it uses to decide when we're too close to a token expirys
:return:
"""
global _current_credentials
session = botocore.session.get_session()
while True:
x = session.get_credentials()
credentials = Credentials(x.access_key, x.secret_key, token=x.token, method=x.method)
if _current_credentials != credentials:
_current_credentials = credentials
_credential_queue.put(credentials)
time.sleep(AssumeRoleProvider.EXPIRY_WINDOW_SECONDS / 5)
async def _credential_refresher() -> None:
"""
Checks the multiprocessing queue to see if the background process has provided a refreshed AWS credential. Must
use the non-blocking form to prevent this from taking CPU away from real work in the main process.
:return:
"""
global _current_credentials
while True:
try:
_current_credentials = _credential_queue.get(block=False)
except Empty:
pass
await asyncio.sleep(AssumeRoleProvider.EXPIRY_WINDOW_SECONDS / 5)
def start_background_botocore_credential_refresh() -> None:
"""
Starts a background process that will retrieve the real AWS credentials using the normal botocore credential
providers, and monkey patches the current process' credential provider to use the credentials provided.
This hack is needed to work around aiobotocore's lack of async credential provider support
:return:
"""
aws_profile = os.environ.get('AWS_PROFILE', None)
if aws_profile:
logging.getLogger().info('aws profile {} specified, starting aiobotocore credential refresh'.format(aws_profile))
else:
logging.getLogger().info('aws profile not specified, no need to start aiobotocore credential refresh')
return
global _credential_acquisition_process
global _current_credentials
global _async_refresh_task
_credential_acquisition_process = Process(target=_background_botocore_credential_acquisition, daemon=True)
_credential_acquisition_process.start()
# We block synchronously so we can't leave this function without getting the first set of credentials
_current_credentials = _credential_queue.get(block=True)
# Replace the default credential resolver with one patched to return the asyncronously refreshed credentials
_original_create_credential_resolver = botocore.credentials.create_credential_resolver
def patched_create_credential_resolver(session, cache=None):
class BackgroundProvider(CredentialProvider):
def load(self):
return _current_credentials
return CredentialResolver(providers=[BackgroundProvider()])
botocore.credentials.create_credential_resolver = patched_create_credential_resolver
# Start the background task that will keep
_async_refresh_task = asyncio.ensure_future(_credential_refresher())
@thehesiod
Copy link
Author

thehesiod commented Mar 11, 2020

@rrrix
most likely the helper process is dying/stuck. I suggest debugging it to see what's going on, or add try/except/printfs in there. IOW _background_botocore_credential_acquisition is dying. NOTE this isn't prod-ready code, there's a lot of prod'ification needed, logging, except handlers, etc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment