Created
May 20, 2025 03:22
-
-
Save dagrz/8feacc8ad1a1018cd8921da00098c0ad to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
AWS Identity Collector: Extract Trust and Privilege Data Across Accounts | |
This script collects identity and access metadata from one or more AWS accounts, | |
including IAM roles, IAM users, SSO (AWS IAM Identity Center) users, and their policies. | |
It’s used as a precursor for analyzing trust relationships and admin-equivalent access | |
across AWS environments. | |
It supports both the granular method (individual API calls) and a bulk method | |
(using `get_account_authorization_details`). All outputs are saved as structured JSON | |
to local directories for use in graphing or auditing tools. | |
Features: | |
- Extract trust policies and attached/inline policies for IAM roles and users | |
- Identify group memberships and permission set assignments for SSO users | |
- Download all AWS-managed policies for reference | |
- Saves data in `roles/`, `users/`, `sso-users/`, and `policies/` directories | |
Usage: | |
./get-identities.py [--profiles prof1,prof2] [--skip-auth-details] | |
Options: | |
-p, --profiles Comma-separated list of AWS CLI profiles to use. | |
If not provided, uses the default profile. | |
-s, --skip-auth-details Uses individual IAM API calls instead of | |
`get_account_authorization_details`. Required if | |
your account disables that API. | |
Output: | |
- `roles/{account_id}/*.json`: Trust policy and permissions for each IAM role. | |
- `users/{account_id}/*.json`: Group memberships and permissions for each IAM user. | |
- `sso-users/{account_id}/*.json`: Detailed SSO user access data per region. | |
- `managed-policies/*.json`: Cached AWS-managed policy documents. | |
Recommended Use: | |
Run this before using the trust-graph analysis tool (`trust.py`) to provide | |
full context for privilege and trust graphing. | |
Written by: Daniel Grzelak (@dagrz on X, [email protected]) | |
For more cloud security tools and research, visit: https://www.plerion.com | |
""" | |
import argparse | |
import boto3 | |
import json | |
import os | |
import re | |
import requests | |
from urllib.parse import urljoin | |
def main(args): | |
"""Main entry point: processes each AWS profile to collect identity information.""" | |
profiles = get_profiles(args) | |
ensure_base_directories() | |
# Download managed policies from the first profile | |
if profiles: | |
session = create_session(profiles[0]) | |
print("Downloading AWS managed policies...") | |
download_managed_policies(session) | |
for profile in profiles: | |
session = create_session(profile) | |
account_id = session.client('sts').get_caller_identity().get('Account') | |
if not args.skip_auth_details: | |
print(f"Processing account {account_id} using authorization details API...") | |
process_account_auth_details(session, account_id) | |
else: | |
print(f"Processing account {account_id} using individual API calls...") | |
process_account(profile) | |
def get_profiles(args): | |
"""Returns list of AWS profiles to process, defaulting to empty string for default profile.""" | |
if args.profiles: | |
return args.profiles.split(',') | |
return [''] | |
def ensure_base_directories(): | |
"""Creates base directories for storing role, user, and SSO user data.""" | |
for directory in ['roles', 'users', 'sso-users', 'policies']: | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
def process_account(profile): | |
"""Processes a single AWS account: collects roles, users, and SSO users.""" | |
session = create_session(profile) | |
iam_client = session.client('iam') | |
account_id = session.client('sts').get_caller_identity().get('Account') | |
account_directories = ensure_account_directories(account_id) | |
process_account_roles(iam_client, account_directories['roles']) | |
process_account_users(iam_client, account_directories['users']) | |
process_account_sso_users(session, account_directories['sso-users']) | |
def create_session(profile): | |
"""Creates a boto3 session, optionally using a specific AWS profile.""" | |
if profile: | |
return boto3.Session(profile_name=profile) | |
return boto3.Session() | |
def ensure_account_directories(account_id): | |
"""Creates account-specific directories for storing identity data.""" | |
directories = { | |
'roles': os.path.join('roles', account_id), | |
'users': os.path.join('users', account_id), | |
'sso-users': os.path.join('sso-users', account_id), | |
} | |
for directory in directories.values(): | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
return directories | |
def process_account_roles(iam_client, roles_dir): | |
"""Processes all IAM roles in an account, collecting their policies and trust relationships.""" | |
try: | |
roles = iam_client.list_roles() | |
for role in roles['Roles']: | |
process_role(iam_client, role['RoleName'], roles_dir) | |
except Exception as e: | |
print(f"Error processing roles: {e}") | |
return | |
def process_account_users(iam_client, users_dir): | |
"""Processes all IAM users in an account, collecting their policies and group memberships.""" | |
try: | |
users = iam_client.list_users() | |
for user in users['Users']: | |
process_user(iam_client, user['UserName'], users_dir) | |
except Exception as e: | |
print(f"Error processing users: {e}") | |
return | |
def process_role(iam_client, role_name, account_dir): | |
"""Collects and saves detailed information about a single IAM role.""" | |
try: | |
print(f"Role: {role_name}") | |
role_details = iam_client.get_role(RoleName=role_name) | |
trust_policy = role_details['Role']['AssumeRolePolicyDocument'] | |
role_arn = role_details['Role']['Arn'] | |
inline_policies = iam_client.list_role_policies(RoleName=role_name)['PolicyNames'] | |
inline_policy_docs = {} | |
for policy_name in inline_policies: | |
policy_doc = iam_client.get_role_policy(RoleName=role_name, PolicyName=policy_name)['PolicyDocument'] | |
inline_policy_docs[policy_name] = policy_doc | |
attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)['AttachedPolicies'] | |
attached_policy_arns = [policy['PolicyArn'] for policy in attached_policies] | |
role_data = { | |
"roleName": role_name, | |
"roleArn": role_arn, | |
"trustPolicy": trust_policy, | |
"inlinePolicies": inline_policy_docs, | |
"attachedPolicies": attached_policy_arns | |
} | |
safe_role_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', role_name) | |
file_path = os.path.join(account_dir, f"{safe_role_name}.json") | |
with open(file_path, 'w') as f: | |
json.dump(role_data, f, indent=4) | |
except Exception as e: | |
print(f"Error processing role {role_name}: {e}") | |
def process_user(iam_client, user_name, account_dir): | |
"""Collects and saves detailed information about a single IAM user.""" | |
try: | |
print(f"User: {user_name}") | |
user_details = iam_client.get_user(UserName=user_name) | |
user_arn = user_details['User']['Arn'] | |
inline_policies = iam_client.list_user_policies(UserName=user_name)['PolicyNames'] | |
inline_policy_docs = {} | |
for policy_name in inline_policies: | |
policy_doc = iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name)['PolicyDocument'] | |
inline_policy_docs[policy_name] = policy_doc | |
attached_policies = iam_client.list_attached_user_policies(UserName=user_name)['AttachedPolicies'] | |
attached_policy_arns = [policy['PolicyArn'] for policy in attached_policies] | |
groups = iam_client.list_groups_for_user(UserName=user_name)['Groups'] | |
group_names = [group['GroupName'] for group in groups] | |
user_data = { | |
"userName": user_name, | |
"userArn": user_arn, | |
"groups": group_names, | |
"inlinePolicies": inline_policy_docs, | |
"attachedPolicies": attached_policy_arns | |
} | |
safe_user_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', user_name) | |
file_path = os.path.join(account_dir, f"{safe_user_name}.json") | |
with open(file_path, 'w') as f: | |
json.dump(user_data, f, indent=4) | |
except Exception as e: | |
print(f"Error processing user {user_name}: {e}") | |
def process_account_sso_users(session, sso_users_dir): | |
""" | |
Process SSO users across all regions. AWS SSO (Identity Center) can be configured in any region, | |
so we need to check all regions to find where it's set up. We start with us-east-1 as it's | |
commonly used for global services. | |
""" | |
# Use default regions instead of trying to list all regions | |
regions = ['ap-southeast-2', 'us-east-1'] | |
for region in regions: | |
try: | |
sso_client = session.client('sso-admin', region_name=region) | |
instances = sso_client.list_instances() | |
if instances['Instances']: | |
process_account_region_sso_users(session, region, sso_users_dir) | |
except Exception as e: | |
print(f"Error processing SSO in region {region}: {e}") | |
continue | |
def process_account_region_sso_users(session, region, sso_users_dir): | |
""" | |
Process SSO users in a specific region. This involves several steps: | |
1. Get the SSO instance (each account can have one SSO instance per region) | |
2. Get all permission sets (these are like IAM roles but for SSO) | |
3. Get all users from the identity store | |
4. For each user, get their group memberships and permission set assignments | |
""" | |
try: | |
sso_client = session.client('sso-admin', region_name=region) | |
account_id = session.client('sts').get_caller_identity().get('Account') | |
instances = sso_client.list_instances() | |
if not instances['Instances']: | |
print(f"No SSO instances found in {region}") | |
return | |
instance_arn = instances['Instances'][0]['InstanceArn'] | |
identity_store_id = instances['Instances'][0]['IdentityStoreId'] | |
# Get the SAML provider ARN for this SSO instance | |
iam_client = session.client('iam') | |
saml_providers = iam_client.list_saml_providers()['SAMLProviderList'] | |
saml_provider_arn = next( | |
(provider['Arn'] for provider in saml_providers | |
if 'AWSSSO' in provider['Arn'] and account_id in provider['Arn']), | |
None | |
) | |
identity_store = session.client('identitystore', region_name=region) | |
# Get all permission sets first - these are like IAM roles but for SSO | |
# We need these to check which ones are assigned to users/groups | |
permission_sets = {} | |
paginator = sso_client.get_paginator('list_permission_sets') | |
for page in paginator.paginate(InstanceArn=instance_arn): | |
for permission_set_arn in page['PermissionSets']: | |
try: | |
details = sso_client.describe_permission_set( | |
InstanceArn=instance_arn, | |
PermissionSetArn=permission_set_arn | |
)['PermissionSet'] | |
permission_sets[permission_set_arn] = details['Name'] | |
except Exception as e: | |
print(f"Error getting permission set details: {e}") | |
# Get all users from the identity store and process each one | |
try: | |
paginator = identity_store.get_paginator('list_users') | |
for page in paginator.paginate(IdentityStoreId=identity_store_id): | |
for user in page['Users']: | |
try: | |
process_identity_center_user( | |
identity_store, | |
sso_client, | |
instance_arn, | |
identity_store_id, | |
user, | |
permission_sets, | |
sso_users_dir, | |
account_id, | |
region, | |
saml_provider_arn | |
) | |
except Exception as e: | |
print(f"Error processing user {user.get('UserName')}: {e}") | |
except Exception as e: | |
print(f"Error listing users: {e}") | |
except Exception as e: | |
print(f"Error processing region {region}: {e}") | |
def process_identity_center_user(identity_store, sso_client, instance_arn, identity_store_id, user, permission_sets, sso_users_dir, account_id, region, saml_provider_arn): | |
""" | |
Process a single SSO user. This involves: | |
1. Getting the user's group memberships (SSO groups are separate from IAM groups) | |
2. For each permission set, checking if the user or any of their groups have access | |
3. Collecting all inline and managed policies from the assigned permission sets | |
4. Saving the user's complete access information to a JSON file | |
""" | |
print(f'SSO user: {user.get("UserName")}') | |
try: | |
user_id = user['UserId'] | |
groups = [] | |
group_ids = [] | |
# Get all groups the user belongs to in SSO | |
# SSO groups are separate from IAM groups and are managed through the identity store | |
paginator = identity_store.get_paginator('list_group_memberships_for_member') | |
for page in paginator.paginate(IdentityStoreId=identity_store_id, MemberId={'UserId': user_id}): | |
for membership in page['GroupMemberships']: | |
try: | |
group = identity_store.describe_group( | |
IdentityStoreId=identity_store_id, | |
GroupId=membership['GroupId'] | |
) | |
groups.append(group['DisplayName']) | |
group_ids.append(membership['GroupId']) | |
except Exception as e: | |
print(f"Error getting group details: {e}") | |
inline_policies = {} | |
attached_policies = [] | |
# Check each permission set to see if the user or their groups have access | |
for permission_set_arn in permission_sets: | |
try: | |
assignments = sso_client.list_account_assignments( | |
InstanceArn=instance_arn, | |
AccountId=account_id, | |
PermissionSetArn=permission_set_arn | |
) | |
# A user can get access to a permission set either directly or through group membership | |
has_permission_set = any( | |
(a.get('PrincipalId') == user_id and a.get('PrincipalType') == 'USER') or | |
(a.get('PrincipalType') == 'GROUP' and a.get('PrincipalId') in group_ids) | |
for a in assignments.get('AccountAssignments', []) | |
) | |
if has_permission_set: | |
# Get managed policies attached to the permission set | |
policies = sso_client.list_managed_policies_in_permission_set( | |
InstanceArn=instance_arn, | |
PermissionSetArn=permission_set_arn | |
).get('AttachedManagedPolicies', []) | |
attached_policies.extend([p['Arn'] for p in policies]) | |
# Get inline policy for the permission set | |
inline_policy = sso_client.get_inline_policy_for_permission_set( | |
InstanceArn=instance_arn, | |
PermissionSetArn=permission_set_arn | |
).get('InlinePolicy') | |
if inline_policy: | |
inline_policies[permission_sets[permission_set_arn]] = json.loads(inline_policy) | |
except Exception as e: | |
print(f"Error processing permission set: {e}") | |
# Compile all the user's access information | |
user_data = { | |
"userName": user['UserName'], | |
"userId": user_id, | |
"email": next((attr['Value'] for attr in user.get('Emails', []) if attr.get('Primary')), None), | |
"groups": groups, | |
"inlinePolicies": inline_policies, | |
"attachedPolicies": attached_policies, | |
"region": region, | |
"samlProviderArn": saml_provider_arn | |
} | |
safe_user_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', user['UserName']) | |
file_path = os.path.join(sso_users_dir, f"{safe_user_name}-{region}.json") | |
with open(file_path, 'w') as f: | |
json.dump(user_data, f, indent=4) | |
except Exception as e: | |
print(f"Error processing user {user.get('UserName')}: {e}") | |
def get_custom_policy(iam_client, policy_arn): | |
"""Gets the policy document for a custom IAM policy if it's not AWS managed.""" | |
try: | |
# Skip AWS managed policies | |
if policy_arn.startswith('arn:aws:iam::aws:'): | |
return None | |
# Get policy details | |
policy = iam_client.get_policy(PolicyArn=policy_arn) | |
policy_version = iam_client.get_policy_version( | |
PolicyArn=policy_arn, | |
VersionId=policy['Policy']['DefaultVersionId'] | |
) | |
return { | |
"policyName": policy['Policy']['PolicyName'], | |
"policyDocument": policy_version['PolicyVersion']['Document'] | |
} | |
except Exception as e: | |
print(f"Error getting policy {policy_arn}: {e}") | |
return None | |
def process_role_from_auth_details(role, roles_dir, iam_client): | |
"""Process a role from authorization details API response.""" | |
try: | |
role_name = role['RoleName'] | |
print(f"Role: {role_name}") | |
# Get custom policies | |
customer_policies = {} | |
for policy in role.get('AttachedManagedPolicies', []): | |
policy_details = get_custom_policy(iam_client, policy['PolicyArn']) | |
if policy_details: | |
customer_policies[policy_details['policyName']] = policy_details['policyDocument'] | |
role_data = { | |
"roleName": role_name, | |
"roleArn": role['Arn'], | |
"trustPolicy": role.get('AssumeRolePolicyDocument', {}), | |
"inlinePolicies": {p['PolicyName']: p['PolicyDocument'] for p in role.get('RolePolicyList', [])}, | |
"attachedPolicies": [p['PolicyArn'] for p in role.get('AttachedManagedPolicies', [])], | |
"customerPolicies": customer_policies | |
} | |
safe_role_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', role_name) | |
file_path = os.path.join(roles_dir, f"{safe_role_name}.json") | |
with open(file_path, 'w') as f: | |
json.dump(role_data, f, indent=4) | |
except Exception as e: | |
print(f"Error processing role {role.get('RoleName', 'unknown')}: {e}") | |
def process_user_from_auth_details(user, users_dir, iam_client): | |
"""Process a user from authorization details API response.""" | |
try: | |
user_name = user['UserName'] | |
print(f"User: {user_name}") | |
# Get custom policies | |
customer_policies = {} | |
for policy in user.get('AttachedManagedPolicies', []): | |
policy_details = get_custom_policy(iam_client, policy['PolicyArn']) | |
if policy_details: | |
customer_policies[policy_details['policyName']] = policy_details['policyDocument'] | |
user_data = { | |
"userName": user_name, | |
"userArn": user['Arn'], | |
"groups": [g['GroupName'] for g in user.get('GroupList', [])], | |
"inlinePolicies": {p['PolicyName']: p['PolicyDocument'] for p in user.get('UserPolicyList', [])}, | |
"attachedPolicies": [p['PolicyArn'] for p in user.get('AttachedManagedPolicies', [])], | |
"customerPolicies": customer_policies | |
} | |
safe_user_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', user_name) | |
file_path = os.path.join(users_dir, f"{safe_user_name}.json") | |
with open(file_path, 'w') as f: | |
json.dump(user_data, f, indent=4) | |
except Exception as e: | |
print(f"Error processing user {user.get('UserName', 'unknown')}: {e}") | |
def process_account_auth_details(session, account_id): | |
"""Process account using get_account_authorization_details API for a comprehensive view of IAM entities.""" | |
try: | |
iam_client = session.client('iam') | |
paginator = iam_client.get_paginator('get_account_authorization_details') | |
# Create directories for the account | |
account_directories = ensure_account_directories(account_id) | |
# Process all entities from the API | |
for page in paginator.paginate(): | |
# Process roles | |
for role in page.get('RoleDetailList', []): | |
process_role_from_auth_details(role, account_directories['roles'], iam_client) | |
# Process users | |
for user in page.get('UserDetailList', []): | |
process_user_from_auth_details(user, account_directories['users'], iam_client) | |
# Still need to process SSO users separately as they're not included in auth details | |
process_account_sso_users(session, account_directories['sso-users']) | |
except Exception as e: | |
print(f"Error processing authorization details: {e}") | |
def download_managed_policies(session): | |
"""Downloads managed policies from AWS.""" | |
try: | |
# Create managed policies directory if it doesn't exist | |
managed_policies_dir = 'managed-policies' | |
if not os.path.exists(managed_policies_dir): | |
os.makedirs(managed_policies_dir) | |
iam_client = session.client('iam') | |
# Get all managed policies | |
paginator = iam_client.get_paginator('list_policies') | |
for page in paginator.paginate(Scope='AWS'): | |
for policy in page['Policies']: | |
try: | |
policy_arn = policy['Arn'] | |
policy_name = policy['PolicyName'] | |
# Check if policy already exists | |
file_path = os.path.join(managed_policies_dir, f"{policy_name}.json") | |
if os.path.exists(file_path): | |
print(f"Skipping existing policy: {policy_name}") | |
continue | |
# Get the policy document | |
policy_version = iam_client.get_policy_version( | |
PolicyArn=policy_arn, | |
VersionId=policy['DefaultVersionId'] | |
) | |
# Save the policy file | |
with open(file_path, 'w') as f: | |
json.dump(policy_version['PolicyVersion']['Document'], f, indent=4) | |
print(f"Downloaded policy: {policy_name}") | |
except Exception as e: | |
print(f"Error processing policy {policy.get('PolicyName', 'unknown')}: {e}") | |
except Exception as e: | |
print(f"Error downloading managed policies: {e}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Download role trust policies for multiple AWS accounts.") | |
parser.add_argument('-p', '--profiles', type=str, help='Comma-separated list of AWS profile names') | |
parser.add_argument('-s', '--skip-auth-details', action='store_true', help='Skip using get_authorization_details API and use individual API calls instead') | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment