Created
August 9, 2021 14:24
-
-
Save diasjorge/bf4e392b20beffbf46256d76ce5f879f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
import csv | |
import re | |
from collections import OrderedDict | |
from pprint import pprint | |
import boto3 | |
import click | |
from botocore.exceptions import ClientError | |
ec2 = None | |
exists_icon = '✅' | |
not_exists_icon = '❌' | |
@click.group() | |
def cli(): | |
''' | |
Helper commands for Snapshots management. | |
''' | |
pass | |
@cli.command() | |
@click.option('-r', '--regions', required=True, help='Comma separated list') | |
def snapshot_report(regions): | |
''' | |
Find unreferenced snapshots. | |
''' | |
global ec2 | |
with open('report.csv', 'w') as csv_file: | |
csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) | |
csv_writer.writerow([ | |
'id', | |
'volume_id', | |
'volume_exists', | |
'ami_id', | |
'ami_exists', | |
'instance_id', | |
'instance_exists', | |
'size', | |
'start_time', | |
'description' ]) | |
for region in regions: | |
ec2 = boto3.client('ec2', region_name=region) | |
for snapshot in get_snapshots(): | |
csv_writer.writerow([ | |
snapshot['id'], | |
snapshot['volume_id'], | |
snapshot['volume_exists'], | |
snapshot['ami_id'], | |
snapshot['ami_exists'], | |
snapshot['instance_id'], | |
snapshot['instance_exists'], | |
str(snapshot['size']) + 'gb', | |
str(snapshot['start_time']), | |
snapshot['description']]) | |
@cli.command() | |
@click.option('-f', '--force', is_flag=True) | |
@click.option('-r', '--regions', required=True, help='Comma separated list') | |
def snapshot_cleanup(force, regions): | |
''' | |
Find and delete unreferenced snapshots. | |
''' | |
global ec2 | |
for region in regions.split(','): | |
ec2 = boto3.client('ec2', region_name=region) | |
print('region={}'.format(region)) | |
print('{:22} {:23} {:23} {:23} {:>7} {:25} {:30}'.format('snapshot id', 'volume id', 'ami id', | |
'instance id', 'size', 'start time', 'description')) | |
for snapshot in get_snapshots(): | |
volume_exists = exists_icon if snapshot['volume_exists'] else not_exists_icon | |
ami_exists = exists_icon if snapshot['ami_exists'] else not_exists_icon | |
instance_exists = exists_icon if snapshot['instance_exists'] else not_exists_icon | |
print('{:22} {:22} {:22} {:22} {:>7} {:25} {:30}'.format( | |
snapshot['id'], | |
snapshot['volume_id'] + volume_exists, | |
snapshot['ami_id'] + ami_exists, | |
snapshot['instance_id'] + instance_exists, | |
str(snapshot['size']) + 'gb', | |
str(snapshot['start_time']), | |
snapshot['description'] | |
)) | |
if not snapshot['volume_exists'] and not snapshot['ami_exists'] and not snapshot['instance_exists'] and (force or click.confirm('Delete?', default=True)): | |
snapshot_delete(snapshot['id']) | |
@cli.command() | |
@click.option('-f', '--force', is_flag=True) | |
@click.option('-r', '--regions', required=True, help='Comma separated list') | |
def volume_cleanup(force, regions): | |
''' | |
Find and delete unused volumes. | |
''' | |
global ec2 | |
print('{:23} {:20} {:>7} {:10} {:23}'.format( | |
'volume id', 'status', 'size', 'created', 'snapshot id')) | |
for region in regions: | |
ec2 = boto3.client('ec2', region_name=region) | |
print('region={}'.format(region)) | |
for volume in get_available_volumes(): | |
snapshot_exists = exists_icon if volume['snapshot_exists'] else not_exists_icon | |
print('{:23} {:20} {:>7} {:10} {:22}'.format( | |
volume['id'], | |
volume['status'], | |
str(volume['size']) + 'gb', | |
volume['create_time'].strftime('%Y-%m-%d'), | |
volume['snapshot_id'] + snapshot_exists | |
)) | |
if not volume['snapshot_exists']: | |
print('Tags:') | |
print(' '+('\n '.join(['{}={}'.format(click.style(key, fg='blue'), tag) | |
for key, tag in volume['tags'].items()]))) | |
if force or click.confirm('Delete?', default=True): | |
ec2.delete_volume(VolumeId=volume['id']) | |
def snapshot_delete(snapshot_id): | |
''' | |
Delete single snapshot by id. | |
''' | |
try: | |
ec2.delete_snapshot(SnapshotId=snapshot_id) | |
print('Deleted ' + snapshot_id) | |
except ClientError as e: | |
print('Failed to delete ' + snapshot_id) | |
print(e) | |
def get_snapshots(): | |
''' | |
Get all snapshots. | |
''' | |
paginator = ec2.get_paginator('describe_snapshots') | |
page_iterator = paginator.paginate(OwnerIds=['self']) | |
for page in page_iterator: | |
for snapshot in page['Snapshots']: | |
instance_id, image_id = parse_description(snapshot['Description']) | |
yield { | |
'id': snapshot['SnapshotId'], | |
'description': snapshot['Description'], | |
'start_time': snapshot['StartTime'], | |
'size': snapshot['VolumeSize'], | |
'volume_id': snapshot['VolumeId'], | |
'volume_exists': volume_exists(snapshot['VolumeId']), | |
'instance_id': instance_id, | |
'instance_exists': instance_exists(instance_id), | |
'ami_id': image_id, | |
'ami_exists': image_exists(image_id), | |
} | |
def get_available_volumes(): | |
''' | |
Get all volumes in 'available' state. (Volumes not attached to any instance) | |
''' | |
paginator = ec2.get_paginator('describe_volumes') | |
page_iterator = paginator.paginate(Filters=[{'Name': 'status', 'Values': ['available']}]) | |
for page in page_iterator: | |
for volume in page['Volumes']: | |
yield { | |
'id': volume['VolumeId'], | |
'create_time': volume['CreateTime'], | |
'status': volume['State'], | |
'size': volume['Size'], | |
'snapshot_id': volume['SnapshotId'], | |
'snapshot_exists': str(snapshot_exists(volume['SnapshotId'])), | |
'tags': OrderedDict(sorted([(tag['Key'], tag['Value']) for tag in volume['Tags']])), | |
} | |
snapshot_cache = {} | |
def snapshot_exists(snapshot_id): | |
global snapshot_cache | |
if not snapshot_id: | |
return '' | |
try: | |
if snapshot_id not in snapshot_cache: | |
ec2.describe_snapshots(SnapshotIds=[snapshot_id]) | |
snapshot_cache[snapshot_id] = True | |
except ClientError: | |
snapshot_cache[snapshot_id] = False | |
return snapshot_cache[snapshot_id] | |
volume_cache = {} | |
def volume_exists(volume_id): | |
global volume_cache | |
if not volume_id: | |
return False | |
try: | |
if volume_id not in volume_cache: | |
ec2.describe_volumes(VolumeIds=[volume_id]) | |
volume_cache[volume_id] = True | |
except ClientError: | |
volume_cache[volume_id] = False | |
return volume_cache[volume_id] | |
instance_cache = {} | |
def instance_exists(instance_id): | |
global instance_cache | |
if not instance_id: | |
return '' | |
try: | |
instance_cache[instance_id] = (len(ec2.describe_instances(InstanceIds=[instance_id])['Reservations']) != 0) | |
except ClientError: | |
instance_cache[instance_id] = False | |
return instance_cache[instance_id] | |
image_cache = {} | |
def image_exists(image_id): | |
global image_cache | |
if not image_id: | |
return '' | |
try: | |
image_cache[image_id] = (len(ec2.describe_images(ImageIds=[image_id])['Images']) != 0) | |
except ClientError: | |
image_cache[image_id] = False | |
return image_cache[image_id] | |
def parse_description(description): | |
regex = r"^Created by CreateImage\((.*?)\) for (.*?) " | |
matches = re.finditer(regex, description, re.MULTILINE) | |
for matchNum, match in enumerate(matches): | |
return match.groups() | |
return '', '' | |
if __name__ == '__main__': | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment