Created
May 6, 2021 06:57
-
-
Save Burekasim/f3621ae91313b104e4c73fb4c4205280 to your computer and use it in GitHub Desktop.
lambda script to maintain lifecycle rules on AWS EC2 AMI with slack notification, and controlling ami via tags
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import os | |
import re | |
import logging | |
import json | |
from botocore.exceptions import ClientError | |
from time import sleep | |
def env_var(env: str): | |
if os.environ: | |
if env in os.environ: | |
return os.environ[env] | |
else: | |
return '' | |
else: | |
return '' | |
images_to_keep = int(env_var('images_to_keep')) if env_var('images_to_keep') else int(7) | |
# AWS boto3 settings | |
region = env_var('region') if env_var('region') else 'us-west-2' | |
client = boto3.client('ec2', region_name=region) | |
ami_images = client.describe_images(Owners=['self'], Filters=[{'Name': 'state', 'Values': ['available']}]) | |
# AMI tag to skip | |
skip_tag_name = env_var('skip_tag_name') if env_var('skip_tag_name') else 'SkipAMI' | |
# SNS settings for slack | |
send_sns = True if env_var('send_sns') else False | |
DEBUG = True if env_var('debug') == "True" else False | |
sns_region = env_var('sns_region') | |
sns_topic = env_var('sns_topic') | |
# Logging | |
logger = logging.getLogger() | |
if DEBUG: | |
logger.setLevel('DEBUG') | |
else: | |
logger.setLevel('INFO') | |
def delete_ami(aws_ami_id: list): | |
try: | |
if client.deregister_image(ImageId=aws_ami_id): | |
return True | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'InvalidSnapshot.InUse': | |
return False | |
return False | |
def delete_snapshot(aws_snapshot_id: str): | |
try: | |
if client.delete_snapshot(SnapshotId=aws_snapshot_id): | |
return True | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'InvalidSnapshot.InUse': | |
return False | |
return False | |
def get_running_instances_ami(): | |
instance_ami_list = [] | |
instances = client.describe_instances(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]) | |
for instance in instances['Reservations']: | |
instance_ami_list.append(instance['Instances'][0]['ImageId']) | |
return instance_ami_list | |
def get_ami(): | |
ami_list = [] | |
ami_response = client.describe_images(Owners=['self']) | |
for ami_image in ami_response['Images']: | |
ami_list.append(ami_image['ImageId']) | |
return ami_list | |
def get_ami_name(ami_id: str): | |
ami_response = client.describe_images(Owners=['self'], ImageIds=[ami_id]) | |
return str(ami_response['Images'][0]['Name']) | |
def lambda_handler(event, context): | |
regex = '^instance-(.+)\-([v]?\d+\.\d+\.\d+).+(\d{4}-\d{2}-\d{2}.+Z)' | |
ami_images_list = [] | |
ami_images = client.describe_images(Owners=['self'], Filters=[{'Name': 'state', 'Values': ['available']}]) | |
for ami_details in ami_images['Images']: | |
if 'Tags' in ami_details: | |
skip_ami = False | |
for ami_tags in ami_details['Tags']: | |
if ami_tags['Key'] == skip_tag_name and ami_tags['Value'] == 'True': | |
skip_ami = True | |
if DEBUG: | |
logging.debug('DEBUG: skipping {} because tag {} has been found'.format(ami_details['Name'], skip_tag_name)) | |
if not skip_ami: | |
ami_images_list.append(ami_details) | |
else: | |
ami_images_list.append(ami_details) | |
ami_map = {} | |
bad_ami_names = [] | |
for ami_dict in ami_images_list: | |
ami_id = ami_dict['ImageId'] | |
ami_regex = re.search(regex, ami_dict['Name']) | |
if ami_regex: | |
ami_api_created = ami_dict['CreationDate'] | |
ami_service = ami_regex.group(1) | |
ami_name_created = ami_regex.group(3) | |
if ami_map.get(ami_service): | |
ami_map[ami_service].append({'ami_id': ami_id, 'ami_name': ami_dict['Name'], 'ami_date': ami_api_created}) | |
else: | |
ami_map[ami_service] = [{'ami_id': ami_id, 'ami_name': ami_dict['Name'], 'ami_date': ami_api_created}] | |
else: | |
# insert AMI's that don't match the regex to a list | |
bad_ami_names.append(ami_dict['Name']) | |
# send sns message of all the ami's that don't match the regex | |
if bad_ami_names and send_sns: | |
message = 'The following AMI failed to match regex in region:\n\n{}'.format('\n'.join(bad_ami_names)) | |
try: | |
print(boto3.client('sns', region_name=sns_region).publish(TopicArn=sns_topic, Message=message)) | |
except Exception as e: | |
logger.debug(e) | |
new_dict = {} | |
for service, service_list in ami_map.items(): | |
new_dict[service] = sorted(service_list, key=lambda k: k['ami_date'], reverse=False)[:- images_to_keep] | |
ami_to_delete_list = [] | |
for service, service_list in new_dict.items(): | |
i = 0 | |
for service_ami in service_list: | |
ami_to_delete_list.append(service_ami['ami_id']) | |
# Cleanup duplicate ami and ami used by running instances | |
running_instances_ami = get_running_instances_ami() | |
ami_to_delete_list = list(set(ami_to_delete_list) - set(running_instances_ami)) | |
for ami_to_remove in ami_to_delete_list: | |
if ami_to_remove in running_instances_ami: | |
ami_to_skip_name = get_ami_name(ami_to_remove) | |
logger.info(f'{ami_to_remove} (name: {ami_to_skip_name}) is on running instance, skipping') | |
if ami_to_delete_list: | |
ami_describe_response = client.describe_images(ImageIds=ami_to_delete_list) | |
else: | |
logging.info('Nothing to delete, exiting') | |
return { | |
'statusCode': 200, | |
'body': json.dumps('Done, deatailed log in CloudWatch') | |
} | |
snapshot_list = [] | |
if ami_describe_response['Images']: | |
for ami_image in ami_describe_response['Images']: | |
for snapshot_dict in ami_image['BlockDeviceMappings']: | |
try: | |
if 'Ebs' in snapshot_dict: | |
if snapshot_dict['Ebs']['SnapshotId']: | |
snapshot_list.append(snapshot_dict['Ebs']['SnapshotId']) | |
except Exception as e: | |
logger.debug('could not extract snapshot id for ami') | |
for ami_id in ami_to_delete_list: | |
ami_name = get_ami_name(ami_id) | |
if DEBUG: | |
logging.debug(f'DEBUG: dry-run delete: {ami_id}, ami name is: {ami_name}') | |
else: | |
if delete_ami(ami_id): | |
logger.info(f'successfully deleted AMI {ami_id}, ami name is: {ami_name}') | |
else: | |
logger.info(f'could not delete AMI {ami_id}, ami name is: {ami_name}, will try again one more time') | |
delete_snapshot(ami_id) | |
# Sleep for 5 seconds to avoid api call errors | |
sleep(5) | |
for snapshot_id in snapshot_list: | |
if DEBUG: | |
logging.debug(f'DEBUG: dry-run delete: {snapshot_id}') | |
else: | |
if delete_snapshot(snapshot_id): | |
logger.info(f'successfully deleted {snapshot_id}') | |
else: | |
logger.info(f'could not delete snapshot {snapshot_id} will try again one more time') | |
delete_snapshot(snapshot_id) | |
return { | |
'statusCode': 200, | |
'body': json.dumps('Done, detailed log in CloudWatch') | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment