Skip to content

Instantly share code, notes, and snippets.

@dagrz
Created May 20, 2025 03:18
Show Gist options
  • Save dagrz/0efd3e94ac74aa3f3cd78e213abbed08 to your computer and use it in GitHub Desktop.
Save dagrz/0efd3e94ac74aa3f3cd78e213abbed08 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
AWS Trust Graph Analyzer
This script analyzes AWS IAM trust relationships across accounts, roles, users, and SSO principals.
It builds a graph of `sts:AssumeRole` relationships, identifies roles with admin-level privileges,
and traces trust chains to help you understand who really has root-equivalent access in your AWS environment.
The tool supports multiple output formats, including:
- A tab-indented trust tree (default: trust-tree.txt)
- A Mermaid diagram for Markdown-based visualization (default: mermaid.md)
- An optional interactive graph using matplotlib (--visualize)
Usage:
./trust.py [--visualize] [--require-assume-role]
[--tree-file trust-tree.txt]
[--mermaid-file mermaid.md]
Options:
--visualize Launches an interactive matplotlib visualization of the trust graph.
--require-assume-role Filters out principals that don’t have AssumeRole permissions,
even if they’re listed in trust policies.
--tree-file FILE Path to output trust tree text file. Default: trust-tree.txt
--mermaid-file FILE Path to output Mermaid Markdown diagram. Default: mermaid.md
Outputs:
- Prints all admin-like roles and roles with world-level trust.
- Prompts you to select one and then outputs a narrowed trust chain to the selected role.
- Outputs results in plaintext, Mermaid, and optionally interactive form.
Written by: Daniel Grzelak (@dagrz on X, [email protected])
For more cloud security tools and research, visit: https://www.plerion.com
"""
import argparse
import os
import json
import re
import random
import networkx as nx
import matplotlib.pyplot as plt
# AWS-managed policies that grant full admin or IAM control privileges
ADMIN_POLICIES = {'arn:aws:iam::aws:policy/AdministratorAccess', 'arn:aws:iam::aws:policy/IAMFullAccess'}
# Policies that include sts:AssumeRole permissions, often used in automation or service roles
ASSUME_ROLE_POLICIES = [
'arn:aws:iam::aws:policy/AdministratorAccess-Amplify',
'arn:aws:iam::aws:policy/AWSControlTowerServiceRolePolicy',
'arn:aws:iam::aws:policy/AmplifyBackendDeployFullAccess',
'arn:aws:iam::aws:policy/AdministratorAccess',
'arn:aws:iam::aws:policy/AWSElasticDisasterRecoveryEc2InstancePolicy',
'arn:aws:iam::aws:policy/SageMakerStudioProjectUserRolePermissionsBoundary',
'arn:aws:iam::aws:policy/AWSServiceRoleForImageBuilder',
'arn:aws:iam::aws:policy/AWS-SSM-RemediationAutomation-OperationalAccountAdministrationRolePolicy',
'arn:aws:iam::aws:policy/AWS-SSM-RemediationAutomation-AdministrationRolePolicy',
'arn:aws:iam::aws:policy/SageMakerStudioEMRInstanceRolePolicy',
'arn:aws:iam::aws:policy/CloudFormationStackSetsOrgAdminServiceRolePolicy',
'arn:aws:iam::aws:policy/CloudWatch-CrossAccountAccess',
'arn:aws:iam::aws:policy/AWS-SSM-DiagnosisAutomation-AdministrationRolePolicy',
'arn:aws:iam::aws:policy/AWSElasticDisasterRecoveryRecoveryInstancePolicy',
'arn:aws:iam::aws:policy/AWS-SSM-DiagnosisAutomation-OperationalAccountAdministrationRolePolicy',
'arn:aws:iam::aws:policy/SageMakerStudioProjectUserRolePolicy',
]
def parse_trust_policy(trust_policy):
"""Parse a trust policy and return a list of trusted principals."""
trusted = []
if not trust_policy or 'Statement' not in trust_policy:
return trusted
for statement in trust_policy['Statement']:
if statement.get('Effect') != 'Allow':
continue
# Handle both string and list actions
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
# Check if any of the actions are assume role actions
if not any(action in ['sts:AssumeRole', 'sts:AssumeRoleWithSAML', 'sts:AssumeRoleWithWebIdentity'] for action in actions):
continue
principal = statement.get('Principal', {})
# Handle AWS principals
if 'AWS' in principal:
aws_principals = principal['AWS']
if isinstance(aws_principals, str):
aws_principals = [aws_principals]
for p in aws_principals:
if p == '*':
trusted.append(('world', '*'))
elif re.match(r'^A[0-9A-Z]{20}$', p) or re.match(r'^[0-9a-f]{64}$', p):
trusted.append(('canonical', p))
else:
trusted.append(('aws', p))
# Handle Service principals
if 'Service' in principal:
service_principals = principal['Service']
if isinstance(service_principals, str):
service_principals = [service_principals]
trusted.extend(('service', p) for p in service_principals)
# Handle Federated principals (both SAML and OIDC)
if 'Federated' in principal:
federated = principal['Federated']
if isinstance(federated, str):
federated = [federated]
for p in federated:
if ':saml-provider/' in p:
# This is a SAML provider
trusted.append(('saml', p))
else:
# Other types of federated principals
trusted.append(('federated', p))
# Handle Canonical IDs
if 'CanonicalUser' in principal:
canonical = principal['CanonicalUser']
if isinstance(canonical, str):
canonical = [canonical]
trusted.extend(('canonical', c) for c in canonical)
return trusted
def is_admin_principal(principal_data):
"""Check if a principal (role or user) has admin privileges."""
# Check attached policies
for policy in principal_data.get('attachedPolicies', []):
if policy in ADMIN_POLICIES:
return True
# Check both inline and customer policies
for policy_type in ['inlinePolicies', 'customerPolicies']:
for policy_name, policy in principal_data.get(policy_type, {}).items():
if not isinstance(policy, dict):
continue
# Handle both array and single object statements
statements = policy.get('Statement', [])
if isinstance(statements, dict):
statements = [statements]
elif not isinstance(statements, list):
continue
for statement in statements:
if not isinstance(statement, dict):
continue
# Check for wildcard permissions
if (statement.get('Effect') == 'Allow' and
statement.get('Action') == '*' and
statement.get('Resource') == '*'):
return True
# Check for IAM wildcard permissions
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
if 'iam:*' in actions and statement.get('Effect') == 'Allow':
return True
return False
def has_assume_role_permissions(principal_data):
"""Check if a principal (role or user) has permissions to assume roles."""
# Check attached policies against ASSUME_ROLE_POLICIES
for policy in principal_data.get('attachedPolicies', []):
if policy in ASSUME_ROLE_POLICIES:
return True
# Check both inline and customer policies
for policy_type in ['inlinePolicies', 'customerPolicies']:
for policy_name, policy in principal_data.get(policy_type, {}).items():
if not isinstance(policy, dict):
continue
# Handle both array and single object statements
statements = policy.get('Statement', [])
if isinstance(statements, dict):
statements = [statements]
elif not isinstance(statements, list):
continue
for statement in statements:
if not isinstance(statement, dict):
continue
if statement.get('Effect') != 'Allow':
continue
# Check for assume role permissions
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
if any(action in ['*', 'sts:*', 'sts:AssumeRole'] for action in actions):
return True
return False
def has_world_trust(role_data):
"""Check if a role trusts the world (can be assumed by anyone)."""
trust_policy = role_data.get('trustPolicy', {})
if not isinstance(trust_policy, dict):
return False
for statement in trust_policy.get('Statement', []):
if statement.get('Effect') != 'Allow':
continue
# Check if the action is sts:AssumeRole
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
if not any(action in ['sts:AssumeRole', 'sts:AssumeRoleWithSAML'] for action in actions):
continue
principal = statement.get('Principal', {})
if 'AWS' in principal:
aws_principals = principal['AWS']
if isinstance(aws_principals, str):
aws_principals = [aws_principals]
if '*' in aws_principals:
return True
return False
def find_admin_roles(principal_data_cache):
"""Find all roles with admin privileges."""
admin_roles = []
for role_arn, role_data in principal_data_cache.items():
if is_admin_principal(role_data):
admin_roles.append(role_arn)
return admin_roles
def find_world_trust_roles(principal_data_cache):
"""Find all roles that can be assumed by anyone."""
world_trust_roles = []
for role_arn, role_data in principal_data_cache.items():
if has_world_trust(role_data):
world_trust_roles.append(role_arn)
return world_trust_roles
def build_trust_graph(require_assume_role=False):
"""Build a NetworkX graph representing trust relationships."""
G = nx.DiGraph()
# Cache for principal data
principal_data_cache = {}
# First pass: collect all principals in each account and cache their data
account_principals = {} # account_id -> set of principals
# Walk through roles and users directories
for principal_type_dir in ['roles', 'users']:
if not os.path.exists(principal_type_dir):
continue
for account_dir in os.listdir(principal_type_dir):
account_path = os.path.join(principal_type_dir, account_dir)
if not os.path.isdir(account_path):
continue
# Initialize set for this account if not already done
if account_dir not in account_principals:
account_principals[account_dir] = set()
# Add account node first
account_node = f"account:{account_dir}"
G.add_node(account_node, type='account', account=account_dir)
for principal_file in os.listdir(account_path):
if not principal_file.endswith('.json'):
continue
principal_path = os.path.join(account_path, principal_file)
try:
with open(principal_path, 'r') as f:
principal_data = json.load(f)
# Skip if principal_data is not a dictionary
if not isinstance(principal_data, dict):
print(f"Skipping {principal_path}: Invalid JSON format - not a dictionary")
continue
principal_arn = principal_data.get('roleArn') if principal_type_dir == 'roles' else principal_data.get('userArn')
if not principal_arn:
print(f"Skipping {principal_path}: No {principal_type_dir[:-1]}Arn found")
continue
# Add principal to account's principals and cache its data
account_principals[account_dir].add(principal_arn)
principal_data_cache[principal_arn] = principal_data
# Add principal node with original data
is_admin = is_admin_principal(principal_data)
G.add_node(principal_arn,
type='role' if principal_type_dir == 'roles' else 'user',
account=account_dir,
admin=is_admin,
original_data=principal_data)
# Parse trust policy (empty for users)
trust_policy = principal_data.get('trustPolicy', {}) if principal_type_dir == 'roles' else {}
if not isinstance(trust_policy, dict):
print(f"Warning: trustPolicy in {principal_path} is not a dictionary: {type(trust_policy)}")
trust_policy = {}
trusted_principals = parse_trust_policy(trust_policy)
for principal_type, principal in trusted_principals:
# Handle different types of trust relationships
if principal_type == 'world':
# Add world node if it doesn't exist
world_node = '*'
if world_node not in G:
G.add_node(world_node, type='world', account='000000000000')
G.add_edge(world_node, principal_arn, type='world_trust')
elif principal_type == 'aws':
if principal.endswith(':root'):
# Account-wide trust
account_id = principal.split(':')[4]
account_node = f"account:{account_id}"
# Add the account node if it doesn't exist
if account_node not in G:
G.add_node(account_node, type='account', account=account_id)
G.add_edge(account_node, principal_arn, type='account_trust')
else:
# Direct principal trust - only add if principal can assume roles
try:
# Try to find the principal's policy file
parts = principal.split(':')
if len(parts) < 6:
print(f"Warning: Invalid ARN format for principal {principal}")
continue
principal_account = parts[4]
principal_type = 'user' if ':user/' in principal else 'role'
principal_name = principal.split('/')[-1]
# Check both roles and users directories
policy_path = None
if principal_type == 'role':
policy_path = os.path.join('roles', principal_account, f"{principal_name}.json")
else:
policy_path = os.path.join('users', f"{principal_name}.json")
if os.path.exists(policy_path):
with open(policy_path, 'r') as f:
principal_data = json.load(f)
# Cache the principal data
principal_data_cache[principal] = principal_data
# If require_assume_role is True, check if the principal has assume role permissions
if not require_assume_role or has_assume_role_permissions(principal_data):
G.add_edge(principal, principal_arn, type='direct_trust')
except Exception as e:
print(f"Error checking principal {principal}: {e}")
elif principal_type == 'service':
# Add service node if it doesn't exist
if principal not in G:
G.add_node(principal, type='service', account='000000000000')
G.add_edge(principal, principal_arn, type='service_trust')
elif principal_type == 'saml':
# Add federated node if it doesn't exist
if principal not in G:
# Extract account ID from SAML provider ARN
# Format: arn:aws:iam::<account_id>:saml-provider/<provider_name>
try:
saml_account_id = principal.split(':')[4]
G.add_node(principal, type='saml', account=saml_account_id)
except IndexError:
# If we can't parse the ARN, use the default account
G.add_node(principal, type='saml', account='000000000000')
G.add_edge(principal, principal_arn, type='saml_trust')
elif principal_type == 'federated':
# Add federated node if it doesn't exist
if principal not in G:
# Extract account ID from federated provider ARN if possible
try:
if ':' in principal:
account_id = principal.split(':')[4]
G.add_node(principal, type='federated', account=account_id)
else:
G.add_node(principal, type='federated', account='000000000000')
except IndexError:
# If we can't parse the ARN, use the default account
G.add_node(principal, type='federated', account='000000000000')
G.add_edge(principal, principal_arn, type='federated_trust')
elif principal_type == 'canonical':
# Add canonical node if it doesn't exist
if principal not in G:
G.add_node(principal, type='canonical', account=account_dir)
G.add_edge(principal, principal_arn, type='canonical_trust')
except json.JSONDecodeError as e:
print(f"Error parsing JSON in {principal_path}: {e}")
except Exception as e:
print(f"Error processing {principal_path}: {e}")
# Second pass: add implied trust relationships
for u, v, data in list(G.edges(data=True)):
if data.get('type') == 'account_trust':
# This is an account-wide trust
trusted_account = u.split(':')[1] # Extract account ID from account node
if trusted_account in account_principals:
# Add implied edges from all principals in the trusted account to the account node
for principal in account_principals[trusted_account]:
if principal != v: # Don't add self-loops
# If require_assume_role is True, check if the principal has assume role permissions
if not require_assume_role:
G.add_edge(principal, u, type='implied_trust')
else:
# Use cached principal data
if principal in principal_data_cache:
if has_assume_role_permissions(principal_data_cache[principal]):
G.add_edge(principal, u, type='implied_trust')
return G, principal_data_cache
def build_trust_chain(G, target_role):
"""Build a subgraph containing only the trust chain leading to the target role."""
# Create a new graph for the trust chain
chain = nx.DiGraph()
# Add the target role
chain.add_node(target_role, **G.nodes[target_role])
# Function to recursively add predecessors
def add_predecessors(node):
for pred in G.predecessors(node):
if pred not in chain:
chain.add_node(pred, **G.nodes[pred])
chain.add_edge(pred, node, **G.edges[pred, node])
add_predecessors(pred)
# Build the chain
add_predecessors(target_role)
return chain
def visualize_graph(G, target_role):
"""Visualize the trust graph with interactive labels."""
# Create figure for the graph with minimal margins
plt.figure(1, figsize=(10, 8))
plt.subplots_adjust(left=0.01, right=0.99, top=0.95, bottom=0.05)
graph_ax = plt.gca()
# Use spring layout with fixed center and scale
pos = nx.spring_layout(G, k=1, iterations=50, center=(0.5, 0.5), scale=0.4)
# Get unique accounts and assign colors
accounts = set()
for _, data in G.nodes(data=True):
if 'account' in data:
accounts.add(data['account'])
# Generate a color for each account
account_colors = {}
for i, account in enumerate(accounts):
hue = i / len(accounts)
account_colors[account] = plt.cm.hsv(hue)
# Store node artists for later reference
node_artists = {}
edge_artists = {}
# Draw nodes with different colors based on account and type
for account in accounts:
# Get nodes for this account
account_nodes = [n for n, d in G.nodes(data=True)
if d.get('account') == account]
# Split into different node types
admin_nodes = []
regular_nodes = []
service_nodes = []
saml_nodes = []
canonical_nodes = []
world_nodes = []
account_nodes_list = []
user_nodes = []
federated_nodes = []
for node in account_nodes:
node_type = G.nodes[node].get('type')
if node_type == 'account':
account_nodes_list.append(node)
elif node_type == 'role':
if G.nodes[node].get('admin'):
admin_nodes.append(node)
else:
regular_nodes.append(node)
elif node_type == 'service':
service_nodes.append(node)
elif node_type == 'saml':
saml_nodes.append(node)
elif node_type == 'canonical':
canonical_nodes.append(node)
elif node_type == 'world':
world_nodes.append(node)
elif node_type == 'user':
user_nodes.append(node)
elif node_type == 'federated':
federated_nodes.append(node)
# Draw world nodes (large octagons)
if world_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=world_nodes,
node_color='red',
node_size=2000, alpha=0.7,
node_shape='8', ax=graph_ax)
for node in world_nodes:
node_artists[node] = graph_ax.collections[-1]
# Draw account nodes (squares)
if account_nodes_list:
nx.draw_networkx_nodes(G, pos, nodelist=account_nodes_list,
node_color=[account_colors[account]],
node_size=500, alpha=0.7,
node_shape='s', ax=graph_ax)
for node in account_nodes_list:
node_artists[node] = graph_ax.collections[-1]
# Draw admin nodes (filled circles)
if admin_nodes:
# Make the first admin node (start node) larger
if admin_nodes:
first_admin = admin_nodes[0]
other_admins = admin_nodes[1:]
# Draw the start node larger
nx.draw_networkx_nodes(G, pos, nodelist=[first_admin],
node_color=[account_colors[account]],
node_size=1000, alpha=0.7, ax=graph_ax)
node_artists[first_admin] = graph_ax.collections[-1]
# Draw other admin nodes normal size
if other_admins:
nx.draw_networkx_nodes(G, pos, nodelist=other_admins,
node_color=[account_colors[account]],
node_size=500, alpha=0.7, ax=graph_ax)
for node in other_admins:
node_artists[node] = graph_ax.collections[-1]
# Draw regular nodes (unfilled circles)
if regular_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=regular_nodes,
node_color='none',
node_size=500, alpha=0.7,
edgecolors=account_colors[account],
linewidths=2, ax=graph_ax)
for node in regular_nodes:
node_artists[node] = graph_ax.collections[-1]
# Draw service nodes (triangles)
if service_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=service_nodes,
node_color='gray',
node_size=500, alpha=0.7,
node_shape='^', ax=graph_ax)
for node in service_nodes:
node_artists[node] = graph_ax.collections[-1]
# Draw SAML nodes (stars)
if saml_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=saml_nodes,
node_color=[account_colors[account]],
node_size=500, alpha=0.7,
node_shape='*', ax=graph_ax)
for node in saml_nodes:
node_artists[node] = graph_ax.collections[-1]
# Draw canonical nodes (diamonds)
if canonical_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=canonical_nodes,
node_color=[account_colors[account]],
node_size=500, alpha=0.7,
node_shape='d', ax=graph_ax)
for node in canonical_nodes:
node_artists[node] = graph_ax.collections[-1]
# Draw user nodes (small filled circles)
if user_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=user_nodes,
node_color=[account_colors[account]],
node_size=300, alpha=0.7,
node_shape='o', ax=graph_ax)
for node in user_nodes:
node_artists[node] = graph_ax.collections[-1]
# Draw federated nodes (hexagons)
if federated_nodes:
nx.draw_networkx_nodes(G, pos, nodelist=federated_nodes,
node_color='purple',
node_size=500, alpha=0.7,
node_shape='h', ax=graph_ax)
for node in federated_nodes:
node_artists[node] = graph_ax.collections[-1]
# Highlight the target role
if target_role in G.nodes():
# Draw a larger circle around the target role
nx.draw_networkx_nodes(G, pos, nodelist=[target_role],
node_color='none',
node_size=1500, alpha=1.0,
edgecolors='red',
linewidths=3, ax=graph_ax)
# Make the target role node itself larger
node_data = G.nodes[target_role]
account = node_data.get('account', 'unknown')
nx.draw_networkx_nodes(G, pos, nodelist=[target_role],
node_color=[account_colors[account]],
node_size=1000, alpha=1.0, ax=graph_ax)
node_artists[target_role] = graph_ax.collections[-1]
# Create separate figure for legends
plt.figure(2, figsize=(2.5, 8)) # Made the legend window even thinner
legend_ax = plt.gca()
legend_ax.axis('off')
# Create legend elements
node_type_legend_elements = [
plt.Line2D([0], [0], marker='s', color='w',
label='Account',
markerfacecolor='gray', markersize=10),
plt.Line2D([0], [0], marker='o', color='w',
label='Admin Role',
markerfacecolor='gray', markersize=10),
plt.Line2D([0], [0], marker='o', color='gray',
label='Regular Role',
markerfacecolor='none', markersize=10),
plt.Line2D([0], [0], marker='o', color='w',
label='User',
markerfacecolor='gray', markersize=8),
plt.Line2D([0], [0], marker='^', color='w',
label='Service Principal',
markerfacecolor='gray', markersize=10),
plt.Line2D([0], [0], marker='*', color='w',
label='SAML Provider',
markerfacecolor='gray', markersize=10),
plt.Line2D([0], [0], marker='d', color='w',
label='Canonical User',
markerfacecolor='gray', markersize=10),
plt.Line2D([0], [0], marker='8', color='w',
label='The World (*)',
markerfacecolor='red', markersize=15),
plt.Line2D([0], [0], marker='h', color='w',
label='Federated Principal',
markerfacecolor='purple', markersize=10),
plt.Line2D([0], [0], marker='o', color='red',
label='Target Role',
markerfacecolor='none', markersize=10,
markeredgewidth=2)
]
account_legend_elements = []
for account, color in account_colors.items():
if account != '000000000000':
account_legend_elements.append(
plt.Line2D([0], [0], marker='o', color='w',
label=account,
markerfacecolor=color, markersize=10)
)
# Add legends to the separate figure - node types at top, account colors below
node_type_legend = legend_ax.legend(handles=node_type_legend_elements, loc='upper left',
title='Node Types')
node_type_legend.get_title().set_fontweight('bold')
legend_ax.add_artist(node_type_legend)
account_legend = legend_ax.legend(handles=account_legend_elements, loc='lower left',
title='Account Colors')
account_legend.get_title().set_fontweight('bold')
legend_ax.add_artist(account_legend)
# Store selected nodes and current hover node
selected_nodes = set()
current_hover_node = None
# If there are 20 or fewer nodes, draw all edges by default
if len(G.nodes()) <= 20:
# Draw regular edges
regular_edges = [(u, v) for u, v, d in G.edges(data=True) if d.get('type') != 'implied_trust']
if regular_edges:
artists = nx.draw_networkx_edges(G, pos, edgelist=regular_edges,
edge_color='black', arrows=True,
alpha=0.7, arrowsize=20, width=2, ax=graph_ax)
for edge, artist in zip(regular_edges, artists):
edge_artists[edge] = artist
# Draw implied edges
implied_edges = [(u, v) for u, v, d in G.edges(data=True) if d.get('type') == 'implied_trust']
if implied_edges:
artists = nx.draw_networkx_edges(G, pos, edgelist=implied_edges,
edge_color='black', arrows=True,
alpha=0.7, arrowsize=20, width=2,
style='dotted', ax=graph_ax)
for edge, artist in zip(implied_edges, artists):
edge_artists[edge] = artist
def on_motion(event):
nonlocal current_hover_node
if event.inaxes is not None and event.inaxes.figure.number == 1: # Only respond to graph window
# Clear previous annotations
for annotation in graph_ax.texts:
annotation.remove()
# Only clear edge highlights if we're not hovering over a selected node
if current_hover_node not in selected_nodes:
for artist in edge_artists.values():
artist.remove()
edge_artists.clear()
# Check for node hover
for node, (x, y) in pos.items():
if abs(event.xdata - x) < 0.05 and abs(event.ydata - y) < 0.05:
current_hover_node = node
# Highlight the node
node_artists[node].set_alpha(1.0)
# Show connected edges if node is not selected
if node not in selected_nodes:
connected_edges = []
for u, v, data in G.edges(data=True):
if u == node or v == node:
connected_edges.append((u, v, data))
# Draw regular edges
regular_edges = [(u, v) for u, v, d in connected_edges if d.get('type') != 'implied_trust']
if regular_edges:
artists = nx.draw_networkx_edges(G, pos, edgelist=regular_edges,
edge_color='black', arrows=True,
alpha=0.7, arrowsize=20, width=2, ax=graph_ax)
for edge, artist in zip(regular_edges, artists):
edge_artists[edge] = artist
# Draw implied edges
implied_edges = [(u, v) for u, v, d in connected_edges if d.get('type') == 'implied_trust']
if implied_edges:
artists = nx.draw_networkx_edges(G, pos, edgelist=implied_edges,
edge_color='black', arrows=True,
alpha=0.7, arrowsize=20, width=2,
style='dotted', ax=graph_ax)
for edge, artist in zip(implied_edges, artists):
edge_artists[edge] = artist
# Show node label
node_data = G.nodes[node]
account = node_data.get('account', 'unknown')
node_type = node_data.get('type', 'role')
is_admin = node_data.get('admin', False)
if node_type == 'account':
label = f"Account\n{account}"
elif node_type == 'service':
label = f"Service Principal\n{node}"
elif node_type == 'saml':
label = f"SAML Provider\nAccount: {account}\n{node}"
elif node_type == 'canonical':
label = f"Canonical User\nAccount: {account}\n{node}"
elif node_type == 'world':
label = f"The World\n(Anyone can assume this role)"
elif node_type == 'user':
label = f"User\nAccount: {account}\n{node}"
elif node_type == 'federated':
label = f"Federated Principal\n{node}"
elif is_admin:
label = f"Admin Role\nAccount: {account}\n{node}"
else:
label = f"Role\nAccount: {account}\n{node}"
graph_ax.text(0.02, 0.02, label, transform=graph_ax.transAxes,
fontsize=14, verticalalignment='bottom',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.draw()
return
else:
# Reset node alpha if not selected
if node not in selected_nodes:
node_artists[node].set_alpha(0.7)
current_hover_node = None
def on_click(event):
if event.inaxes is not None and event.inaxes.figure.number == 1: # Only respond to graph window
for node, (x, y) in pos.items():
if abs(event.xdata - x) < 0.05 and abs(event.ydata - y) < 0.05:
# Print original JSON data to console
node_data = G.nodes[node]
print(f"\n{node}")
if 'original_data' in node_data:
print(json.dumps(node_data['original_data'], indent=2))
else:
print(json.dumps(node_data, indent=2))
if node in selected_nodes:
# Deselect the node
selected_nodes.remove(node)
node_artists[node].set_alpha(0.7)
else:
# Select the node
selected_nodes.add(node)
node_artists[node].set_alpha(1.0)
plt.draw()
return
plt.figure(1) # Switch back to graph figure
plt.connect('motion_notify_event', on_motion)
plt.connect('button_press_event', on_click)
# Extract role name from ARN
role_name = target_role.split('/')[-1]
plt.title(f"AWS Trust Chain to {role_name}")
graph_ax.axis('off')
# Show both figures
plt.figure(1)
plt.show(block=False)
plt.figure(2)
plt.show()
def print_trust_tree_to_file(G, target_role, output_file="trust-tree.txt", depth=0):
"""Print the trust tree to a file with tab-based indentation.
Args:
G: The trust graph
target_role: The target role to start from
output_file: The file to write to (default: trust-tree.txt)
depth: Current depth in the tree (for indentation)
"""
# Get node data
node_data = G.nodes[target_role]
node_type = node_data.get('type', 'role')
account = node_data.get('account', 'unknown')
is_admin = node_data.get('admin', False)
# Create indentation string
indent = '\t' * depth
# Format node label based on type
if node_type == 'account':
label = f"Account: {account}"
elif node_type == 'service':
label = f"Service Principal: {target_role}"
elif node_type == 'saml':
label = f"SAML Provider: {target_role} (Account: {account})"
elif node_type == 'canonical':
label = f"Canonical User: {target_role} (Account: {account})"
elif node_type == 'world':
label = "The World (Anyone can assume this role)"
elif is_admin:
label = f"Admin Role: {target_role} (Account: {account})"
else:
label = f"Role: {target_role} (Account: {account})"
# Write the current node
with open(output_file, 'a') as f:
f.write(f"{indent}{label}\n")
# Recursively process predecessors
for pred in G.predecessors(target_role):
print_trust_tree_to_file(G, pred, output_file, depth + 1)
def print_mermaid_diagram(G, target_role, output_file="mermaid.md"):
"""Print the trust graph as a Mermaid markdown diagram.
Args:
G: The trust graph
target_role: The target role to start from
output_file: The file to write to (default: mermaid.md)
"""
# Create a unique ID for each node
node_ids = {}
id_counter = 0
def get_node_id(node):
nonlocal id_counter
if node not in node_ids:
node_ids[node] = f"node{id_counter}"
id_counter += 1
return node_ids[node]
# Write the Mermaid diagram
with open(output_file, 'w') as f:
f.write("```mermaid\n")
f.write("graph TD\n")
f.write(" classDef Account fill:#f9f,stroke:#333,stroke-width:2px\n")
f.write(" classDef Service fill:#bbf,stroke:#333,stroke-width:2px\n")
f.write(" classDef SAML fill:#bfb,stroke:#333,stroke-width:2px\n")
f.write(" classDef Canonical fill:#fbb,stroke:#333,stroke-width:2px\n")
f.write(" classDef World fill:#f00,stroke:#333,stroke-width:2px\n")
f.write(" classDef Admin fill:#f00,stroke:#333,stroke-width:2px\n")
f.write(" classDef Role fill:#fff,stroke:#333,stroke-width:2px\n")
# Add nodes with appropriate styling
for node in G.nodes():
node_data = G.nodes[node]
node_type = node_data.get('type', 'role')
account = node_data.get('account', 'unknown')
is_admin = node_data.get('admin', False)
node_id = get_node_id(node)
# Format node label - simplified to avoid special characters
if node_type == 'account':
label = f"Account {account}"
class_name = "Account"
elif node_type == 'service':
label = f"Service {node.split('/')[-1]}"
class_name = "Service"
elif node_type == 'saml':
label = f"SAML {node.split('/')[-1]}"
class_name = "SAML"
elif node_type == 'canonical':
label = f"Canonical {node.split('/')[-1]}"
class_name = "Canonical"
elif node_type == 'world':
label = "The World"
class_name = "World"
elif is_admin:
label = f"Admin {node.split('/')[-1]}"
class_name = "Admin"
else:
label = f"Role {node.split('/')[-1]}"
class_name = "Role"
# Escape any special characters in the label
label = label.replace('"', '\\"')
label = label.replace('\\', '\\\\')
f.write(f" {node_id}[\"{label}\"]\n")
f.write(f" class {node_id} {class_name}\n")
# Add edges
for u, v, data in G.edges(data=True):
edge_type = data.get('type', 'trust')
u_id = get_node_id(u)
v_id = get_node_id(v)
if edge_type == 'implied_trust':
f.write(f" {u_id} -.-> {v_id}\n")
else:
f.write(f" {u_id} --> {v_id}\n")
f.write("```\n")
def main():
parser = argparse.ArgumentParser(description='Analyze AWS trust relationships')
parser.add_argument('--visualize', action='store_true',
help='Visualize the trust graph')
parser.add_argument('--tree-file', default='trust-tree.txt',
help='Output file for trust tree (default: trust-tree.txt)')
parser.add_argument('--mermaid-file', default='mermaid.md',
help='Output file for Mermaid diagram (default: mermaid.md)')
parser.add_argument('--require-assume-role', action='store_true',
help='Only include principals that have assume role permissions')
args = parser.parse_args()
# Build the complete graph and get the principal data cache
G, principal_data_cache = build_trust_graph(require_assume_role=args.require_assume_role)
print(f"Complete graph has {G.number_of_nodes()} nodes and {G.number_of_edges()} edges")
while True: # Main loop
# Find admin roles and world trust roles using the cache
admin_roles = find_admin_roles(principal_data_cache)
world_trust_roles = find_world_trust_roles(principal_data_cache)
if not admin_roles and not world_trust_roles:
print("No roles found!")
return
# List all admin roles
if admin_roles:
print("\nAdmin roles:")
for i, role in enumerate(admin_roles, 1):
warning = " [⚠️ trusts everyone]" if role in world_trust_roles else ""
print(f"{i}. {role}{warning}")
# List all world trust roles that aren't admin roles
non_admin_world_trust = [r for r in world_trust_roles if r not in admin_roles]
if non_admin_world_trust:
print("\nNon-admin roles that trust everyone:")
for i, role in enumerate(non_admin_world_trust, len(admin_roles) + 1):
print(f"{i}. {role}")
# Get user input
while True:
try:
total_roles = len(admin_roles) + len(non_admin_world_trust)
choice = input(f"\nEnter the number of the role to analyze (1-{total_roles}) or 'q' to quit: ")
if choice.lower() == 'q':
return
index = int(choice) - 1
if 0 <= index < total_roles:
if index < len(admin_roles):
target_role = admin_roles[index]
else:
target_role = non_admin_world_trust[index - len(admin_roles)]
break
else:
print(f"Invalid choice. Please enter a number between 1 and {total_roles}.")
except ValueError:
print("Invalid input. Please enter a number or 'q' to quit.")
print(f"\nAnalyzing trust chain to role: {target_role}")
# Build the trust chain
chain = build_trust_chain(G, target_role)
print(f"Trust chain has {chain.number_of_nodes()} nodes and {chain.number_of_edges()} edges")
# Write the trust tree
with open(args.tree_file, 'w') as f:
f.write(f"Trust Tree for {target_role}\n\n")
print_trust_tree_to_file(chain, target_role, args.tree_file)
print(f"\nTrust tree written to {args.tree_file}")
# Generate Mermaid diagram
print_mermaid_diagram(chain, target_role, args.mermaid_file)
print(f"Mermaid diagram written to {args.mermaid_file}")
# Visualize if requested
if args.visualize:
visualize_graph(chain, target_role)
# Clear the figure to prepare for next visualization
plt.close('all')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment