Skip to content

Instantly share code, notes, and snippets.

@toabctl
Last active December 12, 2024 17:58
Show Gist options
  • Save toabctl/2c88faa8f0165faa949447920806c3f6 to your computer and use it in GitHub Desktop.
Save toabctl/2c88faa8f0165faa949447920806c3f6 to your computer and use it in GitHub Desktop.
delete images and snapshots
#!/usr/bin/env python3
"""
Delete all snapshots
!!!!!!!!!!!!!!!! BE CAREFUL !!!!!!!!!!!!!!!!!!!!!!!!!
AWS_DEFAULT_REGION=sa-east-1 AWS_PROFILE=my-profile ./aws-snapshots-delete.py
Or to iterate over all regions:
for region in $(aws ec2 describe-regions | jq -r '.Regions[] | .RegionName'); do
echo $region; AWS_DEFAULT_REGION=$region ./aws-images-delete.py
done
"""
import concurrent.futures
import boto3
import botocore
import sys
ec2res = boto3.resource('ec2')
ec2client = boto3.client('ec2')
stsclient = boto3.client("sts")
# safetly net - don't delete from important Canonical accounts!
account_id = stsclient.get_caller_identity()["Account"]
print(f"account id: {account_id}")
if account_id in ["099720109477", "513442679011", "837727238323", "156445921216"]:
print("#" * 80)
print(f"This looks like the wrong account ID: {account_id}!!! STOP !!!")
print("#" * 80)
sys.exit(1)
region_name = ec2client.meta.region_name
filters = [
{"Name": "image-type", "Values": ["machine"]},
# {"Name": "is-public", "Values": ["true"]},
]
image_list = ec2client.describe_images(Owners=["self"], Filters=filters)["Images"]
snapshot_list = ec2client.describe_snapshots(OwnerIds=["self"])["Snapshots"]
print(f'{region_name}: {len(image_list)} images will be deleted ...')
print(f'{region_name}: {len(snapshot_list)} snapshot will be deleted ...')
def _snapshot_delete(snapshot):
c = boto3.client('ec2')
try:
c.delete_snapshot(SnapshotId=snapshot["SnapshotId"])
print(f'{region_name}: snapshot {snapshot["SnapshotId"]} deleted')
except botocore.exceptions.ClientError as e:
print(e)
def _image_delete(image):
"""
Delete image and used root device snapshot
"""
c = boto3.client('ec2')
# delete AMI
try:
c.deregister_image(ImageId=image["ImageId"])
print(f'{region_name}: image {image["Name"]} ({image["ImageId"]}) deleted')
except botocore.exceptions.ClientError as e:
print(e)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for image in image_list:
futures.append(executor.submit(_image_delete, image))
print(f'{region_name}: waiting for {len(futures)} futures to complete to delete images ...')
for future in concurrent.futures.as_completed(futures):
res = future.result()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for snapshot in snapshot_list:
futures.append(executor.submit(_snapshot_delete, snapshot))
print(f'{region_name}: waiting for {len(futures)} futures to complete to delete snapshots ...')
for future in concurrent.futures.as_completed(futures):
res = future.result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment