Skip to content

Instantly share code, notes, and snippets.

@Dgadavin
Created May 11, 2022 07:27
Show Gist options
  • Save Dgadavin/5aa7dcdf41182fbb2261f4c581b39878 to your computer and use it in GitHub Desktop.
Save Dgadavin/5aa7dcdf41182fbb2261f4c581b39878 to your computer and use it in GitHub Desktop.
from __future__ import print_function
from datetime import date
import boto3
import json
import os
bucket_name = os.getenv('BUCKET')
def main():
store = {}
client = boto3.client('ssm')
paginator = client.get_paginator('describe_parameters')
pages = paginator.paginate(MaxResults=50)
for page in pages:
for p in page['Parameters']:
resp = client.get_parameter(Name=p['Name'], WithDecryption=True)
store[resp['Parameter']['Name']] = resp['Parameter']['Value']
s3 = boto3.client('s3')
s3.put_object(
Bucket=bucket_name,
Key=str(date.today().year) + "/" + str(date.today().month) + "/" + str(date.today().day) + ".json",
Body=json.dumps(store),
ServerSideEncryption='aws:kms'
)
def lambda_handler(event, context):
main()
if __name__ == "__main__":
main()
from __future__ import print_function
import boto3
import os
import botocore
limit = os.getenv('LIMIT', 1)
client = boto3.client('ec2')
account_id = boto3.client('sts').get_caller_identity().get('Account')
def get_snapshots():
snapshots = client.describe_snapshots(OwnerIds=[account_id])
print(snapshots)
return snapshots
def cleanup():
volumes = {}
volume_ids = []
for v in get_snapshots()['Snapshots']:
if v['VolumeId'] not in volumes:
volumes[v['VolumeId']] = {}
volumes[v['VolumeId']][v['SnapshotId']] = v['StartTime']
for v in client.describe_volumes()['Volumes']:
volume_ids.append(v['VolumeId'])
for v, i in volumes.items():
if v in volume_ids:
for snap in sorted(i, key=i.get, reverse=True)[limit:]:
print('REMOVE (volume exists): ' + str(snap) + " for " + v)
try:
client.delete_snapshot(SnapshotId=snap)
except botocore.exceptions.ClientError as e:
print("ERROR: " + str(e))
else:
for snap in sorted(i, key=i.get, reverse=True):
print('REMOVE: ' + str(snap) + " for " + v)
try:
client.delete_snapshot(SnapshotId=snap)
except botocore.exceptions.ClientError as e:
print("ERROR: " + str(e))
def lambda_handler(event, context):
cleanup()
if __name__ == "__main__":
cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment