-
-
Save snixon/059b0a0edf87e9a34d020bb2c9546874 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import os | |
import boto3 | |
import argparse | |
import json | |
# `pip install -U PTable` will get you the right fork of PrettyTable | |
from prettytable import PrettyTable | |
from botocore.exceptions import ClientError | |
# Tags in this list will be checked against any tags Security Groups may have on them | |
# If a match is found, the SG will be excluded. Matches are case insensitive for both key and value | |
exclusion_tags = [{"Key": "ephemeral", "Value": "true"}] | |
try: | |
parser = argparse.ArgumentParser(description="Find and delete unused Security Groups") | |
parser.add_argument( | |
"-r", "--region", type=str, default="us-east-1", help="The default region is us-east-1" | |
) | |
parser.add_argument( | |
"-p", | |
"--profile", | |
type=str, | |
default="default", | |
help="The AWS profile to use for the connection", | |
) | |
parser.add_argument( | |
"-d", "--delete", action="store_true", help="Try to delete the security groups we find" | |
) | |
parser.add_argument("--dry-run", dest="dry_run", action="store_true", help="Simulate deletes") | |
parser.add_argument( | |
"--todos", dest="all_regions", action="store_true", help="Run on each region in turn" | |
) | |
parser.add_argument( | |
"--json", | |
dest="json_output", | |
action="store_true", | |
help="Output JSON Doc of rules for each SG to be deleted", | |
) | |
parser.add_argument( | |
"-q", | |
"--quiet", | |
action="store_true", | |
help="Don't show summaries for non-deletable resources", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
action="store", | |
default="output", | |
help="Optional directory prefix for the output json files if json_output is specified.", | |
) | |
args = parser.parse_args() | |
session = boto3.session.Session(profile_name=args.profile) | |
regions = [] | |
if args.all_regions: | |
for region in session.get_available_regions("ec2"): | |
regions.append(region) | |
else: | |
regions.append(args.region) | |
for region in regions: | |
ec2 = session.resource("ec2", region_name=region) | |
client = session.client("ec2", region_name=region) | |
acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0] | |
acct_id = session.client("sts").get_caller_identity().get("Account") | |
all_groups = [] | |
security_groups_in_use = [] | |
rule_referenced_sgs = [] | |
tag_excluded_sgs = [] | |
def lookup_by_id(sgid): | |
sg = ec2.get_all_security_groups(group_ids=sgid) | |
return sg[0].name | |
# Get ALL security groups names | |
try: | |
security_groups_dict = client.describe_security_groups() | |
except ClientError as e: | |
if e.response["Error"]["Code"] == "AuthFailure": | |
if args.quiet: | |
continue | |
else: | |
error_table = PrettyTable(["Error Message"]) | |
error_table.add_row( | |
["Authentication Failure: You may not have access to this Region"] | |
) | |
print( | |
error_table.get_string( | |
title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
) | |
) | |
continue | |
security_groups = security_groups_dict["SecurityGroups"] | |
for groupobj in security_groups: | |
if ( | |
groupobj["GroupName"] == "default" | |
or groupobj["GroupName"].startswith("d-") | |
or groupobj["GroupName"].startswith("AWS-OpsWorks-") | |
): | |
security_groups_in_use.append(groupobj["GroupId"]) | |
for ruleset in groupobj["IpPermissions"]: | |
if len(ruleset["UserIdGroupPairs"]) > 0: | |
for group in ruleset["UserIdGroupPairs"]: | |
rule_referenced_sgs.append(group["GroupId"]) | |
if len(exclusion_tags) > 0: | |
if "Tags" in groupobj: | |
for tag_group in exclusion_tags: | |
for tags in groupobj["Tags"]: | |
if str(tag_group).casefold() == str(tags).casefold(): | |
tag_excluded_sgs.append(groupobj["GroupId"]) | |
all_groups.append(groupobj["GroupId"]) | |
total_groups = len(all_groups) | |
# Prune the groups that are referenced by other groups | |
for group in rule_referenced_sgs: | |
if group in all_groups: | |
all_groups.remove(group) | |
security_groups_in_use.append(group) | |
for group in tag_excluded_sgs: | |
if group in all_groups: | |
all_groups.remove(group) | |
security_groups_in_use.append(group) | |
# Get all security groups used by instances | |
instances_dict = client.describe_instances() | |
reservations = instances_dict["Reservations"] | |
network_interface_count = 0 | |
for i in reservations: | |
for j in i["Instances"]: | |
for k in j["SecurityGroups"]: | |
if k["GroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(k["GroupId"]) | |
# Security Groups in use by Network Interfaces | |
eni_dict = client.describe_network_interfaces() | |
for i in eni_dict["NetworkInterfaces"]: | |
for j in i["Groups"]: | |
if j["GroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(j["GroupId"]) | |
# Security groups used by classic ELBs | |
elb_client = session.client("elb", region_name=region) | |
elb_dict = elb_client.describe_load_balancers() | |
for i in elb_dict["LoadBalancerDescriptions"]: | |
for j in i["SecurityGroups"]: | |
if j not in security_groups_in_use: | |
security_groups_in_use.append(j) | |
# Security groups used by ALBs | |
elb2_client = session.client("elbv2", region_name=region) | |
elb2_dict = elb2_client.describe_load_balancers() | |
for i in elb2_dict["LoadBalancers"]: | |
if "SecurityGroups" in i.keys(): | |
for j in i["SecurityGroups"]: | |
if j not in security_groups_in_use: | |
security_groups_in_use.append(j) | |
# Security groups used by RDS | |
rds_client = session.client("rds", region_name=region) | |
rds_dict = rds_client.describe_db_instances() | |
for i in rds_dict["DBInstances"]: | |
for j in i["VpcSecurityGroups"]: | |
if j["VpcSecurityGroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(j["VpcSecurityGroupId"]) | |
delete_candidates = [] | |
for group in all_groups: | |
if group not in security_groups_in_use: | |
delete_candidates.append(group) | |
if args.json_output: | |
region_dict = {} | |
# Create json docs in directory structure account_id/vpc_id/sg_id.json | |
path = "./{}/{}".format(args.output, acct_id) | |
os.makedirs(path, exist_ok=True) | |
for group in sorted(delete_candidates): | |
security_group = ec2.SecurityGroup(group) | |
sg_doc = { | |
"id": security_group.id, | |
"region": region, | |
"name": security_group.group_name, | |
"description": security_group.description, | |
"owner_id": security_group.owner_id, | |
"vpc_id": security_group.vpc_id, | |
"tags": security_group.tags, | |
"ingress_rules": security_group.ip_permissions, | |
"egress_rules": security_group.ip_permissions_egress, | |
} | |
if region not in region_dict: | |
region_dict[region] = [] | |
region_dict[region].append({security_group.id: sg_doc}) | |
for region_name in region_dict: | |
filename = path + "/" + region_name + "_unused_sg.json" | |
with open(filename, "w") as outfile: | |
outfile.write(json.dumps(region_dict, indent=2)) | |
if args.delete: | |
print("We will now delete security groups identified to not be in use.") | |
dry_run_deletes = 0 | |
for group in delete_candidates: | |
security_group = ec2.SecurityGroup(group) | |
try: | |
if args.dry_run: | |
security_group.delete(DryRun=True) | |
else: | |
security_group.delete() | |
except ClientError as e: | |
if e.response["Error"]["Code"] == "DependencyViolation": | |
print( | |
"{0} requires manual remediation. DependencyViolation".format( | |
security_group.group_name | |
) | |
) | |
elif e.response["Error"]["Code"] == "DryRunOperation": | |
dry_run_deletes += 1 | |
else: | |
print("{0} requires manual remediation.".format(security_group.group_name)) | |
else: | |
if args.quiet and len(delete_candidates) == 0: | |
continue | |
else: | |
table = PrettyTable(["Region", "VPC ID", "SecurityGroup ID", "SecurityGroup Name"]) | |
table.align["SecurityGroup ID"] = "l" | |
table.align["VPC ID"] = "c" | |
table.align["SecurityGroup Name"] = "l" | |
table.sortby = "VPC ID" | |
for group in sorted(delete_candidates): | |
security_group = ec2.SecurityGroup(group) | |
table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name]) | |
print( | |
table.get_string( | |
title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
) | |
) | |
if args.quiet: | |
continue | |
else: | |
summary_table = PrettyTable(["Category Evaluated", "Count"]) | |
summary_table.align["Category Evaluated"] = "l" | |
summary_table.align["Count"] = "r" | |
summary_table.add_row(["Total Security Groups", total_groups]) | |
summary_table.add_row(["Total EC2 Instances", len(reservations)]) | |
summary_table.add_row( | |
[ | |
"Total Load Balancers", | |
len(elb_dict["LoadBalancerDescriptions"]) + len(elb2_dict["LoadBalancers"]), | |
] | |
) | |
summary_table.add_row(["Total RDS Instances", len(rds_dict["DBInstances"])]) | |
summary_table.add_row(["Total Network Interfaces", len(eni_dict["NetworkInterfaces"])]) | |
summary_table.add_row(["In-Use Security Groups", len(set(security_groups_in_use))]) | |
summary_table.add_row(["Security Groups Excluded by Tag", len(tag_excluded_sgs)]) | |
summary_table.add_row(["---", "---"]) | |
if args.dry_run: | |
summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) | |
elif args.delete: | |
summary_table.add_row(["Unused SG Deleted", len(delete_candidates)]) | |
else: | |
summary_table.add_row(["Unused SG to Delete", len(delete_candidates)]) | |
print(summary_table.get_string(title="Summary")) | |
except KeyboardInterrupt: | |
print("\nCtrl+C Caught, Terminating") |
https://gist.github.com/snixon/059b0a0edf87e9a34d020bb2c9546874#file-security-group-cleanup-py-L118 errors out when using exclusion tags;
FIX :
tag_excluded_sgs.append(groupobj["GroupId"])
Hey thanks! updated.
This works for me on my account where I only have like 10 regular EC2 SGs but on my main account with over 300 SGs that spread across Workspaces, Lambdas, RDS, and all kinds of other services I end up with the following error:
Traceback (most recent call last):
File "security-group-cleanup.py", line 247, in
title="Account: {} ({}) - {}".format(acct_name, acct_id, region)
File "/home/csmith/.local/lib/python3.6/site-packages/prettytable/prettytable.py", line 1140, in get_string
rows = self._get_rows(options)
File "/home/csmith/.local/lib/python3.6/site-packages/prettytable/prettytable.py", line 1083, in _get_rows
rows.sort(reverse=options["reversesort"], key=options["sort_key"])
TypeError: '<' not supported between instances of 'NoneType' and 'str'
I ran into this issue also. Change line 244 to:
table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name])
Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?
acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]
IndexError: list index out of range
I ran into this issue also. Change line 244 to:
table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name])
@josharrington
Thanks for the notes, updated!
Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]
IndexError: list index out of range
@yinghan1221 The script is expecting that you've set an account alias on your AWS account, like a friendly name, add one to it and it should work for you
It looks like the Dry Run option is broken? This is what I get when using --dry-run.
Traceback (most recent call last): File ".\security-group-cleanup.py", line 271, in <module> summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) NameError: name 'dry_run_deletes' is not defined
Honestly though I'm not sure why the dry-run code even exists...the default mode without specify the delete argument is a dry-run....
https://gist.github.com/snixon/059b0a0edf87e9a34d020bb2c9546874#file-security-group-cleanup-py-L118 errors out when using exclusion tags;
FIX :
tag_excluded_sgs.append(groupobj["GroupId"])