Created
June 3, 2020 05:34
-
-
Save tkalus/9bf1cae852012083314aebfe60f88c42 to your computer and use it in GitHub Desktop.
AWS: Attempt to delete all default VPCs in all active regions, provided they appear unused and unmodified.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""AWS: Safely delete all default VPCs in all active regions.""" | |
import logging | |
import sys | |
from functools import partial | |
from itertools import chain | |
from typing import Callable | |
from boto3.session import Session | |
from botocore.exceptions import ClientError | |
log = logging.getLogger(__name__) | |
def get_inactive_ec2_regions(session: Session): | |
"""Yield inactive regions (for logging).""" | |
active_regions = tuple(get_active_ec2_regions(session)) | |
for region in ( | |
session.client("ec2").describe_regions(AllRegions=True).get("Regions", []) | |
): | |
region_name = region.get("RegionName") | |
if region_name not in active_regions: | |
yield region_name | |
def get_active_ec2_regions(session: Session): | |
"""Yield active regions.""" | |
for region in ( | |
session.client("ec2") | |
.describe_regions( | |
Filters=[ | |
{"Name": "opt-in-status", "Values": ["opt-in-not-required", "opted-in"]} | |
], | |
AllRegions=True, | |
) | |
.get("Regions", []) | |
): | |
yield region.get("RegionName") | |
def get_default_vpcs(session: Session, region_name: str): | |
"""List VPCs tagged isDefault = true.""" | |
for vpc in ( | |
session.client("ec2", region_name=region_name) | |
.describe_vpcs(Filters=[{"Name": "isDefault", "Values": ["true"]}]) | |
.get("Vpcs") | |
): | |
yield vpc.get("VpcId") | |
def vpc_only_has_defaults(session: Session, region_name: str, vpc_id: str): # noqa:C901 | |
""" | |
Iterate across the vpc_id, checking to ensure it appears unused and unmodified. | |
:param session: boto3.session.Session for AWS. | |
:param region_name: AWS Region to operate in. | |
:param vpc_id: VPC ID thought to be a default VPC. | |
:returns bool: True if appears to be default; False if not. | |
:raises: possible botocore.exceptions, uncaught. | |
""" | |
vpc = session.resource("ec2", region_name=region_name).Vpc(vpc_id) | |
is_default = True | |
for assoc in chain( | |
vpc.cidr_block_association_set or [], vpc.ipv6_cidr_block_association_set or [] | |
): | |
cidr_block = assoc.get("CidrBlock") or assoc.get("Ipv6CidrBlock") | |
if cidr_block == vpc.cidr_block: | |
# Safely skip the default cidr_block association | |
continue | |
state = assoc.get("CidrBlockState", {}).get("State") or assoc.get( | |
"Ipv6CidrBlockState", {} | |
).get("State") | |
if state != "associated": | |
# old and failed associated blocks are listed in the API | |
# Skip all but "associated" | |
continue | |
# Should be no additonal IPv4 CIDR Assoc's | |
log.error(f"CIDR Association {cidr_block}:{state} found in {vpc_id}") | |
is_default = False | |
for peering_connection in chain( | |
vpc.accepted_vpc_peering_connections.all() or [], | |
vpc.requested_vpc_peering_connections.all() or [], | |
): | |
# Should be no Peering Connections | |
log.error(f"Peering Connection {peering_connection.id} found in {vpc_id}") | |
is_default = False | |
for net_iface in vpc.network_interfaces.all(): | |
# Should be no Network Interfaces | |
log.error(f"Network Interface {net_iface.id} found in {vpc_id}") | |
is_default = False | |
for instance in vpc.instances.all(): | |
# Should be no Instances | |
log.error(f"Instance {instance.id} found in {vpc_id}") | |
is_default = False | |
for igw in vpc.internet_gateways.all(): | |
for vid in [a.get("VpcId") for a in igw.attachments]: | |
if vid != vpc_id: | |
# All IGW Assoications should be exclusive to the default | |
log.error( | |
f"Non default IGW Attachment ({vid}) found on {igw.id} in {vpc_id}" | |
) | |
is_default = False | |
for subnet in vpc.subnets.all(): | |
if not subnet.default_for_az: | |
# Ensure that no subnets were added after-the-fact | |
log.error(f"Non default Subnet ({subnet.id}) found in {vpc_id}") | |
is_default = False | |
for rtb in vpc.route_tables.all(): | |
if not all(attrib.get("Main") for attrib in rtb.associations_attribute): | |
# Ensure rtb assoications are _only_ main | |
log.error(f"RTB ({rtb.id}) with a non-main assoc found in {vpc_id}") | |
is_default = False | |
for route in rtb.routes: | |
if route.gateway_id == "local": | |
continue | |
if route.destination_cidr_block != "0.0.0.0/0": | |
log.error( | |
f"Found non-default Route {route.destination_cidr_block} in RTB ({rtb.id}) in {vpc_id}" | |
) | |
is_default = False | |
for nacl in vpc.network_acls.all(): | |
if not nacl.is_default: | |
log.error(f"Non default NACL ({nacl.id}) found in {vpc_id}") | |
is_default = False | |
if len([vpc.security_groups.all()]) > 1: | |
log.error(f"Non default SG found in {vpc_id}") | |
is_default = False | |
for sg in vpc.security_groups.all(): | |
if sg.group_name != "default": | |
log.error(f"Non default SG ({sg.id}) found in {vpc_id}") | |
is_default = False | |
return is_default | |
def try_except_dry_run(func: Callable[[None], None]) -> None: | |
"""Call passed function, catching and logging if we ONLY hit a DryRun "problem".""" | |
try: | |
func() | |
except ClientError as e: | |
if e.response["Error"]["Code"] != "DryRunOperation": | |
raise | |
log.debug(" DRYRUN: Skipped ^^^") | |
def delete_default_vpc_if_unmodified( | |
session: Session, region_name: str, vpc_id: str, dry_run: bool = True | |
) -> None: | |
""" | |
Delete the default VPC in region_name iff it appears to be unused and unmodified. | |
:param session: boto3.session.Session for AWS. | |
:param region_name: AWS Region to operate in. | |
:param vpc_id: VPC ID thought to be a default VPC. | |
:param dry_run: Whether or not to attempt to delete resources OR log whether you could. | |
:raises: possible botocore.exceptions, uncaught. | |
""" | |
if not vpc_only_has_defaults( | |
session=session, region_name=region_name, vpc_id=vpc_id | |
): | |
if dry_run: | |
log.error(f"VPC {vpc_id} appears to have been modified.") | |
else: | |
raise TypeError(f"VPC {vpc_id} appears to have been modified.") | |
return | |
ec2 = session.resource("ec2", region_name=region_name) | |
vpc = ec2.Vpc(vpc_id) | |
for igw in vpc.internet_gateways.all(): | |
log.info(f"Detaching igw {igw.id} from VPC {vpc_id}") | |
try_except_dry_run(partial(igw.detach_from_vpc, DryRun=dry_run, VpcId=vpc_id)) | |
log.info(f"Deleting igw {igw.id} from VPC {vpc_id}") | |
try_except_dry_run(partial(igw.delete, DryRun=dry_run)) | |
for subnet in vpc.subnets.all(): | |
log.info(f"Deleting subnet {subnet.id} from VPC {vpc_id}") | |
try_except_dry_run(partial(subnet.delete, DryRun=dry_run)) | |
for rtb in vpc.route_tables.all(): | |
for route in rtb.routes: | |
if route.gateway_id == "local": | |
# Can't delete local routes | |
continue | |
log.info( | |
f"Deleting route {route.destination_cidr_block} via" | |
f" {route.gateway_id} on rtb {rtb.id} in VPC {vpc_id}" | |
) | |
try_except_dry_run(partial(route.delete, DryRun=dry_run)) | |
log.info(f"Route Table {rtb.id} will be deleted with VPC {vpc_id}") | |
for nacl in vpc.network_acls.all(): | |
log.info(f"Default NACL {nacl.id} will be deleted wtih VPC {vpc_id}") | |
for sg in vpc.security_groups.all(): | |
log.info(f"Default SG {sg.id} will be deleted with VPC {vpc_id}") | |
log.info(f"Deleting VPC {vpc_id}") | |
try_except_dry_run(partial(vpc.delete, DryRun=dry_run)) | |
def main(dry_run: bool = False): | |
"""Collect Active Regions and Default VPCs and see about deleting them.""" | |
session = Session() | |
for region_name in sorted(get_inactive_ec2_regions(session)): | |
log.info(f"Inactive REGION: {region_name}") | |
for region_name in sorted(get_active_ec2_regions(session)): | |
log.info(f"Working on REGION: {region_name}") | |
for vpc_id in get_default_vpcs(session=session, region_name=region_name): | |
log.info(f" VPC Id: {vpc_id}") | |
delete_default_vpc_if_unmodified( | |
session=session, region_name=region_name, vpc_id=vpc_id, dry_run=dry_run | |
) | |
if __name__ == "__main__": | |
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) | |
logging.getLogger("botocore").setLevel(logging.ERROR) | |
logging.getLogger("boto3").setLevel(logging.ERROR) | |
logging.getLogger("urllib3").setLevel(logging.ERROR) | |
main(dry_run=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment