Skip to content

Instantly share code, notes, and snippets.

@artem-hatchenko
Created June 8, 2022 18:55
Show Gist options
  • Save artem-hatchenko/a7ed678750ad8ea8e106e336d0459f92 to your computer and use it in GitHub Desktop.
Save artem-hatchenko/a7ed678750ad8ea8e106e336d0459f92 to your computer and use it in GitHub Desktop.
from __future__ import print_function
from datetime import datetime
import sys
import boto3
import botocore
import urllib.request
ec2 = boto3.client('ec2', region_name='eu-west-1')
def create_snapshot(app):
print("Creating snapshot for " + str(app))
# Get Instance ID
instances_id = []
response = ec2.describe_instances(
Filters = [
{
'Name': 'tag:Application',
'Values': [
app,
]
},
{
'Name': 'instance-state-name',
'Values': ['running']
}
]
)
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instances_id.append(instance['InstanceId'])
# Get Volumes ID
volumes_id = []
for instance_id in instances_id:
response = ec2.describe_instance_attribute(InstanceId = instance_id, Attribute = 'blockDeviceMapping')
for BlockDeviceMappings in response['BlockDeviceMappings']:
volumes_id.append(BlockDeviceMappings['Ebs']['VolumeId'])
# Create Volume Snapshot
for volume_id in volumes_id:
date = datetime.now()
response = ec2.create_snapshot(
Description = app + " snapshot created by Jenkins at " + str(date),
VolumeId = volume_id,
TagSpecifications=[
{
'ResourceType': 'snapshot',
'Tags': [
{
'Key': 'Name',
'Value': app + "-snapshot"
},
]
},
]
)
try:
print("Waiting until snapshot will be created...")
snapshot_id = response['SnapshotId']
snapshot_complete_waiter = ec2.get_waiter('snapshot_completed')
snapshot_complete_waiter.wait(SnapshotIds=[snapshot_id], WaiterConfig = { 'Delay': 30, 'MaxAttempts': 120})
except botocore.exceptions.WaiterError as e:
print(e)
def cleanup_snapshot(app):
print("Clean up step.")
print("Looking for all snapshots regarding " + str(app) + " application.")
# Get snapshots
response = ec2.describe_snapshots(
Filters = [
{
'Name': 'tag:Name',
'Values': [
app + "-snapshot",
]
},
],
OwnerIds = [
'self',
]
)
sorted_snapshots = sorted(response['Snapshots'], key=lambda k: k['StartTime'])
# Clean up snapshots and keep only the latest snapshot
for snapshot in sorted_snapshots[:-1]:
print("Deleting snapshot: " + str(snapshot['SnapshotId']))
response = ec2.delete_snapshot(
SnapshotId = snapshot['SnapshotId']
)
def main():
if sys.argv[1:] and sys.argv[2:]:
action = sys.argv[1]
app = sys.argv[2]
if action == 'create':
create_snapshot(app)
elif action == 'cleanup':
cleanup_snapshot(app)
else:
print("Wrong action: " + str(action))
else:
print("This script for create and cleanup snapshots")
print("Usage : python3 " + sys.argv[0] + " {create|cleanup} " + "{application_tag}")
print("Example: python3 " + sys.argv[0] + " create " + "ebs-snapshot-check")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment