Skip to content

Instantly share code, notes, and snippets.

@artem-hatchenko
Created January 5, 2023 13:20
Show Gist options
  • Save artem-hatchenko/4c914c50a330029d2dadf34f937d9740 to your computer and use it in GitHub Desktop.
Save artem-hatchenko/4c914c50a330029d2dadf34f937d9740 to your computer and use it in GitHub Desktop.
import boto3
custom_ec2_filter = [
{
'Name': 'instance-state-name',
'Values': ['running', 'pending']
}
]
# Combination of tag key and value to prevent shutdown/downscale resources
skip_tag_key = 'prevent_stop'
skip_tag_value = 'true'
# Get list of EC2 regions
ec2 = boto3.client('ec2')
regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']]
def main(event, context):
for region in regions:
print("Region: " + str(region))
# ASG
print("Searching for ASG...")
asg = boto3.client('autoscaling', region_name = region)
asg_list = [asg_name['AutoScalingGroupName'] for asg_name in asg.describe_auto_scaling_groups()['AutoScalingGroups']]
for asg_name in asg_list:
api_response = asg.describe_auto_scaling_groups(
AutoScalingGroupNames = [
asg_name,
]
)
tags = (api_response["AutoScalingGroups"][0]["Tags"])
asg_scaledown = False
if skip_tag_key not in [tag['Key'] for tag in tags]:
asg_scaledown = True
else:
for tag in tags:
if tag["Key"] == skip_tag_key and tag["Value"] != skip_tag_value:
asg_scaledown = True
if asg_scaledown == True:
minSize = (api_response["AutoScalingGroups"][0]["MinSize"])
maxSize = (api_response["AutoScalingGroups"][0]["MaxSize"])
desiredCapacity = (api_response["AutoScalingGroups"][0]["DesiredCapacity"])
if minSize != 0 or maxSize != 0 or desiredCapacity != 0:
print("Scalling down ASG: " + str(asg_name))
asg.update_auto_scaling_group(
AutoScalingGroupName = asg_name,
MinSize = 0,
MaxSize = 0,
DesiredCapacity = 0
)
# EC2 Instances
print("Searching for running instances...")
ec2 = boto3.resource('ec2', region_name = region)
instances_list = ec2.instances.filter(Filters = custom_ec2_filter)
for instance in instances_list:
if instance.tags is not None:
if skip_tag_key not in [tag['Key'] for tag in instance.tags]:
print("Stopping instance: " + str(instance.id))
instance.stop()
else:
for tag in instance.tags:
if tag["Key"] == skip_tag_key and tag["Value"] != skip_tag_value:
print("Stopping instance: " + str(instance.id))
instance.stop()
else:
print("Stopping instance: " + str(instance.id))
instance.stop()
# RDS
print("Searching for running RDS instances...")
rds = boto3.client('rds', region_name = region)
rds_list = [instance['DBInstanceIdentifier'] for instance in rds.describe_db_instances()['DBInstances']]
for instance in rds_list:
arn = rds.describe_db_instances(DBInstanceIdentifier = instance)['DBInstances'][0]['DBInstanceArn']
tags = rds.list_tags_for_resource(ResourceName = arn)['TagList']
stop_instance = False
if skip_tag_key not in [tag['Key'] for tag in tags]:
stop_instance = True
else:
for tag in tags:
if tag["Key"] == skip_tag_key and tag["Value"] != skip_tag_value:
stop_instance = True
if stop_instance == True:
instance_status = rds.describe_db_instances(DBInstanceIdentifier = instance)['DBInstances'][0]['DBInstanceStatus']
if instance_status != "stopping" and instance_status != "stopped":
engine = rds.describe_db_instances(DBInstanceIdentifier = instance)['DBInstances'][0]['Engine']
print("Engine: " + str(engine))
if engine.startswith('aurora') == True:
cluster_id = rds.describe_db_instances(DBInstanceIdentifier = instance)['DBInstances'][0]['DBClusterIdentifier']
print("Stopping Aurora cluster: " + str(cluster_id))
rds.stop_db_cluster(DBClusterIdentifier = cluster_id)
else:
print("Stopping RDS instances: " + str(instance))
rds.stop_db_instance(DBInstanceIdentifier = instance)
print("----------------------------------------")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment