Last active
December 6, 2019 23:13
-
-
Save chowyi/d2b9cb2e555fc45726735ea1f2421bea to your computer and use it in GitHub Desktop.
python script to clean AWS useless security groups.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding:utf-8 | |
| import boto3 | |
| import logging | |
| logging.basicConfig( | |
| format='%(levelname)s %(asctime)s\n%(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S', | |
| filename='./clean_security_groups.log', | |
| level=logging.INFO, | |
| filemode='a' | |
| ) | |
| def get_all_security_groups(client): | |
| groups = [] | |
| params = {'MaxResults': 1000} | |
| while True: | |
| result = client.describe_security_groups(**params) | |
| groups += result['SecurityGroups'] | |
| if not result.get('NextToken'): | |
| break | |
| params['NextToken'] = result['NextToken'] | |
| return groups | |
| def get_all_instances(client): | |
| instances = [] | |
| params = {'MaxResults': 1000} | |
| while True: | |
| result = client.describe_instances(**params) | |
| instances += reduce(list.__add__, [reservation['Instances'] for reservation in result['Reservations']]) | |
| if not result.get('NextToken'): | |
| break | |
| params['NextToken'] = result['NextToken'] | |
| return instances | |
| def get_all_load_balancers(client): | |
| load_balancers = [] | |
| params = {'PageSize': 400} | |
| while True: | |
| result = client.describe_load_balancers(**params) | |
| load_balancers += result['LoadBalancerDescriptions'] | |
| if not result.get('NextMarker'): | |
| break | |
| params['Marker'] = result['NextMarker'] | |
| return load_balancers | |
| def get_all_load_balancers_v2(client): | |
| load_balancers = [] | |
| params = {'PageSize': 400} | |
| while True: | |
| result = client.describe_load_balancers(**params) | |
| load_balancers += result['LoadBalancers'] | |
| if not result.get('NextMarker'): | |
| break | |
| params['Marker'] = result['NextMarker'] | |
| return load_balancers | |
| def get_all_rds_instances(client): | |
| rds_instances = [] | |
| params = {'MaxRecords': 100} | |
| while True: | |
| result = client.describe_db_instances(**params) | |
| rds_instances += result['DBInstances'] | |
| if not result.get('Marker'): | |
| break | |
| params['Marker'] = result['Marker'] | |
| return rds_instances | |
| def get_all_network_interfaces(client): | |
| result = client.describe_network_interfaces() | |
| return result['NetworkInterfaces'] | |
| def clean_useless_security_groups(ak, sk, region='us-east-1'): | |
| ec2_client = boto3.client('ec2', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region) | |
| elb_client = boto3.client('elb', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region) | |
| elbv2_client = boto3.client('elbv2', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region) | |
| rds_client = boto3.client('rds', aws_access_key_id=ak, aws_secret_access_key=sk, region_name=region) | |
| instances = get_all_instances(ec2_client) | |
| security_groups = get_all_security_groups(ec2_client) | |
| load_balancers = get_all_load_balancers(elb_client) | |
| load_balancers_v2 = get_all_load_balancers_v2(elbv2_client) | |
| rds_instances = get_all_rds_instances(rds_client) | |
| interfaces = get_all_network_interfaces(ec2_client) | |
| print('instances count:', len(instances)) | |
| print('security_groups count:', len(security_groups)) | |
| print('load_balancers count:', len(load_balancers) + len(load_balancers_v2)) | |
| print('rds_instances count:', len(rds_instances)) | |
| sec_group_ids_from_instances = reduce(list.__add__, [[group['GroupId'] for group in instance['SecurityGroups']] for instance in instances]) | |
| sec_group_ids_from_elb = reduce(list.__add__, [elb['SecurityGroups'] for elb in load_balancers]) | |
| sec_group_ids_from_elbv2 = reduce(list.__add__, [elb['SecurityGroups'] for elb in load_balancers_v2 if elb.get('SecurityGroups')]) | |
| sec_group_ids_from_rds = reduce(list.__add__, [[group['VpcSecurityGroupId'] for group in rds['VpcSecurityGroups']] for rds in rds_instances]) | |
| sec_group_ids_from_interfaces = reduce(list.__add__, [[group['GroupId'] for group in interface['Groups']] for interface in interfaces]) | |
| sec_group_ids_from_parent = [] | |
| for group in security_groups: | |
| for perm in group['IpPermissions']: | |
| if perm['UserIdGroupPairs']: | |
| sec_group_ids_from_parent += [pair['GroupId'] for pair in perm['UserIdGroupPairs']] | |
| security_group_ids_in_use = list(set(sec_group_ids_from_instances + sec_group_ids_from_elb + sec_group_ids_from_elbv2 + sec_group_ids_from_parent + sec_group_ids_from_rds + sec_group_ids_from_interfaces)) | |
| useless_security_groups = [group for group in security_groups if group['GroupId'] not in security_group_ids_in_use] | |
| print('find {count} useless security_groups.'.format(count=len(useless_security_groups))) | |
| count = 0 | |
| for group in useless_security_groups: | |
| group_id, group_name = group['GroupId'], group['GroupName'] | |
| try: | |
| logging.info('try to delete {groupid} {groupname}'.format(groupid=group_id, groupname=group_name)) | |
| print(group['GroupId'], group['GroupName']) | |
| ec2_client.delete_security_group(GroupId=group_id) | |
| logging.info('Already deleted {groupid} {groupname}'.format(groupid=group_id, groupname=group_name)) | |
| count += 1 | |
| except Exception as e: | |
| logging.info('Failed to delete {groupid} {groupname}. {e}'.format(groupid=group_id, groupname=group_name, e=str(e))) | |
| print('deleted security groups {count}.'.format(count=count)) | |
| def main(): | |
| clean_useless_security_groups(ak='your-aws-access-key', sk='your-aws-secret-key') | |
| if __name__ == '__main__': | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment