Created
June 9, 2023 08:42
-
-
Save yunzheng/944bcd2962e0fcc12ab8037f52e5f6e6 to your computer and use it in GitHub Desktop.
Convert flow records to sqlite3 database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !/usr/bin/env python3 | |
# | |
# Convert flow records to sqlite3 database | |
# | |
# Author: Yun Zheng Hu <[email protected]> | |
import argparse | |
import logging | |
import sqlite3 | |
import sys | |
from flow.record import RecordDescriptor, RecordReader | |
logger = logging.getLogger(__name__) | |
# flow.record field mappings to SQLite types | |
FIELD_MAP = { | |
"int": "INTEGER", | |
"varint": "INTEGER", | |
"float": "REAL", | |
"bool": "INTEGER", | |
"bytes": "BLOB", | |
} | |
def create_descriptor_table(con: sqlite3.Connection, descriptor: RecordDescriptor): | |
table_name = descriptor.name | |
# Create column definitions (uses TEXT for unsupported types) | |
column_defs = [] | |
for column_name, fieldset in descriptor.get_all_fields().items(): | |
column_type = FIELD_MAP.get(fieldset.typename, "TEXT") | |
column_defs.append(f" [{column_name}] {column_type}") | |
sql_columns = ",\n".join(column_defs) | |
# Create the descriptor table | |
sql = f"""CREATE TABLE IF NOT EXISTS [{table_name}] (\n{sql_columns}\n);""" | |
logger.debug(sql) | |
return con.execute(sql) | |
def create_descriptor_columns(con, descriptor: RecordDescriptor): | |
table_name = descriptor.name | |
# Get existing columns | |
cursor = con.execute(f"PRAGMA table_info([{table_name}])") | |
column_names = set(row[1] for row in cursor.fetchall()) | |
# Add missing columns | |
column_defs = [] | |
for column_name, fieldset in descriptor.get_all_fields().items(): | |
if column_name in column_names: | |
continue | |
column_type = FIELD_MAP.get(fieldset.typename, "TEXT") | |
column_defs.append(f" ALTER TABLE [{table_name}] ADD COLUMN [{column_name}] {column_type}") | |
# No missing columns | |
if not column_defs: | |
return None | |
# Add the new columns | |
sql_columns = ";\n".join(column_defs) | |
sql = f"""BEGIN TRANSACTION;\n{sql_columns};\nCOMMIT;""" | |
logger.info(sql) | |
return con.executescript(sql) | |
def db_insert_record(con, record): | |
table_name = record._desc.name | |
rdict = record._asdict() | |
# Construct placeholder SQL query | |
column_names = ", ".join(f"[{name}]" for name in rdict.keys()) | |
value_placeholder = ", ".join(["?"] * len(rdict)) | |
sql = f"""INSERT INTO [{table_name}] ({column_names}) VALUES ({value_placeholder})""" | |
# convert values to str() for types we don't support | |
values = [] | |
for value in rdict.values(): | |
if not (isinstance(value, (bytes, int, bool, float)) or value is None): | |
value = str(value) | |
values.append(value) | |
# Insert record into database | |
logger.debug(sql) | |
logger.debug(values) | |
return con.execute(sql, values) | |
def recordreader_to_sqlite(reader: RecordReader, con: sqlite3.Connection, batch_size=1000) -> int: | |
desc_seen = set() | |
for count, record in enumerate(reader): | |
desc = record._desc | |
# Create a new table for each new descriptor, or add missing columns | |
if desc not in desc_seen: | |
desc_seen.add(desc) | |
create_descriptor_table(con, desc) | |
create_descriptor_columns(con, desc) | |
db_insert_record(con, record) | |
# Commit every batch_size records | |
if count % batch_size == 0: | |
con.commit() | |
con.commit() | |
return count | |
def main(): | |
parser = argparse.ArgumentParser(description="Convert flow records to sqlite3 database") | |
parser.add_argument("database", metavar="OUTPUT_DATABASE", help="Output sqlite3 database") | |
parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase verbosity") | |
args = parser.parse_args() | |
levels = [logging.WARNING, logging.INFO, logging.DEBUG] | |
level = levels[min(len(levels) - 1, args.verbose)] | |
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(message)s") | |
print(f"Writing records to {args.database!r}", file=sys.stderr) | |
with sqlite3.connect(args.database) as con: | |
# Currently just read records from stdin | |
with RecordReader() as reader: | |
count = recordreader_to_sqlite(reader, con) | |
print(f"Wrote {count} records", file=sys.stderr) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment