Skip to content

Instantly share code, notes, and snippets.

@mchestr
Last active November 4, 2015 17:29
Show Gist options
  • Save mchestr/cec7d970b08c46af4a8e to your computer and use it in GitHub Desktop.
Save mchestr/cec7d970b08c46af4a8e to your computer and use it in GitHub Desktop.
Clean Your AWS Regions
import argparse
import collections
import functools
import logging
import os
import re
import sys
import threading
from boto import ec2
from boto import vpc
from eventlet import greenthread
import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow import exceptions
from taskflow.types import futures
AWS_ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_KEY')
IGNORED_REGIONS = [
'cn-north-1',
'us-gov-west-1',
]
REGIONS = [
_r.name for _r in ec2.regions() if _r.name not in IGNORED_REGIONS
]
EXTRA_REGION_VALUES = set([
_r.split('-')[0] for _r in REGIONS
])
EXTRA_REGION_VALUES.add('all')
logging.basicConfig(level=logging.INFO)
logging.getLogger('boto').setLevel(logging.CRITICAL)
LOG = logging.getLogger(__name__)
def ignore_not_found(func):
@functools.wraps(func)
def on_call(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
if hasattr(exc, 'code'):
if exc.code.endswith('NotFound'):
return True
raise exc
return on_call
def log_entry_and_exit(func):
@functools.wraps(func)
def on_call(self, *args, **kwargs):
LOG.info(
"[%s] %s %s",
self._region,
' '.join(re.findall('[A-Z][^A-Z]*', self.__class__.__name__)),
self.name
)
ret_val = func(self, *args, **kwargs)
if ret_val:
LOG.info("[%s] %s Deleted.", self._region, self.name)
return ret_val
return on_call
class CleanNetworkInterface(task.Task):
def __init__(self, conn, interface):
self._conn = conn
self._region = conn.region.name
self._interface_id = interface.id
self._attachment = interface.attachment
super(CleanNetworkInterface, self).__init__(name=self._interface_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._attachment:
LOG.info(
"[%s] Detaching %s...",
self._region,
self._interface_id
)
self._conn.detach_network_interface(self._attachment.id)
LOG.info("[%s] %s Detached.", self._region, self._interface_id)
self._conn.delete_network_interface(self._interface_id)
return True
class CleanElasticIP(task.Task):
def __init__(self, conn, elastic_ip):
self._conn = conn
self._region = conn.region.name
self._allocation_id = elastic_ip.allocation_id
self._association_id = elastic_ip.association_id
super(CleanElasticIP, self).__init__(name=self._allocation_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._association_id:
LOG.info(
"[%s] Disassociating %s...",
self._region,
self._allocation_id
)
self._conn.disassociate_address(
association_id=self._association_id
)
LOG.info(
"[%s] %s Disassociated.",
self._region,
self._allocation_id
)
self._conn.release_address(allocation_id=self._allocation_id)
return True
class CleanInstance(task.Task):
def __init__(self, conn, instance):
self._conn = conn
self._region = conn.region.name
self._instance_id = instance.id
super(CleanInstance, self).__init__(name=self._instance_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
instance = self._conn.terminate_instances([self._instance_id])[0]
while instance.update() != 'terminated':
LOG.info(
"[%s] %s State: %s",
self._region,
self._instance_id,
instance.state
)
greenthread.sleep(1)
return True
class CleanSecurityGroup(task.Task):
def __init__(self, conn, security_group):
self._conn = conn
self._region = conn.region.name
self._security_group_id = security_group.id
self._security_group_name = security_group.name
super(CleanSecurityGroup, self).__init__(name=self._security_group_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._security_group_name == 'default':
LOG.info(
"[%s] Ignoring %s as it is a default security group.",
self._region,
self._security_group_id
)
return False
self._conn.delete_security_group(group_id=self._security_group_id)
return True
class CleanKeyPair(task.Task):
def __init__(self, conn, key_pair):
self._conn = conn
self._region = conn.region.name
self._key_pair_name = key_pair.name
super(CleanKeyPair, self).__init__(
name="{0}-{1}".format(self._region, self._key_pair_name)
)
@log_entry_and_exit
@ignore_not_found
def execute(self):
self._conn.delete_key_pair(self._key_pair_name)
return True
class CleanInternetGateway(task.Task):
def __init__(self, conn, internet_gateway):
self._conn = conn
self._region = conn.region.name
self._igw_id = internet_gateway.id
self._igw_attachments = internet_gateway.attachments
self._is_default = False
for attachment in self._igw_attachments:
self._vpc = self._conn.get_all_vpcs(attachment.vpc_id)[0]
if self._vpc.is_default:
self._is_default = True
super(CleanInternetGateway, self).__init__(name=self._igw_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._is_default:
LOG.info(
"[%s] Ignoring %s as it is attached to a default VPC",
self._region,
self._igw_id
)
return False
if self._igw_attachments:
LOG.info("[%s] Detaching %s...", self._region, self._igw_id)
self._conn.detach_internet_gateway(self._igw_id, self._vpc.id)
LOG.info("[%s] %s Detached.", self._region, self._igw_id)
self._conn.delete_internet_gateway(self._igw_id)
return True
class CleanSubnet(task.Task):
def __init__(self, conn, subnet):
self._conn = conn
self._region = conn.region.name
self._subnet_id = subnet.id
_vpc = self._conn.get_all_vpcs(subnet.vpc_id)[0]
self._is_default = _vpc.is_default
super(CleanSubnet, self).__init__(name=self._subnet_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._is_default:
LOG.info(
"[%s] Ignoring %s as it belongs to a default VPC",
self._region,
self._subnet_id
)
return False
self._conn.delete_subnet(self._subnet_id)
return True
class CleanVpc(task.Task):
def __init__(self, conn, _vpc):
self._conn = conn
self._region = conn.region.name
self._vpc_id = _vpc.id
self._is_default = _vpc.is_default
super(CleanVpc, self).__init__(name=self._vpc_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._is_default:
LOG.info(
"[%s] Ignoring %s as it is a default VPC",
self._region,
self._vpc_id
)
return False
self._conn.delete_vpc(self._vpc_id)
return True
class CleanNetworkAcl(task.Task):
def __init__(self, conn, network_acl):
self._conn = conn
self._region = conn.region.name
self._network_acl = network_acl
self._network_acl_id = network_acl.id
self._is_default = network_acl.default == u'true'
super(CleanNetworkAcl, self).__init__(name=self._network_acl_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._is_default:
LOG.info(
"[%s] Ignoring %s as it is a default NetworkACL",
self._region,
self._network_acl_id
)
return False
for ass in self._network_acl.associations:
self._conn.disassociate_network_acl(ass.subnet_id)
self._conn.delete_network_acl(self._network_acl_id)
return True
class CleanRouteTable(task.Task):
def __init__(self, conn, route_table):
self._conn = conn
self._region = conn.region.name
self._route_table = route_table
self._route_table_id = route_table.id
self._is_main = False
for ass in route_table.associations:
if ass.main:
self._is_main = True
break
super(CleanRouteTable, self).__init__(name=self._route_table_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
if self._is_main:
LOG.info(
"[%s] Ignoring %s as it is a main RouteTable",
self._region,
self._route_table_id
)
return False
for ass in self._route_table.associations:
self._conn.disassociate_route_table(ass.id)
self._conn.delete_route_table(self._route_table_id)
return True
class CleanCustomerGateway(task.Task):
def __init__(self, conn, cgw):
self._conn = conn
self._region = conn.region.name
self._cgw_id = cgw.id
super(CleanCustomerGateway, self).__init__(name=self._cgw_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
self._conn.delete_customer_gateway(self._cgw_id)
return True
class CleanVpnGateway(task.Task):
def __init__(self, conn, vgw):
self._conn = conn
self._region = conn.region.name
self._vgw = vgw
self._vgw_id = vgw.id
super(CleanVpnGateway, self).__init__(name=self._vgw_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
for attachment in self._vgw.attachments:
self._conn.detach_vpn_gateway(self._vgw_id, attachment.vpc_id)
self._conn.delete_vpn_gateway(self._vgw_id)
return True
class CleanVpnConnection(task.Task):
def __init__(self, conn, vpn_conn):
self._conn = conn
self._region = conn.region.name
self._vpn_conn_id = vpn_conn.id
super(CleanVpnConnection, self).__init__(name=self._vpn_conn_id)
@log_entry_and_exit
@ignore_not_found
def execute(self):
self._conn.delete_vpn_connection(self._vpn_conn_id)
return True
def parse_arguments():
parser = argparse.ArgumentParser(
description='Clean AWS Regions.',
usage='%(prog)s [region [ region ... ]]'
)
parser.add_argument(
'regions',
nargs='*',
choices=REGIONS+list(EXTRA_REGION_VALUES),
help='region(s) to clean'
)
parser.add_argument('--info',
action='store_true',
help='List all region info, do not clean.')
return parser.parse_args()
def connect(service, region):
return service.connect_to_region(
region,
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
def gather_regions_to_clean(args):
regions_to_clean = set()
for region in args.regions:
if region == 'all':
regions_to_clean.update(r for r in REGIONS)
elif region in EXTRA_REGION_VALUES:
regions_to_clean.update(r for r in REGIONS if r.startswith(region))
else:
regions_to_clean.add(region)
return regions_to_clean
def generate_tasks_flow(conn, task_class, objects):
region = conn.region.name
flow = gf.Flow("%s %s" % (task_class.__name__, region))
for obj in objects:
LOG.info("[%s] Generating clean task for resource %s", region, obj)
t = task_class(
conn,
obj
)
flow.add(t)
return flow
def update_shared_resources(conn, get_resource_fn, shared_resource):
region = conn.region.name
resources = getattr(conn, get_resource_fn)()
shared_resource[region][get_resource_fn] = resources
def gather_resources(shared_resource, vpc_conn, ec2_conn,
ec2_resources, vpc_resources):
for __, get_resource_fn in ec2_resources:
thread = threading.Thread(
target=update_shared_resources,
args=(ec2_conn, get_resource_fn, shared_resource,)
)
thread.daemon = True
threads.append(thread)
thread.start()
for __, get_resource_fn in vpc_resources:
thread = threading.Thread(
target=update_shared_resources,
args=(vpc_conn, get_resource_fn, shared_resource,)
)
thread.daemon = True
threads.append(thread)
thread.start()
threads = []
def main():
args = parse_arguments()
# Gather all regions to clean so we only clean each region once
regions = gather_regions_to_clean(args)
# Order matters, resources will be cleaned in order
ec2_resources_to_clean = [
(CleanInstance, 'get_only_instances'),
(CleanKeyPair, 'get_all_key_pairs'),
(CleanElasticIP, 'get_all_addresses'),
(CleanNetworkInterface, 'get_all_network_interfaces'),
(CleanSecurityGroup, 'get_all_security_groups'),
]
vpc_resources_to_clean = [
(CleanNetworkAcl, 'get_all_network_acls'),
(CleanRouteTable, 'get_all_route_tables'),
(CleanInternetGateway, 'get_all_internet_gateways'),
(CleanVpnConnection, 'get_all_vpn_connections'),
(CleanVpnGateway, 'get_all_vpn_gateways'),
(CleanCustomerGateway, 'get_all_customer_gateways'),
(CleanSubnet, 'get_all_subnets'),
(CleanVpc, 'get_all_vpcs'),
]
# Regions have no dependencies on each other so they can be executed in
# parallel
clean_flow = gf.Flow("Clean AWS")
LOG.info("Regions to clean: %s", regions)
resources = collections.defaultdict(dict)
region_conns = collections.defaultdict(dict)
for region in sorted(regions):
ec2_conn = connect(ec2, region)
vpc_conn = connect(vpc, region)
region_conns[region]['ec2'] = ec2_conn
region_conns[region]['vpc'] = vpc_conn
gather_resources(resources, vpc_conn, ec2_conn,
ec2_resources_to_clean, vpc_resources_to_clean)
print ''
while threads:
sys.stdout.write("\033[K")
sys.stdout.write('\r\033[FGathering Resources... %d left' % len(threads))
sys.stdout.flush()
thread = threads.pop()
thread.join()
sys.stdout.write("\033[K")
sys.stdout.write('\r\033[FGathering Resources... %d left' % len(threads))
sys.stdout.flush()
print "\nDone gathering resources for all regions."
for region in sorted(regions):
region_flow = lf.Flow("Clean Region %s" % region)
for t, get_resource_fn in ec2_resources_to_clean:
region_flow.add(
generate_tasks_flow(
region_conns[region]['ec2'],
t,
resources[region][get_resource_fn]
)
)
for t, get_resource_fn in vpc_resources_to_clean:
region_flow.add(
generate_tasks_flow(
region_conns[region]['vpc'],
t,
resources[region][get_resource_fn]
)
)
clean_flow.add(region_flow)
print "\n==========Resources on AWS=========="
for region, aws_objects in resources.iteritems():
for get_all_fn, aws_object in aws_objects.iteritems():
if len(aws_object) > 0:
print "[%s] %s - %s" % (
region,
' '.join(get_all_fn.split('_')[2:]),
len(aws_object))
print ''
if not args.info:
with futures.GreenThreadPoolExecutor() as executor:
try:
engine = taskflow.engines.load(
clean_flow,
executor=executor,
engine='parallel'
)
engine.run()
except exceptions.Empty:
LOG.info("Nothing to clean.")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment