Last active
August 29, 2025 00:56
-
-
Save mzhang77/e3e2bd26a56db2ea998a3a2446ceae1e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
config=""" | |
[data-sources] | |
[data-sources.mysql] | |
host = "127.0.0.1" | |
port = 3306 | |
user = "root" | |
password = "" | |
[data-sources.tidb] | |
host = "127.0.0.1" | |
port = 4000 | |
user = "root" | |
password = "" | |
[task] | |
output-dir = "." | |
source-instance = "mysql" | |
target-instance = "tidb" | |
target-check-schema = ["test100"] | |
""" | |
""" | |
Schema comparator for MySQL/TiDB. | |
- Reads a TOML config like the one provided. | |
- Connects to the data source named by [task].source-instance and [task].target-instance | |
- For each schema named in [task].target-check-schema: | |
* Compares table counts | |
* Compares SHOW CREATE TABLE outputs (normalized) | |
- Writes a Markdown report + per-table .diff files into [task].output-dir | |
Dependencies: | |
pip install pymysql toml | |
Usage: | |
python compare_schemas.py | |
# Config is embedded via the `config` variable at the top of the script. | |
""" | |
import base64 | |
import difflib | |
import os | |
import re | |
from datetime import datetime | |
from typing import Dict, List, Tuple | |
from typing import Any | |
import pymysql | |
import toml | |
# --------------------------- Helpers --------------------------- | |
def maybe_b64(s: str) -> str: | |
""" | |
Heuristically decode Base64 passwords. | |
If decoding cleanly produces UTF-8 printables, use it; otherwise return original. | |
Also supports explicit 'base64:' prefix. | |
""" | |
if not s: | |
return s | |
raw = s | |
if s.lower().startswith("base64:"): | |
raw = s[7:] | |
try: | |
decoded = base64.b64decode(raw, validate=True) | |
text = decoded.decode("utf-8") | |
# require mostly printable | |
if sum(ch.isprintable() for ch in text) / max(1, len(text)) > 0.95: | |
return text | |
return s | |
except Exception: | |
return s | |
def connect_mysql(cfg: Dict): | |
""" | |
cfg keys: host, port, user, password | |
""" | |
password = maybe_b64(cfg.get("password", "")) | |
return pymysql.connect( | |
host=cfg["host"], | |
port=int(cfg.get("port", 3306)), | |
user=cfg["user"], | |
password=password, | |
charset="utf8mb4", | |
autocommit=True, | |
cursorclass=pymysql.cursors.DictCursor, | |
) | |
def get_tables(conn, schema: str) -> List[str]: | |
""" | |
Return sorted list of base table names in a schema (excludes views). | |
Works for MySQL and TiDB. | |
""" | |
q = """ | |
SELECT TABLE_NAME | |
FROM information_schema.TABLES | |
WHERE TABLE_SCHEMA=%s AND TABLE_TYPE='BASE TABLE' | |
ORDER BY TABLE_NAME | |
""" | |
with conn.cursor() as cur: | |
cur.execute(q, (schema,)) | |
rows = cur.fetchall() | |
return [r["TABLE_NAME"] for r in rows] | |
def get_table_columns(conn, schema: str, table: str) -> List[str]: | |
"""Return ordered column names for a table.""" | |
q = ( | |
"SELECT COLUMN_NAME FROM information_schema.COLUMNS " | |
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION" | |
) | |
with conn.cursor() as cur: | |
cur.execute(q, (schema, table)) | |
rows = cur.fetchall() | |
return [r["COLUMN_NAME"] for r in rows] | |
def get_primary_key_columns(conn, schema: str, table: str) -> List[str]: | |
"""Return ordered primary key column names for a table. Empty if none.""" | |
q = ( | |
""" | |
SELECT COLUMN_NAME | |
FROM information_schema.KEY_COLUMN_USAGE | |
WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND CONSTRAINT_NAME='PRIMARY' | |
ORDER BY ORDINAL_POSITION | |
""" | |
) | |
with conn.cursor() as cur: | |
cur.execute(q, (schema, table)) | |
rows = cur.fetchall() | |
return [r["COLUMN_NAME"] for r in rows] | |
def show_create_table(conn, schema: str, table: str) -> str: | |
with conn.cursor() as cur: | |
cur.execute(f"SHOW CREATE TABLE `{schema}`.`{table}`") | |
row = cur.fetchone() | |
# MySQL returns keys: Table, Create Table | |
# TiDB returns: Table, Create Table | |
for k in ("Create Table", "Create View"): | |
if k in row: | |
return row[k] | |
# Fallback: stringify everything | |
return "\n".join(f"{k}: {v}" for k, v in row.items()) | |
# Normalization patterns to ignore benign diffs between MySQL/TiDB or versions | |
_NORM_PATTERNS = [ | |
# AUTO_INCREMENT value | |
(r"AUTO_INCREMENT=\d+\b", "AUTO_INCREMENT=?"), | |
# Row format / stats | |
(r"ROW_FORMAT=\w+", "ROW_FORMAT=?"), | |
(r"STATS_PERSISTENT=\w+", "STATS_PERSISTENT=?"), | |
(r"STATS_AUTO_RECALC=\w+", "STATS_AUTO_RECALC=?"), | |
(r"STATS_SAMPLE_PAGES=\d+", "STATS_SAMPLE_PAGES=?"), | |
(r"CHECKSUM=\d+", "CHECKSUM=?"), | |
(r"DELAY_KEY_WRITE=\d+", "DELAY_KEY_WRITE=?"), | |
(r"KEY_BLOCK_SIZE=\d+", "KEY_BLOCK_SIZE=?"), | |
# Engine options ordering | |
(r"ENGINE=\w+", "ENGINE=?"), | |
# Default charset/collation variants # Removed to preserve charset/collation differences | |
# (r"DEFAULT CHARSET=\w+", "DEFAULT CHARSET=?"), | |
# (r"DEFAULT CHARACTER SET=\w+", "DEFAULT CHARSET=?"), | |
# (r"CHARACTER SET \w+", "CHARACTER SET ?"), | |
# (r"COLLATE=\w+", "COLLATE=?"), | |
# TiDB clustered index annotation | |
(r"/\*T!\[clustered_index\]\s+CLUSTERED\s*\*/", ""), | |
# Comment timestamps | |
(r"COMMENT='.*?'", "COMMENT='?'"), | |
# Partitioning metadata (ids can vary) | |
(r"PARTITION(?: .|\n)*$", "/* PARTITION DEFINITION NORMALIZED */"), | |
# Backtick spacing inconsistencies | |
# (r"\s+", " "), | |
] | |
def normalize_bigint_specs(sql: str) -> str: | |
""" | |
Normalize any BIGINT(<n>) to BIGINT, preserving optional signed/unsigned (any case), | |
and ensure a trailing space after the type to avoid concatenation (e.g., bigintNOT). | |
Examples: | |
BIGINT(20) -> bigint | |
bigint(8) unsigned -> bigint unsigned | |
BIGINT ( 20 ) SIGNED -> bigint signed | |
""" | |
def _repl(m: re.Match) -> str: | |
suffix = m.group('suffix') | |
# Always include a trailing space after the base type | |
return 'bigint ' + (suffix.lower() if suffix else '') | |
# First pass: BIGINT(20) [SIGNED|UNSIGNED] | |
sql = re.sub( | |
r"(?is)\bbigint\s*\(\s*\d+\s*\)\s*(?P<suffix>unsigned|signed)?\b", | |
_repl, | |
sql, | |
) | |
# Second pass: any leftover BIGINT(n) without suffix | |
sql = re.sub(r"(?is)\bbigint\s*\(\s*\d+\s*\)", "bigint ", sql) | |
return sql | |
def normalize_ddl(ddl: str) -> str: | |
s = ddl.strip() | |
# Type-specific normalizations | |
s = normalize_bigint_specs(s) | |
# Lowercase identifiers and keywords uniformly? Better keep case but normalize whitespace and pragmas. | |
for pat, repl in _NORM_PATTERNS: | |
s = re.sub(pat, repl, s, flags=re.IGNORECASE | re.DOTALL) | |
# Remove trailing commas before closing parentheses (style diffs) | |
s = re.sub(r",[ \t]*\)", ")", s) | |
# Normalize spacing around parentheses (do not cross newlines) | |
s = re.sub(r"[ \t]*\([ \t]*", " ( ", s) | |
s = re.sub(r"[ \t]*\)[ \t]*", " ) ", s) | |
# Collapse multiple spaces within lines but preserve newlines | |
s = re.sub(r"[ \t]{2,}", " ", s) | |
# Trim trailing spaces on each line, preserve line breaks | |
s = "\n".join(line.rstrip() for line in s.splitlines()) | |
return s.strip() | |
def unified_diff(a: str, b: str, fromfile: str, tofile: str) -> str: | |
a_lines = a.splitlines(keepends=False) | |
b_lines = b.splitlines(keepends=False) | |
return "\n".join( | |
difflib.unified_diff(a_lines, b_lines, fromfile=fromfile, tofile=tofile, lineterm="") | |
) | |
def table_data_fingerprint(conn, schema: str, table: str, columns: List[str]) -> Tuple[int, int]: | |
""" | |
Return (row_count, checksum) for a table using BIT_XOR(CRC32(CONCAT_WS(...))). | |
Works across MySQL/TiDB. Empty tables return checksum 0. | |
""" | |
if not columns: | |
# No columns (shouldn't happen for BASE TABLE), treat as empty | |
with conn.cursor() as cur: | |
cur.execute(f"SELECT COUNT(*) AS cnt FROM `{schema}`.`{table}`") | |
row = cur.fetchone() | |
return int(row.get("cnt", 0)), 0 | |
concat_parts = ", ".join([f"COALESCE(CAST(`{c}` AS CHAR), 'NULL')" for c in columns]) | |
concat_expr = f"CONCAT_WS('||', {concat_parts})" | |
sql = ( | |
f"SELECT COUNT(*) AS cnt, COALESCE(BIT_XOR(CRC32({concat_expr})), 0) AS crc " | |
f"FROM `{schema}`.`{table}`" | |
) | |
with conn.cursor() as cur: | |
cur.execute(sql) | |
row = cur.fetchone() | |
cnt = int(row.get("cnt", 0)) | |
crc = int(row.get("crc", 0)) if row.get("crc") is not None else 0 | |
return cnt, crc | |
def sql_ident(name: str) -> str: | |
return f"`{name.replace('`', '``')}`" | |
def sql_fqtn(schema: str, table: str) -> str: | |
return f"{sql_ident(schema)}.{sql_ident(table)}" | |
def sql_literal(conn, value: Any) -> str: | |
"""Use the connection's escape to render a SQL literal (NULL, numbers, strings, bytes).""" | |
if value is None: | |
return "NULL" | |
# pymysql Connection has .escape() which returns a quoted string for str/bytes, leaves numbers as-is | |
return conn.escape(value) | |
def row_crc_expr(columns: List[str]) -> str: | |
# Build a CRC32 over all columns (as text) to detect per-row changes | |
if not columns: | |
return "0" | |
concat_parts = ", ".join([f"COALESCE(CAST({sql_ident(c)} AS CHAR), 'NULL')" for c in columns]) | |
return f"CRC32(CONCAT_WS('||', {concat_parts}))" | |
def fetch_pk_crc_map(conn, schema: str, table: str, pk_cols: List[str], all_cols: List[str]) -> Dict[Tuple, int]: | |
"""Return {pk_tuple: crc32} for every row in the table.""" | |
if not pk_cols: | |
return {} | |
pk_sel = ", ".join([sql_ident(c) for c in pk_cols]) | |
sql = ( | |
f"SELECT {pk_sel}, {row_crc_expr(all_cols)} AS _crc FROM {sql_fqtn(schema, table)}" | |
) | |
out: Dict[Tuple, int] = {} | |
with conn.cursor() as cur: | |
cur.execute(sql) | |
for row in cur.fetchall(): | |
key = tuple(row[c] for c in pk_cols) | |
out[key] = int(row["_crc"]) if row["_crc"] is not None else 0 | |
return out | |
essential_batch = 500 | |
def chunked(iterable, n): | |
it = iter(iterable) | |
while True: | |
chunk = [] | |
try: | |
for _ in range(n): | |
chunk.append(next(it)) | |
except StopIteration: | |
if chunk: | |
yield chunk | |
break | |
yield chunk | |
def fetch_rows_by_pks(conn, schema: str, table: str, pk_cols: List[str], pks: List[Tuple], all_cols: List[str]) -> List[Dict[str, Any]]: | |
"""Fetch full rows for the given primary key tuples.""" | |
rows: List[Dict[str, Any]] = [] | |
if not pks: | |
return rows | |
col_sel = ", ".join([sql_ident(c) for c in all_cols]) | |
with conn.cursor() as cur: | |
for batch in chunked(pks, essential_batch): | |
if len(pk_cols) == 1: | |
# WHERE pk IN (...) | |
placeholders = ", ".join([sql_literal(conn, x[0]) for x in batch]) | |
where = f"{sql_ident(pk_cols[0])} IN ({placeholders})" | |
else: | |
# WHERE (pk1,pk2,...) IN ((v11,v12,...),(v21,v22,...)) | |
tuple_texts = [] | |
for key in batch: | |
vals = ", ".join([sql_literal(conn, v) for v in key]) | |
tuple_texts.append(f"({vals})") | |
where = f"({', '.join(sql_ident(c) for c in pk_cols)}) IN (" + ", ".join(tuple_texts) + ")" | |
q = f"SELECT {col_sel} FROM {sql_fqtn(schema, table)} WHERE {where}" | |
cur.execute(q) | |
rows.extend(cur.fetchall()) | |
return rows | |
def generate_sync_sql(source_conn, target_conn, schema: str, table: str, pk_cols: List[str], all_cols: List[str], out_dir: str) -> Tuple[str, Dict[str, int]]: | |
""" | |
Compare rows by primary key and generate a SQL script that syncs TARGET to SOURCE. | |
Returns (path, stats_dict). | |
""" | |
os.makedirs(out_dir, exist_ok=True) | |
path = os.path.join(out_dir, f"{schema}.{table}.sync.sql") | |
src_map = fetch_pk_crc_map(source_conn, schema, table, pk_cols, all_cols) | |
tgt_map = fetch_pk_crc_map(target_conn, schema, table, pk_cols, all_cols) | |
src_keys = set(src_map.keys()) | |
tgt_keys = set(tgt_map.keys()) | |
to_insert = sorted(src_keys - tgt_keys) | |
to_delete = sorted(tgt_keys - src_keys) | |
to_update = sorted([k for k in (src_keys & tgt_keys) if src_map[k] != tgt_map[k]]) | |
non_pk_cols = [c for c in all_cols if c not in pk_cols] | |
stmts: List[str] = [] | |
fqtn = sql_fqtn(schema, table) | |
cols_list = ", ".join(sql_ident(c) for c in all_cols) | |
# INSERTs and UPDATEs via INSERT ... ON DUPLICATE KEY UPDATE | |
if to_insert or to_update: | |
rows_needed = to_insert + to_update | |
rows = fetch_rows_by_pks(source_conn, schema, table, pk_cols, rows_needed, all_cols) | |
# index rows by pk tuple for deterministic batching | |
def key_of(row): | |
return tuple(row[c] for c in pk_cols) | |
row_map = {key_of(r): r for r in rows} | |
for batch_keys in chunked(rows_needed, 200): | |
values_groups = [] | |
for k in batch_keys: | |
r = row_map.get(k) | |
if r is None: | |
continue # shouldn't happen | |
vals = ", ".join(sql_literal(source_conn, r[c]) for c in all_cols) | |
values_groups.append(f"({vals})") | |
if not values_groups: | |
continue | |
upd_list = ", ".join([f"{sql_ident(c)}=VALUES({sql_ident(c)})" for c in non_pk_cols]) | |
stmts.append( | |
f"INSERT INTO {fqtn} ({cols_list}) VALUES\n " + ",\n ".join(values_groups) + | |
(f"\nON DUPLICATE KEY UPDATE {upd_list};" if non_pk_cols else ";") | |
) | |
# DELETEs | |
for batch_keys in chunked(to_delete, 500): | |
if len(pk_cols) == 1: | |
placeholders = ", ".join(sql_literal(target_conn, x[0]) for x in batch_keys) | |
where = f"{sql_ident(pk_cols[0])} IN ({placeholders})" | |
else: | |
tuple_texts = [] | |
for key in batch_keys: | |
vals = ", ".join([sql_literal(target_conn, v) for v in key]) | |
tuple_texts.append(f"({vals})") | |
where = f"({', '.join(sql_ident(c) for c in pk_cols)}) IN (" + ", ".join(tuple_texts) + ")" | |
stmts.append(f"DELETE FROM {fqtn} WHERE {where};") | |
with open(path, "w", encoding="utf-8") as f: | |
f.write(f"-- Sync {fqtn} to match SOURCE at generation time\n") | |
f.write(f"-- Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"-- Primary key: {', '.join(pk_cols) if pk_cols else '(none)'}\n") | |
f.write(f"-- Batches: inserts/updates in groups of 200, deletes of 500\n\n") | |
if not stmts: | |
f.write("-- No changes necessary.\n") | |
else: | |
f.write("SET autocommit=0;\nSTART TRANSACTION;\n\n") | |
for s in stmts: | |
f.write(s + "\n\n") | |
f.write("COMMIT;\n") | |
return path, { | |
"insert": len(to_insert), | |
"update": len(to_update), | |
"delete": len(to_delete), | |
} | |
# --------------------------- Core comparison --------------------------- | |
def compare_schema( | |
source_conn, target_conn, schema: str, diffs_dir: str | |
) -> Tuple[Dict, List[str]]: | |
""" | |
Returns: | |
summary: dict with counts, missing tables, and per-table status | |
diff_files: list of generated diff file paths | |
""" | |
diff_files: List[str] = [] | |
src_tables = get_tables(source_conn, schema) | |
tgt_tables = get_tables(target_conn, schema) | |
src_set = set(src_tables) | |
tgt_set = set(tgt_tables) | |
only_in_source = sorted(src_set - tgt_set) | |
only_in_target = sorted(tgt_set - src_set) | |
in_both = sorted(src_set & tgt_set) | |
table_results = [] | |
for t in in_both: | |
src_ddl_raw = show_create_table(source_conn, schema, t) | |
tgt_ddl_raw = show_create_table(target_conn, schema, t) | |
src_ddl = normalize_ddl(src_ddl_raw) | |
tgt_ddl = normalize_ddl(tgt_ddl_raw) | |
match = (src_ddl == tgt_ddl) | |
status = "MATCH" if match else "DIFF" | |
diff_path = None | |
if not match: | |
# Produce diff on normalized DDL to avoid cosmetic diffs (e.g., BIGINT(20) vs BIGINT) | |
diff_text = unified_diff( | |
src_ddl, tgt_ddl, | |
fromfile=f"{schema}.{t}@source (normalized)", | |
tofile=f"{schema}.{t}@target (normalized)" | |
) | |
os.makedirs(diffs_dir, exist_ok=True) | |
diff_path = os.path.join(diffs_dir, f"{schema}.{t}.diff") | |
with open(diff_path, "w", encoding="utf-8") as f: | |
f.write(diff_text) | |
diff_files.append(diff_path) | |
# Data comparison (row count + checksum) + optional sync SQL generation | |
fix_sql = None | |
fix_counts = None | |
warn_no_pk = False | |
try: | |
cols = get_table_columns(source_conn, schema, t) | |
# Ensure column ordering is consistent on both sides by using source order | |
src_cnt, src_crc = table_data_fingerprint(source_conn, schema, t, cols) | |
tgt_cnt, tgt_crc = table_data_fingerprint(target_conn, schema, t, cols) | |
if src_cnt != tgt_cnt: | |
data_status = "COUNT_MISMATCH" | |
else: | |
data_status = "SAME" if src_crc == tgt_crc else "DIFF" | |
# If data differs, and table has a primary key (on TARGET), generate sync SQL | |
if data_status != "SAME": | |
pk_cols = get_primary_key_columns(target_conn, schema, t) | |
if pk_cols: | |
# create per-table SQL under output/sql_fixes | |
sql_dir = os.path.join(diffs_dir, "..", "sql_fixes") | |
sql_dir = os.path.normpath(sql_dir) | |
try: | |
fix_sql, fix_counts = generate_sync_sql( | |
source_conn, target_conn, schema, t, pk_cols, cols, sql_dir | |
) | |
except Exception as gen_e: | |
# Surface a clear error in the report but don't crash the run | |
data_status = f"ERROR: {type(gen_e).__name__}" | |
fix_sql = None | |
fix_counts = None | |
else: | |
warn_no_pk = True | |
except Exception as e: | |
data_status = f"ERROR: {type(e).__name__}" | |
src_cnt = tgt_cnt = -1 | |
table_results.append({ | |
"table": t, | |
"status": status, # DDL status | |
"data_status": data_status, | |
"src_count": src_cnt, | |
"tgt_count": tgt_cnt, | |
"diff_file": diff_path, | |
"fix_sql": fix_sql, | |
"fix_counts": fix_counts, | |
"no_pk_warning": warn_no_pk, | |
}) | |
summary = { | |
"schema": schema, | |
"source_table_count": len(src_tables), | |
"target_table_count": len(tgt_tables), | |
"only_in_source": only_in_source, | |
"only_in_target": only_in_target, | |
"common_tables": in_both, | |
"common_table_count": len(in_both), | |
"tables_compared": table_results, | |
} | |
return summary, diff_files | |
def load_config_from_embedded() -> Dict: | |
"""Load configuration from the embedded `config` string at the top of this file.""" | |
return toml.loads(config) | |
def pick_instance(cfg: Dict, name: str) -> Dict: | |
ds = cfg.get("data-sources", {}) | |
if name not in ds: | |
raise KeyError(f'Instance "{name}" not found under [data-sources.*] in config.') | |
return ds[name] | |
def main(): | |
# --- Load config --- | |
cfg = load_config_from_embedded() | |
task = cfg.get("task", {}) | |
output_dir = task.get("output-dir", "./output") | |
source_name = task["source-instance"] | |
target_name = task["target-instance"] | |
check_schemas = task.get("target-check-schema", []) | |
if not check_schemas: | |
raise ValueError("No schemas set in [task].target-check-schema") | |
src_cfg = pick_instance(cfg, source_name) | |
tgt_cfg = pick_instance(cfg, target_name) | |
os.makedirs(output_dir, exist_ok=True) | |
diffs_dir = os.path.join(output_dir, "diffs") | |
os.makedirs(diffs_dir, exist_ok=True) | |
# --- Connect --- | |
print(f"Connecting to source: {source_name} ({src_cfg['host']}:{src_cfg.get('port',3306)})") | |
print(f"Connecting to target: {target_name} ({tgt_cfg['host']}:{tgt_cfg.get('port',3306)})") | |
with connect_mysql(src_cfg) as src_conn, connect_mysql(tgt_cfg) as tgt_conn: | |
all_summaries = [] | |
all_diff_files = [] | |
for schema in check_schemas: | |
print(f"Comparing schema: {schema}") | |
summary, diff_files = compare_schema(src_conn, tgt_conn, schema, diffs_dir) | |
if summary.get("common_table_count", 0) == 0: | |
print(f"No common tables in schema: {schema}") | |
all_summaries.append(summary) | |
all_diff_files.extend(diff_files) | |
# --- Write Markdown report --- | |
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
report_path = os.path.join(output_dir, "schema_compare_report.md") | |
def md_escape(s: str) -> str: | |
return s.replace("|", r"\|") | |
lines = [] | |
lines.append(f"# Schema Comparison Report") | |
lines.append("") | |
lines.append(f"- Generated: {ts}") | |
lines.append(f"- Source instance: **{md_escape(source_name)}**") | |
lines.append(f"- Target instance: **{md_escape(target_name)}**") | |
lines.append(f"- Schemas checked: {', '.join(check_schemas)}") | |
lines.append("") | |
for s in all_summaries: | |
lines.append(f"## Schema `{s['schema']}`") | |
lines.append("") | |
lines.append(f"- Source tables: **{s['source_table_count']}**") | |
lines.append(f"- Target tables: **{s['target_table_count']}**") | |
same_count = "✅" if s['source_table_count'] == s['target_table_count'] else "❌" | |
lines.append(f"- Same number of tables: {same_count}") | |
lines.append(f"- Common tables: **{s.get('common_table_count', 0)}**") | |
if s.get('common_table_count', 0) == 0: | |
lines.append("") | |
lines.append("> ⚠️ No common tables between source and target for this schema; DDL comparison table omitted.") | |
lines.append("") | |
if s["only_in_source"]: | |
lines.append("<details><summary>Tables only in source</summary>") | |
lines.append("") | |
for t in s["only_in_source"]: | |
lines.append(f"- `{t}`") | |
lines.append("") | |
lines.append("</details>") | |
lines.append("") | |
if s["only_in_target"]: | |
lines.append("<details><summary>Tables only in target</summary>") | |
lines.append("") | |
for t in s["only_in_target"]: | |
lines.append(f"- `{t}`") | |
lines.append("") | |
lines.append("</details>") | |
lines.append("") | |
# Table-by-table results (only if there are common tables) | |
if s.get('common_table_count', 0) > 0: | |
lines.append("### Table Definition Comparison") | |
lines.append("") | |
lines.append("| Table | DDL | Data | Diff File | Fix SQL | Notes |") | |
lines.append("|---|---|---|---|---|---|") | |
for tr in s["tables_compared"]: | |
ddl_status = "✅ MATCH" if tr["status"] == "MATCH" else "❌ DIFF" | |
ds = tr.get("data_status", "") | |
if ds == "SAME": | |
data_status = "✅ SAME" | |
elif ds == "DIFF": | |
data_status = "❌ DIFF" | |
elif ds == "COUNT_MISMATCH": | |
data_status = f"⚠️ COUNT {tr.get('src_count', '?')} vs {tr.get('tgt_count', '?')}" | |
elif isinstance(ds, str) and ds.startswith("ERROR:"): | |
data_status = f"⚠️ {ds}" | |
else: | |
data_status = "" | |
diff_link = ( | |
f"[{os.path.basename(tr['diff_file'])}](./diffs/{os.path.basename(tr['diff_file'])})" | |
if tr["diff_file"] else "" | |
) | |
fix_link = "" | |
if tr.get("fix_sql"): | |
fix_base = os.path.basename(tr["fix_sql"]) | |
fix_link = f"[${{fix_base}}](./{fix_base})" if fix_base else "" | |
notes = "" | |
if tr.get("fix_counts"): | |
c = tr["fix_counts"] | |
notes = f"ins={c['insert']}, upd={c['update']}, del={c['delete']}" | |
if tr.get("no_pk_warning"): | |
if notes: | |
notes += "; " | |
notes += "skipped: no primary key" | |
lines.append(f"| `{tr['table']}` | {ddl_status} | {data_status} | {diff_link} | {fix_link} | {notes} |") | |
lines.append("") | |
else: | |
lines.append("_No common tables to compare in this schema._") | |
lines.append("") | |
with open(report_path, "w", encoding="utf-8") as f: | |
f.write("\n".join(lines)) | |
print(f"\nReport written to: {report_path}") | |
if all_diff_files: | |
print(f"Diff files in: {diffs_dir} ({len(all_diff_files)} files)") | |
sql_fix_dir = os.path.join(output_dir, "sql_fixes") | |
if os.path.isdir(sql_fix_dir): | |
print(f"SQL fix scripts in: {sql_fix_dir}") | |
else: | |
print("No diffs generated (all matching or no common tables).") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment