-
-
Save thehesiod/05298c1c89c7b5a38da0abc4ccbed7b7 to your computer and use it in GitHub Desktop.
import asyncio | |
import os | |
import time | |
import logging | |
from multiprocessing import Process, Queue | |
from queue import Empty | |
import botocore.session | |
from botocore.credentials import Credentials, CredentialResolver, CredentialProvider, AssumeRoleProvider | |
_original_create_credential_resolver = None | |
_current_credentials = None | |
_credential_queue = Queue(maxsize=1) | |
_credential_acquisition_process = None | |
_async_refresh_task = None | |
def _background_botocore_credential_acquisition() -> None: | |
""" | |
This loop gets the credentials from the default botocore credentials provider and passes them back to the main | |
process - botocore itself ensure it will refresh before the credentials are actually expired. We check at a higher | |
frequency than the window it uses to decide when we're too close to a token expirys | |
:return: | |
""" | |
global _current_credentials | |
session = botocore.session.get_session() | |
while True: | |
x = session.get_credentials() | |
credentials = Credentials(x.access_key, x.secret_key, token=x.token, method=x.method) | |
if _current_credentials != credentials: | |
_current_credentials = credentials | |
_credential_queue.put(credentials) | |
time.sleep(AssumeRoleProvider.EXPIRY_WINDOW_SECONDS / 5) | |
async def _credential_refresher() -> None: | |
""" | |
Checks the multiprocessing queue to see if the background process has provided a refreshed AWS credential. Must | |
use the non-blocking form to prevent this from taking CPU away from real work in the main process. | |
:return: | |
""" | |
global _current_credentials | |
while True: | |
try: | |
_current_credentials = _credential_queue.get(block=False) | |
except Empty: | |
pass | |
await asyncio.sleep(AssumeRoleProvider.EXPIRY_WINDOW_SECONDS / 5) | |
def start_background_botocore_credential_refresh() -> None: | |
""" | |
Starts a background process that will retrieve the real AWS credentials using the normal botocore credential | |
providers, and monkey patches the current process' credential provider to use the credentials provided. | |
This hack is needed to work around aiobotocore's lack of async credential provider support | |
:return: | |
""" | |
aws_profile = os.environ.get('AWS_PROFILE', None) | |
if aws_profile: | |
logging.getLogger().info('aws profile {} specified, starting aiobotocore credential refresh'.format(aws_profile)) | |
else: | |
logging.getLogger().info('aws profile not specified, no need to start aiobotocore credential refresh') | |
return | |
global _credential_acquisition_process | |
global _current_credentials | |
global _async_refresh_task | |
_credential_acquisition_process = Process(target=_background_botocore_credential_acquisition, daemon=True) | |
_credential_acquisition_process.start() | |
# We block synchronously so we can't leave this function without getting the first set of credentials | |
_current_credentials = _credential_queue.get(block=True) | |
# Replace the default credential resolver with one patched to return the asyncronously refreshed credentials | |
_original_create_credential_resolver = botocore.credentials.create_credential_resolver | |
def patched_create_credential_resolver(session, cache=None): | |
class BackgroundProvider(CredentialProvider): | |
def load(self): | |
return _current_credentials | |
return CredentialResolver(providers=[BackgroundProvider()]) | |
botocore.credentials.create_credential_resolver = patched_create_credential_resolver | |
# Start the background task that will keep | |
_async_refresh_task = asyncio.ensure_future(_credential_refresher()) |
credits to @alexmac for orig impl :)
Hi @thehesiod,
Thank you for all of your contributions to aioboto3/aiobotocore! I came across the same problems that are now well documented in aiobotocore #619.
Can you advise on how to use the botocore_credential_refresher.py
module? An example snippet?
This gets stuck on line 79 from above, essentially waiting (blocking) on an empty queue, which seems to never get the credentials put there.
_current_credentials = _credential_queue.get(block=True)
My test code:
import os
import logging
import asyncio
import aiobotocore
import botocore_assume_role
os.environ['AWS_PROFILE'] = 'dev1'
logging.basicConfig(level=logging.DEBUG)
async def main():
botocore_assume_role.start_background_botocore_credential_refresh() ## <<--- hangs forever here
async with aiobotocore.get_session().create_client('sts') as client:
response = await client.get_caller_identity()
print(response)
if __name__ == '__main__':
asyncio.run(main(), debug=True)
Thanks in advance!
@rrrix
most likely the helper process is dying/stuck. I suggest debugging it to see what's going on, or add try/except/printfs in there. IOW _background_botocore_credential_acquisition is dying. NOTE this isn't prod-ready code, there's a lot of prod'ification needed, logging, except handlers, etc
In My Case.
Anyway, Thanks a lot giving monkey patch code.
We can use it with EKS assume role issue now.
botocore: 1.13.14
aiobotocore: 0.11.0