Forked from TomRyan-321/security-group-cleanup.py
Last active
August 25, 2022 18:08
-
-
Save snixon/059b0a0edf87e9a34d020bb2c9546874 to your computer and use it in GitHub Desktop.
Fancy Security Group Cleanup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import boto3 | |
import argparse | |
import json | |
# `pip install -U PTable` will get you the right fork of PrettyTable | |
from prettytable import PrettyTable | |
from botocore.exceptions import ClientError | |
# Tags in this list will be checked against any tags Security Groups may have on them | |
# If a match is found, the SG will be excluded. Matches are case insensitive for both key and value | |
exclusion_tags = [{"Key": "ephemeral", "Value": "true"}] | |
try: | |
parser = argparse.ArgumentParser(description="Find and delete unused Security Groups") | |
parser.add_argument( | |
"-r", "--region", type=str, default="us-east-1", help="The default region is us-east-1" | |
) | |
parser.add_argument( | |
"-p", | |
"--profile", | |
type=str, | |
default="default", | |
help="The AWS profile to use for the connection", | |
) | |
parser.add_argument( | |
"-d", "--delete", action="store_true", help="Try to delete the security groups we find" | |
) | |
parser.add_argument("--dry-run", dest="dry_run", action="store_true", help="Simulate deletes") | |
parser.add_argument( | |
"--todos", dest="all_regions", action="store_true", help="Run on each region in turn" | |
) | |
parser.add_argument( | |
"--json", | |
dest="json_output", | |
action="store_true", | |
help="Output JSON Doc of rules for each SG to be deleted", | |
) | |
parser.add_argument( | |
"-q", | |
"--quiet", | |
action="store_true", | |
help="Don't show summaries for non-deletable resources", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
action="store", | |
default="output", | |
help="Optional directory prefix for the output json files if json_output is specified.", | |
) | |
args = parser.parse_args() | |
session = boto3.session.Session(profile_name=args.profile) | |
regions = [] | |
if args.all_regions: | |
for region in session.get_available_regions("ec2"): | |
regions.append(region) | |
else: | |
regions.append(args.region) | |
for region in regions: | |
ec2 = session.resource("ec2", region_name=region) | |
client = session.client("ec2", region_name=region) | |
acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0] | |
acct_id = session.client("sts").get_caller_identity().get("Account") | |
all_groups = [] | |
security_groups_in_use = [] | |
rule_referenced_sgs = [] | |
tag_excluded_sgs = [] | |
def lookup_by_id(sgid): | |
sg = ec2.get_all_security_groups(group_ids=sgid) | |
return sg[0].name | |
# Get ALL security groups names | |
try: | |
security_groups_dict = client.describe_security_groups() | |
except ClientError as e: | |
if e.response["Error"]["Code"] == "AuthFailure": | |
if args.quiet: | |
continue | |
else: | |
error_table = PrettyTable(["Error Message"]) | |
error_table.add_row( | |
["Authentication Failure: You may not have access to this Region"] | |
) | |
print( | |
error_table.get_string( | |
title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
) | |
) | |
continue | |
security_groups = security_groups_dict["SecurityGroups"] | |
for groupobj in security_groups: | |
if ( | |
groupobj["GroupName"] == "default" | |
or groupobj["GroupName"].startswith("d-") | |
or groupobj["GroupName"].startswith("AWS-OpsWorks-") | |
): | |
security_groups_in_use.append(groupobj["GroupId"]) | |
for ruleset in groupobj["IpPermissions"]: | |
if len(ruleset["UserIdGroupPairs"]) > 0: | |
for group in ruleset["UserIdGroupPairs"]: | |
rule_referenced_sgs.append(group["GroupId"]) | |
if len(exclusion_tags) > 0: | |
if "Tags" in groupobj: | |
for tag_group in exclusion_tags: | |
for tags in groupobj["Tags"]: | |
if str(tag_group).casefold() == str(tags).casefold(): | |
tag_excluded_sgs.append(groupobj["GroupId"]) | |
all_groups.append(groupobj["GroupId"]) | |
total_groups = len(all_groups) | |
# Prune the groups that are referenced by other groups | |
for group in rule_referenced_sgs: | |
if group in all_groups: | |
all_groups.remove(group) | |
security_groups_in_use.append(group) | |
for group in tag_excluded_sgs: | |
if group in all_groups: | |
all_groups.remove(group) | |
security_groups_in_use.append(group) | |
# Get all security groups used by instances | |
instances_dict = client.describe_instances() | |
reservations = instances_dict["Reservations"] | |
network_interface_count = 0 | |
for i in reservations: | |
for j in i["Instances"]: | |
for k in j["SecurityGroups"]: | |
if k["GroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(k["GroupId"]) | |
# Security Groups in use by Network Interfaces | |
eni_dict = client.describe_network_interfaces() | |
for i in eni_dict["NetworkInterfaces"]: | |
for j in i["Groups"]: | |
if j["GroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(j["GroupId"]) | |
# Security groups used by classic ELBs | |
elb_client = session.client("elb", region_name=region) | |
elb_dict = elb_client.describe_load_balancers() | |
for i in elb_dict["LoadBalancerDescriptions"]: | |
for j in i["SecurityGroups"]: | |
if j not in security_groups_in_use: | |
security_groups_in_use.append(j) | |
# Security groups used by ALBs | |
elb2_client = session.client("elbv2", region_name=region) | |
elb2_dict = elb2_client.describe_load_balancers() | |
for i in elb2_dict["LoadBalancers"]: | |
if "SecurityGroups" in i.keys(): | |
for j in i["SecurityGroups"]: | |
if j not in security_groups_in_use: | |
security_groups_in_use.append(j) | |
# Security groups used by RDS | |
rds_client = session.client("rds", region_name=region) | |
rds_dict = rds_client.describe_db_instances() | |
for i in rds_dict["DBInstances"]: | |
for j in i["VpcSecurityGroups"]: | |
if j["VpcSecurityGroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(j["VpcSecurityGroupId"]) | |
delete_candidates = [] | |
for group in all_groups: | |
if group not in security_groups_in_use: | |
delete_candidates.append(group) | |
if args.json_output: | |
region_dict = {} | |
# Create json docs in directory structure account_id/vpc_id/sg_id.json | |
path = "./{}/{}".format(args.output, acct_id) | |
os.makedirs(path, exist_ok=True) | |
for group in sorted(delete_candidates): | |
security_group = ec2.SecurityGroup(group) | |
sg_doc = { | |
"id": security_group.id, | |
"region": region, | |
"name": security_group.group_name, | |
"description": security_group.description, | |
"owner_id": security_group.owner_id, | |
"vpc_id": security_group.vpc_id, | |
"tags": security_group.tags, | |
"ingress_rules": security_group.ip_permissions, | |
"egress_rules": security_group.ip_permissions_egress, | |
} | |
if region not in region_dict: | |
region_dict[region] = [] | |
region_dict[region].append({security_group.id: sg_doc}) | |
for region_name in region_dict: | |
filename = path + "/" + region_name + "_unused_sg.json" | |
with open(filename, "w") as outfile: | |
outfile.write(json.dumps(region_dict, indent=2)) | |
if args.delete: | |
print("We will now delete security groups identified to not be in use.") | |
dry_run_deletes = 0 | |
for group in delete_candidates: | |
security_group = ec2.SecurityGroup(group) | |
try: | |
if args.dry_run: | |
security_group.delete(DryRun=True) | |
else: | |
security_group.delete() | |
except ClientError as e: | |
if e.response["Error"]["Code"] == "DependencyViolation": | |
print( | |
"{0} requires manual remediation. DependencyViolation".format( | |
security_group.group_name | |
) | |
) | |
elif e.response["Error"]["Code"] == "DryRunOperation": | |
dry_run_deletes += 1 | |
else: | |
print("{0} requires manual remediation.".format(security_group.group_name)) | |
else: | |
if args.quiet and len(delete_candidates) == 0: | |
continue | |
else: | |
table = PrettyTable(["Region", "VPC ID", "SecurityGroup ID", "SecurityGroup Name"]) | |
table.align["SecurityGroup ID"] = "l" | |
table.align["VPC ID"] = "c" | |
table.align["SecurityGroup Name"] = "l" | |
table.sortby = "VPC ID" | |
for group in sorted(delete_candidates): | |
security_group = ec2.SecurityGroup(group) | |
table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name]) | |
print( | |
table.get_string( | |
title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
) | |
) | |
if args.quiet: | |
continue | |
else: | |
summary_table = PrettyTable(["Category Evaluated", "Count"]) | |
summary_table.align["Category Evaluated"] = "l" | |
summary_table.align["Count"] = "r" | |
summary_table.add_row(["Total Security Groups", total_groups]) | |
summary_table.add_row(["Total EC2 Instances", len(reservations)]) | |
summary_table.add_row( | |
[ | |
"Total Load Balancers", | |
len(elb_dict["LoadBalancerDescriptions"]) + len(elb2_dict["LoadBalancers"]), | |
] | |
) | |
summary_table.add_row(["Total RDS Instances", len(rds_dict["DBInstances"])]) | |
summary_table.add_row(["Total Network Interfaces", len(eni_dict["NetworkInterfaces"])]) | |
summary_table.add_row(["In-Use Security Groups", len(set(security_groups_in_use))]) | |
summary_table.add_row(["Security Groups Excluded by Tag", len(tag_excluded_sgs)]) | |
summary_table.add_row(["---", "---"]) | |
if args.dry_run: | |
summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) | |
elif args.delete: | |
summary_table.add_row(["Unused SG Deleted", len(delete_candidates)]) | |
else: | |
summary_table.add_row(["Unused SG to Delete", len(delete_candidates)]) | |
print(summary_table.get_string(title="Summary")) | |
except KeyboardInterrupt: | |
print("\nCtrl+C Caught, Terminating") |
Author
snixon
commented
Jul 13, 2020
•
Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]
IndexError: list index out of range
@yinghan1221 The script is expecting that you've set an account alias on your AWS account, like a friendly name, add one to it and it should work for you
It looks like the Dry Run option is broken? This is what I get when using --dry-run.
Traceback (most recent call last): File ".\security-group-cleanup.py", line 271, in <module> summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) NameError: name 'dry_run_deletes' is not defined
Honestly though I'm not sure why the dry-run code even exists...the default mode without specify the delete argument is a dry-run....
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment