Skip to content

Instantly share code, notes, and snippets.

@dagrz
Created May 20, 2025 03:22
Show Gist options
  • Save dagrz/8feacc8ad1a1018cd8921da00098c0ad to your computer and use it in GitHub Desktop.
Save dagrz/8feacc8ad1a1018cd8921da00098c0ad to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
AWS Identity Collector: Extract Trust and Privilege Data Across Accounts
This script collects identity and access metadata from one or more AWS accounts,
including IAM roles, IAM users, SSO (AWS IAM Identity Center) users, and their policies.
It’s used as a precursor for analyzing trust relationships and admin-equivalent access
across AWS environments.
It supports both the granular method (individual API calls) and a bulk method
(using `get_account_authorization_details`). All outputs are saved as structured JSON
to local directories for use in graphing or auditing tools.
Features:
- Extract trust policies and attached/inline policies for IAM roles and users
- Identify group memberships and permission set assignments for SSO users
- Download all AWS-managed policies for reference
- Saves data in `roles/`, `users/`, `sso-users/`, and `policies/` directories
Usage:
./get-identities.py [--profiles prof1,prof2] [--skip-auth-details]
Options:
-p, --profiles Comma-separated list of AWS CLI profiles to use.
If not provided, uses the default profile.
-s, --skip-auth-details Uses individual IAM API calls instead of
`get_account_authorization_details`. Required if
your account disables that API.
Output:
- `roles/{account_id}/*.json`: Trust policy and permissions for each IAM role.
- `users/{account_id}/*.json`: Group memberships and permissions for each IAM user.
- `sso-users/{account_id}/*.json`: Detailed SSO user access data per region.
- `managed-policies/*.json`: Cached AWS-managed policy documents.
Recommended Use:
Run this before using the trust-graph analysis tool (`trust.py`) to provide
full context for privilege and trust graphing.
Written by: Daniel Grzelak (@dagrz on X, [email protected])
For more cloud security tools and research, visit: https://www.plerion.com
"""
import argparse
import boto3
import json
import os
import re
import requests
from urllib.parse import urljoin
def main(args):
"""Main entry point: processes each AWS profile to collect identity information."""
profiles = get_profiles(args)
ensure_base_directories()
# Download managed policies from the first profile
if profiles:
session = create_session(profiles[0])
print("Downloading AWS managed policies...")
download_managed_policies(session)
for profile in profiles:
session = create_session(profile)
account_id = session.client('sts').get_caller_identity().get('Account')
if not args.skip_auth_details:
print(f"Processing account {account_id} using authorization details API...")
process_account_auth_details(session, account_id)
else:
print(f"Processing account {account_id} using individual API calls...")
process_account(profile)
def get_profiles(args):
"""Returns list of AWS profiles to process, defaulting to empty string for default profile."""
if args.profiles:
return args.profiles.split(',')
return ['']
def ensure_base_directories():
"""Creates base directories for storing role, user, and SSO user data."""
for directory in ['roles', 'users', 'sso-users', 'policies']:
if not os.path.exists(directory):
os.makedirs(directory)
def process_account(profile):
"""Processes a single AWS account: collects roles, users, and SSO users."""
session = create_session(profile)
iam_client = session.client('iam')
account_id = session.client('sts').get_caller_identity().get('Account')
account_directories = ensure_account_directories(account_id)
process_account_roles(iam_client, account_directories['roles'])
process_account_users(iam_client, account_directories['users'])
process_account_sso_users(session, account_directories['sso-users'])
def create_session(profile):
"""Creates a boto3 session, optionally using a specific AWS profile."""
if profile:
return boto3.Session(profile_name=profile)
return boto3.Session()
def ensure_account_directories(account_id):
"""Creates account-specific directories for storing identity data."""
directories = {
'roles': os.path.join('roles', account_id),
'users': os.path.join('users', account_id),
'sso-users': os.path.join('sso-users', account_id),
}
for directory in directories.values():
if not os.path.exists(directory):
os.makedirs(directory)
return directories
def process_account_roles(iam_client, roles_dir):
"""Processes all IAM roles in an account, collecting their policies and trust relationships."""
try:
roles = iam_client.list_roles()
for role in roles['Roles']:
process_role(iam_client, role['RoleName'], roles_dir)
except Exception as e:
print(f"Error processing roles: {e}")
return
def process_account_users(iam_client, users_dir):
"""Processes all IAM users in an account, collecting their policies and group memberships."""
try:
users = iam_client.list_users()
for user in users['Users']:
process_user(iam_client, user['UserName'], users_dir)
except Exception as e:
print(f"Error processing users: {e}")
return
def process_role(iam_client, role_name, account_dir):
"""Collects and saves detailed information about a single IAM role."""
try:
print(f"Role: {role_name}")
role_details = iam_client.get_role(RoleName=role_name)
trust_policy = role_details['Role']['AssumeRolePolicyDocument']
role_arn = role_details['Role']['Arn']
inline_policies = iam_client.list_role_policies(RoleName=role_name)['PolicyNames']
inline_policy_docs = {}
for policy_name in inline_policies:
policy_doc = iam_client.get_role_policy(RoleName=role_name, PolicyName=policy_name)['PolicyDocument']
inline_policy_docs[policy_name] = policy_doc
attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)['AttachedPolicies']
attached_policy_arns = [policy['PolicyArn'] for policy in attached_policies]
role_data = {
"roleName": role_name,
"roleArn": role_arn,
"trustPolicy": trust_policy,
"inlinePolicies": inline_policy_docs,
"attachedPolicies": attached_policy_arns
}
safe_role_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', role_name)
file_path = os.path.join(account_dir, f"{safe_role_name}.json")
with open(file_path, 'w') as f:
json.dump(role_data, f, indent=4)
except Exception as e:
print(f"Error processing role {role_name}: {e}")
def process_user(iam_client, user_name, account_dir):
"""Collects and saves detailed information about a single IAM user."""
try:
print(f"User: {user_name}")
user_details = iam_client.get_user(UserName=user_name)
user_arn = user_details['User']['Arn']
inline_policies = iam_client.list_user_policies(UserName=user_name)['PolicyNames']
inline_policy_docs = {}
for policy_name in inline_policies:
policy_doc = iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name)['PolicyDocument']
inline_policy_docs[policy_name] = policy_doc
attached_policies = iam_client.list_attached_user_policies(UserName=user_name)['AttachedPolicies']
attached_policy_arns = [policy['PolicyArn'] for policy in attached_policies]
groups = iam_client.list_groups_for_user(UserName=user_name)['Groups']
group_names = [group['GroupName'] for group in groups]
user_data = {
"userName": user_name,
"userArn": user_arn,
"groups": group_names,
"inlinePolicies": inline_policy_docs,
"attachedPolicies": attached_policy_arns
}
safe_user_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', user_name)
file_path = os.path.join(account_dir, f"{safe_user_name}.json")
with open(file_path, 'w') as f:
json.dump(user_data, f, indent=4)
except Exception as e:
print(f"Error processing user {user_name}: {e}")
def process_account_sso_users(session, sso_users_dir):
"""
Process SSO users across all regions. AWS SSO (Identity Center) can be configured in any region,
so we need to check all regions to find where it's set up. We start with us-east-1 as it's
commonly used for global services.
"""
# Use default regions instead of trying to list all regions
regions = ['ap-southeast-2', 'us-east-1']
for region in regions:
try:
sso_client = session.client('sso-admin', region_name=region)
instances = sso_client.list_instances()
if instances['Instances']:
process_account_region_sso_users(session, region, sso_users_dir)
except Exception as e:
print(f"Error processing SSO in region {region}: {e}")
continue
def process_account_region_sso_users(session, region, sso_users_dir):
"""
Process SSO users in a specific region. This involves several steps:
1. Get the SSO instance (each account can have one SSO instance per region)
2. Get all permission sets (these are like IAM roles but for SSO)
3. Get all users from the identity store
4. For each user, get their group memberships and permission set assignments
"""
try:
sso_client = session.client('sso-admin', region_name=region)
account_id = session.client('sts').get_caller_identity().get('Account')
instances = sso_client.list_instances()
if not instances['Instances']:
print(f"No SSO instances found in {region}")
return
instance_arn = instances['Instances'][0]['InstanceArn']
identity_store_id = instances['Instances'][0]['IdentityStoreId']
# Get the SAML provider ARN for this SSO instance
iam_client = session.client('iam')
saml_providers = iam_client.list_saml_providers()['SAMLProviderList']
saml_provider_arn = next(
(provider['Arn'] for provider in saml_providers
if 'AWSSSO' in provider['Arn'] and account_id in provider['Arn']),
None
)
identity_store = session.client('identitystore', region_name=region)
# Get all permission sets first - these are like IAM roles but for SSO
# We need these to check which ones are assigned to users/groups
permission_sets = {}
paginator = sso_client.get_paginator('list_permission_sets')
for page in paginator.paginate(InstanceArn=instance_arn):
for permission_set_arn in page['PermissionSets']:
try:
details = sso_client.describe_permission_set(
InstanceArn=instance_arn,
PermissionSetArn=permission_set_arn
)['PermissionSet']
permission_sets[permission_set_arn] = details['Name']
except Exception as e:
print(f"Error getting permission set details: {e}")
# Get all users from the identity store and process each one
try:
paginator = identity_store.get_paginator('list_users')
for page in paginator.paginate(IdentityStoreId=identity_store_id):
for user in page['Users']:
try:
process_identity_center_user(
identity_store,
sso_client,
instance_arn,
identity_store_id,
user,
permission_sets,
sso_users_dir,
account_id,
region,
saml_provider_arn
)
except Exception as e:
print(f"Error processing user {user.get('UserName')}: {e}")
except Exception as e:
print(f"Error listing users: {e}")
except Exception as e:
print(f"Error processing region {region}: {e}")
def process_identity_center_user(identity_store, sso_client, instance_arn, identity_store_id, user, permission_sets, sso_users_dir, account_id, region, saml_provider_arn):
"""
Process a single SSO user. This involves:
1. Getting the user's group memberships (SSO groups are separate from IAM groups)
2. For each permission set, checking if the user or any of their groups have access
3. Collecting all inline and managed policies from the assigned permission sets
4. Saving the user's complete access information to a JSON file
"""
print(f'SSO user: {user.get("UserName")}')
try:
user_id = user['UserId']
groups = []
group_ids = []
# Get all groups the user belongs to in SSO
# SSO groups are separate from IAM groups and are managed through the identity store
paginator = identity_store.get_paginator('list_group_memberships_for_member')
for page in paginator.paginate(IdentityStoreId=identity_store_id, MemberId={'UserId': user_id}):
for membership in page['GroupMemberships']:
try:
group = identity_store.describe_group(
IdentityStoreId=identity_store_id,
GroupId=membership['GroupId']
)
groups.append(group['DisplayName'])
group_ids.append(membership['GroupId'])
except Exception as e:
print(f"Error getting group details: {e}")
inline_policies = {}
attached_policies = []
# Check each permission set to see if the user or their groups have access
for permission_set_arn in permission_sets:
try:
assignments = sso_client.list_account_assignments(
InstanceArn=instance_arn,
AccountId=account_id,
PermissionSetArn=permission_set_arn
)
# A user can get access to a permission set either directly or through group membership
has_permission_set = any(
(a.get('PrincipalId') == user_id and a.get('PrincipalType') == 'USER') or
(a.get('PrincipalType') == 'GROUP' and a.get('PrincipalId') in group_ids)
for a in assignments.get('AccountAssignments', [])
)
if has_permission_set:
# Get managed policies attached to the permission set
policies = sso_client.list_managed_policies_in_permission_set(
InstanceArn=instance_arn,
PermissionSetArn=permission_set_arn
).get('AttachedManagedPolicies', [])
attached_policies.extend([p['Arn'] for p in policies])
# Get inline policy for the permission set
inline_policy = sso_client.get_inline_policy_for_permission_set(
InstanceArn=instance_arn,
PermissionSetArn=permission_set_arn
).get('InlinePolicy')
if inline_policy:
inline_policies[permission_sets[permission_set_arn]] = json.loads(inline_policy)
except Exception as e:
print(f"Error processing permission set: {e}")
# Compile all the user's access information
user_data = {
"userName": user['UserName'],
"userId": user_id,
"email": next((attr['Value'] for attr in user.get('Emails', []) if attr.get('Primary')), None),
"groups": groups,
"inlinePolicies": inline_policies,
"attachedPolicies": attached_policies,
"region": region,
"samlProviderArn": saml_provider_arn
}
safe_user_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', user['UserName'])
file_path = os.path.join(sso_users_dir, f"{safe_user_name}-{region}.json")
with open(file_path, 'w') as f:
json.dump(user_data, f, indent=4)
except Exception as e:
print(f"Error processing user {user.get('UserName')}: {e}")
def get_custom_policy(iam_client, policy_arn):
"""Gets the policy document for a custom IAM policy if it's not AWS managed."""
try:
# Skip AWS managed policies
if policy_arn.startswith('arn:aws:iam::aws:'):
return None
# Get policy details
policy = iam_client.get_policy(PolicyArn=policy_arn)
policy_version = iam_client.get_policy_version(
PolicyArn=policy_arn,
VersionId=policy['Policy']['DefaultVersionId']
)
return {
"policyName": policy['Policy']['PolicyName'],
"policyDocument": policy_version['PolicyVersion']['Document']
}
except Exception as e:
print(f"Error getting policy {policy_arn}: {e}")
return None
def process_role_from_auth_details(role, roles_dir, iam_client):
"""Process a role from authorization details API response."""
try:
role_name = role['RoleName']
print(f"Role: {role_name}")
# Get custom policies
customer_policies = {}
for policy in role.get('AttachedManagedPolicies', []):
policy_details = get_custom_policy(iam_client, policy['PolicyArn'])
if policy_details:
customer_policies[policy_details['policyName']] = policy_details['policyDocument']
role_data = {
"roleName": role_name,
"roleArn": role['Arn'],
"trustPolicy": role.get('AssumeRolePolicyDocument', {}),
"inlinePolicies": {p['PolicyName']: p['PolicyDocument'] for p in role.get('RolePolicyList', [])},
"attachedPolicies": [p['PolicyArn'] for p in role.get('AttachedManagedPolicies', [])],
"customerPolicies": customer_policies
}
safe_role_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', role_name)
file_path = os.path.join(roles_dir, f"{safe_role_name}.json")
with open(file_path, 'w') as f:
json.dump(role_data, f, indent=4)
except Exception as e:
print(f"Error processing role {role.get('RoleName', 'unknown')}: {e}")
def process_user_from_auth_details(user, users_dir, iam_client):
"""Process a user from authorization details API response."""
try:
user_name = user['UserName']
print(f"User: {user_name}")
# Get custom policies
customer_policies = {}
for policy in user.get('AttachedManagedPolicies', []):
policy_details = get_custom_policy(iam_client, policy['PolicyArn'])
if policy_details:
customer_policies[policy_details['policyName']] = policy_details['policyDocument']
user_data = {
"userName": user_name,
"userArn": user['Arn'],
"groups": [g['GroupName'] for g in user.get('GroupList', [])],
"inlinePolicies": {p['PolicyName']: p['PolicyDocument'] for p in user.get('UserPolicyList', [])},
"attachedPolicies": [p['PolicyArn'] for p in user.get('AttachedManagedPolicies', [])],
"customerPolicies": customer_policies
}
safe_user_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', user_name)
file_path = os.path.join(users_dir, f"{safe_user_name}.json")
with open(file_path, 'w') as f:
json.dump(user_data, f, indent=4)
except Exception as e:
print(f"Error processing user {user.get('UserName', 'unknown')}: {e}")
def process_account_auth_details(session, account_id):
"""Process account using get_account_authorization_details API for a comprehensive view of IAM entities."""
try:
iam_client = session.client('iam')
paginator = iam_client.get_paginator('get_account_authorization_details')
# Create directories for the account
account_directories = ensure_account_directories(account_id)
# Process all entities from the API
for page in paginator.paginate():
# Process roles
for role in page.get('RoleDetailList', []):
process_role_from_auth_details(role, account_directories['roles'], iam_client)
# Process users
for user in page.get('UserDetailList', []):
process_user_from_auth_details(user, account_directories['users'], iam_client)
# Still need to process SSO users separately as they're not included in auth details
process_account_sso_users(session, account_directories['sso-users'])
except Exception as e:
print(f"Error processing authorization details: {e}")
def download_managed_policies(session):
"""Downloads managed policies from AWS."""
try:
# Create managed policies directory if it doesn't exist
managed_policies_dir = 'managed-policies'
if not os.path.exists(managed_policies_dir):
os.makedirs(managed_policies_dir)
iam_client = session.client('iam')
# Get all managed policies
paginator = iam_client.get_paginator('list_policies')
for page in paginator.paginate(Scope='AWS'):
for policy in page['Policies']:
try:
policy_arn = policy['Arn']
policy_name = policy['PolicyName']
# Check if policy already exists
file_path = os.path.join(managed_policies_dir, f"{policy_name}.json")
if os.path.exists(file_path):
print(f"Skipping existing policy: {policy_name}")
continue
# Get the policy document
policy_version = iam_client.get_policy_version(
PolicyArn=policy_arn,
VersionId=policy['DefaultVersionId']
)
# Save the policy file
with open(file_path, 'w') as f:
json.dump(policy_version['PolicyVersion']['Document'], f, indent=4)
print(f"Downloaded policy: {policy_name}")
except Exception as e:
print(f"Error processing policy {policy.get('PolicyName', 'unknown')}: {e}")
except Exception as e:
print(f"Error downloading managed policies: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download role trust policies for multiple AWS accounts.")
parser.add_argument('-p', '--profiles', type=str, help='Comma-separated list of AWS profile names')
parser.add_argument('-s', '--skip-auth-details', action='store_true', help='Skip using get_authorization_details API and use individual API calls instead')
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment