Skip to content

Instantly share code, notes, and snippets.

@huynhbaoan
Created November 1, 2024 12:15
Show Gist options
  • Save huynhbaoan/68161828ef661bb155b66a469cbf7114 to your computer and use it in GitHub Desktop.
Save huynhbaoan/68161828ef661bb155b66a469cbf7114 to your computer and use it in GitHub Desktop.
import requests
import boto3
import getpass
import base64
import os
import csv
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
import sys
from urllib3.exceptions import InsecureRequestWarning
import argparse
from typing import Dict
# Suppress SSL warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Environment configurations for SAML authentication
ENV_CONFIGS = {
'np': {
'account_id': '803264201466',
'account_name': 'servicesnp',
'role_name': 'AUR-Resource-AWS-servicesnp-InfraAnalysis',
'environment': 'nonprod'
},
'preprod': {
'account_id': '150897553596',
'account_name': 'servicespreprod',
'role_name': 'AUR-Resource-AWS-servicespreprod-InfraAnalysis',
'environment': 'preprod'
},
'prod': {
'account_id': '522412867873',
'account_name': 'servicesprod',
'role_name': 'AUR-Resource-AWS-servicesprod-InfraAnalysis',
'environment': 'prod'
}
}
def load_account_mappings(csv_file: str, target_environment: str) -> Dict[str, Dict[str, str]]:
"""Load account mappings from CSV file, filtering by target environment."""
account_mappings = {}
try:
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
# Check if required columns exist
required_columns = {'Account ID', 'Environment', 'Description', 'URL'}
if not required_columns.issubset(reader.fieldnames):
missing = required_columns - set(reader.fieldnames)
sys.exit(f"CSV file is missing required columns: {', '.join(missing)}")
for row in reader:
if row['Environment'] == target_environment:
account_mappings[row['Environment']] = {
'account_id': row['Account ID'],
'account_name': row['Environment'],
'env_type': row['Description'],
'url': row['URL'],
'switch_role_arn': f"arn:aws:iam::{row['Account ID']}:role/HIPViewOnlyRole"
}
except Exception as e:
sys.exit(f"Error reading CSV file: {str(e)}")
return account_mappings
def get_session(account_name, account_id, quiet=False):
"""Initialize a requests session with the IDP"""
# Construct the IDP URL
idp_url = f"https://idp.CDNXHG.com.au/noWebtop/CDNXHG-{account_name}/api/res?id=/Common/AWS-IdP"
# Add sn and acc parameters
sn = account_name.replace('aws-', '')
idp_url = f"{idp_url}&sn={sn}&acc={account_id}"
if not quiet:
print(f"IDP URL: {idp_url}")
# Initiate session handler
session = requests.Session()
try:
response = session.get(idp_url, verify=False, timeout=10)
except requests.exceptions.ConnectionError as err:
sys.exit(f"Failed to connect to {idp_url} " + repr(err))
except requests.exceptions.TooManyRedirects as err:
sys.exit(f"Too many redirects from {idp_url} " + repr(err))
except requests.exceptions.Timeout as err:
sys.exit(f"Connect timeout from {idp_url} " + repr(err))
if response.url == "https://idp.CDNXHG.com.au/vdesk/hangup.php3":
sys.exit('ERROR: Redirected to hangup URL. Invalid account name or ID.')
if response.url != "https://idp.CDNXHG.com.au/my.policy":
sys.exit(f'ERROR: Unexpected redirect URL: {response.url}')
return session, response
def authenticate(session, username, password, mfa_token=None, quiet=False):
"""Handle authentication with the IDP"""
try:
payload = {
'username': username,
'password': password,
'domain': 'aur.national.com.au',
'vhost': 'standard'
}
if mfa_token:
payload['token'] = mfa_token
try:
auth_response = session.post(
'https://idp.CDNXHG.com.au/my.policy',
data=payload,
verify=False,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36'
},
allow_redirects=True
)
auth_response.raise_for_status()
except requests.exceptions.ConnectionError as e:
if 'Connection aborted.' in str(e):
print("IDP closed the connection prematurely - trying again")
else:
raise
soup = BeautifulSoup(auth_response.text, 'html.parser')
assertion = soup.find('input', {'name': 'SAMLResponse'})
if assertion is not None:
return assertion.get('value')
if 'Session Expired/Timeout' in auth_response.text:
sys.exit("Session expired or timed out")
if 'Authentication failed' in auth_response.text:
sys.exit("Authentication failed - check your credentials")
sys.exit("No SAML assertion found in response")
except Exception as e:
sys.exit(f"Authentication error: {str(e)}")
def select_role(roles, target_role_name):
"""Select the appropriate role from available roles"""
print("\nAvailable roles:")
selected_index = None
for i, role in enumerate(roles):
print(f"[{i}]: {role}")
if target_role_name in role:
selected_index = i
if selected_index is not None:
print(f"\nAuto-selecting role: {roles[selected_index]}")
return roles[selected_index]
else:
print(f"\nWARNING: Target role '{target_role_name}' not found!")
while True:
try:
selection = int(input("Please select a role manually [0-{}]: ".format(len(roles)-1)))
if 0 <= selection < len(roles):
return roles[selection]
else:
print("Invalid selection. Try again.")
except ValueError:
print("Please enter a valid number.")
def write_credentials(profile_name: str, credentials: Dict):
"""Write AWS credentials to file while preserving other profiles"""
credentials_path = os.path.expanduser("~/.aws/credentials")
os.makedirs(os.path.dirname(credentials_path), exist_ok=True)
# Read existing credentials file
existing_credentials = {}
current_profile = None
if os.path.exists(credentials_path):
try:
with open(credentials_path, 'r') as f:
for line in f:
line = line.strip()
if line: # Skip empty lines
if line.startswith('[') and line.endswith(']'):
current_profile = line[1:-1]
existing_credentials[current_profile] = []
elif current_profile:
existing_credentials[current_profile].append(line)
except Exception as e:
print(f"Warning: Could not read existing credentials file: {str(e)}")
# Update or add new profile with account_name as the profile name
existing_credentials[profile_name] = [
"region = ap-southeast-2",
f"aws_access_key_id = {credentials['AccessKeyId']}",
f"aws_secret_access_key = {credentials['SecretAccessKey']}",
f"aws_session_token = {credentials['SessionToken']}",
f"aws_security_token = {credentials['SessionToken']}" # Same as session token
]
# Write back all profiles
try:
with open(credentials_path, 'w') as f:
for profile, lines in existing_credentials.items():
f.write(f"[{profile}]\n")
for line in lines:
f.write(f"{line}\n")
f.write("\n") # Add empty line between profiles
except Exception as e:
sys.exit(f"Error writing credentials file: {str(e)}")
def switch_role(source_profile: str, target_role_arn: str, session_name: str) -> Dict:
"""Assume a role using existing credentials"""
try:
# Use the session with `source_profile` to create an STS client
session = boto3.Session(profile_name=source_profile)
sts_client = session.client('sts')
# Assume the target role
response = sts_client.assume_role(
RoleArn=target_role_arn,
RoleSessionName=session_name
)
return response['Credentials']
except Exception as e:
raise Exception(f"Failed to switch role: {str(e)}")
def main():
parser = argparse.ArgumentParser(description='AWS SAML Authentication and Role Switching Tool')
parser.add_argument('service_env', choices=['np', 'preprod', 'prod'],
help='Service environment to authenticate with SAML')
parser.add_argument('--csv', required=True,
help='Path to CSV file containing account mappings')
parser.add_argument('--username', '-u', help='Username (optional, defaults to $USER)')
parser.add_argument('--quiet', '-q', action='store_true', help='Minimize output')
parser.add_argument('--mfa-token', help='MFA Token (optional)')
args = parser.parse_args()
print("CDNXHG HIS AWS CLI Access Tool")
print("===========================\n")
# Determine target environment for filtering accounts in CSV
env_config = ENV_CONFIGS[args.service_env]
target_environment = env_config['environment']
# Load and validate CSV content, filtering by target environment
print("\nLoading and validating CSV content...")
account_mappings = load_account_mappings(args.csv, target_environment)
if not account_mappings:
sys.exit(f"No accounts found in the CSV file for environment: {target_environment}. Please check the file and try again.")
# Get username from environment variable, command line arg, or prompt
username = args.username or os.environ.get('USER') or input("Enter username: ")
password = getpass.getpass("Password: ")
mfa_token = args.mfa_token or input("MFA Token (Optional): [Press enter to skip] ")
# First authenticate to service account using SAML
try:
print(f"\nAuthenticating to {env_config['account_name']}")
session, response = get_session(env_config['account_name'],
env_config['account_id'],
args.quiet)
saml_response = authenticate(session, username, password, mfa_token, args.quiet)
# Decode SAML response and get roles
assertion = base64.b64decode(saml_response)
root = ET.fromstring(assertion)
roles = []
for attr in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
if attr.get('Name') == 'https://aws.amazon.com/SAML/Attributes/Role':
for value in attr.iter('{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'):
roles.append(value.text)
# Select appropriate role
selected_role = select_role(roles, env_config['role_name'])
# Split role ARN and principal ARN
role_arn, principal_arn = selected_role.split(',')
# Assume role with SAML for the primary service account
sts_client = boto3.client('sts')
response = sts_client.assume_role_with_saml(
RoleArn=role_arn,
PrincipalArn=principal_arn,
SAMLAssertion=saml_response
)
# Write service account credentials
write_credentials(env_config['account_name'], response['Credentials'])
print(f"\nCredentials for {env_config['account_name']} updated successfully!")
# Switch roles for each account in the CSV using the initial SAML-authenticated credentials
print("\nSwitching roles based on CSV content...")
for account_name, account_config in account_mappings.items():
try:
print(f"\nSwitching to role in account: {account_name} ({account_config['account_name']})")
# Assume the HIPViewOnlyRole role for each landing zone account
credentials = switch_role(
source_profile=env_config['account_name'],
target_role_arn=account_config['switch_role_arn'],
session_name=f"{username}-session"
)
# Write the credentials with human-readable profile names
write_credentials(account_config['account_name'], credentials)
print(f"Successfully switched to role in {account_name}")
print(f"To use these credentials: export AWS_PROFILE={account_name}")
print(f"Credentials expire: {credentials['Expiration']}")
except Exception as e:
print(f"Error switching to account {account_name}: {str(e)}")
print("Continuing with next account...")
continue
except Exception as e:
sys.exit(f"Error in SAML authentication: {str(e)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment