Created
December 2, 2024 22:05
-
-
Save sylr/95a99aeeac4987e0961132fa701aa0a7 to your computer and use it in GitHub Desktop.
Convert cloudtrail json files from S3 to sqlite databa
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sqlite3 | |
import click | |
import re | |
import gzip | |
# Function to create the SQLite database and the necessary tables | |
def create_db(db_name: str) -> sqlite3.Connection: | |
conn = sqlite3.connect(db_name, autocommit=False, isolation_level="DEFERRED") | |
cursor = conn.cursor() | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS identity ( | |
identity_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_name TEXT, | |
arn TEXT | |
); | |
""") | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS session ( | |
session_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
session_name TEXT | |
); | |
""") | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS user_agent ( | |
user_agent_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_agent TEXT | |
); | |
""") | |
# Create a table for CloudTrail events | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS event ( | |
event_id TEXT PRIMARY KEY, | |
event_version TEXT, | |
event_time TEXT, | |
event_source TEXT, | |
event_name TEXT, | |
identity_id INTEGER REFERENCES identity (identity_id), | |
session_id INTEGER REFERENCES session (session_id), | |
aws_region TEXT, | |
source_ip_address TEXT, | |
user_agent_id INTEGER REFERENCES user_agent (user_agent_id), | |
request_parameters TEXT, | |
response_elements TEXT, | |
error_code TEXT, | |
error_message TEXT, | |
additional_event_data TEXT | |
); | |
""") | |
# Create a table for resources | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS resource ( | |
resource_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
resource_type TEXT, | |
arn_base TEXT UNIQUE | |
); | |
""") | |
# Create an association table to link event and resources | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS event_resources ( | |
event_id TEXT REFERENCES event (event_id), | |
resource_id INTEGER REFERENCES resource (resource_id), | |
arn_path TEXT, | |
PRIMARY KEY (event_id, resource_id, arn_path) | |
); | |
""") | |
conn.commit() | |
cursor.close() | |
return conn | |
reAssumedRoleArn = re.compile("^([^/]+)/([^/]+)(/.*)$") | |
reArn = re.compile("^([^/]+)(/.*)?$") | |
def split_arn(arn: str) -> tuple[str, str]: | |
if arn == "": | |
return ("", "") | |
if ":assumed-role/" in arn: | |
m = reAssumedRoleArn.search(arn) | |
if m: | |
return (m.group(1)+'/'+m.group(2), m.group(3)) | |
m = reArn.search(arn) | |
return (m.group(1), m.group(2) if m.group(2) else "") | |
# Function to insert a CloudTrail event into the database | |
def insert_event(cursor: sqlite3.Cursor, event: any, resource_cache: dict[str, int], identity_cache: dict[str, int], session_cache: dict[str, int], ua_cache: dict[str, int]): | |
# Extract relevant details from the event | |
event_id = event.get("eventID", "") | |
event_version = event.get("eventVersion", "") | |
user_identity = event.get("userIdentity", {}) | |
user_name = user_identity.get("userName", "") | |
user_arn, user_session = split_arn(user_identity.get("arn", "")) | |
event_time = event.get("eventTime", "") | |
event_source = event.get("eventSource", "") | |
event_name = event.get("eventName", "") | |
aws_region = event.get("awsRegion", "") | |
source_ip = event.get("sourceIPAddress", "") | |
user_agent = event.get("userAgent", "") | |
request_parameters = json.dumps(event.get("requestParameters", {})) | |
response_elements = json.dumps(event.get("responseElements", {})) | |
error_code = event.get("errorCode", "") | |
error_message = event.get("errorMessage", "") | |
additional_event_data = json.dumps(event.get("additionalEventData", {})) | |
session_id = 0 | |
identity_id = 0 | |
user_agent_id = 0 | |
# Check if the user agent is in the cache | |
if user_agent in ua_cache: | |
user_agent_id = ua_cache[user_agent] | |
else: | |
cursor.execute("""SELECT user_agent_id FROM user_agent WHERE user_agent = ?""", (user_agent,)) | |
existing_resource = cursor.fetchone() | |
if existing_resource: | |
user_agent_id = int(existing_resource[0]) | |
else: | |
cursor.execute("""INSERT INTO user_agent (user_agent) VALUES (?)""", (user_agent,)) | |
user_agent_id = cursor.lastrowid | |
ua_cache[user_agent] = user_agent_id | |
# Check if the session is in the cache | |
if user_session in session_cache: | |
session_id = session_cache[user_session] | |
else: | |
cursor.execute("""SELECT session_id FROM session WHERE session_name = ?""", (user_session,)) | |
existing_resource = cursor.fetchone() | |
if existing_resource: | |
session_id = int(existing_resource[0]) | |
else: | |
cursor.execute("""INSERT INTO session (session_name) VALUES (?)""", (user_session,)) | |
session_id = cursor.lastrowid | |
session_cache[user_session] = session_id | |
# Check if the identity is in the cache | |
if user_arn in identity_cache: | |
identity_id = identity_cache[user_arn] | |
else: | |
cursor.execute("""SELECT identity_id FROM identity WHERE arn = ?""", (user_arn,)) | |
existing_resource = cursor.fetchone() | |
if existing_resource: | |
identity_id = int(existing_resource[0]) | |
else: | |
cursor.execute("""INSERT INTO identity (user_name, arn) VALUES (?, ?)""", (user_name, user_arn)) | |
identity_id = cursor.lastrowid | |
identity_cache[user_arn] = identity_id | |
# Insert the event into the event table | |
cursor.execute(""" | |
INSERT OR REPLACE INTO event ( | |
event_id, event_version, identity_id, session_id, event_time, | |
event_source, event_name, aws_region, source_ip_address, user_agent_id, | |
request_parameters, response_elements, error_code, error_message, additional_event_data | |
) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
""", (event_id, event_version, identity_id, session_id, event_time, event_source, event_name, | |
aws_region, source_ip, user_agent_id, request_parameters, response_elements, | |
error_code, error_message, additional_event_data)) | |
# Insert resources associated with the event into the resources table | |
resources = event.get("resources", []) | |
for resource in resources: | |
resource_type = resource.get("type", "") | |
resource_arn_base, resource_arn_path = split_arn(resource.get("ARN", "")) | |
# Check if the resource is in the cache | |
if resource_arn_base in resource_cache: | |
resource_id = resource_cache[resource_arn_base] | |
else: | |
cursor.execute("""SELECT resource_id FROM resource WHERE arn_base = ?""", (resource_arn_base,)) | |
existing_resource = cursor.fetchone() | |
if existing_resource: | |
resource_id = int(existing_resource[0]) | |
else: | |
cursor.execute("""INSERT INTO resource (resource_type, arn_base) VALUES (?, ?)""", (resource_type, resource_arn_base)) | |
resource_id = cursor.lastrowid | |
resource_cache[resource_arn_base] = resource_id | |
cursor.execute("""INSERT OR REPLACE INTO event_resources (event_id, resource_id, arn_path) VALUES (?, ?, ?)""", (event_id, resource_id, resource_arn_path)) | |
# Function to process the CloudTrail JSON file and populate the SQLite database | |
def process_cloudtrail_json(input_json_files: list[str], db_name: str): | |
# Connect to the SQLite database and create the tables | |
conn = create_db(db_name) | |
for input_json_file in input_json_files: | |
if input_json_file.endswith(".gz"): | |
with gzip.open(input_json_file, "rb") as file: | |
events = json.load(file) | |
else: | |
with open(input_json_file, "r") as file: | |
events = json.load(file) | |
# Cache | |
identity_cache = {} | |
resource_cache = {} | |
session_cache = {} | |
ua_cache = {} | |
cursor = conn.cursor() | |
# Check if the events are in a list (standard case) | |
if isinstance(events, list): | |
for event in events: | |
insert_event(cursor, event, resource_cache, identity_cache, session_cache, ua_cache) | |
else: | |
# In case the CloudTrail data is a dictionary with an 'Records' key (older format) | |
for event in events.get('Records', []): | |
insert_event(cursor, event, resource_cache, identity_cache, session_cache, ua_cache) | |
conn.commit() | |
cursor.close() | |
conn.close() | |
print(f"CloudTrail events and resources successfully imported into {db_name}") | |
# Command-line interface using click | |
@click.command() | |
@click.argument('files', type=click.Path(exists=True), required=True, nargs=-1) | |
@click.argument('db', type=click.Path(), required=True) | |
def cli(files: list[str], db: str): | |
""" | |
Process CloudTrail events from JSON files and store them in a SQLite database. | |
files: The paths to the CloudTrail JSON files. | |
db: The SQLite database file name or path. | |
""" | |
process_cloudtrail_json(files, db) | |
# Example usage | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment