Skip to content

Instantly share code, notes, and snippets.

@sylr
Created December 2, 2024 22:05
Show Gist options
  • Save sylr/95a99aeeac4987e0961132fa701aa0a7 to your computer and use it in GitHub Desktop.
Save sylr/95a99aeeac4987e0961132fa701aa0a7 to your computer and use it in GitHub Desktop.
Convert cloudtrail json files from S3 to sqlite databa
import json
import sqlite3
import click
import re
import gzip
# Function to create the SQLite database and the necessary tables
def create_db(db_name: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_name, autocommit=False, isolation_level="DEFERRED")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS identity (
identity_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT,
arn TEXT
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS session (
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_name TEXT
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_agent (
user_agent_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_agent TEXT
);
""")
# Create a table for CloudTrail events
cursor.execute("""
CREATE TABLE IF NOT EXISTS event (
event_id TEXT PRIMARY KEY,
event_version TEXT,
event_time TEXT,
event_source TEXT,
event_name TEXT,
identity_id INTEGER REFERENCES identity (identity_id),
session_id INTEGER REFERENCES session (session_id),
aws_region TEXT,
source_ip_address TEXT,
user_agent_id INTEGER REFERENCES user_agent (user_agent_id),
request_parameters TEXT,
response_elements TEXT,
error_code TEXT,
error_message TEXT,
additional_event_data TEXT
);
""")
# Create a table for resources
cursor.execute("""
CREATE TABLE IF NOT EXISTS resource (
resource_id INTEGER PRIMARY KEY AUTOINCREMENT,
resource_type TEXT,
arn_base TEXT UNIQUE
);
""")
# Create an association table to link event and resources
cursor.execute("""
CREATE TABLE IF NOT EXISTS event_resources (
event_id TEXT REFERENCES event (event_id),
resource_id INTEGER REFERENCES resource (resource_id),
arn_path TEXT,
PRIMARY KEY (event_id, resource_id, arn_path)
);
""")
conn.commit()
cursor.close()
return conn
reAssumedRoleArn = re.compile("^([^/]+)/([^/]+)(/.*)$")
reArn = re.compile("^([^/]+)(/.*)?$")
def split_arn(arn: str) -> tuple[str, str]:
if arn == "":
return ("", "")
if ":assumed-role/" in arn:
m = reAssumedRoleArn.search(arn)
if m:
return (m.group(1)+'/'+m.group(2), m.group(3))
m = reArn.search(arn)
return (m.group(1), m.group(2) if m.group(2) else "")
# Function to insert a CloudTrail event into the database
def insert_event(cursor: sqlite3.Cursor, event: any, resource_cache: dict[str, int], identity_cache: dict[str, int], session_cache: dict[str, int], ua_cache: dict[str, int]):
# Extract relevant details from the event
event_id = event.get("eventID", "")
event_version = event.get("eventVersion", "")
user_identity = event.get("userIdentity", {})
user_name = user_identity.get("userName", "")
user_arn, user_session = split_arn(user_identity.get("arn", ""))
event_time = event.get("eventTime", "")
event_source = event.get("eventSource", "")
event_name = event.get("eventName", "")
aws_region = event.get("awsRegion", "")
source_ip = event.get("sourceIPAddress", "")
user_agent = event.get("userAgent", "")
request_parameters = json.dumps(event.get("requestParameters", {}))
response_elements = json.dumps(event.get("responseElements", {}))
error_code = event.get("errorCode", "")
error_message = event.get("errorMessage", "")
additional_event_data = json.dumps(event.get("additionalEventData", {}))
session_id = 0
identity_id = 0
user_agent_id = 0
# Check if the user agent is in the cache
if user_agent in ua_cache:
user_agent_id = ua_cache[user_agent]
else:
cursor.execute("""SELECT user_agent_id FROM user_agent WHERE user_agent = ?""", (user_agent,))
existing_resource = cursor.fetchone()
if existing_resource:
user_agent_id = int(existing_resource[0])
else:
cursor.execute("""INSERT INTO user_agent (user_agent) VALUES (?)""", (user_agent,))
user_agent_id = cursor.lastrowid
ua_cache[user_agent] = user_agent_id
# Check if the session is in the cache
if user_session in session_cache:
session_id = session_cache[user_session]
else:
cursor.execute("""SELECT session_id FROM session WHERE session_name = ?""", (user_session,))
existing_resource = cursor.fetchone()
if existing_resource:
session_id = int(existing_resource[0])
else:
cursor.execute("""INSERT INTO session (session_name) VALUES (?)""", (user_session,))
session_id = cursor.lastrowid
session_cache[user_session] = session_id
# Check if the identity is in the cache
if user_arn in identity_cache:
identity_id = identity_cache[user_arn]
else:
cursor.execute("""SELECT identity_id FROM identity WHERE arn = ?""", (user_arn,))
existing_resource = cursor.fetchone()
if existing_resource:
identity_id = int(existing_resource[0])
else:
cursor.execute("""INSERT INTO identity (user_name, arn) VALUES (?, ?)""", (user_name, user_arn))
identity_id = cursor.lastrowid
identity_cache[user_arn] = identity_id
# Insert the event into the event table
cursor.execute("""
INSERT OR REPLACE INTO event (
event_id, event_version, identity_id, session_id, event_time,
event_source, event_name, aws_region, source_ip_address, user_agent_id,
request_parameters, response_elements, error_code, error_message, additional_event_data
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (event_id, event_version, identity_id, session_id, event_time, event_source, event_name,
aws_region, source_ip, user_agent_id, request_parameters, response_elements,
error_code, error_message, additional_event_data))
# Insert resources associated with the event into the resources table
resources = event.get("resources", [])
for resource in resources:
resource_type = resource.get("type", "")
resource_arn_base, resource_arn_path = split_arn(resource.get("ARN", ""))
# Check if the resource is in the cache
if resource_arn_base in resource_cache:
resource_id = resource_cache[resource_arn_base]
else:
cursor.execute("""SELECT resource_id FROM resource WHERE arn_base = ?""", (resource_arn_base,))
existing_resource = cursor.fetchone()
if existing_resource:
resource_id = int(existing_resource[0])
else:
cursor.execute("""INSERT INTO resource (resource_type, arn_base) VALUES (?, ?)""", (resource_type, resource_arn_base))
resource_id = cursor.lastrowid
resource_cache[resource_arn_base] = resource_id
cursor.execute("""INSERT OR REPLACE INTO event_resources (event_id, resource_id, arn_path) VALUES (?, ?, ?)""", (event_id, resource_id, resource_arn_path))
# Function to process the CloudTrail JSON file and populate the SQLite database
def process_cloudtrail_json(input_json_files: list[str], db_name: str):
# Connect to the SQLite database and create the tables
conn = create_db(db_name)
for input_json_file in input_json_files:
if input_json_file.endswith(".gz"):
with gzip.open(input_json_file, "rb") as file:
events = json.load(file)
else:
with open(input_json_file, "r") as file:
events = json.load(file)
# Cache
identity_cache = {}
resource_cache = {}
session_cache = {}
ua_cache = {}
cursor = conn.cursor()
# Check if the events are in a list (standard case)
if isinstance(events, list):
for event in events:
insert_event(cursor, event, resource_cache, identity_cache, session_cache, ua_cache)
else:
# In case the CloudTrail data is a dictionary with an 'Records' key (older format)
for event in events.get('Records', []):
insert_event(cursor, event, resource_cache, identity_cache, session_cache, ua_cache)
conn.commit()
cursor.close()
conn.close()
print(f"CloudTrail events and resources successfully imported into {db_name}")
# Command-line interface using click
@click.command()
@click.argument('files', type=click.Path(exists=True), required=True, nargs=-1)
@click.argument('db', type=click.Path(), required=True)
def cli(files: list[str], db: str):
"""
Process CloudTrail events from JSON files and store them in a SQLite database.
files: The paths to the CloudTrail JSON files.
db: The SQLite database file name or path.
"""
process_cloudtrail_json(files, db)
# Example usage
if __name__ == "__main__":
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment