Skip to content

Instantly share code, notes, and snippets.

@Burekasim
Created May 6, 2021 06:57
Show Gist options
  • Save Burekasim/f3621ae91313b104e4c73fb4c4205280 to your computer and use it in GitHub Desktop.
Save Burekasim/f3621ae91313b104e4c73fb4c4205280 to your computer and use it in GitHub Desktop.
lambda script to maintain lifecycle rules on AWS EC2 AMI with slack notification, and controlling ami via tags
import boto3
import os
import re
import logging
import json
from botocore.exceptions import ClientError
from time import sleep
def env_var(env: str):
if os.environ:
if env in os.environ:
return os.environ[env]
else:
return ''
else:
return ''
images_to_keep = int(env_var('images_to_keep')) if env_var('images_to_keep') else int(7)
# AWS boto3 settings
region = env_var('region') if env_var('region') else 'us-west-2'
client = boto3.client('ec2', region_name=region)
ami_images = client.describe_images(Owners=['self'], Filters=[{'Name': 'state', 'Values': ['available']}])
# AMI tag to skip
skip_tag_name = env_var('skip_tag_name') if env_var('skip_tag_name') else 'SkipAMI'
# SNS settings for slack
send_sns = True if env_var('send_sns') else False
DEBUG = True if env_var('debug') == "True" else False
sns_region = env_var('sns_region')
sns_topic = env_var('sns_topic')
# Logging
logger = logging.getLogger()
if DEBUG:
logger.setLevel('DEBUG')
else:
logger.setLevel('INFO')
def delete_ami(aws_ami_id: list):
try:
if client.deregister_image(ImageId=aws_ami_id):
return True
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidSnapshot.InUse':
return False
return False
def delete_snapshot(aws_snapshot_id: str):
try:
if client.delete_snapshot(SnapshotId=aws_snapshot_id):
return True
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidSnapshot.InUse':
return False
return False
def get_running_instances_ami():
instance_ami_list = []
instances = client.describe_instances(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
for instance in instances['Reservations']:
instance_ami_list.append(instance['Instances'][0]['ImageId'])
return instance_ami_list
def get_ami():
ami_list = []
ami_response = client.describe_images(Owners=['self'])
for ami_image in ami_response['Images']:
ami_list.append(ami_image['ImageId'])
return ami_list
def get_ami_name(ami_id: str):
ami_response = client.describe_images(Owners=['self'], ImageIds=[ami_id])
return str(ami_response['Images'][0]['Name'])
def lambda_handler(event, context):
regex = '^instance-(.+)\-([v]?\d+\.\d+\.\d+).+(\d{4}-\d{2}-\d{2}.+Z)'
ami_images_list = []
ami_images = client.describe_images(Owners=['self'], Filters=[{'Name': 'state', 'Values': ['available']}])
for ami_details in ami_images['Images']:
if 'Tags' in ami_details:
skip_ami = False
for ami_tags in ami_details['Tags']:
if ami_tags['Key'] == skip_tag_name and ami_tags['Value'] == 'True':
skip_ami = True
if DEBUG:
logging.debug('DEBUG: skipping {} because tag {} has been found'.format(ami_details['Name'], skip_tag_name))
if not skip_ami:
ami_images_list.append(ami_details)
else:
ami_images_list.append(ami_details)
ami_map = {}
bad_ami_names = []
for ami_dict in ami_images_list:
ami_id = ami_dict['ImageId']
ami_regex = re.search(regex, ami_dict['Name'])
if ami_regex:
ami_api_created = ami_dict['CreationDate']
ami_service = ami_regex.group(1)
ami_name_created = ami_regex.group(3)
if ami_map.get(ami_service):
ami_map[ami_service].append({'ami_id': ami_id, 'ami_name': ami_dict['Name'], 'ami_date': ami_api_created})
else:
ami_map[ami_service] = [{'ami_id': ami_id, 'ami_name': ami_dict['Name'], 'ami_date': ami_api_created}]
else:
# insert AMI's that don't match the regex to a list
bad_ami_names.append(ami_dict['Name'])
# send sns message of all the ami's that don't match the regex
if bad_ami_names and send_sns:
message = 'The following AMI failed to match regex in region:\n\n{}'.format('\n'.join(bad_ami_names))
try:
print(boto3.client('sns', region_name=sns_region).publish(TopicArn=sns_topic, Message=message))
except Exception as e:
logger.debug(e)
new_dict = {}
for service, service_list in ami_map.items():
new_dict[service] = sorted(service_list, key=lambda k: k['ami_date'], reverse=False)[:- images_to_keep]
ami_to_delete_list = []
for service, service_list in new_dict.items():
i = 0
for service_ami in service_list:
ami_to_delete_list.append(service_ami['ami_id'])
# Cleanup duplicate ami and ami used by running instances
running_instances_ami = get_running_instances_ami()
ami_to_delete_list = list(set(ami_to_delete_list) - set(running_instances_ami))
for ami_to_remove in ami_to_delete_list:
if ami_to_remove in running_instances_ami:
ami_to_skip_name = get_ami_name(ami_to_remove)
logger.info(f'{ami_to_remove} (name: {ami_to_skip_name}) is on running instance, skipping')
if ami_to_delete_list:
ami_describe_response = client.describe_images(ImageIds=ami_to_delete_list)
else:
logging.info('Nothing to delete, exiting')
return {
'statusCode': 200,
'body': json.dumps('Done, deatailed log in CloudWatch')
}
snapshot_list = []
if ami_describe_response['Images']:
for ami_image in ami_describe_response['Images']:
for snapshot_dict in ami_image['BlockDeviceMappings']:
try:
if 'Ebs' in snapshot_dict:
if snapshot_dict['Ebs']['SnapshotId']:
snapshot_list.append(snapshot_dict['Ebs']['SnapshotId'])
except Exception as e:
logger.debug('could not extract snapshot id for ami')
for ami_id in ami_to_delete_list:
ami_name = get_ami_name(ami_id)
if DEBUG:
logging.debug(f'DEBUG: dry-run delete: {ami_id}, ami name is: {ami_name}')
else:
if delete_ami(ami_id):
logger.info(f'successfully deleted AMI {ami_id}, ami name is: {ami_name}')
else:
logger.info(f'could not delete AMI {ami_id}, ami name is: {ami_name}, will try again one more time')
delete_snapshot(ami_id)
# Sleep for 5 seconds to avoid api call errors
sleep(5)
for snapshot_id in snapshot_list:
if DEBUG:
logging.debug(f'DEBUG: dry-run delete: {snapshot_id}')
else:
if delete_snapshot(snapshot_id):
logger.info(f'successfully deleted {snapshot_id}')
else:
logger.info(f'could not delete snapshot {snapshot_id} will try again one more time')
delete_snapshot(snapshot_id)
return {
'statusCode': 200,
'body': json.dumps('Done, detailed log in CloudWatch')
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment