Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Last active August 29, 2025 00:56
Show Gist options
  • Save mzhang77/e3e2bd26a56db2ea998a3a2446ceae1e to your computer and use it in GitHub Desktop.
Save mzhang77/e3e2bd26a56db2ea998a3a2446ceae1e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
config="""
[data-sources]
[data-sources.mysql]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
[data-sources.tidb]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
[task]
output-dir = "."
source-instance = "mysql"
target-instance = "tidb"
target-check-schema = ["test100"]
"""
"""
Schema comparator for MySQL/TiDB.
- Reads a TOML config like the one provided.
- Connects to the data source named by [task].source-instance and [task].target-instance
- For each schema named in [task].target-check-schema:
* Compares table counts
* Compares SHOW CREATE TABLE outputs (normalized)
- Writes a Markdown report + per-table .diff files into [task].output-dir
Dependencies:
pip install pymysql toml
Usage:
python compare_schemas.py
# Config is embedded via the `config` variable at the top of the script.
"""
import base64
import difflib
import os
import re
from datetime import datetime
from typing import Dict, List, Tuple
from typing import Any
import pymysql
import toml
# --------------------------- Helpers ---------------------------
def maybe_b64(s: str) -> str:
"""
Heuristically decode Base64 passwords.
If decoding cleanly produces UTF-8 printables, use it; otherwise return original.
Also supports explicit 'base64:' prefix.
"""
if not s:
return s
raw = s
if s.lower().startswith("base64:"):
raw = s[7:]
try:
decoded = base64.b64decode(raw, validate=True)
text = decoded.decode("utf-8")
# require mostly printable
if sum(ch.isprintable() for ch in text) / max(1, len(text)) > 0.95:
return text
return s
except Exception:
return s
def connect_mysql(cfg: Dict):
"""
cfg keys: host, port, user, password
"""
password = maybe_b64(cfg.get("password", ""))
return pymysql.connect(
host=cfg["host"],
port=int(cfg.get("port", 3306)),
user=cfg["user"],
password=password,
charset="utf8mb4",
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
)
def get_tables(conn, schema: str) -> List[str]:
"""
Return sorted list of base table names in a schema (excludes views).
Works for MySQL and TiDB.
"""
q = """
SELECT TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA=%s AND TABLE_TYPE='BASE TABLE'
ORDER BY TABLE_NAME
"""
with conn.cursor() as cur:
cur.execute(q, (schema,))
rows = cur.fetchall()
return [r["TABLE_NAME"] for r in rows]
def get_table_columns(conn, schema: str, table: str) -> List[str]:
"""Return ordered column names for a table."""
q = (
"SELECT COLUMN_NAME FROM information_schema.COLUMNS "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
)
with conn.cursor() as cur:
cur.execute(q, (schema, table))
rows = cur.fetchall()
return [r["COLUMN_NAME"] for r in rows]
def get_primary_key_columns(conn, schema: str, table: str) -> List[str]:
"""Return ordered primary key column names for a table. Empty if none."""
q = (
"""
SELECT COLUMN_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND CONSTRAINT_NAME='PRIMARY'
ORDER BY ORDINAL_POSITION
"""
)
with conn.cursor() as cur:
cur.execute(q, (schema, table))
rows = cur.fetchall()
return [r["COLUMN_NAME"] for r in rows]
def show_create_table(conn, schema: str, table: str) -> str:
with conn.cursor() as cur:
cur.execute(f"SHOW CREATE TABLE `{schema}`.`{table}`")
row = cur.fetchone()
# MySQL returns keys: Table, Create Table
# TiDB returns: Table, Create Table
for k in ("Create Table", "Create View"):
if k in row:
return row[k]
# Fallback: stringify everything
return "\n".join(f"{k}: {v}" for k, v in row.items())
# Normalization patterns to ignore benign diffs between MySQL/TiDB or versions
_NORM_PATTERNS = [
# AUTO_INCREMENT value
(r"AUTO_INCREMENT=\d+\b", "AUTO_INCREMENT=?"),
# Row format / stats
(r"ROW_FORMAT=\w+", "ROW_FORMAT=?"),
(r"STATS_PERSISTENT=\w+", "STATS_PERSISTENT=?"),
(r"STATS_AUTO_RECALC=\w+", "STATS_AUTO_RECALC=?"),
(r"STATS_SAMPLE_PAGES=\d+", "STATS_SAMPLE_PAGES=?"),
(r"CHECKSUM=\d+", "CHECKSUM=?"),
(r"DELAY_KEY_WRITE=\d+", "DELAY_KEY_WRITE=?"),
(r"KEY_BLOCK_SIZE=\d+", "KEY_BLOCK_SIZE=?"),
# Engine options ordering
(r"ENGINE=\w+", "ENGINE=?"),
# Default charset/collation variants # Removed to preserve charset/collation differences
# (r"DEFAULT CHARSET=\w+", "DEFAULT CHARSET=?"),
# (r"DEFAULT CHARACTER SET=\w+", "DEFAULT CHARSET=?"),
# (r"CHARACTER SET \w+", "CHARACTER SET ?"),
# (r"COLLATE=\w+", "COLLATE=?"),
# TiDB clustered index annotation
(r"/\*T!\[clustered_index\]\s+CLUSTERED\s*\*/", ""),
# Comment timestamps
(r"COMMENT='.*?'", "COMMENT='?'"),
# Partitioning metadata (ids can vary)
(r"PARTITION(?: .|\n)*$", "/* PARTITION DEFINITION NORMALIZED */"),
# Backtick spacing inconsistencies
# (r"\s+", " "),
]
def normalize_bigint_specs(sql: str) -> str:
"""
Normalize any BIGINT(<n>) to BIGINT, preserving optional signed/unsigned (any case),
and ensure a trailing space after the type to avoid concatenation (e.g., bigintNOT).
Examples:
BIGINT(20) -> bigint
bigint(8) unsigned -> bigint unsigned
BIGINT ( 20 ) SIGNED -> bigint signed
"""
def _repl(m: re.Match) -> str:
suffix = m.group('suffix')
# Always include a trailing space after the base type
return 'bigint ' + (suffix.lower() if suffix else '')
# First pass: BIGINT(20) [SIGNED|UNSIGNED]
sql = re.sub(
r"(?is)\bbigint\s*\(\s*\d+\s*\)\s*(?P<suffix>unsigned|signed)?\b",
_repl,
sql,
)
# Second pass: any leftover BIGINT(n) without suffix
sql = re.sub(r"(?is)\bbigint\s*\(\s*\d+\s*\)", "bigint ", sql)
return sql
def normalize_ddl(ddl: str) -> str:
s = ddl.strip()
# Type-specific normalizations
s = normalize_bigint_specs(s)
# Lowercase identifiers and keywords uniformly? Better keep case but normalize whitespace and pragmas.
for pat, repl in _NORM_PATTERNS:
s = re.sub(pat, repl, s, flags=re.IGNORECASE | re.DOTALL)
# Remove trailing commas before closing parentheses (style diffs)
s = re.sub(r",[ \t]*\)", ")", s)
# Normalize spacing around parentheses (do not cross newlines)
s = re.sub(r"[ \t]*\([ \t]*", " ( ", s)
s = re.sub(r"[ \t]*\)[ \t]*", " ) ", s)
# Collapse multiple spaces within lines but preserve newlines
s = re.sub(r"[ \t]{2,}", " ", s)
# Trim trailing spaces on each line, preserve line breaks
s = "\n".join(line.rstrip() for line in s.splitlines())
return s.strip()
def unified_diff(a: str, b: str, fromfile: str, tofile: str) -> str:
a_lines = a.splitlines(keepends=False)
b_lines = b.splitlines(keepends=False)
return "\n".join(
difflib.unified_diff(a_lines, b_lines, fromfile=fromfile, tofile=tofile, lineterm="")
)
def table_data_fingerprint(conn, schema: str, table: str, columns: List[str]) -> Tuple[int, int]:
"""
Return (row_count, checksum) for a table using BIT_XOR(CRC32(CONCAT_WS(...))).
Works across MySQL/TiDB. Empty tables return checksum 0.
"""
if not columns:
# No columns (shouldn't happen for BASE TABLE), treat as empty
with conn.cursor() as cur:
cur.execute(f"SELECT COUNT(*) AS cnt FROM `{schema}`.`{table}`")
row = cur.fetchone()
return int(row.get("cnt", 0)), 0
concat_parts = ", ".join([f"COALESCE(CAST(`{c}` AS CHAR), 'NULL')" for c in columns])
concat_expr = f"CONCAT_WS('||', {concat_parts})"
sql = (
f"SELECT COUNT(*) AS cnt, COALESCE(BIT_XOR(CRC32({concat_expr})), 0) AS crc "
f"FROM `{schema}`.`{table}`"
)
with conn.cursor() as cur:
cur.execute(sql)
row = cur.fetchone()
cnt = int(row.get("cnt", 0))
crc = int(row.get("crc", 0)) if row.get("crc") is not None else 0
return cnt, crc
def sql_ident(name: str) -> str:
return f"`{name.replace('`', '``')}`"
def sql_fqtn(schema: str, table: str) -> str:
return f"{sql_ident(schema)}.{sql_ident(table)}"
def sql_literal(conn, value: Any) -> str:
"""Use the connection's escape to render a SQL literal (NULL, numbers, strings, bytes)."""
if value is None:
return "NULL"
# pymysql Connection has .escape() which returns a quoted string for str/bytes, leaves numbers as-is
return conn.escape(value)
def row_crc_expr(columns: List[str]) -> str:
# Build a CRC32 over all columns (as text) to detect per-row changes
if not columns:
return "0"
concat_parts = ", ".join([f"COALESCE(CAST({sql_ident(c)} AS CHAR), 'NULL')" for c in columns])
return f"CRC32(CONCAT_WS('||', {concat_parts}))"
def fetch_pk_crc_map(conn, schema: str, table: str, pk_cols: List[str], all_cols: List[str]) -> Dict[Tuple, int]:
"""Return {pk_tuple: crc32} for every row in the table."""
if not pk_cols:
return {}
pk_sel = ", ".join([sql_ident(c) for c in pk_cols])
sql = (
f"SELECT {pk_sel}, {row_crc_expr(all_cols)} AS _crc FROM {sql_fqtn(schema, table)}"
)
out: Dict[Tuple, int] = {}
with conn.cursor() as cur:
cur.execute(sql)
for row in cur.fetchall():
key = tuple(row[c] for c in pk_cols)
out[key] = int(row["_crc"]) if row["_crc"] is not None else 0
return out
essential_batch = 500
def chunked(iterable, n):
it = iter(iterable)
while True:
chunk = []
try:
for _ in range(n):
chunk.append(next(it))
except StopIteration:
if chunk:
yield chunk
break
yield chunk
def fetch_rows_by_pks(conn, schema: str, table: str, pk_cols: List[str], pks: List[Tuple], all_cols: List[str]) -> List[Dict[str, Any]]:
"""Fetch full rows for the given primary key tuples."""
rows: List[Dict[str, Any]] = []
if not pks:
return rows
col_sel = ", ".join([sql_ident(c) for c in all_cols])
with conn.cursor() as cur:
for batch in chunked(pks, essential_batch):
if len(pk_cols) == 1:
# WHERE pk IN (...)
placeholders = ", ".join([sql_literal(conn, x[0]) for x in batch])
where = f"{sql_ident(pk_cols[0])} IN ({placeholders})"
else:
# WHERE (pk1,pk2,...) IN ((v11,v12,...),(v21,v22,...))
tuple_texts = []
for key in batch:
vals = ", ".join([sql_literal(conn, v) for v in key])
tuple_texts.append(f"({vals})")
where = f"({', '.join(sql_ident(c) for c in pk_cols)}) IN (" + ", ".join(tuple_texts) + ")"
q = f"SELECT {col_sel} FROM {sql_fqtn(schema, table)} WHERE {where}"
cur.execute(q)
rows.extend(cur.fetchall())
return rows
def generate_sync_sql(source_conn, target_conn, schema: str, table: str, pk_cols: List[str], all_cols: List[str], out_dir: str) -> Tuple[str, Dict[str, int]]:
"""
Compare rows by primary key and generate a SQL script that syncs TARGET to SOURCE.
Returns (path, stats_dict).
"""
os.makedirs(out_dir, exist_ok=True)
path = os.path.join(out_dir, f"{schema}.{table}.sync.sql")
src_map = fetch_pk_crc_map(source_conn, schema, table, pk_cols, all_cols)
tgt_map = fetch_pk_crc_map(target_conn, schema, table, pk_cols, all_cols)
src_keys = set(src_map.keys())
tgt_keys = set(tgt_map.keys())
to_insert = sorted(src_keys - tgt_keys)
to_delete = sorted(tgt_keys - src_keys)
to_update = sorted([k for k in (src_keys & tgt_keys) if src_map[k] != tgt_map[k]])
non_pk_cols = [c for c in all_cols if c not in pk_cols]
stmts: List[str] = []
fqtn = sql_fqtn(schema, table)
cols_list = ", ".join(sql_ident(c) for c in all_cols)
# INSERTs and UPDATEs via INSERT ... ON DUPLICATE KEY UPDATE
if to_insert or to_update:
rows_needed = to_insert + to_update
rows = fetch_rows_by_pks(source_conn, schema, table, pk_cols, rows_needed, all_cols)
# index rows by pk tuple for deterministic batching
def key_of(row):
return tuple(row[c] for c in pk_cols)
row_map = {key_of(r): r for r in rows}
for batch_keys in chunked(rows_needed, 200):
values_groups = []
for k in batch_keys:
r = row_map.get(k)
if r is None:
continue # shouldn't happen
vals = ", ".join(sql_literal(source_conn, r[c]) for c in all_cols)
values_groups.append(f"({vals})")
if not values_groups:
continue
upd_list = ", ".join([f"{sql_ident(c)}=VALUES({sql_ident(c)})" for c in non_pk_cols])
stmts.append(
f"INSERT INTO {fqtn} ({cols_list}) VALUES\n " + ",\n ".join(values_groups) +
(f"\nON DUPLICATE KEY UPDATE {upd_list};" if non_pk_cols else ";")
)
# DELETEs
for batch_keys in chunked(to_delete, 500):
if len(pk_cols) == 1:
placeholders = ", ".join(sql_literal(target_conn, x[0]) for x in batch_keys)
where = f"{sql_ident(pk_cols[0])} IN ({placeholders})"
else:
tuple_texts = []
for key in batch_keys:
vals = ", ".join([sql_literal(target_conn, v) for v in key])
tuple_texts.append(f"({vals})")
where = f"({', '.join(sql_ident(c) for c in pk_cols)}) IN (" + ", ".join(tuple_texts) + ")"
stmts.append(f"DELETE FROM {fqtn} WHERE {where};")
with open(path, "w", encoding="utf-8") as f:
f.write(f"-- Sync {fqtn} to match SOURCE at generation time\n")
f.write(f"-- Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"-- Primary key: {', '.join(pk_cols) if pk_cols else '(none)'}\n")
f.write(f"-- Batches: inserts/updates in groups of 200, deletes of 500\n\n")
if not stmts:
f.write("-- No changes necessary.\n")
else:
f.write("SET autocommit=0;\nSTART TRANSACTION;\n\n")
for s in stmts:
f.write(s + "\n\n")
f.write("COMMIT;\n")
return path, {
"insert": len(to_insert),
"update": len(to_update),
"delete": len(to_delete),
}
# --------------------------- Core comparison ---------------------------
def compare_schema(
source_conn, target_conn, schema: str, diffs_dir: str
) -> Tuple[Dict, List[str]]:
"""
Returns:
summary: dict with counts, missing tables, and per-table status
diff_files: list of generated diff file paths
"""
diff_files: List[str] = []
src_tables = get_tables(source_conn, schema)
tgt_tables = get_tables(target_conn, schema)
src_set = set(src_tables)
tgt_set = set(tgt_tables)
only_in_source = sorted(src_set - tgt_set)
only_in_target = sorted(tgt_set - src_set)
in_both = sorted(src_set & tgt_set)
table_results = []
for t in in_both:
src_ddl_raw = show_create_table(source_conn, schema, t)
tgt_ddl_raw = show_create_table(target_conn, schema, t)
src_ddl = normalize_ddl(src_ddl_raw)
tgt_ddl = normalize_ddl(tgt_ddl_raw)
match = (src_ddl == tgt_ddl)
status = "MATCH" if match else "DIFF"
diff_path = None
if not match:
# Produce diff on normalized DDL to avoid cosmetic diffs (e.g., BIGINT(20) vs BIGINT)
diff_text = unified_diff(
src_ddl, tgt_ddl,
fromfile=f"{schema}.{t}@source (normalized)",
tofile=f"{schema}.{t}@target (normalized)"
)
os.makedirs(diffs_dir, exist_ok=True)
diff_path = os.path.join(diffs_dir, f"{schema}.{t}.diff")
with open(diff_path, "w", encoding="utf-8") as f:
f.write(diff_text)
diff_files.append(diff_path)
# Data comparison (row count + checksum) + optional sync SQL generation
fix_sql = None
fix_counts = None
warn_no_pk = False
try:
cols = get_table_columns(source_conn, schema, t)
# Ensure column ordering is consistent on both sides by using source order
src_cnt, src_crc = table_data_fingerprint(source_conn, schema, t, cols)
tgt_cnt, tgt_crc = table_data_fingerprint(target_conn, schema, t, cols)
if src_cnt != tgt_cnt:
data_status = "COUNT_MISMATCH"
else:
data_status = "SAME" if src_crc == tgt_crc else "DIFF"
# If data differs, and table has a primary key (on TARGET), generate sync SQL
if data_status != "SAME":
pk_cols = get_primary_key_columns(target_conn, schema, t)
if pk_cols:
# create per-table SQL under output/sql_fixes
sql_dir = os.path.join(diffs_dir, "..", "sql_fixes")
sql_dir = os.path.normpath(sql_dir)
try:
fix_sql, fix_counts = generate_sync_sql(
source_conn, target_conn, schema, t, pk_cols, cols, sql_dir
)
except Exception as gen_e:
# Surface a clear error in the report but don't crash the run
data_status = f"ERROR: {type(gen_e).__name__}"
fix_sql = None
fix_counts = None
else:
warn_no_pk = True
except Exception as e:
data_status = f"ERROR: {type(e).__name__}"
src_cnt = tgt_cnt = -1
table_results.append({
"table": t,
"status": status, # DDL status
"data_status": data_status,
"src_count": src_cnt,
"tgt_count": tgt_cnt,
"diff_file": diff_path,
"fix_sql": fix_sql,
"fix_counts": fix_counts,
"no_pk_warning": warn_no_pk,
})
summary = {
"schema": schema,
"source_table_count": len(src_tables),
"target_table_count": len(tgt_tables),
"only_in_source": only_in_source,
"only_in_target": only_in_target,
"common_tables": in_both,
"common_table_count": len(in_both),
"tables_compared": table_results,
}
return summary, diff_files
def load_config_from_embedded() -> Dict:
"""Load configuration from the embedded `config` string at the top of this file."""
return toml.loads(config)
def pick_instance(cfg: Dict, name: str) -> Dict:
ds = cfg.get("data-sources", {})
if name not in ds:
raise KeyError(f'Instance "{name}" not found under [data-sources.*] in config.')
return ds[name]
def main():
# --- Load config ---
cfg = load_config_from_embedded()
task = cfg.get("task", {})
output_dir = task.get("output-dir", "./output")
source_name = task["source-instance"]
target_name = task["target-instance"]
check_schemas = task.get("target-check-schema", [])
if not check_schemas:
raise ValueError("No schemas set in [task].target-check-schema")
src_cfg = pick_instance(cfg, source_name)
tgt_cfg = pick_instance(cfg, target_name)
os.makedirs(output_dir, exist_ok=True)
diffs_dir = os.path.join(output_dir, "diffs")
os.makedirs(diffs_dir, exist_ok=True)
# --- Connect ---
print(f"Connecting to source: {source_name} ({src_cfg['host']}:{src_cfg.get('port',3306)})")
print(f"Connecting to target: {target_name} ({tgt_cfg['host']}:{tgt_cfg.get('port',3306)})")
with connect_mysql(src_cfg) as src_conn, connect_mysql(tgt_cfg) as tgt_conn:
all_summaries = []
all_diff_files = []
for schema in check_schemas:
print(f"Comparing schema: {schema}")
summary, diff_files = compare_schema(src_conn, tgt_conn, schema, diffs_dir)
if summary.get("common_table_count", 0) == 0:
print(f"No common tables in schema: {schema}")
all_summaries.append(summary)
all_diff_files.extend(diff_files)
# --- Write Markdown report ---
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report_path = os.path.join(output_dir, "schema_compare_report.md")
def md_escape(s: str) -> str:
return s.replace("|", r"\|")
lines = []
lines.append(f"# Schema Comparison Report")
lines.append("")
lines.append(f"- Generated: {ts}")
lines.append(f"- Source instance: **{md_escape(source_name)}**")
lines.append(f"- Target instance: **{md_escape(target_name)}**")
lines.append(f"- Schemas checked: {', '.join(check_schemas)}")
lines.append("")
for s in all_summaries:
lines.append(f"## Schema `{s['schema']}`")
lines.append("")
lines.append(f"- Source tables: **{s['source_table_count']}**")
lines.append(f"- Target tables: **{s['target_table_count']}**")
same_count = "✅" if s['source_table_count'] == s['target_table_count'] else "❌"
lines.append(f"- Same number of tables: {same_count}")
lines.append(f"- Common tables: **{s.get('common_table_count', 0)}**")
if s.get('common_table_count', 0) == 0:
lines.append("")
lines.append("> ⚠️ No common tables between source and target for this schema; DDL comparison table omitted.")
lines.append("")
if s["only_in_source"]:
lines.append("<details><summary>Tables only in source</summary>")
lines.append("")
for t in s["only_in_source"]:
lines.append(f"- `{t}`")
lines.append("")
lines.append("</details>")
lines.append("")
if s["only_in_target"]:
lines.append("<details><summary>Tables only in target</summary>")
lines.append("")
for t in s["only_in_target"]:
lines.append(f"- `{t}`")
lines.append("")
lines.append("</details>")
lines.append("")
# Table-by-table results (only if there are common tables)
if s.get('common_table_count', 0) > 0:
lines.append("### Table Definition Comparison")
lines.append("")
lines.append("| Table | DDL | Data | Diff File | Fix SQL | Notes |")
lines.append("|---|---|---|---|---|---|")
for tr in s["tables_compared"]:
ddl_status = "✅ MATCH" if tr["status"] == "MATCH" else "❌ DIFF"
ds = tr.get("data_status", "")
if ds == "SAME":
data_status = "✅ SAME"
elif ds == "DIFF":
data_status = "❌ DIFF"
elif ds == "COUNT_MISMATCH":
data_status = f"⚠️ COUNT {tr.get('src_count', '?')} vs {tr.get('tgt_count', '?')}"
elif isinstance(ds, str) and ds.startswith("ERROR:"):
data_status = f"⚠️ {ds}"
else:
data_status = ""
diff_link = (
f"[{os.path.basename(tr['diff_file'])}](./diffs/{os.path.basename(tr['diff_file'])})"
if tr["diff_file"] else ""
)
fix_link = ""
if tr.get("fix_sql"):
fix_base = os.path.basename(tr["fix_sql"])
fix_link = f"[${{fix_base}}](./{fix_base})" if fix_base else ""
notes = ""
if tr.get("fix_counts"):
c = tr["fix_counts"]
notes = f"ins={c['insert']}, upd={c['update']}, del={c['delete']}"
if tr.get("no_pk_warning"):
if notes:
notes += "; "
notes += "skipped: no primary key"
lines.append(f"| `{tr['table']}` | {ddl_status} | {data_status} | {diff_link} | {fix_link} | {notes} |")
lines.append("")
else:
lines.append("_No common tables to compare in this schema._")
lines.append("")
with open(report_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"\nReport written to: {report_path}")
if all_diff_files:
print(f"Diff files in: {diffs_dir} ({len(all_diff_files)} files)")
sql_fix_dir = os.path.join(output_dir, "sql_fixes")
if os.path.isdir(sql_fix_dir):
print(f"SQL fix scripts in: {sql_fix_dir}")
else:
print("No diffs generated (all matching or no common tables).")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment