Created
December 27, 2024 21:26
-
-
Save aculich/2f18504b2678f3dcca0d5b6bb60ad9e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sqlite3 | |
from tqdm import tqdm | |
def get_table_schema(cursor, table_name): | |
cursor.execute(f"PRAGMA table_info({table_name})") | |
return {col[1]: col[2] for col in cursor.fetchall()} | |
def align_schemas(source_conn, dest_conn, table_name): | |
source_cursor = source_conn.cursor() | |
dest_cursor = dest_conn.cursor() | |
# Retrieve source and destination schemas | |
source_schema = get_table_schema(source_cursor, table_name) | |
dest_schema = get_table_schema(dest_cursor, table_name) | |
# Add missing columns to the destination table | |
for column, col_type in source_schema.items(): | |
if column not in dest_schema: | |
alter_table_sql = f'ALTER TABLE "{table_name}" ADD COLUMN "{column}" {col_type}' | |
dest_cursor.execute(alter_table_sql) | |
dest_conn.commit() | |
def merge_tables(source_conn, dest_conn, table_name): | |
source_cursor = source_conn.cursor() | |
dest_cursor = dest_conn.cursor() | |
# Align schemas before merging | |
align_schemas(source_conn, dest_conn, table_name) | |
# Retrieve column names after alignment | |
dest_schema = get_table_schema(dest_cursor, table_name) | |
columns = ', '.join([f'"{col}"' for col in dest_schema.keys()]) # Escape column names | |
placeholders = ', '.join('?' * len(dest_schema)) | |
# Insert data from source to destination | |
source_cursor.execute(f'SELECT {columns} FROM "{table_name}"') # Escape table name | |
rows = source_cursor.fetchall() | |
if rows: | |
insert_sql = f'INSERT INTO "{table_name}" ({columns}) VALUES ({placeholders})' # Escape table name | |
dest_cursor.executemany(insert_sql, rows) | |
dest_conn.commit() | |
def merge_databases(source_db, dest_conn): | |
source_conn = sqlite3.connect(source_db) | |
source_cursor = source_conn.cursor() | |
# Retrieve all table names from the source database | |
source_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
tables = source_cursor.fetchall() | |
for (table_name,) in tables: | |
merge_tables(source_conn, dest_conn, table_name) | |
source_conn.close() | |
def main(rawdata_dir, output_db): | |
# Connect to the destination database (it will be created if it doesn't exist) | |
dest_conn = sqlite3.connect(output_db) | |
# Traverse the rawdata directory and its subdirectories to find .db files | |
db_files = [] | |
for root, _, files in os.walk(rawdata_dir): | |
for file in files: | |
if file.endswith('.db'): | |
db_files.append(os.path.join(root, file)) | |
# Merge each database into the destination database with a progress bar | |
for db_file in tqdm(db_files, desc="Merging databases"): | |
merge_databases(db_file, dest_conn) | |
dest_conn.close() | |
print(f"All databases have been merged into {output_db}") | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) != 3: | |
print("Usage: merge_databases.py <rawdata_dir> <output_db>") | |
else: | |
main(sys.argv[1], sys.argv[2]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
assuming there is a
rawdata
directory where the excel files are and adata
directory that we'll be working with and saving data into we can run:./dchb_excel_to_sqlite.py rawdata data/unified.db