Last active
July 8, 2024 20:51
-
-
Save bneutra/e91f4ae2501c774d428e3f0425202472 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import sys | |
def has_ecs_create_permissions(policy_document): | |
""" | |
Check if the policy document contains permissions to create ECS resources. | |
""" | |
for statement in policy_document.get("Statement", []): | |
if statement.get("Action") is None: | |
continue | |
if statement["Effect"] == "Allow": | |
actions = ( | |
statement["Action"] | |
if isinstance(statement["Action"], list) | |
else [statement["Action"]] | |
) | |
for action in actions: | |
if any( | |
ecs_action in action | |
for ecs_action in [ | |
"ecs:Create", | |
"ecs:RunTask", | |
"ecs:StartTask", | |
"ecs:Register", | |
] | |
): | |
return [action] | |
return [] | |
def has_ecs_tag_resource_permission(policy_document): | |
""" | |
Check if the policy document contains permission to tag ECS resources. | |
""" | |
for statement in policy_document.get("Statement", []): | |
if statement.get("Action") is None: | |
continue | |
if statement["Effect"] == "Allow": | |
actions = ( | |
statement["Action"] | |
if isinstance(statement["Action"], list) | |
else [statement["Action"]] | |
) | |
if "ecs:TagResource" in actions: | |
return True | |
return False | |
def get_all_roles(iam_client): | |
""" | |
Use a paginator to get all roles in the specified region. | |
""" | |
paginator = iam_client.get_paginator("list_roles") | |
roles = [] | |
for page in paginator.paginate(): | |
roles.extend(page["Roles"]) | |
return roles | |
def scan_roles(iam_client): | |
""" | |
Scan all IAM roles in the specified region and flag roles related to ECS creation without ecs:TagResource permission. | |
""" | |
roles = get_all_roles(iam_client) | |
flagged_roles = [] | |
for role in roles: | |
# print(f"Checking role {role['RoleName']}") | |
policy_documents = [] | |
breakpoint() | |
attached_policies = iam_client.list_attached_role_policies( | |
RoleName=role["RoleName"] | |
) | |
for policy in attached_policies.get("AttachedPolicies", []): | |
policy_version = iam_client.get_policy_version( | |
PolicyArn=policy["PolicyArn"], | |
VersionId=iam_client.get_policy(PolicyArn=policy["PolicyArn"])[ | |
"Policy" | |
]["DefaultVersionId"], | |
) | |
policy_documents.append(policy_version["PolicyVersion"]["Document"]) | |
create_actions = [has_ecs_create_permissions(doc) for doc in policy_documents] | |
create_ecs_resources = any(create_actions) | |
has_tag_resource = any( | |
has_ecs_tag_resource_permission(doc) for doc in policy_documents | |
) | |
if create_ecs_resources and not has_tag_resource: | |
print(f"Role {role['RoleName']} flagged, actions {create_actions}") | |
flagged_roles.append(role["RoleName"]) | |
return flagged_roles | |
def scan_policies(iam_client): | |
policies = list_policies(iam_client) | |
flagged_policies = [] | |
for policy in policies: | |
policy_version = iam_client.get_policy(PolicyArn=policy)['Policy']['DefaultVersionId'] | |
policy_document = iam_client.get_policy_version(PolicyArn=policy, VersionId=policy_version)['PolicyVersion']['Document'] | |
#print(policy_document) | |
create_actions = has_ecs_create_permissions(policy_document) | |
create_ecs_resources = any(create_actions) | |
has_tag_resource = has_ecs_tag_resource_permission(policy_document) | |
if create_ecs_resources and not has_tag_resource: | |
print(f"Policy {policy} flagged, actions {create_actions}") | |
flagged_policies.append(policy) | |
return flagged_policies | |
def list_policies(iam_client): | |
# List all IAM policies with pagination | |
paginator = iam_client.get_paginator('list_policies') | |
policy_list = [] | |
for response in paginator.paginate(Scope='Local'): | |
for policy in response['Policies']: | |
policy_list.append(policy['Arn']) | |
return policy_list | |
# IAM is global | |
region = "us-west-2" | |
iam_client = boto3.client("iam", region_name=region) | |
flagged_policies = scan_policies(iam_client) | |
#flagged_roles = scan_roles(iam_client) | |
#print("Flagged Roles:", flagged_roles) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment