Skip to content

Instantly share code, notes, and snippets.

@snixon
Forked from TomRyan-321/security-group-cleanup.py
Last active August 25, 2022 18:08
Show Gist options
  • Save snixon/059b0a0edf87e9a34d020bb2c9546874 to your computer and use it in GitHub Desktop.
Save snixon/059b0a0edf87e9a34d020bb2c9546874 to your computer and use it in GitHub Desktop.
Fancy Security Group Cleanup
#!/usr/bin/env python
import os
import boto3
import argparse
import json
# `pip install -U PTable` will get you the right fork of PrettyTable
from prettytable import PrettyTable
from botocore.exceptions import ClientError
# Tags in this list will be checked against any tags Security Groups may have on them
# If a match is found, the SG will be excluded. Matches are case insensitive for both key and value
exclusion_tags = [{"Key": "ephemeral", "Value": "true"}]
try:
parser = argparse.ArgumentParser(description="Find and delete unused Security Groups")
parser.add_argument(
"-r", "--region", type=str, default="us-east-1", help="The default region is us-east-1"
)
parser.add_argument(
"-p",
"--profile",
type=str,
default="default",
help="The AWS profile to use for the connection",
)
parser.add_argument(
"-d", "--delete", action="store_true", help="Try to delete the security groups we find"
)
parser.add_argument("--dry-run", dest="dry_run", action="store_true", help="Simulate deletes")
parser.add_argument(
"--todos", dest="all_regions", action="store_true", help="Run on each region in turn"
)
parser.add_argument(
"--json",
dest="json_output",
action="store_true",
help="Output JSON Doc of rules for each SG to be deleted",
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="Don't show summaries for non-deletable resources",
)
parser.add_argument(
"-o",
"--output",
action="store",
default="output",
help="Optional directory prefix for the output json files if json_output is specified.",
)
args = parser.parse_args()
session = boto3.session.Session(profile_name=args.profile)
regions = []
if args.all_regions:
for region in session.get_available_regions("ec2"):
regions.append(region)
else:
regions.append(args.region)
for region in regions:
ec2 = session.resource("ec2", region_name=region)
client = session.client("ec2", region_name=region)
acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]
acct_id = session.client("sts").get_caller_identity().get("Account")
all_groups = []
security_groups_in_use = []
rule_referenced_sgs = []
tag_excluded_sgs = []
def lookup_by_id(sgid):
sg = ec2.get_all_security_groups(group_ids=sgid)
return sg[0].name
# Get ALL security groups names
try:
security_groups_dict = client.describe_security_groups()
except ClientError as e:
if e.response["Error"]["Code"] == "AuthFailure":
if args.quiet:
continue
else:
error_table = PrettyTable(["Error Message"])
error_table.add_row(
["Authentication Failure: You may not have access to this Region"]
)
print(
error_table.get_string(
title="Account: {} ({}) - {}".format(acct_name, acct_id, region)
)
)
continue
security_groups = security_groups_dict["SecurityGroups"]
for groupobj in security_groups:
if (
groupobj["GroupName"] == "default"
or groupobj["GroupName"].startswith("d-")
or groupobj["GroupName"].startswith("AWS-OpsWorks-")
):
security_groups_in_use.append(groupobj["GroupId"])
for ruleset in groupobj["IpPermissions"]:
if len(ruleset["UserIdGroupPairs"]) > 0:
for group in ruleset["UserIdGroupPairs"]:
rule_referenced_sgs.append(group["GroupId"])
if len(exclusion_tags) > 0:
if "Tags" in groupobj:
for tag_group in exclusion_tags:
for tags in groupobj["Tags"]:
if str(tag_group).casefold() == str(tags).casefold():
tag_excluded_sgs.append(groupobj["GroupId"])
all_groups.append(groupobj["GroupId"])
total_groups = len(all_groups)
# Prune the groups that are referenced by other groups
for group in rule_referenced_sgs:
if group in all_groups:
all_groups.remove(group)
security_groups_in_use.append(group)
for group in tag_excluded_sgs:
if group in all_groups:
all_groups.remove(group)
security_groups_in_use.append(group)
# Get all security groups used by instances
instances_dict = client.describe_instances()
reservations = instances_dict["Reservations"]
network_interface_count = 0
for i in reservations:
for j in i["Instances"]:
for k in j["SecurityGroups"]:
if k["GroupId"] not in security_groups_in_use:
security_groups_in_use.append(k["GroupId"])
# Security Groups in use by Network Interfaces
eni_dict = client.describe_network_interfaces()
for i in eni_dict["NetworkInterfaces"]:
for j in i["Groups"]:
if j["GroupId"] not in security_groups_in_use:
security_groups_in_use.append(j["GroupId"])
# Security groups used by classic ELBs
elb_client = session.client("elb", region_name=region)
elb_dict = elb_client.describe_load_balancers()
for i in elb_dict["LoadBalancerDescriptions"]:
for j in i["SecurityGroups"]:
if j not in security_groups_in_use:
security_groups_in_use.append(j)
# Security groups used by ALBs
elb2_client = session.client("elbv2", region_name=region)
elb2_dict = elb2_client.describe_load_balancers()
for i in elb2_dict["LoadBalancers"]:
if "SecurityGroups" in i.keys():
for j in i["SecurityGroups"]:
if j not in security_groups_in_use:
security_groups_in_use.append(j)
# Security groups used by RDS
rds_client = session.client("rds", region_name=region)
rds_dict = rds_client.describe_db_instances()
for i in rds_dict["DBInstances"]:
for j in i["VpcSecurityGroups"]:
if j["VpcSecurityGroupId"] not in security_groups_in_use:
security_groups_in_use.append(j["VpcSecurityGroupId"])
delete_candidates = []
for group in all_groups:
if group not in security_groups_in_use:
delete_candidates.append(group)
if args.json_output:
region_dict = {}
# Create json docs in directory structure account_id/vpc_id/sg_id.json
path = "./{}/{}".format(args.output, acct_id)
os.makedirs(path, exist_ok=True)
for group in sorted(delete_candidates):
security_group = ec2.SecurityGroup(group)
sg_doc = {
"id": security_group.id,
"region": region,
"name": security_group.group_name,
"description": security_group.description,
"owner_id": security_group.owner_id,
"vpc_id": security_group.vpc_id,
"tags": security_group.tags,
"ingress_rules": security_group.ip_permissions,
"egress_rules": security_group.ip_permissions_egress,
}
if region not in region_dict:
region_dict[region] = []
region_dict[region].append({security_group.id: sg_doc})
for region_name in region_dict:
filename = path + "/" + region_name + "_unused_sg.json"
with open(filename, "w") as outfile:
outfile.write(json.dumps(region_dict, indent=2))
if args.delete:
print("We will now delete security groups identified to not be in use.")
dry_run_deletes = 0
for group in delete_candidates:
security_group = ec2.SecurityGroup(group)
try:
if args.dry_run:
security_group.delete(DryRun=True)
else:
security_group.delete()
except ClientError as e:
if e.response["Error"]["Code"] == "DependencyViolation":
print(
"{0} requires manual remediation. DependencyViolation".format(
security_group.group_name
)
)
elif e.response["Error"]["Code"] == "DryRunOperation":
dry_run_deletes += 1
else:
print("{0} requires manual remediation.".format(security_group.group_name))
else:
if args.quiet and len(delete_candidates) == 0:
continue
else:
table = PrettyTable(["Region", "VPC ID", "SecurityGroup ID", "SecurityGroup Name"])
table.align["SecurityGroup ID"] = "l"
table.align["VPC ID"] = "c"
table.align["SecurityGroup Name"] = "l"
table.sortby = "VPC ID"
for group in sorted(delete_candidates):
security_group = ec2.SecurityGroup(group)
table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name])
print(
table.get_string(
title="Account: {} ({}) - {}".format(acct_name, acct_id, region)
)
)
if args.quiet:
continue
else:
summary_table = PrettyTable(["Category Evaluated", "Count"])
summary_table.align["Category Evaluated"] = "l"
summary_table.align["Count"] = "r"
summary_table.add_row(["Total Security Groups", total_groups])
summary_table.add_row(["Total EC2 Instances", len(reservations)])
summary_table.add_row(
[
"Total Load Balancers",
len(elb_dict["LoadBalancerDescriptions"]) + len(elb2_dict["LoadBalancers"]),
]
)
summary_table.add_row(["Total RDS Instances", len(rds_dict["DBInstances"])])
summary_table.add_row(["Total Network Interfaces", len(eni_dict["NetworkInterfaces"])])
summary_table.add_row(["In-Use Security Groups", len(set(security_groups_in_use))])
summary_table.add_row(["Security Groups Excluded by Tag", len(tag_excluded_sgs)])
summary_table.add_row(["---", "---"])
if args.dry_run:
summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes])
elif args.delete:
summary_table.add_row(["Unused SG Deleted", len(delete_candidates)])
else:
summary_table.add_row(["Unused SG to Delete", len(delete_candidates)])
print(summary_table.get_string(title="Summary"))
except KeyboardInterrupt:
print("\nCtrl+C Caught, Terminating")
@srikrishnt
Copy link

https://gist.github.com/snixon/059b0a0edf87e9a34d020bb2c9546874#file-security-group-cleanup-py-L118 errors out when using exclusion tags;

FIX :
tag_excluded_sgs.append(groupobj["GroupId"])

@snixon
Copy link
Author

snixon commented Dec 5, 2019

https://gist.github.com/snixon/059b0a0edf87e9a34d020bb2c9546874#file-security-group-cleanup-py-L118 errors out when using exclusion tags;

FIX :
tag_excluded_sgs.append(groupobj["GroupId"])

Hey thanks! updated.

@evolart
Copy link

evolart commented May 1, 2020

This works for me on my account where I only have like 10 regular EC2 SGs but on my main account with over 300 SGs that spread across Workspaces, Lambdas, RDS, and all kinds of other services I end up with the following error:

Traceback (most recent call last):
File "security-group-cleanup.py", line 247, in
title="Account: {} ({}) - {}".format(acct_name, acct_id, region)
File "/home/csmith/.local/lib/python3.6/site-packages/prettytable/prettytable.py", line 1140, in get_string
rows = self._get_rows(options)
File "/home/csmith/.local/lib/python3.6/site-packages/prettytable/prettytable.py", line 1083, in _get_rows
rows.sort(reverse=options["reversesort"], key=options["sort_key"])
TypeError: '<' not supported between instances of 'NoneType' and 'str'

@josharrington
Copy link

I ran into this issue also. Change line 244 to:

table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name])

Copy link

ghost commented Jul 13, 2020

Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?

acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]

IndexError: list index out of range

@snixon
Copy link
Author

snixon commented Jul 13, 2020

I ran into this issue also. Change line 244 to:

table.add_row([region, security_group.vpc_id or "None", group, security_group.group_name])
@josharrington
Thanks for the notes, updated!

@snixon
Copy link
Author

snixon commented Jul 13, 2020

Hi, I'm beginner to this, i'm getting error while running this .py file.
Anyone can help?

acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0]

IndexError: list index out of range

@yinghan1221 The script is expecting that you've set an account alias on your AWS account, like a friendly name, add one to it and it should work for you

@evolart
Copy link

evolart commented Oct 27, 2020

It looks like the Dry Run option is broken? This is what I get when using --dry-run.

Traceback (most recent call last): File ".\security-group-cleanup.py", line 271, in <module> summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) NameError: name 'dry_run_deletes' is not defined

Honestly though I'm not sure why the dry-run code even exists...the default mode without specify the delete argument is a dry-run....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment