Created
July 7, 2025 15:37
-
-
Save mbutler/5f2b58e632370b058be67c2264acaa86 to your computer and use it in GitHub Desktop.
import 4e compendium sql files. Holy hell what a mess
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sqlparse | |
import pymysql | |
import sqlglot | |
from pathlib import Path | |
# CONFIG | |
DB_NAME = '4e_compendium' | |
DB_USER = 'mbutler' | |
DB_PASS = '' | |
SQL_DIR = './compendium_sql' | |
LOG_FILE = 'import_errors.log' | |
RETRY_FILE = 'retry_failed.sql' | |
# DB connection | |
conn = pymysql.connect( | |
host='localhost', | |
user=DB_USER, | |
password=DB_PASS, | |
database=DB_NAME, | |
charset='utf8mb4', | |
autocommit=True | |
) | |
cursor = conn.cursor() | |
# Clear logs | |
Path(LOG_FILE).write_text('') | |
original_retry = Path(RETRY_FILE).read_text(encoding='utf8') if Path(RETRY_FILE).exists() else '' | |
Path(RETRY_FILE).write_text('') # Clear after saving | |
def log_error(file, stmt, err): | |
with open(LOG_FILE, 'a', encoding='utf8') as f: | |
f.write(f"----- ERROR in {file} -----\n") | |
f.write(f"SQL: {stmt.strip()}\n") | |
f.write(f"Error: {err}\n\n") | |
with open(RETRY_FILE, 'a', encoding='utf8') as f: | |
f.write(stmt.strip() + ';\n\n') | |
def escape_inner_single_quotes(sql): | |
# Replace single quote between word characters with escaped quote | |
return re.sub(r"(?<=\w)'(?=\w)", "''", sql) | |
def clean_statement(sql): | |
fixed = sql | |
fixed = fixed.replace('\\"', '"') | |
fixed = re.sub(r"[‘’]", "'", fixed) | |
fixed = re.sub(r"[“”]", '"', fixed) | |
fixed = re.sub(r'[\u0000-\u001F\u007F\u2028\u2029]', '', fixed) | |
# Escape embedded quotes: e.g. wizard's -> wizard''s | |
fixed = escape_inner_single_quotes(fixed) | |
# Unclosed parentheses fix | |
if fixed.count('(') > fixed.count(')'): | |
fixed += ')' * (fixed.count('(') - fixed.count(')')) | |
# Strip trailing garbage | |
fixed = re.sub(r'(</script>\s*)[^;]*$', r'\1', fixed) | |
return fixed | |
def is_valid_sql_candidate(sql): | |
sql = sql.strip().lower() | |
return sql.startswith(('insert', 'update', 'delete', 'replace', 'create', 'drop')) | |
def is_garbage_fragment(sql): | |
cleaned = sql.strip().lower() | |
return ( | |
len(cleaned) < 25 or | |
all(c in '<>/ =; \'"' for c in cleaned) or | |
not re.search(r'[a-zA-Z0-9_]+\s+\(', cleaned) | |
) | |
def try_execute(stmt, path): | |
try: | |
cursor.execute(stmt) | |
return True | |
except Exception as e1: | |
# Try parsing to validate structure | |
try: | |
sqlglot.parse_one(stmt) | |
except Exception as e2: | |
log_error(path.name, stmt, f"(sqlglot) {e2}") | |
return False | |
# Try with cleaned version | |
fixed = clean_statement(stmt) | |
if fixed != stmt: | |
try: | |
cursor.execute(fixed) | |
return True | |
except Exception as e3: | |
log_error(path.name, fixed, f"(cleaned) {e3}") | |
return False | |
else: | |
log_error(path.name, stmt, f"(uncleaned) {e1}") | |
return False | |
def run_sql_file(path): | |
buffer = '' | |
open_quotes = 0 | |
with open(path, 'r', encoding='utf8') as f: | |
for line in f: | |
stripped = line.strip() | |
if not stripped or stripped.startswith('--'): | |
continue | |
buffer += line | |
open_quotes += line.count("'") % 2 | |
if ';' in line and open_quotes % 2 == 0: | |
statements = sqlparse.split(buffer) | |
for stmt in statements: | |
stmt = stmt.strip() | |
if not stmt or is_garbage_fragment(stmt): | |
continue | |
if not is_valid_sql_candidate(stmt): | |
continue | |
try_execute(stmt, path) | |
buffer = '' | |
open_quotes = 0 | |
# Remaining buffer | |
if buffer.strip(): | |
stmt = buffer.strip() | |
if is_valid_sql_candidate(stmt) and not is_garbage_fragment(stmt): | |
try_execute(stmt, path) | |
def retry_failed_statements(): | |
retry_path = Path(RETRY_FILE) | |
if not original_retry.strip(): | |
print("📭 No retry_failed.sql content found.") | |
return | |
print(f"🔁 Retrying from {RETRY_FILE}") | |
buffer = '' | |
for line in original_retry.splitlines(keepends=True): | |
buffer += line | |
if ';' in line: | |
for stmt in sqlparse.split(buffer): | |
stmt = stmt.strip() | |
if not stmt or is_garbage_fragment(stmt): | |
continue | |
if is_valid_sql_candidate(stmt): | |
try_execute(stmt, retry_path) | |
buffer = '' | |
# 🔁 First, retry failed statements | |
retry_failed_statements() | |
# 📥 Then, process all SQL files | |
for file in sorted(Path(SQL_DIR).glob('*.sql')): | |
print(f'📥 Importing {file.name}') | |
try: | |
run_sql_file(file) | |
print(f'✅ {file.name} completed') | |
except Exception as e: | |
log_error(file.name, '-- FILE LEVEL ERROR --', str(e)) | |
print(f'❌ {file.name} failed at file level') | |
print('📝 Import complete. See import_errors.log and retry_failed.sql for failed inserts.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment