Skip to content

Instantly share code, notes, and snippets.

@jcalvento
Last active April 27, 2023 21:54
Show Gist options
  • Save jcalvento/92861eb7ebda3fa064f3fbbb71acba41 to your computer and use it in GitHub Desktop.
Save jcalvento/92861eb7ebda3fa064f3fbbb71acba41 to your computer and use it in GitHub Desktop.
import json
import os
import webbrowser
from configparser import ConfigParser
from datetime import datetime, timedelta, timezone
from pathlib import Path
import boto3
import dateutil
from botocore import UNSIGNED
from botocore.config import Config
AWS_CONFIG_PATH = f"{Path.home()}/.aws/config"
AWS_CREDENTIAL_PATH = f"{Path.home()}/.aws/credentials"
AWS_SSO_CACHE_PATH = f"{Path.home()}/.aws/sso/cache"
def get_sso_cached_login():
file_paths = list_directory(AWS_SSO_CACHE_PATH)
for file_path in file_paths:
data = load_json(file_path)
if not (data.get("startUrl") and data.get("startUrl").startswith(sso_start_url)) or\
data.get("region") != aws_region or iso_time_now() > parse_timestamp(data["expiresAt"]):
continue
return data['accessToken']
raise ExpiredSSOCredentialsError("Current cached SSO login is expired or invalid")
def iso_time_now():
return datetime.now(timezone.utc)
def list_directory(path):
file_paths = []
if os.path.exists(path):
file_paths = Path(path).iterdir()
file_paths = sorted(file_paths, key=os.path.getmtime)
file_paths.reverse() # sort by recently updated
return [str(f) for f in file_paths]
def load_json(path):
try:
with open(path) as context:
return json.load(context)
except ValueError:
pass # ignore invalid json
def parse_timestamp(value):
return dateutil.parser.parse(value)
def read_config(path):
config = ConfigParser()
config.read(path)
return config
def write_config(path, config):
with open(path, "w") as destination:
config.write(destination)
def role_name(role_data):
return role_data['roleName']
def update_aws_credentials(new_credentials):
config = read_config(AWS_CREDENTIAL_PATH)
print("Updating credentials")
for profile_credential in new_credentials:
profile_name = profile_credential['accountName']
if config.has_section(profile_name):
config.remove_section(profile_name)
config.add_section(profile_name)
config.set(profile_name, "aws_access_key_id", profile_credential["accessKeyId"])
config.set(profile_name, "aws_secret_access_key ", profile_credential["secretAccessKey"])
config.set(profile_name, "aws_session_token", profile_credential["sessionToken"])
write_config(AWS_CREDENTIAL_PATH, config)
class ExpiredSSOCredentialsError(Exception):
pass
def fetch_access_token():
try:
return get_sso_cached_login()
except ExpiredSSOCredentialsError as error:
print(error)
print("Fetching credentials again")
return renew_access_token()
def renew_access_token():
client = boto3.client('sso-oidc')
client_name = 'aws-sso-script'
register_client_response = client.register_client(clientName=client_name, clientType='public')
client_id = register_client_response['clientId']
client_secret = register_client_response['clientSecret']
start_authorization_response = client.start_device_authorization(clientId=client_id, clientSecret=client_secret,
startUrl=sso_start_url)
device_code = start_authorization_response['deviceCode']
verification_uri = start_authorization_response['verificationUriComplete']
webbrowser.open(verification_uri, new=2)
input("Waiting for device authorization, once authorized Press [any key] to continue: ")
create_token_response = client.create_token(
clientId=client_id,
clientSecret=client_secret,
grantType='urn:ietf:params:oauth:grant-type:device_code',
deviceCode=device_code,
code=device_code
)
expiration_date = iso_time_now() + timedelta(0, create_token_response['expiresIn'])
expiration_date_iso = expiration_date.isoformat()
access_token = create_token_response['accessToken']
with open(f'{AWS_SSO_CACHE_PATH}/{client_name}.json', 'w') as cache_file:
cache_file.write(json.dumps({
'accessToken': access_token,
'expiresAt': expiration_date_iso,
'region': aws_region,
'startUrl': sso_start_url
}))
return access_token
def fetch_accouts_credentials():
credentials = []
client_config = Config(signature_version=UNSIGNED, region_name='us-east-1')
sso = boto3.client("sso", config=client_config)
paginator = sso.get_paginator('list_accounts')
results = paginator.paginate(accessToken=access_token)
print("Fetching accounts")
account_list = results.build_full_result()['accountList']
print("Available accounts: " + str(account_list))
for account in account_list:
sso_account_id = account['accountId']
sso_account_name = account['accountName'].replace("_", "-")
paginator = sso.get_paginator('list_account_roles')
results = paginator.paginate(
accountId=sso_account_id,
accessToken=access_token
)
role_list = results.build_full_result()['roleList']
role_list.sort(key=role_name)
print(f"Available roles for account: {sso_account_id} - {sso_account_name}: {role_list}")
role = role_list[0]
print(f"Fetching credentials for account: {sso_account_id} - {sso_account_name} and role: {role}")
role_credentials = sso.get_role_credentials(
roleName=role['roleName'], accountId=sso_account_id, accessToken=access_token
)['roleCredentials']
role_credentials['accountName'] = sso_account_name
credentials.append(role_credentials)
return credentials
'''
- You need botocore and boto3 with python3
- Exec this with python path/to/this/file.py
- I'll get default values for sso_region and sso_start_url from your ~/.aws/config file, you can overwrite it anyways when you run the script
- It updates ~/.aws/credentials will all credentials assigned in your SSO account
'''
if __name__ == '__main__':
aws_config = ConfigParser()
aws_config.read(AWS_CONFIG_PATH)
aws_default_config = dict(aws_config.items('default'))
sso_start_url = aws_default_config.get('sso_start_url', None)
aws_region = aws_default_config.get('sso_region', None)
sso_start_url = input(f"Insert SSO start url ['{sso_start_url}' as default]: ") or sso_start_url
aws_region = input(f"Insert AWS region ['{aws_default_config['sso_region']}' as default]: ") or aws_region
access_token = fetch_access_token()
credentials = fetch_accouts_credentials()
update_aws_credentials(credentials)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment