-
-
Save danpritts/1089d878a76b14393478a7476476f97b to your computer and use it in GitHub Desktop.
# most credit to the original: https://gist.github.com/brandond/6b4d22eaefbd66895f230f68f27ee586 | |
# Tag snapshots based on their associated AMI and volumes based on attached instance. | |
# format: | |
# (AMI:db5|db5) /dev/sda1 (1/4) | |
# (AMI:db5|db5) /dev/sdb (2/4) | |
# Best practice: create IAM user | |
# Simplest privilege to get it to work with reasonable security: use predefined policy "ReadOnlyAccess" | |
# and add your own custom policy that grants "ec2:CreateTags" | |
import copy | |
import logging | |
import os | |
import boto3 | |
logging.basicConfig(level=os.environ.get('LOG_LEVEL', 'INFO')) | |
ec2 = boto3.client('ec2') | |
logger = logging.getLogger(__name__) | |
def tag_snapshots(): | |
snapshots = {} | |
for response in ec2.get_paginator('describe_snapshots').paginate(OwnerIds=['self']): | |
snapshots.update([(snapshot['SnapshotId'], snapshot) for snapshot in response['Snapshots']]) | |
for image in ec2.describe_images(Owners=['self'])['Images']: | |
tags = boto3_tag_list_to_ansible_dict(image.get('Tags', [])) | |
# to tag volumes, e.g., (1/3) (2/3) (3/3) | |
devnum = 0 | |
numberofdevs=len(image['BlockDeviceMappings']) | |
for device in image['BlockDeviceMappings']: | |
if 'SnapshotId' in device['Ebs']: | |
devnum += 1 | |
snapshot = snapshots[device['Ebs']['SnapshotId']] | |
snapshot['Used'] = True | |
cur_tags = boto3_tag_list_to_ansible_dict(snapshot.get('Tags', [])) | |
new_tags = copy.deepcopy(cur_tags) | |
new_tags.update(tags) | |
new_tags['ImageId'] = image['ImageId'] | |
# here's where to change formatting | |
new_tags['Name'] = 'AMI:' + image['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')' | |
if new_tags != cur_tags: | |
logger.info('{0}: Tags changed to {1}'.format(snapshot['SnapshotId'], new_tags)) | |
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=ansible_dict_to_boto3_tag_list(new_tags)) | |
for snapshot in snapshots.values(): | |
if 'Used' not in snapshot: | |
cur_tags = boto3_tag_list_to_ansible_dict(snapshot.get('Tags', [])) | |
name = cur_tags.get('Name', snapshot['SnapshotId']) | |
if not name.startswith('UNUSED'): | |
logger.warning('{0} Unused!'.format(snapshot['SnapshotId'])) | |
cur_tags['Name'] = 'UNUSED ' + name | |
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=ansible_dict_to_boto3_tag_list(cur_tags)) | |
def tag_volumes(): | |
volumes = {} | |
for response in ec2.get_paginator('describe_volumes').paginate(): | |
volumes.update([(volume['VolumeId'], volume) for volume in response['Volumes']]) | |
for response in ec2.get_paginator('describe_instances').paginate(): | |
for reservation in response['Reservations']: | |
for instance in reservation['Instances']: | |
tags = boto3_tag_list_to_ansible_dict(instance.get('Tags', [])) | |
# to tag the number volumes on an instance, e.g., (1/3) (2/3) (3/3) | |
devnum = 0 | |
numberofdevs=len(instance['BlockDeviceMappings']) | |
for device in instance['BlockDeviceMappings']: | |
devnum += 1 | |
volume = volumes[device['Ebs']['VolumeId']] | |
volume['Used'] = True | |
cur_tags = boto3_tag_list_to_ansible_dict(volume.get('Tags', [])) | |
new_tags = copy.deepcopy(cur_tags) | |
new_tags.update(tags) | |
# here's where to change formatting | |
new_tags['Name'] = tags['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')' | |
if new_tags != cur_tags: | |
logger.info('{0} Tags changed to {1}'.format(volume['VolumeId'], new_tags)) | |
ec2.create_tags(Resources=[volume['VolumeId']], Tags=ansible_dict_to_boto3_tag_list(new_tags)) | |
for volume in volumes.values(): | |
if 'Used' not in volume: | |
cur_tags = boto3_tag_list_to_ansible_dict(volume.get('Tags', [])) | |
name = cur_tags.get('Name', volume['VolumeId']) | |
if not name.startswith('UNUSED'): | |
logger.warning('{0} Unused!'.format(volume['VolumeId'])) | |
cur_tags['Name'] = 'UNUSED ' + name | |
ec2.create_tags(Resources=[volume['VolumeId']], Tags=ansible_dict_to_boto3_tag_list(cur_tags)) | |
def tag_everything(): | |
tag_snapshots() | |
tag_volumes() | |
def boto3_tag_list_to_ansible_dict(tags_list): | |
tags_dict = {} | |
for tag in tags_list: | |
if 'key' in tag and not tag['key'].startswith('aws:'): | |
tags_dict[tag['key']] = tag['value'] | |
elif 'Key' in tag and not tag['Key'].startswith('aws:'): | |
tags_dict[tag['Key']] = tag['Value'] | |
return tags_dict | |
def ansible_dict_to_boto3_tag_list(tags_dict): | |
tags_list = [] | |
for k, v in tags_dict.items(): | |
tags_list.append({'Key': k, 'Value': v}) | |
return tags_list | |
def handler(event, context): | |
tag_everything() | |
if __name__ == '__main__': | |
tag_everything() |
Ansible functions were in the original code I copied (see url in comments at top). At a guess, the original author knew about this method from other work and reused it.
Not sure if access or cred, but is using predefined CI CD deployment to different environments. Work on the other one and only on this only env it is not working. Thanks for the feedback as well.
Looks like the key "Ebs" is not present in all BlockDeviceMappings as some might be ephemeral volumes.
I added a try catch around that to see what the error is
and the output of the exception block is
having a try-catch block around these kind of things helps in narrowing the error and continue the execution of the script.
Thanksssss a lot mate! Cheers, owe you one! @yyashwanth
Thanks, Your script help me a lot...
I did some small improvements for this gist
#!/usr/bin/env python3
import logging
import os
import boto3
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
ec2 = boto3.client("ec2")
logger = logging.getLogger(__name__)
def tag_snapshots():
snapshots = {
snapshot["SnapshotId"]: snapshot
for page in ec2.get_paginator("describe_snapshots").paginate(OwnerIds=["self"])
for snapshot in page["Snapshots"]
}
for image in ec2.describe_images(Owners=["self"])["Images"]:
numberofdevs = len(image["BlockDeviceMappings"])
devnum = 0
tags = extract_tags(image.get("Tags", []))
for device in image["BlockDeviceMappings"]:
if "Ebs" in device and "SnapshotId" in device["Ebs"]:
devnum += 1
name = f"AMI: {image['Name']} {device['DeviceName']} ({devnum}/{numberofdevs})"
snapshot = snapshots[device["Ebs"]["SnapshotId"]]
snapshot["Used"] = True
new_tags = {
**extract_tags(snapshot.get("Tags", [])),
**tags,
"Name": name,
"ImageId": image["ImageId"],
}
if new_tags != tags:
logger.info("%s: Tags changed to %s", snapshot["SnapshotId"], repr(new_tags))
ec2.create_tags(Resources=[snapshot["SnapshotId"]], Tags=load_tags(new_tags))
for snapshot in snapshots.values():
if "Used" not in snapshot:
tags = extract_tags(snapshot.get("Tags", []))
name = tags.get("Name", snapshot["SnapshotId"])
if not name.startswith("UNUSED"):
logger.warning("%s: Unused", snapshot["SnapshotId"])
tags["Name"] = f"UNUSED {name}"
ec2.create_tags(Resources=[snapshot["SnapshotId"]], Tags=load_tags(tags))
def tag_volumes():
volumes = {
volume["VolumeId"]: volume
for page in ec2.get_paginator("describe_volumes").paginate()
for volume in page["Volumes"]
}
for page in ec2.get_paginator("describe_instances").paginate():
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
numberofdevs = len(instance["BlockDeviceMappings"])
devnum = 0
tags = extract_tags(instance.get("Tags", []))
for device in instance["BlockDeviceMappings"]:
if "Ebs" in device:
devnum += 1
name = f"{tags['Name']} {device['DeviceName']} ({devnum}/{numberofdevs})"
volume = volumes[device["Ebs"]["VolumeId"]]
volume["Used"] = True
new_tags = {
**extract_tags(volume.get("Tags", [])),
**tags,
"Name": name,
}
if new_tags != tags:
logger.info("%s: Tags changed to %s", volume["VolumeId"], repr(new_tags))
ec2.create_tags(Resources=[volume["VolumeId"]], Tags=load_tags(new_tags))
for volume in volumes.values():
if "Used" not in volume:
tags = extract_tags(volume.get("Tags", []))
name = tags.get("Name", volume["VolumeId"])
if not name.startswith("UNUSED"):
logger.warning("%s: Unused", volume["VolumeId"])
tags["Name"] = f"UNUSED {name}"
ec2.create_tags(Resources=[volume["VolumeId"]], Tags=load_tags(tags))
def tag_everything():
tag_snapshots()
tag_volumes()
def extract_tags(tags):
return {tag["Key"]: tag["Value"] for tag in tags if not tag["Key"].startswith("aws:")}
def load_tags(tags):
return [{"Key": k, "Value": v} for k, v in tags.items() if not k.startswith("aws:")]
if __name__ == "__main__":
tag_everything()
Hello Everyone and thanks for this tread,
I've done some further changes and tried to include as much as possible improvement to this and add the feature of deleting resources to the script as I needed to delete resources that are not used by AMI or Instances and have published a gist here if it comes useful to anyone: https://gist.github.com/gannino/17b41c316f874af5d4068c2674a54048
Can you help me understand why are you using Ansible here ?