Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save filipeandre/f9a6e04f7e07ff3d8c971c3b54411d97 to your computer and use it in GitHub Desktop.
Save filipeandre/f9a6e04f7e07ff3d8c971c3b54411d97 to your computer and use it in GitHub Desktop.
python script to clean AWS useless security groups.
# coding:utf-8
import boto3
import logging
logging.basicConfig(
format='%(levelname)s %(asctime)s\n%(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='./clean_security_groups.log',
level=logging.INFO,
filemode='a'
)
def get_all_security_groups(client):
groups = []
params = {'MaxResults': 1000}
while True:
result = client.describe_security_groups(**params)
groups += result['SecurityGroups']
if not result.get('NextToken'):
break
params['NextToken'] = result['NextToken']
return groups
def get_all_instances(client):
instances = []
params = {'MaxResults': 1000}
while True:
result = client.describe_instances(**params)
instances += reduce(list.__add__, [reservation['Instances'] for reservation in result['Reservations']])
if not result.get('NextToken'):
break
params['NextToken'] = result['NextToken']
return instances
def get_all_load_balancers(client):
load_balancers = []
params = {'PageSize': 400}
while True:
result = client.describe_load_balancers(**params)
load_balancers += result['LoadBalancerDescriptions']
if not result.get('NextMarker'):
break
params['Marker'] = result['NextMarker']
return load_balancers
def get_all_load_balancers_v2(client):
load_balancers = []
params = {'PageSize': 400}
while True:
result = client.describe_load_balancers(**params)
load_balancers += result['LoadBalancers']
if not result.get('NextMarker'):
break
params['Marker'] = result['NextMarker']
return load_balancers
def get_all_rds_instances(client):
rds_instances = []
params = {'MaxRecords': 100}
while True:
result = client.describe_db_instances(**params)
rds_instances += result['DBInstances']
if not result.get('Marker'):
break
params['Marker'] = result['Marker']
return rds_instances
def get_all_network_interfaces(client):
result = client.describe_network_interfaces()
return result['NetworkInterfaces']
def clean_useless_security_groups(ak, sk, region='us-east-1'):
ec2_client = boto3.client('ec2', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region)
elb_client = boto3.client('elb', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region)
elbv2_client = boto3.client('elbv2', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region)
rds_client = boto3.client('rds', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region)
instances = get_all_instances(ec2_client)
security_groups = get_all_security_groups(ec2_client)
load_balancers = get_all_load_balancers(elb_client)
load_balancers_v2 = get_all_load_balancers_v2(elbv2_client)
rds_instances = get_all_rds_instances(rds_client)
interfaces = get_all_network_interfaces(ec2_client)
print('instances count:', len(instances))
print('security_groups count:', len(security_groups))
print('load_balancers count:', len(load_balancers) + len(load_balancers_v2))
print('rds_instances count:', len(rds_instances))
sec_group_ids_from_instances = reduce(list.__add__, [[group['GroupId'] for group in instance['SecurityGroups']] for instance in instances])
sec_group_ids_from_elb = reduce(list.__add__, [elb['SecurityGroups'] for elb in load_balancers])
sec_group_ids_from_elbv2 = reduce(list.__add__, [elb['SecurityGroups'] for elb in load_balancers_v2 if elb.get('SecurityGroups')])
sec_group_ids_from_rds = reduce(list.__add__, [[group['VpcSecurityGroupId'] for group in rds['VpcSecurityGroups']] for rds in rds_instances])
sec_group_ids_from_interfaces = reduce(list.__add__, [[group['GroupId'] for group in interface['Groups']] for interface in interfaces])
sec_group_ids_from_parent = []
for group in security_groups:
for perm in group['IpPermissions']:
if perm['UserIdGroupPairs']:
sec_group_ids_from_parent += [pair['GroupId'] for pair in perm['UserIdGroupPairs']]
security_group_ids_in_use = list(set(sec_group_ids_from_instances + sec_group_ids_from_elb + sec_group_ids_from_elbv2 + sec_group_ids_from_parent + sec_group_ids_from_rds + sec_group_ids_from_interfaces))
useless_security_groups = [group for group in security_groups if group['GroupId'] not in security_group_ids_in_use]
print('find {count} useless security_groups.'.format(count=len(useless_security_groups)))
count = 0
for group in useless_security_groups:
group_id, group_name = group['GroupId'], group['GroupName']
try:
logging.info('try to delete {groupid} {groupname}'.format(groupid=group_id, groupname=group_name))
print(group['GroupId'], group['GroupName'])
ec2_client.delete_security_group(GroupId=group_id)
logging.info('Already deleted {groupid} {groupname}'.format(groupid=group_id, groupname=group_name))
count += 1
except Exception as e:
logging.info('Failed to delete {groupid} {groupname}. {e}'.format(groupid=group_id, groupname=group_name, e=str(e)))
print('deleted security groups {count}.'.format(count=count))
def main():
clean_useless_security_groups(ak='your-aws-access-key', sk='your-aws-secret-key')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment