Last active
April 27, 2023 21:54
-
-
Save jcalvento/92861eb7ebda3fa064f3fbbb71acba41 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import webbrowser | |
from configparser import ConfigParser | |
from datetime import datetime, timedelta, timezone | |
from pathlib import Path | |
import boto3 | |
import dateutil | |
from botocore import UNSIGNED | |
from botocore.config import Config | |
AWS_CONFIG_PATH = f"{Path.home()}/.aws/config" | |
AWS_CREDENTIAL_PATH = f"{Path.home()}/.aws/credentials" | |
AWS_SSO_CACHE_PATH = f"{Path.home()}/.aws/sso/cache" | |
def get_sso_cached_login(): | |
file_paths = list_directory(AWS_SSO_CACHE_PATH) | |
for file_path in file_paths: | |
data = load_json(file_path) | |
if not (data.get("startUrl") and data.get("startUrl").startswith(sso_start_url)) or\ | |
data.get("region") != aws_region or iso_time_now() > parse_timestamp(data["expiresAt"]): | |
continue | |
return data['accessToken'] | |
raise ExpiredSSOCredentialsError("Current cached SSO login is expired or invalid") | |
def iso_time_now(): | |
return datetime.now(timezone.utc) | |
def list_directory(path): | |
file_paths = [] | |
if os.path.exists(path): | |
file_paths = Path(path).iterdir() | |
file_paths = sorted(file_paths, key=os.path.getmtime) | |
file_paths.reverse() # sort by recently updated | |
return [str(f) for f in file_paths] | |
def load_json(path): | |
try: | |
with open(path) as context: | |
return json.load(context) | |
except ValueError: | |
pass # ignore invalid json | |
def parse_timestamp(value): | |
return dateutil.parser.parse(value) | |
def read_config(path): | |
config = ConfigParser() | |
config.read(path) | |
return config | |
def write_config(path, config): | |
with open(path, "w") as destination: | |
config.write(destination) | |
def role_name(role_data): | |
return role_data['roleName'] | |
def update_aws_credentials(new_credentials): | |
config = read_config(AWS_CREDENTIAL_PATH) | |
print("Updating credentials") | |
for profile_credential in new_credentials: | |
profile_name = profile_credential['accountName'] | |
if config.has_section(profile_name): | |
config.remove_section(profile_name) | |
config.add_section(profile_name) | |
config.set(profile_name, "aws_access_key_id", profile_credential["accessKeyId"]) | |
config.set(profile_name, "aws_secret_access_key ", profile_credential["secretAccessKey"]) | |
config.set(profile_name, "aws_session_token", profile_credential["sessionToken"]) | |
write_config(AWS_CREDENTIAL_PATH, config) | |
class ExpiredSSOCredentialsError(Exception): | |
pass | |
def fetch_access_token(): | |
try: | |
return get_sso_cached_login() | |
except ExpiredSSOCredentialsError as error: | |
print(error) | |
print("Fetching credentials again") | |
return renew_access_token() | |
def renew_access_token(): | |
client = boto3.client('sso-oidc') | |
client_name = 'aws-sso-script' | |
register_client_response = client.register_client(clientName=client_name, clientType='public') | |
client_id = register_client_response['clientId'] | |
client_secret = register_client_response['clientSecret'] | |
start_authorization_response = client.start_device_authorization(clientId=client_id, clientSecret=client_secret, | |
startUrl=sso_start_url) | |
device_code = start_authorization_response['deviceCode'] | |
verification_uri = start_authorization_response['verificationUriComplete'] | |
webbrowser.open(verification_uri, new=2) | |
input("Waiting for device authorization, once authorized Press [any key] to continue: ") | |
create_token_response = client.create_token( | |
clientId=client_id, | |
clientSecret=client_secret, | |
grantType='urn:ietf:params:oauth:grant-type:device_code', | |
deviceCode=device_code, | |
code=device_code | |
) | |
expiration_date = iso_time_now() + timedelta(0, create_token_response['expiresIn']) | |
expiration_date_iso = expiration_date.isoformat() | |
access_token = create_token_response['accessToken'] | |
with open(f'{AWS_SSO_CACHE_PATH}/{client_name}.json', 'w') as cache_file: | |
cache_file.write(json.dumps({ | |
'accessToken': access_token, | |
'expiresAt': expiration_date_iso, | |
'region': aws_region, | |
'startUrl': sso_start_url | |
})) | |
return access_token | |
def fetch_accouts_credentials(): | |
credentials = [] | |
client_config = Config(signature_version=UNSIGNED, region_name='us-east-1') | |
sso = boto3.client("sso", config=client_config) | |
paginator = sso.get_paginator('list_accounts') | |
results = paginator.paginate(accessToken=access_token) | |
print("Fetching accounts") | |
account_list = results.build_full_result()['accountList'] | |
print("Available accounts: " + str(account_list)) | |
for account in account_list: | |
sso_account_id = account['accountId'] | |
sso_account_name = account['accountName'].replace("_", "-") | |
paginator = sso.get_paginator('list_account_roles') | |
results = paginator.paginate( | |
accountId=sso_account_id, | |
accessToken=access_token | |
) | |
role_list = results.build_full_result()['roleList'] | |
role_list.sort(key=role_name) | |
print(f"Available roles for account: {sso_account_id} - {sso_account_name}: {role_list}") | |
role = role_list[0] | |
print(f"Fetching credentials for account: {sso_account_id} - {sso_account_name} and role: {role}") | |
role_credentials = sso.get_role_credentials( | |
roleName=role['roleName'], accountId=sso_account_id, accessToken=access_token | |
)['roleCredentials'] | |
role_credentials['accountName'] = sso_account_name | |
credentials.append(role_credentials) | |
return credentials | |
''' | |
- You need botocore and boto3 with python3 | |
- Exec this with python path/to/this/file.py | |
- I'll get default values for sso_region and sso_start_url from your ~/.aws/config file, you can overwrite it anyways when you run the script | |
- It updates ~/.aws/credentials will all credentials assigned in your SSO account | |
''' | |
if __name__ == '__main__': | |
aws_config = ConfigParser() | |
aws_config.read(AWS_CONFIG_PATH) | |
aws_default_config = dict(aws_config.items('default')) | |
sso_start_url = aws_default_config.get('sso_start_url', None) | |
aws_region = aws_default_config.get('sso_region', None) | |
sso_start_url = input(f"Insert SSO start url ['{sso_start_url}' as default]: ") or sso_start_url | |
aws_region = input(f"Insert AWS region ['{aws_default_config['sso_region']}' as default]: ") or aws_region | |
access_token = fetch_access_token() | |
credentials = fetch_accouts_credentials() | |
update_aws_credentials(credentials) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment