-
-
Save mlapida/931c03cce1e9e43f147b to your computer and use it in GitHub Desktop.
from __future__ import print_function | |
import json | |
import boto3 | |
import logging | |
#setup simple logging for INFO | |
logger = logging.getLogger() | |
logger.setLevel(logging.ERROR) | |
#define the connection region | |
ec2 = boto3.resource('ec2', region_name="us-west-2") | |
#Set this to True if you don't want the function to perform any actions | |
debugMode = False | |
def lambda_handler(event, context): | |
#List all EC2 instances | |
base = ec2.instances.all() | |
#loop through by running instances | |
for instance in base: | |
#Tag the Volumes | |
for vol in instance.volumes.all(): | |
#print(vol.attachments[0]['Device']) | |
if debugMode == True: | |
print("[DEBUG] " + str(vol)) | |
tag_cleanup(instance, vol.attachments[0]['Device']) | |
else: | |
tag = vol.create_tags(Tags=tag_cleanup(instance, vol.attachments[0]['Device'])) | |
print("[INFO]: " + str(tag)) | |
#Tag the Network Interfaces | |
for eni in instance.network_interfaces: | |
#print(eni.attachment['DeviceIndex']) | |
if debugMode == True: | |
print("[DEBUG] " + str(eni)) | |
tag_cleanup(instance, "eth"+str(eni.attachment['DeviceIndex'])) | |
else: | |
tag = eni.create_tags(Tags=tag_cleanup(instance, "eth"+str(eni.attachment['DeviceIndex']))) | |
print("[INFO]: " + str(tag)) | |
#------------- Functions ------------------ | |
#returns the type of configuration that was performed | |
def tag_cleanup(instance, detail): | |
tempTags=[] | |
v={} | |
for t in instance.tags: | |
#pull the name tag | |
if t['Key'] == 'Name': | |
v['Value'] = t['Value'] + " - " + str(detail) | |
v['Key'] = 'Name' | |
tempTags.append(v) | |
#Set the important tags that should be written here | |
elif t['Key'] == 'Application Owner': | |
print("[INFO]: Application Owner Tag " + str(t)) | |
tempTags.append(t) | |
elif t['Key'] == 'Cost Center': | |
print("[INFO]: Cost Center Tag " + str(t)) | |
tempTags.append(t) | |
elif t['Key'] == 'Date Created': | |
print("[INFO]: Date Created Tag " + str(t)) | |
tempTags.append(t) | |
elif t['Key'] == 'Requestor': | |
print("[INFO]: Requestor Tag " + str(t)) | |
tempTags.append(t) | |
elif t['Key'] == 'System Owner': | |
print("[INFO]: System Owner Tag " + str(t)) | |
tempTags.append(t) | |
else: | |
print("[INFO]: Skip Tag - " + str(t)) | |
print("[INFO] " + str(tempTags)) | |
return(tempTags) |
And another variation adding elb and elbv2 eni tagging. I took out the lambda bits, but it'd be easy to add them back.
import boto3
COPYABLE = ["Service", "Environment", "Team", "Name"]
def ec2():
print('Processing EC2 Instances')
instances = boto3.resource('ec2').instances.all()
for instance in instances:
tags = [t for t in instance.tags or [] if t['Key'] in COPYABLE]
if not tags:
continue
# Tag the EBS Volumes
for vol in instance.volumes.all():
print('Updating tags for {}'.format(vol.id))
vol.create_tags(Tags=tags)
# Tag the Elastic Network Interfaces
for eni in instance.network_interfaces:
print('Updating tags for {}'.format(eni.id))
eni.create_tags(Tags=tags)
def elb():
print('Processing ELB Instances')
def filter(i):
return (i.get('RequesterId') == 'amazon-elb' and
i['Description'].startswith('ELB') and
'/' not in i['Description'])
tags = _get_elb_tags('elb')
for interface in _network_interfaces(filter):
name = interface['Description'].split(' ')[1]
if name not in tags:
continue
_tag_network_interface(interface['NetworkInterfaceId'], tags[name])
def elbv2():
print('Processing ELBv2 Instances')
def filter(i):
return (i.get('RequesterId') == 'amazon-elb' and
i['Description'].startswith('ELB') and
'/' in i['Description'])
tags = _get_elb_tags('elbv2')
for interface in _network_interfaces(filter):
name = interface['Description'].split('/')[1]
if name not in tags:
continue
_tag_network_interface(interface['NetworkInterfaceId'], tags[name])
def _get_elb_tags(name='elb'):
if name == 'elb':
page_name = 'LoadBalancerDescriptions'
key = 'LoadBalancerName'
kwname = 'LoadBalancerNames'
elif name == 'elbv2':
page_name = 'LoadBalancers'
key = 'LoadBalancerArn'
kwname = 'ResourceArns'
else:
raise ValueError('Invalid name: {}'.format(name))
tags = {}
client = boto3.client(name)
paginator = client.get_paginator('describe_load_balancers')
for page in paginator.paginate():
for lb in page[page_name]:
response = client.describe_tags(**{kwname: [lb[key]]})
lb_tags = [item for sublist in
[r.get('Tags', []) for r in response['TagDescriptions']]
for item in sublist]
tags[lb['LoadBalancerName']] = [t for t in lb_tags if
t['Key'] in COPYABLE]
tags[lb['LoadBalancerName']].append(
{'Key': 'Name', 'Value': lb['LoadBalancerName']})
return tags
def _network_interfaces(filter=None):
client = boto3.client('ec2')
paginator = client.get_paginator('describe_network_interfaces')
for page in paginator.paginate():
for interface in page['NetworkInterfaces']:
if filter and not filter(interface):
continue
yield interface
def _tag_network_interface(eni_id, tags):
print('Updating tags for {}'.format(eni_id))
ec2 = boto3.resource('ec2')
eni = ec2.NetworkInterface(eni_id)
eni.create_tags(Tags=tags)
def main():
ec2()
elb()
elbv2()
if __name__ == '__main__':
main()
@gmr this is awesome, just the thing i was looking for. I'm very new to python so still learning. Just 2 things i wanted to ask. Is there a way to include Elastic IP's in this code? Also how could you implement a dryrun in this so we can see what changes would me made without actually making them in the first instance?
I packaged the script from @gmr into self-contained CloudFormation for ease of use.
https://gist.github.com/tomekklas/f11172a0a2cdd5888c66ddb7f6d6c214
Even simpler. Thanks for your example.
import boto3
tags_to_use = ['SomeTag']
def lambda_handler(event, context):
instances = boto3.resource('ec2').instances.all()
for instance in instances:
tags = instance.tags
to_tag = [t for t in tags if t['Key'] in tags_to_use]
for vol in instance.volumes.all():
print(f"Tagging volume {vol.id} from instance {instance.id}")
vol.create_tags(Tags=to_tag)
You can attach a CloudWatch event that listen to new running instances. This way, you can run your script when a new instance starts.
@n2taylor - thanks, this is great. but
Can I add a condition to check if the volume has any existing tag's before assigning the instance tags?
I don't want to take out existing tags on any of the EBS volumes, just want to assign ec2 tags incase if the volume doesn't has any.
thanks
Hi,
Is it possible to extend the copying of tags to Snapshots?
@n2taylor - thanks, this is great. but
Can I add a condition to check if the volume has any existing tag's before assigning the instance tags?
I don't want to take out existing tags on any of the EBS volumes, just want to assign ec2 tags incase if the volume doesn't has any.thanks
I updated my copy of this with this ec2
function, not sure if this works for you. I was specifically looking to avoid overwriting the Name tag as that was set by a persistent volume claim in EKS and didn't want to lose the relation.
COPYABLE = [
"Name",
"role",
"environment",
"cost_category",
]
NO_NAME_COPYABLE = [
"role",
"environment",
"cost_category",
]
def ec2():
print('Processing EC2 Instances')
instances = boto3.resource('ec2').instances.all()
for instance in instances:
tags = [t for t in instance.tags or [] if t['Key'] in COPYABLE]
tags_no_name = [t for t in instance.tags or [] if t['Key'] in NO_NAME_COPYABLE]
if not tags:
continue
# Tag the EBS Volumes
for vol in instance.volumes.all():
print(f'Updating tags for {vol.id}')
# Don't replace existing Name for volumes
existing_volume_tag_keys = [t['Key'] for t in vol.tags or []]
if "Name" in existing_volume_tag_keys:
vol.create_tags(Tags=tags_no_name)
else:
vol.create_tags(Tags=tags)
# Tag the Elastic Network Interfaces
for eni in instance.network_interfaces:
print('Updating tags for {}'.format(eni.id))
eni.create_tags(Tags=tags)
I added the ability to tag network interfaces for ElastiCache and Elasticsearch.
def cache():
print('Processing ElasticCache Instances')
def filter(i):
return (i.get('RequesterId') == 'amazon-elasticache' and
i['Description'].startswith('ElastiCache'))
tags = _get_elasticcache_tags()
pprint(tags)
for interface in _network_interfaces(filter):
name = interface['Description'].split(' ')[1]
print(name)
if name not in tags:
continue
_tag_network_interface(interface['NetworkInterfaceId'], tags[name])
def _get_elasticcache_tags():
tags = {}
client = boto3.client('elasticache')
clusters = client.describe_cache_clusters()
for cluster in clusters['CacheClusters']:
name = cluster['CacheClusterId']
response = client.list_tags_for_resource(ResourceName=cluster['ARN'])
cache = response.get('TagList', [])
tags[name] = [t for t in cache if t['Key'] in COPYABLE]
tags[name].append({'Key': 'Name', 'Value': name})
return tags
def es():
print('Processing Elasticsearch clusters')
def filter(i):
return (i.get('RequesterId') == 'amazon-elasticsearch' and
i['Description'].startswith('ES'))
tags = _get_es_tags()
for interface in _network_interfaces(filter):
name = interface['Description'].split(' ')[1]
if name not in tags:
continue
_tag_network_interface(interface['NetworkInterfaceId'], tags[name])
def _get_es_tags():
tags = {}
client = boto3.client('es')
domain_names = client.list_domain_names()
for domain in domain_names['DomainNames']:
domain_name = domain['DomainName']
domain_details = client.describe_elasticsearch_domain(DomainName=domain_name)
response = client.list_tags(ARN=domain_details['DomainStatus']['ARN'])
es = response.get('TagList', [])
tags[domain_name] = [t for t in es if t['Key'] in COPYABLE]
tags[domain_name].append({'Key': 'Name', 'Value': domain_name})
return tags
And EKS:
def eks():
print('Processing EKS Clusters')
def filter(i):
return (i['Description'].startswith('Amazon EKS'))
tags = _get_eks_tags()
for interface in _network_interfaces(filter):
name = interface['Description'].split(' ')[2]
if name not in tags:
continue
_tag_network_interface(interface['NetworkInterfaceId'], tags[name])
def _get_eks_tags():
tags = {}
client = boto3.client('eks')
paginator = client.get_paginator('list_clusters')
clusters = [c for p in paginator.paginate() for c in p['clusters']]
for cluster in clusters:
response = client.describe_cluster(name=cluster)
cluster_details = response['cluster']
cluster_tags = cluster_details.get('tags', [])
tags[cluster] = [{'Key': k, 'Value': v} for k, v in cluster_tags.items() if k in COPYABLE]
return tags
i got this:
import boto3
def lambda_handler(event, context):
is_test = context.function_name == 'test' # this value is injected by SAM local
instances = boto3.resource('ec2').instances.all()
volume = ["Volume"]
eni = ["ENI"]
copyable_tag_keys = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
copyable_tag_keysv = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
copyable_tag_keyseni = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
for instance in instances:
copyable_tags = [t for t in instance.tags
if t["Key"] in copyable_tag_keys] if instance.tags else []
if not copyable_tags:
continue
# Tag the EBS Volumes
print(f"{instance.instance_id}: {instance.tags}")
for vol in instance.volumes.all():
print(f"Volume{vol.attachments[0]['Device']}: %volume,{copyable_tags}")
if not is_test:
vol.create_tags(Tags=copyable_tags)
# Tag the Elastic Network Interfaces
for eni in instance.network_interfaces:
print(f"eth{str(eni.attachment['DeviceIndex'])}: %eni,{copyable_tags}")
if not is_test:
eni.create_tags(Tags=copyable_tags)
------------
But actually i need to exclude some ec2 instances and volumes from tagging, how can i do that?
@jamesduffy i want to exclude some ec2 instances and volumes from tagging, how can i do that?
@jamesduffy i want to exclude some ec2 instances and volumes from tagging, how can i do that?
I don't know if negative filters work in boto3. I haven't tried it, but I would start there. I would try to add a tag to the instances/resources that tell the script not to copy the tags. For example my ideal setup I would create a copy-tags-ignore=true
on anything I wanted to ignore. Then if I can't do a negative filter to replace boto3.resource('ec2').instances.all()
. If I can't use a negative filter than if you can check for that tag in the for loop and check if it exists and continue
if it is exists and set to true.
Not ideal instead of using a tag named something like copy-tags-ignore
you could create a list of resource names you don't want the script to copy and in the loop do a check like if instance.tags['Name'] in do_not_copy
continue instead of copying.
I borrowed this code. Thanks. Here's a condensed version for py3.6 with less logging: