Skip to content

Instantly share code, notes, and snippets.

@bneutra
Last active July 8, 2024 20:51
Show Gist options
  • Save bneutra/e91f4ae2501c774d428e3f0425202472 to your computer and use it in GitHub Desktop.
Save bneutra/e91f4ae2501c774d428e3f0425202472 to your computer and use it in GitHub Desktop.
import boto3
import sys
def has_ecs_create_permissions(policy_document):
"""
Check if the policy document contains permissions to create ECS resources.
"""
for statement in policy_document.get("Statement", []):
if statement.get("Action") is None:
continue
if statement["Effect"] == "Allow":
actions = (
statement["Action"]
if isinstance(statement["Action"], list)
else [statement["Action"]]
)
for action in actions:
if any(
ecs_action in action
for ecs_action in [
"ecs:Create",
"ecs:RunTask",
"ecs:StartTask",
"ecs:Register",
]
):
return [action]
return []
def has_ecs_tag_resource_permission(policy_document):
"""
Check if the policy document contains permission to tag ECS resources.
"""
for statement in policy_document.get("Statement", []):
if statement.get("Action") is None:
continue
if statement["Effect"] == "Allow":
actions = (
statement["Action"]
if isinstance(statement["Action"], list)
else [statement["Action"]]
)
if "ecs:TagResource" in actions:
return True
return False
def get_all_roles(iam_client):
"""
Use a paginator to get all roles in the specified region.
"""
paginator = iam_client.get_paginator("list_roles")
roles = []
for page in paginator.paginate():
roles.extend(page["Roles"])
return roles
def scan_roles(iam_client):
"""
Scan all IAM roles in the specified region and flag roles related to ECS creation without ecs:TagResource permission.
"""
roles = get_all_roles(iam_client)
flagged_roles = []
for role in roles:
# print(f"Checking role {role['RoleName']}")
policy_documents = []
breakpoint()
attached_policies = iam_client.list_attached_role_policies(
RoleName=role["RoleName"]
)
for policy in attached_policies.get("AttachedPolicies", []):
policy_version = iam_client.get_policy_version(
PolicyArn=policy["PolicyArn"],
VersionId=iam_client.get_policy(PolicyArn=policy["PolicyArn"])[
"Policy"
]["DefaultVersionId"],
)
policy_documents.append(policy_version["PolicyVersion"]["Document"])
create_actions = [has_ecs_create_permissions(doc) for doc in policy_documents]
create_ecs_resources = any(create_actions)
has_tag_resource = any(
has_ecs_tag_resource_permission(doc) for doc in policy_documents
)
if create_ecs_resources and not has_tag_resource:
print(f"Role {role['RoleName']} flagged, actions {create_actions}")
flagged_roles.append(role["RoleName"])
return flagged_roles
def scan_policies(iam_client):
policies = list_policies(iam_client)
flagged_policies = []
for policy in policies:
policy_version = iam_client.get_policy(PolicyArn=policy)['Policy']['DefaultVersionId']
policy_document = iam_client.get_policy_version(PolicyArn=policy, VersionId=policy_version)['PolicyVersion']['Document']
#print(policy_document)
create_actions = has_ecs_create_permissions(policy_document)
create_ecs_resources = any(create_actions)
has_tag_resource = has_ecs_tag_resource_permission(policy_document)
if create_ecs_resources and not has_tag_resource:
print(f"Policy {policy} flagged, actions {create_actions}")
flagged_policies.append(policy)
return flagged_policies
def list_policies(iam_client):
# List all IAM policies with pagination
paginator = iam_client.get_paginator('list_policies')
policy_list = []
for response in paginator.paginate(Scope='Local'):
for policy in response['Policies']:
policy_list.append(policy['Arn'])
return policy_list
# IAM is global
region = "us-west-2"
iam_client = boto3.client("iam", region_name=region)
flagged_policies = scan_policies(iam_client)
#flagged_roles = scan_roles(iam_client)
#print("Flagged Roles:", flagged_roles)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment