Created
November 9, 2017 15:52
-
-
Save astrikos/d782c2108d4bebaabbbd1528d2c3821d to your computer and use it in GitHub Desktop.
Script to detect stale AWS security groups
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import boto3 | |
import argparse | |
class StaleSGDetector(object): | |
""" | |
Class to hold the logic for detecting AWS security groups that are stale. | |
""" | |
def __init__(self, **kwargs): | |
super(StaleSGDetector, self).__init__() | |
self.region = kwargs["region"] | |
self.profile = kwargs["profile"] | |
self.all_groups = set() | |
self.security_groups_in_use = set() | |
self.stale_groups = set() | |
self.ec2_instances_count = 0 | |
self.network_interfaces_count = 0 | |
self.elbs_count = 0 | |
self.albs_count = 0 | |
self.rds_count = 0 | |
self.redshift_count = 0 | |
def get_session(self): | |
"""Gets a new boto3 session""" | |
kwargs = {} | |
kwargs['profile_name'] = self.profile | |
kwargs['region_name'] = self.region | |
session = boto3.Session(**kwargs) | |
return session | |
def run(self): | |
self.session = self.get_session() | |
self.get_all_security_groups() | |
self.get_ec2_security_groups() | |
self.get_network_interfaces_security_groups() | |
self.get_elb_security_groups() | |
self.get_alb_security_groups() | |
self.get_rds_security_groups() | |
self.get_redshift_security_groups() | |
self.calculate_stale_security_groups() | |
def get_all_security_groups(self): | |
"""Adds all existing security groups used in the region.""" | |
ec2_client = self.session.client('ec2') | |
security_groups_dict = ec2_client.describe_security_groups() | |
security_groups = security_groups_dict['SecurityGroups'] | |
for group in security_groups: | |
# Default SGs don't have to be deleted. | |
if group['GroupName'] == 'default': | |
self.security_groups_in_use.add(group['GroupId']) | |
self.all_groups.add(group['GroupId']) | |
def get_ec2_security_groups(self): | |
"""Adds security groups used by EC2 instances.""" | |
ec2_client = self.session.client('ec2') | |
instances = ec2_client.describe_instances() | |
reservations = instances['Reservations'] | |
for reservation in reservations: | |
for instance in reservation['Instances']: | |
self.ec2_instances_count += 1 | |
for group in instance['SecurityGroups']: | |
self.security_groups_in_use.add(group['GroupId']) | |
def get_network_interfaces_security_groups(self): | |
"""Adds security groups used by Network Interfaces (ENIs).""" | |
ec2_client = self.session.client('ec2') | |
enis = ec2_client.describe_network_interfaces() | |
for eni in enis['NetworkInterfaces']: | |
self.network_interfaces_count += 1 | |
for group in eni['Groups']: | |
self.security_groups_in_use.add(group['GroupId']) | |
def get_elb_security_groups(self): | |
"""Adds security groups used by classic elastic loadbalancers (ELBs).""" | |
elb_client = self.session.client('elb') | |
elbs = elb_client.describe_load_balancers() | |
for elb in elbs['LoadBalancerDescriptions']: | |
self.elbs_count += 1 | |
for group in elb['SecurityGroups']: | |
self.security_groups_in_use.add(group) | |
def get_alb_security_groups(self): | |
"""Adds security groups used by application loadbalancers (ALBs).""" | |
alb_client = self.session.client('elbv2') | |
albs = alb_client.describe_load_balancers() | |
for alb in albs['LoadBalancers']: | |
self.albs_count += 1 | |
for group in alb['SecurityGroups']: | |
self.security_groups_in_use.add(group) | |
def get_rds_security_groups(self): | |
"""Adds security groups used by RDS.""" | |
rds_client = self.session.client('rds') | |
rdses = rds_client.describe_db_instances() | |
for rds in rdses['DBInstances']: | |
self.rds_count += 1 | |
for group in rds['VpcSecurityGroups']: | |
self.security_groups_in_use.add(group['VpcSecurityGroupId']) | |
def get_redshift_security_groups(self): | |
"""Adds security groups used by Redshift.""" | |
redshift_client = self.session.client('redshift') | |
redshifts = redshift_client.describe_clusters() | |
for cluster in redshifts['Clusters']: | |
self.redshift_count += 1 | |
for group in cluster['VpcSecurityGroups']: | |
self.security_groups_in_use.add(group['VpcSecurityGroupId']) | |
def calculate_stale_security_groups(self): | |
self.stale_groups = self.all_groups.difference(self.security_groups_in_use) | |
# Check if there is a SG in usd SGs but not in all. | |
# Could that ever be true? | |
assert(not self.security_groups_in_use.difference(self.all_groups)) | |
def report(detector_object): | |
print("---------------") | |
search_log = ( | |
"Searched through {} EC2 instances, " | |
"{} network interfaces, {} ELBs, {} ALBs, " | |
"{} RDS instances, {} Redshift clusters" | |
).format( | |
detector_object.ec2_instances_count, | |
detector_object.network_interfaces_count, | |
detector_object.elbs_count, detector_object.albs_count, | |
detector_object.rds_count, detector_object.redshift_count | |
) | |
print(search_log) | |
sg_log = "Found {} Security Groups in total, which {} are in-use.".format( | |
len(detector_object.all_groups), len(detector_object.security_groups_in_use) | |
) | |
print(sg_log) | |
stale_log = "Following {} security groups don't seem to be used:".format( | |
len(detector_object.stale_groups) | |
) | |
print(stale_log) | |
for sg in detector_object.stale_groups: | |
print " - ", sg | |
print("---------------") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Show unused security groups") | |
parser.add_argument( | |
"-p", "--profile", type=str, default="default", | |
help="The AWS profile that will be used." | |
) | |
parser.add_argument( | |
"-r", "--region", type=str, default="eu-west-1", | |
help="The default region is eu-west-1." | |
) | |
parser.add_argument( | |
"-d", "--delete", help="Delete security groups from AWS", | |
action="store_true" | |
) | |
parser.add_argument( | |
"-n", "--no-report", help="Skip showing report for Security Groups", | |
action="store_true", default=False | |
) | |
args = parser.parse_args() | |
detector = StaleSGDetector( | |
**{"region": args.region, "profile": args.profile} | |
) | |
detector.run() | |
if not args.no_report: | |
report(detector) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
line 146:
should read: