Skip to content

Instantly share code, notes, and snippets.

@yunzheng
Created June 9, 2023 08:42
Show Gist options
  • Save yunzheng/944bcd2962e0fcc12ab8037f52e5f6e6 to your computer and use it in GitHub Desktop.
Save yunzheng/944bcd2962e0fcc12ab8037f52e5f6e6 to your computer and use it in GitHub Desktop.
Convert flow records to sqlite3 database
# !/usr/bin/env python3
#
# Convert flow records to sqlite3 database
#
# Author: Yun Zheng Hu <[email protected]>
import argparse
import logging
import sqlite3
import sys
from flow.record import RecordDescriptor, RecordReader
logger = logging.getLogger(__name__)
# flow.record field mappings to SQLite types
FIELD_MAP = {
"int": "INTEGER",
"varint": "INTEGER",
"float": "REAL",
"bool": "INTEGER",
"bytes": "BLOB",
}
def create_descriptor_table(con: sqlite3.Connection, descriptor: RecordDescriptor):
table_name = descriptor.name
# Create column definitions (uses TEXT for unsupported types)
column_defs = []
for column_name, fieldset in descriptor.get_all_fields().items():
column_type = FIELD_MAP.get(fieldset.typename, "TEXT")
column_defs.append(f" [{column_name}] {column_type}")
sql_columns = ",\n".join(column_defs)
# Create the descriptor table
sql = f"""CREATE TABLE IF NOT EXISTS [{table_name}] (\n{sql_columns}\n);"""
logger.debug(sql)
return con.execute(sql)
def create_descriptor_columns(con, descriptor: RecordDescriptor):
table_name = descriptor.name
# Get existing columns
cursor = con.execute(f"PRAGMA table_info([{table_name}])")
column_names = set(row[1] for row in cursor.fetchall())
# Add missing columns
column_defs = []
for column_name, fieldset in descriptor.get_all_fields().items():
if column_name in column_names:
continue
column_type = FIELD_MAP.get(fieldset.typename, "TEXT")
column_defs.append(f" ALTER TABLE [{table_name}] ADD COLUMN [{column_name}] {column_type}")
# No missing columns
if not column_defs:
return None
# Add the new columns
sql_columns = ";\n".join(column_defs)
sql = f"""BEGIN TRANSACTION;\n{sql_columns};\nCOMMIT;"""
logger.info(sql)
return con.executescript(sql)
def db_insert_record(con, record):
table_name = record._desc.name
rdict = record._asdict()
# Construct placeholder SQL query
column_names = ", ".join(f"[{name}]" for name in rdict.keys())
value_placeholder = ", ".join(["?"] * len(rdict))
sql = f"""INSERT INTO [{table_name}] ({column_names}) VALUES ({value_placeholder})"""
# convert values to str() for types we don't support
values = []
for value in rdict.values():
if not (isinstance(value, (bytes, int, bool, float)) or value is None):
value = str(value)
values.append(value)
# Insert record into database
logger.debug(sql)
logger.debug(values)
return con.execute(sql, values)
def recordreader_to_sqlite(reader: RecordReader, con: sqlite3.Connection, batch_size=1000) -> int:
desc_seen = set()
for count, record in enumerate(reader):
desc = record._desc
# Create a new table for each new descriptor, or add missing columns
if desc not in desc_seen:
desc_seen.add(desc)
create_descriptor_table(con, desc)
create_descriptor_columns(con, desc)
db_insert_record(con, record)
# Commit every batch_size records
if count % batch_size == 0:
con.commit()
con.commit()
return count
def main():
parser = argparse.ArgumentParser(description="Convert flow records to sqlite3 database")
parser.add_argument("database", metavar="OUTPUT_DATABASE", help="Output sqlite3 database")
parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase verbosity")
args = parser.parse_args()
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(len(levels) - 1, args.verbose)]
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s")
print(f"Writing records to {args.database!r}", file=sys.stderr)
with sqlite3.connect(args.database) as con:
# Currently just read records from stdin
with RecordReader() as reader:
count = recordreader_to_sqlite(reader, con)
print(f"Wrote {count} records", file=sys.stderr)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment