Created
May 20, 2025 03:18
-
-
Save dagrz/0efd3e94ac74aa3f3cd78e213abbed08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
AWS Trust Graph Analyzer | |
This script analyzes AWS IAM trust relationships across accounts, roles, users, and SSO principals. | |
It builds a graph of `sts:AssumeRole` relationships, identifies roles with admin-level privileges, | |
and traces trust chains to help you understand who really has root-equivalent access in your AWS environment. | |
The tool supports multiple output formats, including: | |
- A tab-indented trust tree (default: trust-tree.txt) | |
- A Mermaid diagram for Markdown-based visualization (default: mermaid.md) | |
- An optional interactive graph using matplotlib (--visualize) | |
Usage: | |
./trust.py [--visualize] [--require-assume-role] | |
[--tree-file trust-tree.txt] | |
[--mermaid-file mermaid.md] | |
Options: | |
--visualize Launches an interactive matplotlib visualization of the trust graph. | |
--require-assume-role Filters out principals that don’t have AssumeRole permissions, | |
even if they’re listed in trust policies. | |
--tree-file FILE Path to output trust tree text file. Default: trust-tree.txt | |
--mermaid-file FILE Path to output Mermaid Markdown diagram. Default: mermaid.md | |
Outputs: | |
- Prints all admin-like roles and roles with world-level trust. | |
- Prompts you to select one and then outputs a narrowed trust chain to the selected role. | |
- Outputs results in plaintext, Mermaid, and optionally interactive form. | |
Written by: Daniel Grzelak (@dagrz on X, [email protected]) | |
For more cloud security tools and research, visit: https://www.plerion.com | |
""" | |
import argparse | |
import os | |
import json | |
import re | |
import random | |
import networkx as nx | |
import matplotlib.pyplot as plt | |
# AWS-managed policies that grant full admin or IAM control privileges | |
ADMIN_POLICIES = {'arn:aws:iam::aws:policy/AdministratorAccess', 'arn:aws:iam::aws:policy/IAMFullAccess'} | |
# Policies that include sts:AssumeRole permissions, often used in automation or service roles | |
ASSUME_ROLE_POLICIES = [ | |
'arn:aws:iam::aws:policy/AdministratorAccess-Amplify', | |
'arn:aws:iam::aws:policy/AWSControlTowerServiceRolePolicy', | |
'arn:aws:iam::aws:policy/AmplifyBackendDeployFullAccess', | |
'arn:aws:iam::aws:policy/AdministratorAccess', | |
'arn:aws:iam::aws:policy/AWSElasticDisasterRecoveryEc2InstancePolicy', | |
'arn:aws:iam::aws:policy/SageMakerStudioProjectUserRolePermissionsBoundary', | |
'arn:aws:iam::aws:policy/AWSServiceRoleForImageBuilder', | |
'arn:aws:iam::aws:policy/AWS-SSM-RemediationAutomation-OperationalAccountAdministrationRolePolicy', | |
'arn:aws:iam::aws:policy/AWS-SSM-RemediationAutomation-AdministrationRolePolicy', | |
'arn:aws:iam::aws:policy/SageMakerStudioEMRInstanceRolePolicy', | |
'arn:aws:iam::aws:policy/CloudFormationStackSetsOrgAdminServiceRolePolicy', | |
'arn:aws:iam::aws:policy/CloudWatch-CrossAccountAccess', | |
'arn:aws:iam::aws:policy/AWS-SSM-DiagnosisAutomation-AdministrationRolePolicy', | |
'arn:aws:iam::aws:policy/AWSElasticDisasterRecoveryRecoveryInstancePolicy', | |
'arn:aws:iam::aws:policy/AWS-SSM-DiagnosisAutomation-OperationalAccountAdministrationRolePolicy', | |
'arn:aws:iam::aws:policy/SageMakerStudioProjectUserRolePolicy', | |
] | |
def parse_trust_policy(trust_policy): | |
"""Parse a trust policy and return a list of trusted principals.""" | |
trusted = [] | |
if not trust_policy or 'Statement' not in trust_policy: | |
return trusted | |
for statement in trust_policy['Statement']: | |
if statement.get('Effect') != 'Allow': | |
continue | |
# Handle both string and list actions | |
actions = statement.get('Action', []) | |
if isinstance(actions, str): | |
actions = [actions] | |
# Check if any of the actions are assume role actions | |
if not any(action in ['sts:AssumeRole', 'sts:AssumeRoleWithSAML', 'sts:AssumeRoleWithWebIdentity'] for action in actions): | |
continue | |
principal = statement.get('Principal', {}) | |
# Handle AWS principals | |
if 'AWS' in principal: | |
aws_principals = principal['AWS'] | |
if isinstance(aws_principals, str): | |
aws_principals = [aws_principals] | |
for p in aws_principals: | |
if p == '*': | |
trusted.append(('world', '*')) | |
elif re.match(r'^A[0-9A-Z]{20}$', p) or re.match(r'^[0-9a-f]{64}$', p): | |
trusted.append(('canonical', p)) | |
else: | |
trusted.append(('aws', p)) | |
# Handle Service principals | |
if 'Service' in principal: | |
service_principals = principal['Service'] | |
if isinstance(service_principals, str): | |
service_principals = [service_principals] | |
trusted.extend(('service', p) for p in service_principals) | |
# Handle Federated principals (both SAML and OIDC) | |
if 'Federated' in principal: | |
federated = principal['Federated'] | |
if isinstance(federated, str): | |
federated = [federated] | |
for p in federated: | |
if ':saml-provider/' in p: | |
# This is a SAML provider | |
trusted.append(('saml', p)) | |
else: | |
# Other types of federated principals | |
trusted.append(('federated', p)) | |
# Handle Canonical IDs | |
if 'CanonicalUser' in principal: | |
canonical = principal['CanonicalUser'] | |
if isinstance(canonical, str): | |
canonical = [canonical] | |
trusted.extend(('canonical', c) for c in canonical) | |
return trusted | |
def is_admin_principal(principal_data): | |
"""Check if a principal (role or user) has admin privileges.""" | |
# Check attached policies | |
for policy in principal_data.get('attachedPolicies', []): | |
if policy in ADMIN_POLICIES: | |
return True | |
# Check both inline and customer policies | |
for policy_type in ['inlinePolicies', 'customerPolicies']: | |
for policy_name, policy in principal_data.get(policy_type, {}).items(): | |
if not isinstance(policy, dict): | |
continue | |
# Handle both array and single object statements | |
statements = policy.get('Statement', []) | |
if isinstance(statements, dict): | |
statements = [statements] | |
elif not isinstance(statements, list): | |
continue | |
for statement in statements: | |
if not isinstance(statement, dict): | |
continue | |
# Check for wildcard permissions | |
if (statement.get('Effect') == 'Allow' and | |
statement.get('Action') == '*' and | |
statement.get('Resource') == '*'): | |
return True | |
# Check for IAM wildcard permissions | |
actions = statement.get('Action', []) | |
if isinstance(actions, str): | |
actions = [actions] | |
if 'iam:*' in actions and statement.get('Effect') == 'Allow': | |
return True | |
return False | |
def has_assume_role_permissions(principal_data): | |
"""Check if a principal (role or user) has permissions to assume roles.""" | |
# Check attached policies against ASSUME_ROLE_POLICIES | |
for policy in principal_data.get('attachedPolicies', []): | |
if policy in ASSUME_ROLE_POLICIES: | |
return True | |
# Check both inline and customer policies | |
for policy_type in ['inlinePolicies', 'customerPolicies']: | |
for policy_name, policy in principal_data.get(policy_type, {}).items(): | |
if not isinstance(policy, dict): | |
continue | |
# Handle both array and single object statements | |
statements = policy.get('Statement', []) | |
if isinstance(statements, dict): | |
statements = [statements] | |
elif not isinstance(statements, list): | |
continue | |
for statement in statements: | |
if not isinstance(statement, dict): | |
continue | |
if statement.get('Effect') != 'Allow': | |
continue | |
# Check for assume role permissions | |
actions = statement.get('Action', []) | |
if isinstance(actions, str): | |
actions = [actions] | |
if any(action in ['*', 'sts:*', 'sts:AssumeRole'] for action in actions): | |
return True | |
return False | |
def has_world_trust(role_data): | |
"""Check if a role trusts the world (can be assumed by anyone).""" | |
trust_policy = role_data.get('trustPolicy', {}) | |
if not isinstance(trust_policy, dict): | |
return False | |
for statement in trust_policy.get('Statement', []): | |
if statement.get('Effect') != 'Allow': | |
continue | |
# Check if the action is sts:AssumeRole | |
actions = statement.get('Action', []) | |
if isinstance(actions, str): | |
actions = [actions] | |
if not any(action in ['sts:AssumeRole', 'sts:AssumeRoleWithSAML'] for action in actions): | |
continue | |
principal = statement.get('Principal', {}) | |
if 'AWS' in principal: | |
aws_principals = principal['AWS'] | |
if isinstance(aws_principals, str): | |
aws_principals = [aws_principals] | |
if '*' in aws_principals: | |
return True | |
return False | |
def find_admin_roles(principal_data_cache): | |
"""Find all roles with admin privileges.""" | |
admin_roles = [] | |
for role_arn, role_data in principal_data_cache.items(): | |
if is_admin_principal(role_data): | |
admin_roles.append(role_arn) | |
return admin_roles | |
def find_world_trust_roles(principal_data_cache): | |
"""Find all roles that can be assumed by anyone.""" | |
world_trust_roles = [] | |
for role_arn, role_data in principal_data_cache.items(): | |
if has_world_trust(role_data): | |
world_trust_roles.append(role_arn) | |
return world_trust_roles | |
def build_trust_graph(require_assume_role=False): | |
"""Build a NetworkX graph representing trust relationships.""" | |
G = nx.DiGraph() | |
# Cache for principal data | |
principal_data_cache = {} | |
# First pass: collect all principals in each account and cache their data | |
account_principals = {} # account_id -> set of principals | |
# Walk through roles and users directories | |
for principal_type_dir in ['roles', 'users']: | |
if not os.path.exists(principal_type_dir): | |
continue | |
for account_dir in os.listdir(principal_type_dir): | |
account_path = os.path.join(principal_type_dir, account_dir) | |
if not os.path.isdir(account_path): | |
continue | |
# Initialize set for this account if not already done | |
if account_dir not in account_principals: | |
account_principals[account_dir] = set() | |
# Add account node first | |
account_node = f"account:{account_dir}" | |
G.add_node(account_node, type='account', account=account_dir) | |
for principal_file in os.listdir(account_path): | |
if not principal_file.endswith('.json'): | |
continue | |
principal_path = os.path.join(account_path, principal_file) | |
try: | |
with open(principal_path, 'r') as f: | |
principal_data = json.load(f) | |
# Skip if principal_data is not a dictionary | |
if not isinstance(principal_data, dict): | |
print(f"Skipping {principal_path}: Invalid JSON format - not a dictionary") | |
continue | |
principal_arn = principal_data.get('roleArn') if principal_type_dir == 'roles' else principal_data.get('userArn') | |
if not principal_arn: | |
print(f"Skipping {principal_path}: No {principal_type_dir[:-1]}Arn found") | |
continue | |
# Add principal to account's principals and cache its data | |
account_principals[account_dir].add(principal_arn) | |
principal_data_cache[principal_arn] = principal_data | |
# Add principal node with original data | |
is_admin = is_admin_principal(principal_data) | |
G.add_node(principal_arn, | |
type='role' if principal_type_dir == 'roles' else 'user', | |
account=account_dir, | |
admin=is_admin, | |
original_data=principal_data) | |
# Parse trust policy (empty for users) | |
trust_policy = principal_data.get('trustPolicy', {}) if principal_type_dir == 'roles' else {} | |
if not isinstance(trust_policy, dict): | |
print(f"Warning: trustPolicy in {principal_path} is not a dictionary: {type(trust_policy)}") | |
trust_policy = {} | |
trusted_principals = parse_trust_policy(trust_policy) | |
for principal_type, principal in trusted_principals: | |
# Handle different types of trust relationships | |
if principal_type == 'world': | |
# Add world node if it doesn't exist | |
world_node = '*' | |
if world_node not in G: | |
G.add_node(world_node, type='world', account='000000000000') | |
G.add_edge(world_node, principal_arn, type='world_trust') | |
elif principal_type == 'aws': | |
if principal.endswith(':root'): | |
# Account-wide trust | |
account_id = principal.split(':')[4] | |
account_node = f"account:{account_id}" | |
# Add the account node if it doesn't exist | |
if account_node not in G: | |
G.add_node(account_node, type='account', account=account_id) | |
G.add_edge(account_node, principal_arn, type='account_trust') | |
else: | |
# Direct principal trust - only add if principal can assume roles | |
try: | |
# Try to find the principal's policy file | |
parts = principal.split(':') | |
if len(parts) < 6: | |
print(f"Warning: Invalid ARN format for principal {principal}") | |
continue | |
principal_account = parts[4] | |
principal_type = 'user' if ':user/' in principal else 'role' | |
principal_name = principal.split('/')[-1] | |
# Check both roles and users directories | |
policy_path = None | |
if principal_type == 'role': | |
policy_path = os.path.join('roles', principal_account, f"{principal_name}.json") | |
else: | |
policy_path = os.path.join('users', f"{principal_name}.json") | |
if os.path.exists(policy_path): | |
with open(policy_path, 'r') as f: | |
principal_data = json.load(f) | |
# Cache the principal data | |
principal_data_cache[principal] = principal_data | |
# If require_assume_role is True, check if the principal has assume role permissions | |
if not require_assume_role or has_assume_role_permissions(principal_data): | |
G.add_edge(principal, principal_arn, type='direct_trust') | |
except Exception as e: | |
print(f"Error checking principal {principal}: {e}") | |
elif principal_type == 'service': | |
# Add service node if it doesn't exist | |
if principal not in G: | |
G.add_node(principal, type='service', account='000000000000') | |
G.add_edge(principal, principal_arn, type='service_trust') | |
elif principal_type == 'saml': | |
# Add federated node if it doesn't exist | |
if principal not in G: | |
# Extract account ID from SAML provider ARN | |
# Format: arn:aws:iam::<account_id>:saml-provider/<provider_name> | |
try: | |
saml_account_id = principal.split(':')[4] | |
G.add_node(principal, type='saml', account=saml_account_id) | |
except IndexError: | |
# If we can't parse the ARN, use the default account | |
G.add_node(principal, type='saml', account='000000000000') | |
G.add_edge(principal, principal_arn, type='saml_trust') | |
elif principal_type == 'federated': | |
# Add federated node if it doesn't exist | |
if principal not in G: | |
# Extract account ID from federated provider ARN if possible | |
try: | |
if ':' in principal: | |
account_id = principal.split(':')[4] | |
G.add_node(principal, type='federated', account=account_id) | |
else: | |
G.add_node(principal, type='federated', account='000000000000') | |
except IndexError: | |
# If we can't parse the ARN, use the default account | |
G.add_node(principal, type='federated', account='000000000000') | |
G.add_edge(principal, principal_arn, type='federated_trust') | |
elif principal_type == 'canonical': | |
# Add canonical node if it doesn't exist | |
if principal not in G: | |
G.add_node(principal, type='canonical', account=account_dir) | |
G.add_edge(principal, principal_arn, type='canonical_trust') | |
except json.JSONDecodeError as e: | |
print(f"Error parsing JSON in {principal_path}: {e}") | |
except Exception as e: | |
print(f"Error processing {principal_path}: {e}") | |
# Second pass: add implied trust relationships | |
for u, v, data in list(G.edges(data=True)): | |
if data.get('type') == 'account_trust': | |
# This is an account-wide trust | |
trusted_account = u.split(':')[1] # Extract account ID from account node | |
if trusted_account in account_principals: | |
# Add implied edges from all principals in the trusted account to the account node | |
for principal in account_principals[trusted_account]: | |
if principal != v: # Don't add self-loops | |
# If require_assume_role is True, check if the principal has assume role permissions | |
if not require_assume_role: | |
G.add_edge(principal, u, type='implied_trust') | |
else: | |
# Use cached principal data | |
if principal in principal_data_cache: | |
if has_assume_role_permissions(principal_data_cache[principal]): | |
G.add_edge(principal, u, type='implied_trust') | |
return G, principal_data_cache | |
def build_trust_chain(G, target_role): | |
"""Build a subgraph containing only the trust chain leading to the target role.""" | |
# Create a new graph for the trust chain | |
chain = nx.DiGraph() | |
# Add the target role | |
chain.add_node(target_role, **G.nodes[target_role]) | |
# Function to recursively add predecessors | |
def add_predecessors(node): | |
for pred in G.predecessors(node): | |
if pred not in chain: | |
chain.add_node(pred, **G.nodes[pred]) | |
chain.add_edge(pred, node, **G.edges[pred, node]) | |
add_predecessors(pred) | |
# Build the chain | |
add_predecessors(target_role) | |
return chain | |
def visualize_graph(G, target_role): | |
"""Visualize the trust graph with interactive labels.""" | |
# Create figure for the graph with minimal margins | |
plt.figure(1, figsize=(10, 8)) | |
plt.subplots_adjust(left=0.01, right=0.99, top=0.95, bottom=0.05) | |
graph_ax = plt.gca() | |
# Use spring layout with fixed center and scale | |
pos = nx.spring_layout(G, k=1, iterations=50, center=(0.5, 0.5), scale=0.4) | |
# Get unique accounts and assign colors | |
accounts = set() | |
for _, data in G.nodes(data=True): | |
if 'account' in data: | |
accounts.add(data['account']) | |
# Generate a color for each account | |
account_colors = {} | |
for i, account in enumerate(accounts): | |
hue = i / len(accounts) | |
account_colors[account] = plt.cm.hsv(hue) | |
# Store node artists for later reference | |
node_artists = {} | |
edge_artists = {} | |
# Draw nodes with different colors based on account and type | |
for account in accounts: | |
# Get nodes for this account | |
account_nodes = [n for n, d in G.nodes(data=True) | |
if d.get('account') == account] | |
# Split into different node types | |
admin_nodes = [] | |
regular_nodes = [] | |
service_nodes = [] | |
saml_nodes = [] | |
canonical_nodes = [] | |
world_nodes = [] | |
account_nodes_list = [] | |
user_nodes = [] | |
federated_nodes = [] | |
for node in account_nodes: | |
node_type = G.nodes[node].get('type') | |
if node_type == 'account': | |
account_nodes_list.append(node) | |
elif node_type == 'role': | |
if G.nodes[node].get('admin'): | |
admin_nodes.append(node) | |
else: | |
regular_nodes.append(node) | |
elif node_type == 'service': | |
service_nodes.append(node) | |
elif node_type == 'saml': | |
saml_nodes.append(node) | |
elif node_type == 'canonical': | |
canonical_nodes.append(node) | |
elif node_type == 'world': | |
world_nodes.append(node) | |
elif node_type == 'user': | |
user_nodes.append(node) | |
elif node_type == 'federated': | |
federated_nodes.append(node) | |
# Draw world nodes (large octagons) | |
if world_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=world_nodes, | |
node_color='red', | |
node_size=2000, alpha=0.7, | |
node_shape='8', ax=graph_ax) | |
for node in world_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw account nodes (squares) | |
if account_nodes_list: | |
nx.draw_networkx_nodes(G, pos, nodelist=account_nodes_list, | |
node_color=[account_colors[account]], | |
node_size=500, alpha=0.7, | |
node_shape='s', ax=graph_ax) | |
for node in account_nodes_list: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw admin nodes (filled circles) | |
if admin_nodes: | |
# Make the first admin node (start node) larger | |
if admin_nodes: | |
first_admin = admin_nodes[0] | |
other_admins = admin_nodes[1:] | |
# Draw the start node larger | |
nx.draw_networkx_nodes(G, pos, nodelist=[first_admin], | |
node_color=[account_colors[account]], | |
node_size=1000, alpha=0.7, ax=graph_ax) | |
node_artists[first_admin] = graph_ax.collections[-1] | |
# Draw other admin nodes normal size | |
if other_admins: | |
nx.draw_networkx_nodes(G, pos, nodelist=other_admins, | |
node_color=[account_colors[account]], | |
node_size=500, alpha=0.7, ax=graph_ax) | |
for node in other_admins: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw regular nodes (unfilled circles) | |
if regular_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=regular_nodes, | |
node_color='none', | |
node_size=500, alpha=0.7, | |
edgecolors=account_colors[account], | |
linewidths=2, ax=graph_ax) | |
for node in regular_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw service nodes (triangles) | |
if service_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=service_nodes, | |
node_color='gray', | |
node_size=500, alpha=0.7, | |
node_shape='^', ax=graph_ax) | |
for node in service_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw SAML nodes (stars) | |
if saml_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=saml_nodes, | |
node_color=[account_colors[account]], | |
node_size=500, alpha=0.7, | |
node_shape='*', ax=graph_ax) | |
for node in saml_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw canonical nodes (diamonds) | |
if canonical_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=canonical_nodes, | |
node_color=[account_colors[account]], | |
node_size=500, alpha=0.7, | |
node_shape='d', ax=graph_ax) | |
for node in canonical_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw user nodes (small filled circles) | |
if user_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=user_nodes, | |
node_color=[account_colors[account]], | |
node_size=300, alpha=0.7, | |
node_shape='o', ax=graph_ax) | |
for node in user_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Draw federated nodes (hexagons) | |
if federated_nodes: | |
nx.draw_networkx_nodes(G, pos, nodelist=federated_nodes, | |
node_color='purple', | |
node_size=500, alpha=0.7, | |
node_shape='h', ax=graph_ax) | |
for node in federated_nodes: | |
node_artists[node] = graph_ax.collections[-1] | |
# Highlight the target role | |
if target_role in G.nodes(): | |
# Draw a larger circle around the target role | |
nx.draw_networkx_nodes(G, pos, nodelist=[target_role], | |
node_color='none', | |
node_size=1500, alpha=1.0, | |
edgecolors='red', | |
linewidths=3, ax=graph_ax) | |
# Make the target role node itself larger | |
node_data = G.nodes[target_role] | |
account = node_data.get('account', 'unknown') | |
nx.draw_networkx_nodes(G, pos, nodelist=[target_role], | |
node_color=[account_colors[account]], | |
node_size=1000, alpha=1.0, ax=graph_ax) | |
node_artists[target_role] = graph_ax.collections[-1] | |
# Create separate figure for legends | |
plt.figure(2, figsize=(2.5, 8)) # Made the legend window even thinner | |
legend_ax = plt.gca() | |
legend_ax.axis('off') | |
# Create legend elements | |
node_type_legend_elements = [ | |
plt.Line2D([0], [0], marker='s', color='w', | |
label='Account', | |
markerfacecolor='gray', markersize=10), | |
plt.Line2D([0], [0], marker='o', color='w', | |
label='Admin Role', | |
markerfacecolor='gray', markersize=10), | |
plt.Line2D([0], [0], marker='o', color='gray', | |
label='Regular Role', | |
markerfacecolor='none', markersize=10), | |
plt.Line2D([0], [0], marker='o', color='w', | |
label='User', | |
markerfacecolor='gray', markersize=8), | |
plt.Line2D([0], [0], marker='^', color='w', | |
label='Service Principal', | |
markerfacecolor='gray', markersize=10), | |
plt.Line2D([0], [0], marker='*', color='w', | |
label='SAML Provider', | |
markerfacecolor='gray', markersize=10), | |
plt.Line2D([0], [0], marker='d', color='w', | |
label='Canonical User', | |
markerfacecolor='gray', markersize=10), | |
plt.Line2D([0], [0], marker='8', color='w', | |
label='The World (*)', | |
markerfacecolor='red', markersize=15), | |
plt.Line2D([0], [0], marker='h', color='w', | |
label='Federated Principal', | |
markerfacecolor='purple', markersize=10), | |
plt.Line2D([0], [0], marker='o', color='red', | |
label='Target Role', | |
markerfacecolor='none', markersize=10, | |
markeredgewidth=2) | |
] | |
account_legend_elements = [] | |
for account, color in account_colors.items(): | |
if account != '000000000000': | |
account_legend_elements.append( | |
plt.Line2D([0], [0], marker='o', color='w', | |
label=account, | |
markerfacecolor=color, markersize=10) | |
) | |
# Add legends to the separate figure - node types at top, account colors below | |
node_type_legend = legend_ax.legend(handles=node_type_legend_elements, loc='upper left', | |
title='Node Types') | |
node_type_legend.get_title().set_fontweight('bold') | |
legend_ax.add_artist(node_type_legend) | |
account_legend = legend_ax.legend(handles=account_legend_elements, loc='lower left', | |
title='Account Colors') | |
account_legend.get_title().set_fontweight('bold') | |
legend_ax.add_artist(account_legend) | |
# Store selected nodes and current hover node | |
selected_nodes = set() | |
current_hover_node = None | |
# If there are 20 or fewer nodes, draw all edges by default | |
if len(G.nodes()) <= 20: | |
# Draw regular edges | |
regular_edges = [(u, v) for u, v, d in G.edges(data=True) if d.get('type') != 'implied_trust'] | |
if regular_edges: | |
artists = nx.draw_networkx_edges(G, pos, edgelist=regular_edges, | |
edge_color='black', arrows=True, | |
alpha=0.7, arrowsize=20, width=2, ax=graph_ax) | |
for edge, artist in zip(regular_edges, artists): | |
edge_artists[edge] = artist | |
# Draw implied edges | |
implied_edges = [(u, v) for u, v, d in G.edges(data=True) if d.get('type') == 'implied_trust'] | |
if implied_edges: | |
artists = nx.draw_networkx_edges(G, pos, edgelist=implied_edges, | |
edge_color='black', arrows=True, | |
alpha=0.7, arrowsize=20, width=2, | |
style='dotted', ax=graph_ax) | |
for edge, artist in zip(implied_edges, artists): | |
edge_artists[edge] = artist | |
def on_motion(event): | |
nonlocal current_hover_node | |
if event.inaxes is not None and event.inaxes.figure.number == 1: # Only respond to graph window | |
# Clear previous annotations | |
for annotation in graph_ax.texts: | |
annotation.remove() | |
# Only clear edge highlights if we're not hovering over a selected node | |
if current_hover_node not in selected_nodes: | |
for artist in edge_artists.values(): | |
artist.remove() | |
edge_artists.clear() | |
# Check for node hover | |
for node, (x, y) in pos.items(): | |
if abs(event.xdata - x) < 0.05 and abs(event.ydata - y) < 0.05: | |
current_hover_node = node | |
# Highlight the node | |
node_artists[node].set_alpha(1.0) | |
# Show connected edges if node is not selected | |
if node not in selected_nodes: | |
connected_edges = [] | |
for u, v, data in G.edges(data=True): | |
if u == node or v == node: | |
connected_edges.append((u, v, data)) | |
# Draw regular edges | |
regular_edges = [(u, v) for u, v, d in connected_edges if d.get('type') != 'implied_trust'] | |
if regular_edges: | |
artists = nx.draw_networkx_edges(G, pos, edgelist=regular_edges, | |
edge_color='black', arrows=True, | |
alpha=0.7, arrowsize=20, width=2, ax=graph_ax) | |
for edge, artist in zip(regular_edges, artists): | |
edge_artists[edge] = artist | |
# Draw implied edges | |
implied_edges = [(u, v) for u, v, d in connected_edges if d.get('type') == 'implied_trust'] | |
if implied_edges: | |
artists = nx.draw_networkx_edges(G, pos, edgelist=implied_edges, | |
edge_color='black', arrows=True, | |
alpha=0.7, arrowsize=20, width=2, | |
style='dotted', ax=graph_ax) | |
for edge, artist in zip(implied_edges, artists): | |
edge_artists[edge] = artist | |
# Show node label | |
node_data = G.nodes[node] | |
account = node_data.get('account', 'unknown') | |
node_type = node_data.get('type', 'role') | |
is_admin = node_data.get('admin', False) | |
if node_type == 'account': | |
label = f"Account\n{account}" | |
elif node_type == 'service': | |
label = f"Service Principal\n{node}" | |
elif node_type == 'saml': | |
label = f"SAML Provider\nAccount: {account}\n{node}" | |
elif node_type == 'canonical': | |
label = f"Canonical User\nAccount: {account}\n{node}" | |
elif node_type == 'world': | |
label = f"The World\n(Anyone can assume this role)" | |
elif node_type == 'user': | |
label = f"User\nAccount: {account}\n{node}" | |
elif node_type == 'federated': | |
label = f"Federated Principal\n{node}" | |
elif is_admin: | |
label = f"Admin Role\nAccount: {account}\n{node}" | |
else: | |
label = f"Role\nAccount: {account}\n{node}" | |
graph_ax.text(0.02, 0.02, label, transform=graph_ax.transAxes, | |
fontsize=14, verticalalignment='bottom', | |
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) | |
plt.draw() | |
return | |
else: | |
# Reset node alpha if not selected | |
if node not in selected_nodes: | |
node_artists[node].set_alpha(0.7) | |
current_hover_node = None | |
def on_click(event): | |
if event.inaxes is not None and event.inaxes.figure.number == 1: # Only respond to graph window | |
for node, (x, y) in pos.items(): | |
if abs(event.xdata - x) < 0.05 and abs(event.ydata - y) < 0.05: | |
# Print original JSON data to console | |
node_data = G.nodes[node] | |
print(f"\n{node}") | |
if 'original_data' in node_data: | |
print(json.dumps(node_data['original_data'], indent=2)) | |
else: | |
print(json.dumps(node_data, indent=2)) | |
if node in selected_nodes: | |
# Deselect the node | |
selected_nodes.remove(node) | |
node_artists[node].set_alpha(0.7) | |
else: | |
# Select the node | |
selected_nodes.add(node) | |
node_artists[node].set_alpha(1.0) | |
plt.draw() | |
return | |
plt.figure(1) # Switch back to graph figure | |
plt.connect('motion_notify_event', on_motion) | |
plt.connect('button_press_event', on_click) | |
# Extract role name from ARN | |
role_name = target_role.split('/')[-1] | |
plt.title(f"AWS Trust Chain to {role_name}") | |
graph_ax.axis('off') | |
# Show both figures | |
plt.figure(1) | |
plt.show(block=False) | |
plt.figure(2) | |
plt.show() | |
def print_trust_tree_to_file(G, target_role, output_file="trust-tree.txt", depth=0): | |
"""Print the trust tree to a file with tab-based indentation. | |
Args: | |
G: The trust graph | |
target_role: The target role to start from | |
output_file: The file to write to (default: trust-tree.txt) | |
depth: Current depth in the tree (for indentation) | |
""" | |
# Get node data | |
node_data = G.nodes[target_role] | |
node_type = node_data.get('type', 'role') | |
account = node_data.get('account', 'unknown') | |
is_admin = node_data.get('admin', False) | |
# Create indentation string | |
indent = '\t' * depth | |
# Format node label based on type | |
if node_type == 'account': | |
label = f"Account: {account}" | |
elif node_type == 'service': | |
label = f"Service Principal: {target_role}" | |
elif node_type == 'saml': | |
label = f"SAML Provider: {target_role} (Account: {account})" | |
elif node_type == 'canonical': | |
label = f"Canonical User: {target_role} (Account: {account})" | |
elif node_type == 'world': | |
label = "The World (Anyone can assume this role)" | |
elif is_admin: | |
label = f"Admin Role: {target_role} (Account: {account})" | |
else: | |
label = f"Role: {target_role} (Account: {account})" | |
# Write the current node | |
with open(output_file, 'a') as f: | |
f.write(f"{indent}{label}\n") | |
# Recursively process predecessors | |
for pred in G.predecessors(target_role): | |
print_trust_tree_to_file(G, pred, output_file, depth + 1) | |
def print_mermaid_diagram(G, target_role, output_file="mermaid.md"): | |
"""Print the trust graph as a Mermaid markdown diagram. | |
Args: | |
G: The trust graph | |
target_role: The target role to start from | |
output_file: The file to write to (default: mermaid.md) | |
""" | |
# Create a unique ID for each node | |
node_ids = {} | |
id_counter = 0 | |
def get_node_id(node): | |
nonlocal id_counter | |
if node not in node_ids: | |
node_ids[node] = f"node{id_counter}" | |
id_counter += 1 | |
return node_ids[node] | |
# Write the Mermaid diagram | |
with open(output_file, 'w') as f: | |
f.write("```mermaid\n") | |
f.write("graph TD\n") | |
f.write(" classDef Account fill:#f9f,stroke:#333,stroke-width:2px\n") | |
f.write(" classDef Service fill:#bbf,stroke:#333,stroke-width:2px\n") | |
f.write(" classDef SAML fill:#bfb,stroke:#333,stroke-width:2px\n") | |
f.write(" classDef Canonical fill:#fbb,stroke:#333,stroke-width:2px\n") | |
f.write(" classDef World fill:#f00,stroke:#333,stroke-width:2px\n") | |
f.write(" classDef Admin fill:#f00,stroke:#333,stroke-width:2px\n") | |
f.write(" classDef Role fill:#fff,stroke:#333,stroke-width:2px\n") | |
# Add nodes with appropriate styling | |
for node in G.nodes(): | |
node_data = G.nodes[node] | |
node_type = node_data.get('type', 'role') | |
account = node_data.get('account', 'unknown') | |
is_admin = node_data.get('admin', False) | |
node_id = get_node_id(node) | |
# Format node label - simplified to avoid special characters | |
if node_type == 'account': | |
label = f"Account {account}" | |
class_name = "Account" | |
elif node_type == 'service': | |
label = f"Service {node.split('/')[-1]}" | |
class_name = "Service" | |
elif node_type == 'saml': | |
label = f"SAML {node.split('/')[-1]}" | |
class_name = "SAML" | |
elif node_type == 'canonical': | |
label = f"Canonical {node.split('/')[-1]}" | |
class_name = "Canonical" | |
elif node_type == 'world': | |
label = "The World" | |
class_name = "World" | |
elif is_admin: | |
label = f"Admin {node.split('/')[-1]}" | |
class_name = "Admin" | |
else: | |
label = f"Role {node.split('/')[-1]}" | |
class_name = "Role" | |
# Escape any special characters in the label | |
label = label.replace('"', '\\"') | |
label = label.replace('\\', '\\\\') | |
f.write(f" {node_id}[\"{label}\"]\n") | |
f.write(f" class {node_id} {class_name}\n") | |
# Add edges | |
for u, v, data in G.edges(data=True): | |
edge_type = data.get('type', 'trust') | |
u_id = get_node_id(u) | |
v_id = get_node_id(v) | |
if edge_type == 'implied_trust': | |
f.write(f" {u_id} -.-> {v_id}\n") | |
else: | |
f.write(f" {u_id} --> {v_id}\n") | |
f.write("```\n") | |
def main(): | |
parser = argparse.ArgumentParser(description='Analyze AWS trust relationships') | |
parser.add_argument('--visualize', action='store_true', | |
help='Visualize the trust graph') | |
parser.add_argument('--tree-file', default='trust-tree.txt', | |
help='Output file for trust tree (default: trust-tree.txt)') | |
parser.add_argument('--mermaid-file', default='mermaid.md', | |
help='Output file for Mermaid diagram (default: mermaid.md)') | |
parser.add_argument('--require-assume-role', action='store_true', | |
help='Only include principals that have assume role permissions') | |
args = parser.parse_args() | |
# Build the complete graph and get the principal data cache | |
G, principal_data_cache = build_trust_graph(require_assume_role=args.require_assume_role) | |
print(f"Complete graph has {G.number_of_nodes()} nodes and {G.number_of_edges()} edges") | |
while True: # Main loop | |
# Find admin roles and world trust roles using the cache | |
admin_roles = find_admin_roles(principal_data_cache) | |
world_trust_roles = find_world_trust_roles(principal_data_cache) | |
if not admin_roles and not world_trust_roles: | |
print("No roles found!") | |
return | |
# List all admin roles | |
if admin_roles: | |
print("\nAdmin roles:") | |
for i, role in enumerate(admin_roles, 1): | |
warning = " [⚠️ trusts everyone]" if role in world_trust_roles else "" | |
print(f"{i}. {role}{warning}") | |
# List all world trust roles that aren't admin roles | |
non_admin_world_trust = [r for r in world_trust_roles if r not in admin_roles] | |
if non_admin_world_trust: | |
print("\nNon-admin roles that trust everyone:") | |
for i, role in enumerate(non_admin_world_trust, len(admin_roles) + 1): | |
print(f"{i}. {role}") | |
# Get user input | |
while True: | |
try: | |
total_roles = len(admin_roles) + len(non_admin_world_trust) | |
choice = input(f"\nEnter the number of the role to analyze (1-{total_roles}) or 'q' to quit: ") | |
if choice.lower() == 'q': | |
return | |
index = int(choice) - 1 | |
if 0 <= index < total_roles: | |
if index < len(admin_roles): | |
target_role = admin_roles[index] | |
else: | |
target_role = non_admin_world_trust[index - len(admin_roles)] | |
break | |
else: | |
print(f"Invalid choice. Please enter a number between 1 and {total_roles}.") | |
except ValueError: | |
print("Invalid input. Please enter a number or 'q' to quit.") | |
print(f"\nAnalyzing trust chain to role: {target_role}") | |
# Build the trust chain | |
chain = build_trust_chain(G, target_role) | |
print(f"Trust chain has {chain.number_of_nodes()} nodes and {chain.number_of_edges()} edges") | |
# Write the trust tree | |
with open(args.tree_file, 'w') as f: | |
f.write(f"Trust Tree for {target_role}\n\n") | |
print_trust_tree_to_file(chain, target_role, args.tree_file) | |
print(f"\nTrust tree written to {args.tree_file}") | |
# Generate Mermaid diagram | |
print_mermaid_diagram(chain, target_role, args.mermaid_file) | |
print(f"Mermaid diagram written to {args.mermaid_file}") | |
# Visualize if requested | |
if args.visualize: | |
visualize_graph(chain, target_role) | |
# Clear the figure to prepare for next visualization | |
plt.close('all') | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment