Skip to content

Instantly share code, notes, and snippets.

@goneri
Created August 4, 2022 20:23
Show Gist options
  • Save goneri/cbe342a16f398c1745290accda44197b to your computer and use it in GitHub Desktop.
Save goneri/cbe342a16f398c1745290accda44197b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import boto3
from datetime import datetime
client = boto3.client("cloudtrail")
import json
instances = {}
def instanceIds(reponseElements):
return [item["instanceId"] for item in reponseElements["instancesSet"]["items"]]
for e in (
boto3.client("cloudtrail")
.get_paginator("lookup_events")
.paginate(
LookupAttributes=[
{"AttributeKey": "ReadOnly", "AttributeValue": "false"},
{"AttributeKey": "EventSource", "AttributeValue": "ec2.amazonaws.com"},
],
PaginationConfig={"MaxItems": 1000},
)
):
for event in reversed(e.get("Events")):
cloudTrailEvent = json.loads(event["CloudTrailEvent"])
print(f"-> {len(instances)} running instance(s)")
match cloudTrailEvent:
case {
"eventSource": "ec2.amazonaws.com",
"eventName": "TerminateInstances",
"responseElements": responseElements,
}:
print(f"Terminate instance(s) {instanceIds(responseElements)}")
instances = {
k: v
for k, v in instances.items()
if k not in instanceIds(responseElements)
}
case {
"eventSource": "ec2.amazonaws.com",
"eventName": "RunInstances",
"responseElements": responseElements,
}:
for instance in responseElements["instancesSet"]["items"]:
print(f"New instance: {instance['instanceId']}")
instances[instance["instanceId"]] = instance
case {
"eventSource": "ec2.amazonaws.com",
"eventName": "CreateTags"
| "SharedSnapshotVolumeCreated"
| "DeleteVolume",
}:
pass
case {
"eventSource": "ec2.amazonaws.com",
"readOnly": False,
"eventName": unknown,
}:
raise ValueError(f"unknown {unknown}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment