-
-
Save acumenix/388aee8974c514915162253d53c1cd37 to your computer and use it in GitHub Desktop.
Fancy Security Group Cleanup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import boto3 | |
import argparse | |
import json | |
# `pip install -U PTable` will get you the right fork of PrettyTable | |
from prettytable import PrettyTable | |
from botocore.exceptions import ClientError | |
# Tags in this list will be checked against any tags Security Groups may have on them | |
# If a match is found, the SG will be excluded. Matches are case insensitive for both key and value | |
exclusion_tags = [{"Key": "ephemeral", "Value": "true"}] | |
try: | |
parser = argparse.ArgumentParser(description="Find and delete unused Security Groups") | |
parser.add_argument( | |
"-r", "--region", type=str, default="us-east-1", help="The default region is us-east-1" | |
) | |
parser.add_argument( | |
"-p", | |
"--profile", | |
type=str, | |
default="default", | |
help="The AWS profile to use for the connection", | |
) | |
parser.add_argument( | |
"-d", "--delete", action="store_true", help="Try to delete the security groups we find" | |
) | |
parser.add_argument("--dry-run", dest="dry_run", action="store_true", help="Simulate deletes") | |
parser.add_argument( | |
"--todos", dest="all_regions", action="store_true", help="Run on each region in turn" | |
) | |
parser.add_argument( | |
"--json", | |
dest="json_output", | |
action="store_true", | |
help="Output JSON Doc of rules for each SG to be deleted", | |
) | |
parser.add_argument( | |
"-q", | |
"--quiet", | |
action="store_true", | |
help="Don't show summaries for non-deletable resources", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
action="store", | |
default="output", | |
help="Optional directory prefix for the output json files if json_output is specified.", | |
) | |
args = parser.parse_args() | |
session = boto3.session.Session(profile_name=args.profile) | |
regions = [] | |
if args.all_regions: | |
for region in session.get_available_regions("ec2"): | |
regions.append(region) | |
else: | |
regions.append(args.region) | |
for region in regions: | |
ec2 = session.resource("ec2", region_name=region) | |
client = session.client("ec2", region_name=region) | |
acct_name = session.client("iam").list_account_aliases()["AccountAliases"][0] | |
acct_id = session.client("sts").get_caller_identity().get("Account") | |
all_groups = [] | |
security_groups_in_use = [] | |
rule_referenced_sgs = [] | |
tag_excluded_sgs = [] | |
def lookup_by_id(sgid): | |
sg = ec2.get_all_security_groups(group_ids=sgid) | |
return sg[0].name | |
# Get ALL security groups names | |
try: | |
security_groups_dict = client.describe_security_groups() | |
except ClientError as e: | |
if e.response["Error"]["Code"] == "AuthFailure": | |
if args.quiet: | |
continue | |
else: | |
error_table = PrettyTable(["Error Message"]) | |
error_table.add_row( | |
["Authentication Failure: You may not have access to this Region"] | |
) | |
print( | |
error_table.get_string( | |
title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
) | |
) | |
continue | |
security_groups = security_groups_dict["SecurityGroups"] | |
for groupobj in security_groups: | |
if ( | |
groupobj["GroupName"] == "default" | |
or groupobj["GroupName"].startswith("d-") | |
or groupobj["GroupName"].startswith("AWS-OpsWorks-") | |
): | |
security_groups_in_use.append(groupobj["GroupId"]) | |
for ruleset in groupobj["IpPermissions"]: | |
if len(ruleset["UserIdGroupPairs"]) > 0: | |
for group in ruleset["UserIdGroupPairs"]: | |
rule_referenced_sgs.append(group["GroupId"]) | |
if len(exclusion_tags) > 0: | |
if "Tags" in groupobj: | |
for tag_group in exclusion_tags: | |
for tags in groupobj["Tags"]: | |
if str(tag_group).casefold() == str(tags).casefold(): | |
tag_excluded_sgs.append(group["GroupId"]) | |
all_groups.append(groupobj["GroupId"]) | |
total_groups = len(all_groups) | |
# Prune the groups that are referenced by other groups | |
for group in rule_referenced_sgs: | |
if group in all_groups: | |
all_groups.remove(group) | |
security_groups_in_use.append(group) | |
for group in tag_excluded_sgs: | |
if group in all_groups: | |
all_groups.remove(group) | |
security_groups_in_use.append(group) | |
# Get all security groups used by instances | |
instances_dict = client.describe_instances() | |
reservations = instances_dict["Reservations"] | |
network_interface_count = 0 | |
for i in reservations: | |
for j in i["Instances"]: | |
for k in j["SecurityGroups"]: | |
if k["GroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(k["GroupId"]) | |
# Security Groups in use by Network Interfaces | |
eni_dict = client.describe_network_interfaces() | |
for i in eni_dict["NetworkInterfaces"]: | |
for j in i["Groups"]: | |
if j["GroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(j["GroupId"]) | |
# Security groups used by classic ELBs | |
elb_client = session.client("elb", region_name=region) | |
elb_dict = elb_client.describe_load_balancers() | |
for i in elb_dict["LoadBalancerDescriptions"]: | |
for j in i["SecurityGroups"]: | |
if j not in security_groups_in_use: | |
security_groups_in_use.append(j) | |
# Security groups used by ALBs | |
elb2_client = session.client("elbv2", region_name=region) | |
elb2_dict = elb2_client.describe_load_balancers() | |
for i in elb2_dict["LoadBalancers"]: | |
if "SecurityGroups" in i.keys(): | |
for j in i["SecurityGroups"]: | |
if j not in security_groups_in_use: | |
security_groups_in_use.append(j) | |
# Security groups used by RDS | |
rds_client = session.client("rds", region_name=region) | |
rds_dict = rds_client.describe_db_instances() | |
for i in rds_dict["DBInstances"]: | |
for j in i["VpcSecurityGroups"]: | |
if j["VpcSecurityGroupId"] not in security_groups_in_use: | |
security_groups_in_use.append(j["VpcSecurityGroupId"]) | |
delete_candidates = [] | |
for group in all_groups: | |
if group not in security_groups_in_use: | |
delete_candidates.append(group) | |
if args.json_output: | |
region_dict = {} | |
# Create json docs in directory structure account_id/vpc_id/sg_id.json | |
path = "./{}/{}".format(args.output, acct_id) | |
os.makedirs(path, exist_ok=True) | |
for group in sorted(delete_candidates): | |
security_group = ec2.SecurityGroup(group) | |
sg_doc = { | |
"id": security_group.id, | |
"region": region, | |
"name": security_group.group_name, | |
"description": security_group.description, | |
"owner_id": security_group.owner_id, | |
"vpc_id": security_group.vpc_id, | |
"tags": security_group.tags, | |
"ingress_rules": security_group.ip_permissions, | |
"egress_rules": security_group.ip_permissions_egress, | |
} | |
if region not in region_dict: | |
region_dict[region] = [] | |
region_dict[region].append({security_group.id: sg_doc}) | |
for region_name in region_dict: | |
filename = path + "/" + region_name + "_unused_sg.json" | |
with open(filename, "w") as outfile: | |
outfile.write(json.dumps(region_dict, indent=2)) | |
if args.delete: | |
print("We will now delete security groups identified to not be in use.") | |
dry_run_deletes = 0 | |
for group in delete_candidates: | |
security_group = ec2.SecurityGroup(group) | |
try: | |
if args.dry_run: | |
security_group.delete(DryRun=True) | |
else: | |
security_group.delete() | |
except ClientError as e: | |
if e.response["Error"]["Code"] == "DependencyViolation": | |
print( | |
"{0} requires manual remediation. DependencyViolation".format( | |
security_group.group_name | |
) | |
) | |
elif e.response["Error"]["Code"] == "DryRunOperation": | |
dry_run_deletes += 1 | |
else: | |
print("{0} requires manual remediation.".format(security_group.group_name)) | |
else: | |
if args.quiet and len(delete_candidates) == 0: | |
continue | |
else: | |
table = PrettyTable(["Region", "VPC ID", "SecurityGroup ID", "SecurityGroup Name"]) | |
table.align["SecurityGroup ID"] = "l" | |
table.align["VPC ID"] = "c" | |
table.align["SecurityGroup Name"] = "l" | |
table.sortby = "VPC ID" | |
for group in sorted(delete_candidates): | |
security_group = ec2.SecurityGroup(group) | |
table.add_row([region, security_group.vpc_id, group, security_group.group_name]) | |
print( | |
table.get_string( | |
title="Account: {} ({}) - {}".format(acct_name, acct_id, region) | |
) | |
) | |
if args.quiet: | |
continue | |
else: | |
summary_table = PrettyTable(["Category Evaluated", "Count"]) | |
summary_table.align["Category Evaluated"] = "l" | |
summary_table.align["Count"] = "r" | |
summary_table.add_row(["Total Security Groups", total_groups]) | |
summary_table.add_row(["Total EC2 Instances", len(reservations)]) | |
summary_table.add_row( | |
[ | |
"Total Load Balancers", | |
len(elb_dict["LoadBalancerDescriptions"]) + len(elb2_dict["LoadBalancers"]), | |
] | |
) | |
summary_table.add_row(["Total RDS Instances", len(rds_dict["DBInstances"])]) | |
summary_table.add_row(["Total Network Interfaces", len(eni_dict["NetworkInterfaces"])]) | |
summary_table.add_row(["In-Use Security Groups", len(set(security_groups_in_use))]) | |
summary_table.add_row(["Security Groups Excluded by Tag", len(tag_excluded_sgs)]) | |
summary_table.add_row(["---", "---"]) | |
if args.dry_run: | |
summary_table.add_row(["Unused SG to Delete (DRY-RUN)", dry_run_deletes]) | |
elif args.delete: | |
summary_table.add_row(["Unused SG Deleted", len(delete_candidates)]) | |
else: | |
summary_table.add_row(["Unused SG to Delete", len(delete_candidates)]) | |
print(summary_table.get_string(title="Summary")) | |
except KeyboardInterrupt: | |
print("\nCtrl+C Caught, Terminating") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment