|
from datetime import datetime |
|
import gzip |
|
import json |
|
import os |
|
import re |
|
|
|
import timezone |
|
|
|
|
|
class Parser: |
|
ARCHIVE_FILENAME_REGEXP = re.compile( |
|
r"^[0-9]{12}_CloudTrail_[a-z]{2}-[a-z]+-[0-9]_[0-9]{8}T[0-9]{4}Z_[a-zA-Z0-9]{16}\.json\.gz$" |
|
) |
|
CLOUDTRAIL_EVENT_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" |
|
TIMEZONE_UTC = timezone.UTC() |
|
|
|
def __init__(self, archive_base_dir): |
|
# store base dir to CloudTrail archives |
|
self.archive_base_dir = archive_base_dir.rstrip("/") |
|
|
|
def events(self): |
|
# work over CloudTrail archive files |
|
for archive_file_item in self.archive_file_list(): |
|
# open archive - parse JSON contents to dictionary |
|
fp = gzip.open(archive_file_item, "rb") |
|
cloudtrail_data = json.loads(fp.read()) |
|
fp.close() |
|
|
|
if "Records" in cloudtrail_data: |
|
for trail_item in cloudtrail_data["Records"]: |
|
yield self.build_trail_data(trail_item) |
|
|
|
def archive_file_list(self): |
|
for base_path, dir_list, file_list in os.walk(self.archive_base_dir): |
|
# work over files in directory |
|
for file_item in file_list: |
|
# does file item match archive pattern? |
|
if not Parser.ARCHIVE_FILENAME_REGEXP.search(file_item): |
|
# nope - skip file |
|
continue |
|
|
|
# full path to archive file |
|
yield "{0}/{1}".format(base_path, file_item) |
|
|
|
def build_trail_data(self, source): |
|
# convert time string to datetime at UTC |
|
event_time_utc = datetime.strptime( |
|
source["eventTime"], Parser.CLOUDTRAIL_EVENT_DATETIME_FORMAT |
|
).replace(tzinfo=Parser.TIMEZONE_UTC) |
|
|
|
# extract the data we care about from the CloudTrail item into dict |
|
return { |
|
"account_id": str(source["recipientAccountId"]), |
|
"region": str(source["awsRegion"]), |
|
"event_name": str(source["eventName"]), |
|
"event_time": event_time_utc, |
|
"request": self.strip_data_unicode(source["requestParameters"]), |
|
"response": self.strip_data_unicode(source["responseElements"]), |
|
} |
|
|
|
def strip_data_unicode(self, data): |
|
data_type = type(data) |
|
|
|
# recursively process via strip_data_unicode() both list and dictionary structures |
|
if data_type is list: |
|
return [self.strip_data_unicode(list_item) for list_item in data] |
|
|
|
if data_type is dict: |
|
return { |
|
self.strip_data_unicode(dict_key): self.strip_data_unicode(dict_value) |
|
for (dict_key, dict_value) in data.items() |
|
} |
|
|
|
# simple value |
|
if data_type is unicode: |
|
# if unicode cast to string |
|
data = str(data) |
|
|
|
return data |