Skip to content

Instantly share code, notes, and snippets.

@mbutler
Created July 7, 2025 15:37
Show Gist options
  • Save mbutler/5f2b58e632370b058be67c2264acaa86 to your computer and use it in GitHub Desktop.
Save mbutler/5f2b58e632370b058be67c2264acaa86 to your computer and use it in GitHub Desktop.
import 4e compendium sql files. Holy hell what a mess
import os
import re
import sqlparse
import pymysql
import sqlglot
from pathlib import Path
# CONFIG
DB_NAME = '4e_compendium'
DB_USER = 'mbutler'
DB_PASS = ''
SQL_DIR = './compendium_sql'
LOG_FILE = 'import_errors.log'
RETRY_FILE = 'retry_failed.sql'
# DB connection
conn = pymysql.connect(
host='localhost',
user=DB_USER,
password=DB_PASS,
database=DB_NAME,
charset='utf8mb4',
autocommit=True
)
cursor = conn.cursor()
# Clear logs
Path(LOG_FILE).write_text('')
original_retry = Path(RETRY_FILE).read_text(encoding='utf8') if Path(RETRY_FILE).exists() else ''
Path(RETRY_FILE).write_text('') # Clear after saving
def log_error(file, stmt, err):
with open(LOG_FILE, 'a', encoding='utf8') as f:
f.write(f"----- ERROR in {file} -----\n")
f.write(f"SQL: {stmt.strip()}\n")
f.write(f"Error: {err}\n\n")
with open(RETRY_FILE, 'a', encoding='utf8') as f:
f.write(stmt.strip() + ';\n\n')
def escape_inner_single_quotes(sql):
# Replace single quote between word characters with escaped quote
return re.sub(r"(?<=\w)'(?=\w)", "''", sql)
def clean_statement(sql):
fixed = sql
fixed = fixed.replace('\\"', '"')
fixed = re.sub(r"[‘’]", "'", fixed)
fixed = re.sub(r"[“”]", '"', fixed)
fixed = re.sub(r'[\u0000-\u001F\u007F\u2028\u2029]', '', fixed)
# Escape embedded quotes: e.g. wizard's -> wizard''s
fixed = escape_inner_single_quotes(fixed)
# Unclosed parentheses fix
if fixed.count('(') > fixed.count(')'):
fixed += ')' * (fixed.count('(') - fixed.count(')'))
# Strip trailing garbage
fixed = re.sub(r'(</script>\s*)[^;]*$', r'\1', fixed)
return fixed
def is_valid_sql_candidate(sql):
sql = sql.strip().lower()
return sql.startswith(('insert', 'update', 'delete', 'replace', 'create', 'drop'))
def is_garbage_fragment(sql):
cleaned = sql.strip().lower()
return (
len(cleaned) < 25 or
all(c in '<>/&nbsp;=; \'"' for c in cleaned) or
not re.search(r'[a-zA-Z0-9_]+\s+\(', cleaned)
)
def try_execute(stmt, path):
try:
cursor.execute(stmt)
return True
except Exception as e1:
# Try parsing to validate structure
try:
sqlglot.parse_one(stmt)
except Exception as e2:
log_error(path.name, stmt, f"(sqlglot) {e2}")
return False
# Try with cleaned version
fixed = clean_statement(stmt)
if fixed != stmt:
try:
cursor.execute(fixed)
return True
except Exception as e3:
log_error(path.name, fixed, f"(cleaned) {e3}")
return False
else:
log_error(path.name, stmt, f"(uncleaned) {e1}")
return False
def run_sql_file(path):
buffer = ''
open_quotes = 0
with open(path, 'r', encoding='utf8') as f:
for line in f:
stripped = line.strip()
if not stripped or stripped.startswith('--'):
continue
buffer += line
open_quotes += line.count("'") % 2
if ';' in line and open_quotes % 2 == 0:
statements = sqlparse.split(buffer)
for stmt in statements:
stmt = stmt.strip()
if not stmt or is_garbage_fragment(stmt):
continue
if not is_valid_sql_candidate(stmt):
continue
try_execute(stmt, path)
buffer = ''
open_quotes = 0
# Remaining buffer
if buffer.strip():
stmt = buffer.strip()
if is_valid_sql_candidate(stmt) and not is_garbage_fragment(stmt):
try_execute(stmt, path)
def retry_failed_statements():
retry_path = Path(RETRY_FILE)
if not original_retry.strip():
print("📭 No retry_failed.sql content found.")
return
print(f"🔁 Retrying from {RETRY_FILE}")
buffer = ''
for line in original_retry.splitlines(keepends=True):
buffer += line
if ';' in line:
for stmt in sqlparse.split(buffer):
stmt = stmt.strip()
if not stmt or is_garbage_fragment(stmt):
continue
if is_valid_sql_candidate(stmt):
try_execute(stmt, retry_path)
buffer = ''
# 🔁 First, retry failed statements
retry_failed_statements()
# 📥 Then, process all SQL files
for file in sorted(Path(SQL_DIR).glob('*.sql')):
print(f'📥 Importing {file.name}')
try:
run_sql_file(file)
print(f'✅ {file.name} completed')
except Exception as e:
log_error(file.name, '-- FILE LEVEL ERROR --', str(e))
print(f'❌ {file.name} failed at file level')
print('📝 Import complete. See import_errors.log and retry_failed.sql for failed inserts.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment