Last active
November 4, 2015 17:29
-
-
Save mchestr/cec7d970b08c46af4a8e to your computer and use it in GitHub Desktop.
Clean Your AWS Regions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import collections | |
import functools | |
import logging | |
import os | |
import re | |
import sys | |
import threading | |
from boto import ec2 | |
from boto import vpc | |
from eventlet import greenthread | |
import taskflow.engines | |
from taskflow.patterns import graph_flow as gf | |
from taskflow.patterns import linear_flow as lf | |
from taskflow import task | |
from taskflow import exceptions | |
from taskflow.types import futures | |
AWS_ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY') | |
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_KEY') | |
IGNORED_REGIONS = [ | |
'cn-north-1', | |
'us-gov-west-1', | |
] | |
REGIONS = [ | |
_r.name for _r in ec2.regions() if _r.name not in IGNORED_REGIONS | |
] | |
EXTRA_REGION_VALUES = set([ | |
_r.split('-')[0] for _r in REGIONS | |
]) | |
EXTRA_REGION_VALUES.add('all') | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger('boto').setLevel(logging.CRITICAL) | |
LOG = logging.getLogger(__name__) | |
def ignore_not_found(func): | |
@functools.wraps(func) | |
def on_call(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as exc: | |
if hasattr(exc, 'code'): | |
if exc.code.endswith('NotFound'): | |
return True | |
raise exc | |
return on_call | |
def log_entry_and_exit(func): | |
@functools.wraps(func) | |
def on_call(self, *args, **kwargs): | |
LOG.info( | |
"[%s] %s %s", | |
self._region, | |
' '.join(re.findall('[A-Z][^A-Z]*', self.__class__.__name__)), | |
self.name | |
) | |
ret_val = func(self, *args, **kwargs) | |
if ret_val: | |
LOG.info("[%s] %s Deleted.", self._region, self.name) | |
return ret_val | |
return on_call | |
class CleanNetworkInterface(task.Task): | |
def __init__(self, conn, interface): | |
self._conn = conn | |
self._region = conn.region.name | |
self._interface_id = interface.id | |
self._attachment = interface.attachment | |
super(CleanNetworkInterface, self).__init__(name=self._interface_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._attachment: | |
LOG.info( | |
"[%s] Detaching %s...", | |
self._region, | |
self._interface_id | |
) | |
self._conn.detach_network_interface(self._attachment.id) | |
LOG.info("[%s] %s Detached.", self._region, self._interface_id) | |
self._conn.delete_network_interface(self._interface_id) | |
return True | |
class CleanElasticIP(task.Task): | |
def __init__(self, conn, elastic_ip): | |
self._conn = conn | |
self._region = conn.region.name | |
self._allocation_id = elastic_ip.allocation_id | |
self._association_id = elastic_ip.association_id | |
super(CleanElasticIP, self).__init__(name=self._allocation_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._association_id: | |
LOG.info( | |
"[%s] Disassociating %s...", | |
self._region, | |
self._allocation_id | |
) | |
self._conn.disassociate_address( | |
association_id=self._association_id | |
) | |
LOG.info( | |
"[%s] %s Disassociated.", | |
self._region, | |
self._allocation_id | |
) | |
self._conn.release_address(allocation_id=self._allocation_id) | |
return True | |
class CleanInstance(task.Task): | |
def __init__(self, conn, instance): | |
self._conn = conn | |
self._region = conn.region.name | |
self._instance_id = instance.id | |
super(CleanInstance, self).__init__(name=self._instance_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
instance = self._conn.terminate_instances([self._instance_id])[0] | |
while instance.update() != 'terminated': | |
LOG.info( | |
"[%s] %s State: %s", | |
self._region, | |
self._instance_id, | |
instance.state | |
) | |
greenthread.sleep(1) | |
return True | |
class CleanSecurityGroup(task.Task): | |
def __init__(self, conn, security_group): | |
self._conn = conn | |
self._region = conn.region.name | |
self._security_group_id = security_group.id | |
self._security_group_name = security_group.name | |
super(CleanSecurityGroup, self).__init__(name=self._security_group_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._security_group_name == 'default': | |
LOG.info( | |
"[%s] Ignoring %s as it is a default security group.", | |
self._region, | |
self._security_group_id | |
) | |
return False | |
self._conn.delete_security_group(group_id=self._security_group_id) | |
return True | |
class CleanKeyPair(task.Task): | |
def __init__(self, conn, key_pair): | |
self._conn = conn | |
self._region = conn.region.name | |
self._key_pair_name = key_pair.name | |
super(CleanKeyPair, self).__init__( | |
name="{0}-{1}".format(self._region, self._key_pair_name) | |
) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
self._conn.delete_key_pair(self._key_pair_name) | |
return True | |
class CleanInternetGateway(task.Task): | |
def __init__(self, conn, internet_gateway): | |
self._conn = conn | |
self._region = conn.region.name | |
self._igw_id = internet_gateway.id | |
self._igw_attachments = internet_gateway.attachments | |
self._is_default = False | |
for attachment in self._igw_attachments: | |
self._vpc = self._conn.get_all_vpcs(attachment.vpc_id)[0] | |
if self._vpc.is_default: | |
self._is_default = True | |
super(CleanInternetGateway, self).__init__(name=self._igw_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._is_default: | |
LOG.info( | |
"[%s] Ignoring %s as it is attached to a default VPC", | |
self._region, | |
self._igw_id | |
) | |
return False | |
if self._igw_attachments: | |
LOG.info("[%s] Detaching %s...", self._region, self._igw_id) | |
self._conn.detach_internet_gateway(self._igw_id, self._vpc.id) | |
LOG.info("[%s] %s Detached.", self._region, self._igw_id) | |
self._conn.delete_internet_gateway(self._igw_id) | |
return True | |
class CleanSubnet(task.Task): | |
def __init__(self, conn, subnet): | |
self._conn = conn | |
self._region = conn.region.name | |
self._subnet_id = subnet.id | |
_vpc = self._conn.get_all_vpcs(subnet.vpc_id)[0] | |
self._is_default = _vpc.is_default | |
super(CleanSubnet, self).__init__(name=self._subnet_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._is_default: | |
LOG.info( | |
"[%s] Ignoring %s as it belongs to a default VPC", | |
self._region, | |
self._subnet_id | |
) | |
return False | |
self._conn.delete_subnet(self._subnet_id) | |
return True | |
class CleanVpc(task.Task): | |
def __init__(self, conn, _vpc): | |
self._conn = conn | |
self._region = conn.region.name | |
self._vpc_id = _vpc.id | |
self._is_default = _vpc.is_default | |
super(CleanVpc, self).__init__(name=self._vpc_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._is_default: | |
LOG.info( | |
"[%s] Ignoring %s as it is a default VPC", | |
self._region, | |
self._vpc_id | |
) | |
return False | |
self._conn.delete_vpc(self._vpc_id) | |
return True | |
class CleanNetworkAcl(task.Task): | |
def __init__(self, conn, network_acl): | |
self._conn = conn | |
self._region = conn.region.name | |
self._network_acl = network_acl | |
self._network_acl_id = network_acl.id | |
self._is_default = network_acl.default == u'true' | |
super(CleanNetworkAcl, self).__init__(name=self._network_acl_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._is_default: | |
LOG.info( | |
"[%s] Ignoring %s as it is a default NetworkACL", | |
self._region, | |
self._network_acl_id | |
) | |
return False | |
for ass in self._network_acl.associations: | |
self._conn.disassociate_network_acl(ass.subnet_id) | |
self._conn.delete_network_acl(self._network_acl_id) | |
return True | |
class CleanRouteTable(task.Task): | |
def __init__(self, conn, route_table): | |
self._conn = conn | |
self._region = conn.region.name | |
self._route_table = route_table | |
self._route_table_id = route_table.id | |
self._is_main = False | |
for ass in route_table.associations: | |
if ass.main: | |
self._is_main = True | |
break | |
super(CleanRouteTable, self).__init__(name=self._route_table_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
if self._is_main: | |
LOG.info( | |
"[%s] Ignoring %s as it is a main RouteTable", | |
self._region, | |
self._route_table_id | |
) | |
return False | |
for ass in self._route_table.associations: | |
self._conn.disassociate_route_table(ass.id) | |
self._conn.delete_route_table(self._route_table_id) | |
return True | |
class CleanCustomerGateway(task.Task): | |
def __init__(self, conn, cgw): | |
self._conn = conn | |
self._region = conn.region.name | |
self._cgw_id = cgw.id | |
super(CleanCustomerGateway, self).__init__(name=self._cgw_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
self._conn.delete_customer_gateway(self._cgw_id) | |
return True | |
class CleanVpnGateway(task.Task): | |
def __init__(self, conn, vgw): | |
self._conn = conn | |
self._region = conn.region.name | |
self._vgw = vgw | |
self._vgw_id = vgw.id | |
super(CleanVpnGateway, self).__init__(name=self._vgw_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
for attachment in self._vgw.attachments: | |
self._conn.detach_vpn_gateway(self._vgw_id, attachment.vpc_id) | |
self._conn.delete_vpn_gateway(self._vgw_id) | |
return True | |
class CleanVpnConnection(task.Task): | |
def __init__(self, conn, vpn_conn): | |
self._conn = conn | |
self._region = conn.region.name | |
self._vpn_conn_id = vpn_conn.id | |
super(CleanVpnConnection, self).__init__(name=self._vpn_conn_id) | |
@log_entry_and_exit | |
@ignore_not_found | |
def execute(self): | |
self._conn.delete_vpn_connection(self._vpn_conn_id) | |
return True | |
def parse_arguments(): | |
parser = argparse.ArgumentParser( | |
description='Clean AWS Regions.', | |
usage='%(prog)s [region [ region ... ]]' | |
) | |
parser.add_argument( | |
'regions', | |
nargs='*', | |
choices=REGIONS+list(EXTRA_REGION_VALUES), | |
help='region(s) to clean' | |
) | |
parser.add_argument('--info', | |
action='store_true', | |
help='List all region info, do not clean.') | |
return parser.parse_args() | |
def connect(service, region): | |
return service.connect_to_region( | |
region, | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
) | |
def gather_regions_to_clean(args): | |
regions_to_clean = set() | |
for region in args.regions: | |
if region == 'all': | |
regions_to_clean.update(r for r in REGIONS) | |
elif region in EXTRA_REGION_VALUES: | |
regions_to_clean.update(r for r in REGIONS if r.startswith(region)) | |
else: | |
regions_to_clean.add(region) | |
return regions_to_clean | |
def generate_tasks_flow(conn, task_class, objects): | |
region = conn.region.name | |
flow = gf.Flow("%s %s" % (task_class.__name__, region)) | |
for obj in objects: | |
LOG.info("[%s] Generating clean task for resource %s", region, obj) | |
t = task_class( | |
conn, | |
obj | |
) | |
flow.add(t) | |
return flow | |
def update_shared_resources(conn, get_resource_fn, shared_resource): | |
region = conn.region.name | |
resources = getattr(conn, get_resource_fn)() | |
shared_resource[region][get_resource_fn] = resources | |
def gather_resources(shared_resource, vpc_conn, ec2_conn, | |
ec2_resources, vpc_resources): | |
for __, get_resource_fn in ec2_resources: | |
thread = threading.Thread( | |
target=update_shared_resources, | |
args=(ec2_conn, get_resource_fn, shared_resource,) | |
) | |
thread.daemon = True | |
threads.append(thread) | |
thread.start() | |
for __, get_resource_fn in vpc_resources: | |
thread = threading.Thread( | |
target=update_shared_resources, | |
args=(vpc_conn, get_resource_fn, shared_resource,) | |
) | |
thread.daemon = True | |
threads.append(thread) | |
thread.start() | |
threads = [] | |
def main(): | |
args = parse_arguments() | |
# Gather all regions to clean so we only clean each region once | |
regions = gather_regions_to_clean(args) | |
# Order matters, resources will be cleaned in order | |
ec2_resources_to_clean = [ | |
(CleanInstance, 'get_only_instances'), | |
(CleanKeyPair, 'get_all_key_pairs'), | |
(CleanElasticIP, 'get_all_addresses'), | |
(CleanNetworkInterface, 'get_all_network_interfaces'), | |
(CleanSecurityGroup, 'get_all_security_groups'), | |
] | |
vpc_resources_to_clean = [ | |
(CleanNetworkAcl, 'get_all_network_acls'), | |
(CleanRouteTable, 'get_all_route_tables'), | |
(CleanInternetGateway, 'get_all_internet_gateways'), | |
(CleanVpnConnection, 'get_all_vpn_connections'), | |
(CleanVpnGateway, 'get_all_vpn_gateways'), | |
(CleanCustomerGateway, 'get_all_customer_gateways'), | |
(CleanSubnet, 'get_all_subnets'), | |
(CleanVpc, 'get_all_vpcs'), | |
] | |
# Regions have no dependencies on each other so they can be executed in | |
# parallel | |
clean_flow = gf.Flow("Clean AWS") | |
LOG.info("Regions to clean: %s", regions) | |
resources = collections.defaultdict(dict) | |
region_conns = collections.defaultdict(dict) | |
for region in sorted(regions): | |
ec2_conn = connect(ec2, region) | |
vpc_conn = connect(vpc, region) | |
region_conns[region]['ec2'] = ec2_conn | |
region_conns[region]['vpc'] = vpc_conn | |
gather_resources(resources, vpc_conn, ec2_conn, | |
ec2_resources_to_clean, vpc_resources_to_clean) | |
print '' | |
while threads: | |
sys.stdout.write("\033[K") | |
sys.stdout.write('\r\033[FGathering Resources... %d left' % len(threads)) | |
sys.stdout.flush() | |
thread = threads.pop() | |
thread.join() | |
sys.stdout.write("\033[K") | |
sys.stdout.write('\r\033[FGathering Resources... %d left' % len(threads)) | |
sys.stdout.flush() | |
print "\nDone gathering resources for all regions." | |
for region in sorted(regions): | |
region_flow = lf.Flow("Clean Region %s" % region) | |
for t, get_resource_fn in ec2_resources_to_clean: | |
region_flow.add( | |
generate_tasks_flow( | |
region_conns[region]['ec2'], | |
t, | |
resources[region][get_resource_fn] | |
) | |
) | |
for t, get_resource_fn in vpc_resources_to_clean: | |
region_flow.add( | |
generate_tasks_flow( | |
region_conns[region]['vpc'], | |
t, | |
resources[region][get_resource_fn] | |
) | |
) | |
clean_flow.add(region_flow) | |
print "\n==========Resources on AWS==========" | |
for region, aws_objects in resources.iteritems(): | |
for get_all_fn, aws_object in aws_objects.iteritems(): | |
if len(aws_object) > 0: | |
print "[%s] %s - %s" % ( | |
region, | |
' '.join(get_all_fn.split('_')[2:]), | |
len(aws_object)) | |
print '' | |
if not args.info: | |
with futures.GreenThreadPoolExecutor() as executor: | |
try: | |
engine = taskflow.engines.load( | |
clean_flow, | |
executor=executor, | |
engine='parallel' | |
) | |
engine.run() | |
except exceptions.Empty: | |
LOG.info("Nothing to clean.") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment