Skip to content

Instantly share code, notes, and snippets.

@vvalorous
Forked from magnetikonline/README.md
Created September 30, 2018 07:57
Show Gist options
  • Save vvalorous/9f7d185e0584a43707e633e0b8613cfe to your computer and use it in GitHub Desktop.
Save vvalorous/9f7d185e0584a43707e633e0b8613cfe to your computer and use it in GitHub Desktop.
Python AWS CloudTrail parser class.

Python AWS CloudTrail parser

A Python parser class for CloudTrail event archives, previously dumped to an S3 bucket. Class provides an iterator which will:

  • Scan a given directory for archive files matching the required pattern.
  • Decompress each archive in memory.
  • Parse JSON payload and return each event in turn.

Parser contained in cloudtrailparser.py, with timezone.py used as a simple datetime.tzinfo concrete class implement to provide UTC timezone.

Example

$ ls -l1 /path/to/cloudtrail/archives
ACCOUNT_IDXX_CloudTrail_ap-southeast-2_20160101T2155Z_uiGgE0mgD8GUpvNi.json.gz
ACCOUNT_IDXX_CloudTrail_ap-southeast-2_20160101T2305Z_BNBEUH14QUAV0dNd.json.gz

$ ./example.py

Event name: ListContainerInstances
Event time: 2016-01-01 23:02:08+00:00

Event name: DescribeContainerInstances
Event time: 2016-01-01 23:02:08+00:00

Event name: ListContainerInstances
Event time: 2016-01-01 23:02:11+00:00

Event name: DiscoverPollEndpoint
Event time: 2016-01-01 22:59:36+00:00

Event name: DescribeInstanceHealth
Event time: 2016-01-01 23:00:41+00:00
from datetime import datetime
import gzip
import json
import os
import re
import timezone
class Parser:
ARCHIVE_FILENAME_REGEXP = re.compile(r'^[0-9]{12}_CloudTrail_[a-z]{2}-[a-z]+-[0-9]_[0-9]{8}T[0-9]{4}Z_[a-zA-Z0-9]{16}\.json\.gz$')
CLOUDTRAIL_EVENT_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
TIMEZONE_UTC = timezone.UTC()
def __init__(self,archive_base_dir):
# store base dir to CloudTrail archives
self.archive_base_dir = archive_base_dir.rstrip('/')
def events(self):
# work over CloudTrail archive files
for archive_file_item in self.archive_file_list():
# open archive - parse JSON contents to dictionary
fp = gzip.open(archive_file_item,'rb')
cloudtrail_data = json.loads(fp.read())
fp.close()
if ('Records' in cloudtrail_data):
for trail_item in cloudtrail_data['Records']:
yield self.build_trail_data(trail_item)
def archive_file_list(self):
for (base_path,dir_list,file_list) in os.walk(self.archive_base_dir):
# work over files in directory
for file_item in file_list:
# does file item match archive pattern?
if (not Parser.ARCHIVE_FILENAME_REGEXP.search(file_item)):
# nope - skip file
continue
# full path to archive file
yield '{0}/{1}'.format(base_path,file_item)
def build_trail_data(self,source):
# convert time string to datetime at UTC
event_time_utc = (
datetime.strptime(
source['eventTime'],
Parser.CLOUDTRAIL_EVENT_DATETIME_FORMAT
)
.replace(tzinfo = Parser.TIMEZONE_UTC)
)
# extract the data we care about from the CloudTrail item into dict
return {
'account_id': str(source['recipientAccountId']),
'region': str(source['awsRegion']),
'event_name': str(source['eventName']),
'event_time': event_time_utc,
'request': self.strip_data_unicode(source['requestParameters']),
'response': self.strip_data_unicode(source['responseElements'])
}
def strip_data_unicode(self,data):
data_type = type(data)
# recursively process via strip_data_unicode() both list and dictionary structures
if (data_type is list):
return [
self.strip_data_unicode(list_item)
for list_item in data
]
if (data_type is dict):
return {
self.strip_data_unicode(dict_key): self.strip_data_unicode(dict_value)
for (dict_key,dict_value) in data.items()
}
# simple value
if (data_type is unicode):
# if unicode cast to string
data = str(data)
return data
#!/usr/bin/env python
import cloudtrailparser
def main():
print('Example')
parser = cloudtrailparser.Parser('/path/to/cloudtrail/archives')
for event in parser.events():
print('Event name: {0}'.format(event['event_name']))
print('Event time: {0}\n'.format(event['event_time']))
if (__name__ == '__main__'):
main()
import datetime
class BaseTimezone(datetime.tzinfo):
TIMEDELTA_ZERO = datetime.timedelta(0)
def __init__(self,timezone_name,offset_seconds):
datetime.tzinfo.__init__(self)
self.timezone_name = timezone_name
self.offset = datetime.timedelta(seconds = offset_seconds)
def utcoffset(self,dt):
return self.offset
def dst(self,dt):
return BaseTimezone.TIMEDELTA_ZERO
def tzname(self,dt):
return self.timezone_name
# define timezones
class UTC(BaseTimezone):
def __init__(self):
BaseTimezone.__init__(self,'UTC',0)
class Melbourne(BaseTimezone):
def __init__(self):
BaseTimezone.__init__(self,'Melbourne',10 * 3600)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment