Skip to content

Instantly share code, notes, and snippets.

@aculich
Created December 27, 2024 21:26
Show Gist options
  • Save aculich/2f18504b2678f3dcca0d5b6bb60ad9e4 to your computer and use it in GitHub Desktop.
Save aculich/2f18504b2678f3dcca0d5b6bb60ad9e4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import sqlite3
from tqdm import tqdm
def get_table_schema(cursor, table_name):
cursor.execute(f"PRAGMA table_info({table_name})")
return {col[1]: col[2] for col in cursor.fetchall()}
def align_schemas(source_conn, dest_conn, table_name):
source_cursor = source_conn.cursor()
dest_cursor = dest_conn.cursor()
# Retrieve source and destination schemas
source_schema = get_table_schema(source_cursor, table_name)
dest_schema = get_table_schema(dest_cursor, table_name)
# Add missing columns to the destination table
for column, col_type in source_schema.items():
if column not in dest_schema:
alter_table_sql = f'ALTER TABLE "{table_name}" ADD COLUMN "{column}" {col_type}'
dest_cursor.execute(alter_table_sql)
dest_conn.commit()
def merge_tables(source_conn, dest_conn, table_name):
source_cursor = source_conn.cursor()
dest_cursor = dest_conn.cursor()
# Align schemas before merging
align_schemas(source_conn, dest_conn, table_name)
# Retrieve column names after alignment
dest_schema = get_table_schema(dest_cursor, table_name)
columns = ', '.join([f'"{col}"' for col in dest_schema.keys()]) # Escape column names
placeholders = ', '.join('?' * len(dest_schema))
# Insert data from source to destination
source_cursor.execute(f'SELECT {columns} FROM "{table_name}"') # Escape table name
rows = source_cursor.fetchall()
if rows:
insert_sql = f'INSERT INTO "{table_name}" ({columns}) VALUES ({placeholders})' # Escape table name
dest_cursor.executemany(insert_sql, rows)
dest_conn.commit()
def merge_databases(source_db, dest_conn):
source_conn = sqlite3.connect(source_db)
source_cursor = source_conn.cursor()
# Retrieve all table names from the source database
source_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = source_cursor.fetchall()
for (table_name,) in tables:
merge_tables(source_conn, dest_conn, table_name)
source_conn.close()
def main(rawdata_dir, output_db):
# Connect to the destination database (it will be created if it doesn't exist)
dest_conn = sqlite3.connect(output_db)
# Traverse the rawdata directory and its subdirectories to find .db files
db_files = []
for root, _, files in os.walk(rawdata_dir):
for file in files:
if file.endswith('.db'):
db_files.append(os.path.join(root, file))
# Merge each database into the destination database with a progress bar
for db_file in tqdm(db_files, desc="Merging databases"):
merge_databases(db_file, dest_conn)
dest_conn.close()
print(f"All databases have been merged into {output_db}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Usage: merge_databases.py <rawdata_dir> <output_db>")
else:
main(sys.argv[1], sys.argv[2])
@aculich
Copy link
Author

aculich commented Dec 27, 2024

assuming there is a rawdata directory where the excel files are and a data directory that we'll be working with and saving data into we can run: ./dchb_excel_to_sqlite.py rawdata data/unified.db

❯ ./dchb_excel_to_sqlite.py rawdata data/unified.db
❯ sqlite3 data/unified.db
SQLite version 3.45.3 2024-04-15 13:34:05
Enter ".help" for usage hints.
sqlite> .tables
Hamlet            Misc_Information  Village
sqlite>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment