|
#!/usr/bin/env python3 |
|
""" |
|
DNS Migration Validation Tool |
|
============================== |
|
|
|
Author: @jspiro |
|
Co-authored by: Claude (Anthropic) |
|
License: MIT |
|
|
|
This script validates DNS record equivalency when migrating between DNS providers. |
|
It compares DNS responses from two nameservers to ensure that a migration maintains |
|
functional equivalency, even when the underlying implementations differ (e.g., AWS |
|
ALIAS records vs standard CNAMEs). |
|
|
|
IMPORTANT: The zone file is used ONLY to determine which hostnames and record types |
|
to query. The actual values in the zone file are IGNORED - all comparisons are based |
|
on live DNS responses from the specified nameservers. This allows you to compare |
|
production DNS against a staging setup or validate a migration before/after cutover. |
|
|
|
TESTED WITH: |
|
------------ |
|
- Python 3.6+ |
|
- DiG 9.10.6+ |
|
|
|
REQUIREMENTS: |
|
------------- |
|
- Python 3.6 or higher |
|
- dnspython library (pip install dnspython) |
|
- 'dig' command-line tool (part of bind-utils/dnsutils package) |
|
- Network access to query specified nameservers |
|
- Read access to the zone file |
|
|
|
INPUT FORMATS SUPPORTED: |
|
------------------------ |
|
1. Standard BIND zone file format: |
|
example.com. 300 IN A 192.0.2.1 |
|
www.example.com. 300 IN CNAME example.com. |
|
|
|
2. CLI53 AWS export format with ALIAS records: |
|
subdomain 300 AWS ALIAS A target.elb.amazonaws.com. Z1234567890ABC true |
|
|
|
3. Mixed formats with $ORIGIN directives and relative names |
|
|
|
USAGE: |
|
------ |
|
./nscompare.py <zone_file> <nameserver2> [nameserver1] [record_type] |
|
|
|
Arguments: |
|
zone_file - Path to BIND/CLI53 format zone file (used for hostnames/types only) |
|
nameserver2 - Secondary nameserver to query (e.g., austin.ns.cloudflare.com) |
|
nameserver1 - Primary nameserver to query (default: 8.8.8.8) |
|
record_type - Filter by record type: A, AAAA, CNAME, MX, NS, TXT, SOA, CAA, PTR, SRV, or 'all' (default) |
|
|
|
Examples: |
|
./nscompare.py example.zone ns1.cloudflare.com |
|
./nscompare.py example.zone austin.ns.cloudflare.com 8.8.8.8 MX |
|
""" |
|
|
|
import os |
|
import re |
|
import signal |
|
import subprocess |
|
import sys |
|
import tempfile |
|
|
|
from dns import zone, name, rdatatype |
|
|
|
# Color definitions |
|
RED = "\033[0;31m" |
|
GREEN = "\033[0;32m" |
|
YELLOW = "\033[1;33m" |
|
BLUE = "\033[0;34m" |
|
GRAY = "\033[0;37m" |
|
BOLD_GRAY = "\033[1;37m" |
|
CYAN = "\033[0;36m" |
|
MAGENTA = "\033[0;35m" |
|
BOLD = "\033[1m" |
|
NC = "\033[0m" |
|
|
|
# DNS constants |
|
DEFAULT_NAMESERVER = "8.8.8.8" |
|
PLACEHOLDER_IP = "0.0.0.0" |
|
|
|
|
|
def normalize_hostname(hostname): |
|
"""Remove trailing dots and convert to lowercase""" |
|
return hostname.rstrip(".").lower() if hostname else "" |
|
|
|
|
|
def signal_handler(_sig, _frame): |
|
print(f"\n{YELLOW}Interrupted by user{NC}") |
|
os._exit(130) # Force exit without cleanup |
|
|
|
|
|
def format_ns_result(nameserver, result, max_len, is_match): |
|
"""Format nameserver result line with consistent styling""" |
|
color = GRAY if is_match else RED |
|
return f" {BOLD_GRAY}{nameserver:>{max_len}}{NC}: {color}{result}{NC}" |
|
|
|
|
|
def get_type_color(record_type): |
|
"""Get color for a record type""" |
|
type_colors = { |
|
"A": MAGENTA, |
|
"CNAME": "\033[38;5;208m", |
|
"MX": "\033[38;5;135m", |
|
"NS": "\033[38;5;93m", |
|
"TXT": "\033[38;5;46m", |
|
"SOA": "\033[38;5;208m", |
|
} |
|
return type_colors.get(record_type, GRAY) |
|
|
|
|
|
def format_test_header(test_num, total_tests, record_type, name): |
|
"""Format test header with record type coloring""" |
|
type_color = get_type_color(record_type) |
|
return ( |
|
f"{CYAN}Test [{test_num}/{total_tests}]{NC}: {type_color}{record_type}{NC} {BOLD}{name}{NC}" |
|
) |
|
|
|
|
|
def compare_and_display_results( |
|
ns1, ns2, ns1_result, ns2_result, record_type, expected_differ=False |
|
): |
|
"""Compare results and display with colors and match status""" |
|
# Handle empty results |
|
if not ns1_result: |
|
ns1_result = "FAILED" |
|
if not ns2_result: |
|
ns2_result = "FAILED" |
|
|
|
# Format and display results |
|
max_ns_len = max(len(ns1), len(ns2)) |
|
|
|
# Determine if match (case sensitivity based on record type) |
|
is_match = ( |
|
ns1_result == ns2_result |
|
if record_type == "TXT" |
|
else ns1_result.lower() == ns2_result.lower() |
|
) |
|
|
|
print(format_ns_result(ns1, ns1_result, max_ns_len, is_match)) |
|
print(format_ns_result(ns2, ns2_result, max_ns_len, is_match)) |
|
|
|
# Display match status |
|
if ns1_result == "FAILED" and ns2_result == "FAILED": |
|
print(f" {RED}✗ FAILED{NC}") |
|
return True # Both failed identically - no actual difference |
|
elif ns1_result != "FAILED" and ns2_result != "FAILED" and is_match: |
|
print(f" {GREEN}✓ MATCH{NC}") |
|
return True |
|
elif expected_differ: |
|
print(f" {YELLOW}✗ DIFFER (expected){NC}") |
|
return True # Count expected differences as matches |
|
else: |
|
print(f" {RED}✗ DIFFER{NC}") |
|
return False |
|
|
|
|
|
def convert_cli53_to_bind(filename): |
|
"""Convert CLI53 AWS ALIAS records to A records""" |
|
converted_lines = [] |
|
|
|
with open(filename, "r") as f: |
|
for line in f: |
|
line = line.strip() |
|
if re.search(r"\bAWS\s+ALIAS\s+A\b", line): |
|
# Convert AWS ALIAS to A record with placeholder IP |
|
converted = re.sub( |
|
r"^(\S+)\s+(\d+)\s+AWS\s+ALIAS\s+A\s+(\S+)\s+\S+\s+\S+", |
|
rf"\1 \2 IN A {PLACEHOLDER_IP}", |
|
line, |
|
) |
|
converted_lines.append(converted) |
|
else: |
|
converted_lines.append(line) |
|
|
|
return "\n".join(converted_lines) |
|
|
|
|
|
def extract_records_by_type(bind_file, record_type, skip_apex=False): |
|
"""Extract FQDN names for records of given type using dnspython""" |
|
records, origin = parse_zone_file(bind_file, "ALL") |
|
|
|
names = set() |
|
for fqdn, rtype, _ in records: |
|
if rtype.upper() == record_type.upper(): |
|
if not (skip_apex and fqdn == origin): |
|
names.add(fqdn) |
|
|
|
return sorted(names) |
|
|
|
|
|
def dig_query(nameserver, name, record_type): |
|
"""Execute dig query and return sorted, normalized results""" |
|
try: |
|
if record_type == "NS": |
|
# For NS queries, check both ANSWER and AUTHORITY sections |
|
result = subprocess.run( |
|
["dig", f"@{nameserver}", name, record_type], |
|
capture_output=True, |
|
text=True, |
|
timeout=5, |
|
check=False, |
|
) |
|
if result.returncode == 0: |
|
ns_records = [] |
|
for line in result.stdout.split("\n"): |
|
line = line.strip() |
|
if "\tIN\tNS\t" in line or "\tNS\t" in line: |
|
parts = line.split() |
|
if len(parts) >= 4: |
|
# Normalize: remove trailing dot, lowercase |
|
ns_record = normalize_hostname(parts[-1]) |
|
ns_records.append(ns_record) |
|
if ns_records: |
|
return " ".join(sorted(ns_records)) |
|
return "" |
|
else: |
|
# For CNAME queries, check if we get a delegation (referral) |
|
if record_type == "CNAME": |
|
result = subprocess.run( |
|
["dig", f"@{nameserver}", name, record_type, "+norecurse"], |
|
capture_output=True, |
|
text=True, |
|
timeout=5, |
|
check=False, |
|
) |
|
if result.returncode == 0: |
|
# Check for delegation in AUTHORITY section |
|
in_authority = False |
|
has_delegation = False |
|
for line in result.stdout.split("\n"): |
|
if "AUTHORITY SECTION:" in line: |
|
in_authority = True |
|
continue |
|
if ( |
|
in_authority |
|
and line |
|
and not line.startswith(";") |
|
and ("\tNS\t" in line or " NS " in line) |
|
): |
|
has_delegation = True |
|
break |
|
|
|
# If we have delegation, indicate it |
|
if has_delegation: |
|
return "[DELEGATED]" |
|
|
|
# For other record types or if no delegation, use +short |
|
result = subprocess.run( |
|
["dig", "+short", f"@{nameserver}", name, record_type], |
|
capture_output=True, |
|
text=True, |
|
timeout=5, |
|
check=False, |
|
) |
|
if result.returncode == 0 and result.stdout.strip(): |
|
lines = [] |
|
for line in result.stdout.strip().split("\n"): |
|
if line: |
|
# Normalize: remove trailing dots, lowercase for hostnames |
|
if record_type in ["CNAME", "NS", "MX"]: |
|
# For hostname-containing records, normalize hostnames |
|
if record_type == "MX": |
|
# MX format: "priority hostname" |
|
parts = line.split(None, 1) |
|
if len(parts) == 2: |
|
priority, hostname = parts |
|
normalized = f"{priority} {normalize_hostname(hostname)}" |
|
lines.append(normalized) |
|
else: |
|
lines.append(normalize_hostname(line)) |
|
else: |
|
# CNAME, NS: just hostnames - normalize consistently |
|
lines.append(normalize_hostname(line)) |
|
else: |
|
# A records: keep as-is, TXT records: preserve case (case-sensitive) |
|
lines.append(line) |
|
return " ".join(sorted(lines)) |
|
return "" |
|
except Exception: |
|
return "" |
|
|
|
|
|
def recursive_resolve_local(hostname, record_type="A"): |
|
"""Perform local recursive resolution using dig +trace""" |
|
try: |
|
result = subprocess.run( |
|
["dig", "+trace", "+short", hostname, record_type], |
|
capture_output=True, |
|
text=True, |
|
timeout=15, |
|
check=False, |
|
) |
|
if result.returncode == 0 and result.stdout.strip(): |
|
# Extract final answers from trace output |
|
final_answers = [] |
|
for line in result.stdout.strip().split("\n"): |
|
line = line.strip() |
|
if line and " from server " in line: |
|
# Extract record before " from server" |
|
parts = line.split(" from server ") |
|
if len(parts) >= 2: |
|
record_part = parts[0].strip() |
|
# Parse "A 3.141.68.179" format |
|
record_fields = record_part.split(None, 1) |
|
if len(record_fields) == 2 and record_fields[0] == record_type: |
|
if record_type == "A": |
|
final_answers.append(record_fields[1]) |
|
elif record_type == "CNAME": |
|
final_answers.append(record_fields[1].rstrip(".").lower()) |
|
|
|
if final_answers: |
|
return " ".join(sorted(final_answers)) |
|
return "" |
|
except Exception: |
|
return "" |
|
|
|
|
|
def parse_zone_file(filename, filter_type): |
|
"""Parse zone file using dnspython with CLI53 support""" |
|
|
|
with open(filename) as f: |
|
content = f.read() |
|
|
|
# Convert CLI53 AWS ALIAS format to standard BIND if needed |
|
tmp_file = None |
|
if re.search(r"\bAWS\s+ALIAS\s+A\b", content): |
|
content = convert_cli53_to_bind(filename) |
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".zone", delete=False) as tmp: |
|
tmp.write(content) |
|
tmp_file = tmp.name |
|
filename = tmp_file |
|
|
|
try: |
|
dns_zone = zone.from_file(filename, check_origin=False) |
|
records = [] |
|
|
|
for dns_name, node in dns_zone.nodes.items(): |
|
for rdataset in node.rdatasets: |
|
record_type = rdatatype.to_text(rdataset.rdtype) |
|
|
|
if dns_name == name.empty: |
|
fqdn = str(dns_zone.origin).rstrip(".") |
|
else: |
|
fqdn = str(dns_name.derelativize(dns_zone.origin)).rstrip(".") |
|
|
|
records.append((fqdn, record_type, 0)) |
|
|
|
return records, str(dns_zone.origin).rstrip(".") |
|
|
|
finally: |
|
if tmp_file: |
|
os.unlink(tmp_file) |
|
|
|
|
|
def main(): |
|
signal.signal(signal.SIGINT, signal_handler) |
|
|
|
if len(sys.argv) < 3: |
|
print( |
|
f"{RED}Usage: {sys.argv[0]} <bind_file.zone> <nameserver2> [nameserver1] [record_type]{NC}" |
|
) |
|
print("Record types: A, CNAME, MX, NS, TXT, SOA, all (default)") |
|
print(f"Example: {sys.argv[0]} example.zone austin.ns.cloudflare.com 8.8.8.8 CNAME") |
|
sys.exit(1) |
|
|
|
bind_file = sys.argv[1] |
|
ns2 = sys.argv[2] |
|
ns1 = sys.argv[3] if len(sys.argv) > 3 else DEFAULT_NAMESERVER |
|
filter_type = sys.argv[4] if len(sys.argv) > 4 else "all" |
|
|
|
if not os.path.isfile(bind_file): |
|
print(f"{RED}Error: BIND file {bind_file} not found{NC}") |
|
sys.exit(1) |
|
|
|
try: |
|
records, origin = parse_zone_file(bind_file, filter_type) |
|
|
|
# Pre-sort by record type, then by hostname (normalized) |
|
def sort_key(record): |
|
name, record_type, _line_num = record |
|
normalized_name = normalize_hostname(name) |
|
return (record_type, normalized_name) |
|
|
|
records.sort(key=sort_key) |
|
|
|
# Count all tests for continuous numbering |
|
total_individual = len(records) |
|
total_mx_sets = ( |
|
len(extract_records_by_type(bind_file, "MX")) |
|
if filter_type.upper() in ["ALL", "MX"] |
|
else 0 |
|
) |
|
total_ns_sets = ( |
|
len(extract_records_by_type(bind_file, "NS")) |
|
if filter_type.upper() in ["ALL", "NS"] |
|
else 0 |
|
) |
|
total_txt_sets = ( |
|
len(extract_records_by_type(bind_file, "TXT")) |
|
if filter_type.upper() in ["ALL", "TXT"] |
|
else 0 |
|
) |
|
total_all_tests = total_individual + total_mx_sets + total_ns_sets + total_txt_sets |
|
|
|
all_records, _ = parse_zone_file(bind_file, "ALL") |
|
total_found = len(all_records) |
|
|
|
except Exception as e: |
|
print(f"{RED}Error parsing file: {getattr(e, 'msg', str(e))}{NC}") |
|
|
|
sys.exit(1) |
|
|
|
print(f"{BOLD}DNS Migration Validation{NC}") |
|
print(f"{CYAN}└ Nameservers:{NC} {BOLD_GRAY}{ns1}{NC} vs {BOLD_GRAY}{ns2}{NC}") |
|
print(f"{CYAN}└ Zone file:{NC} {GRAY}{bind_file}{NC}") |
|
print(f"{CYAN}└ Filter:{NC} {YELLOW}{filter_type.upper()}{NC} records") |
|
print(f"{CYAN}└ Scope:{NC} {total_found} found → {BOLD}{total_all_tests}{NC} testing") |
|
print(f"{BLUE}{'='*66}{NC}") |
|
|
|
match_count = 0 |
|
differ_count = 0 |
|
differences = [] |
|
|
|
for i, (name, record_type, _line_num) in enumerate(records, 1): |
|
print(format_test_header(i, len(records), record_type, name)) |
|
|
|
# Handle records that are grouped |
|
if record_type in ["MX", "TXT", "NS"]: |
|
print( |
|
f" (Individual {record_type} record - will be tested as complete {record_type} set)" |
|
) |
|
match_count += 1 |
|
print() |
|
continue |
|
|
|
# Query both nameservers |
|
if record_type == "CNAME": |
|
# Smart CNAME resolution: compare CNAMEs first, fall back to A records only if needed |
|
ns1_cname = dig_query(ns1, name, "CNAME") |
|
ns2_cname = dig_query(ns2, name, "CNAME") |
|
|
|
# Check for delegations |
|
ns1_delegated = ns1_cname == "[DELEGATED]" |
|
ns2_delegated = ns2_cname == "[DELEGATED]" |
|
|
|
# Normalize CNAME results for comparison (excluding delegation markers) |
|
ns1_cname_norm = ( |
|
ns1_cname.rstrip(".").lower() if ns1_cname and not ns1_delegated else "" |
|
) |
|
ns2_cname_norm = ( |
|
ns2_cname.rstrip(".").lower() if ns2_cname and not ns2_delegated else "" |
|
) |
|
|
|
if (ns1_cname_norm and ns2_cname_norm) and not (ns1_delegated or ns2_delegated): |
|
# Both returned CNAMEs - compare them directly |
|
max_ns_len = max(len(ns1), len(ns2)) |
|
is_match = ns1_cname_norm == ns2_cname_norm |
|
print(format_ns_result(ns1, ns1_cname, max_ns_len, is_match)) |
|
print(format_ns_result(ns2, ns2_cname, max_ns_len, is_match)) |
|
ns1_result = f"CNAME {ns1_cname}" |
|
ns2_result = f"CNAME {ns2_cname}" |
|
else: |
|
# Mixed response types or missing CNAMEs - resolve to A records as least common denominator |
|
# Use local recursive resolution (dig +trace) for independence |
|
if ns1_cname and not ns1_delegated: |
|
# Resolve CNAME target using local recursion |
|
ns1_a = recursive_resolve_local(ns1_cname.rstrip("."), "A") |
|
elif not ns1_delegated: |
|
# Direct A query using original nameserver |
|
ns1_a = dig_query(ns1, name, "A") |
|
else: |
|
ns1_a = "" # Delegated, can't resolve here |
|
|
|
if ns2_cname and not ns2_delegated: |
|
# Resolve CNAME target using local recursion |
|
ns2_a = recursive_resolve_local(ns2_cname.rstrip("."), "A") |
|
elif not ns2_delegated: |
|
# Direct A query using original nameserver |
|
ns2_a = dig_query(ns2, name, "A") |
|
else: |
|
ns2_a = "" # Delegated, can't resolve here |
|
|
|
# Show the resolution path |
|
max_ns_len = max(len(ns1), len(ns2), len("trace")) |
|
if ns1_delegated: |
|
print( |
|
f" {BOLD_GRAY}{ns1:>{max_ns_len}}{NC}: {CYAN}(delegated to other NS){NC}" |
|
) |
|
elif ns1_cname: |
|
print(f" {BOLD_GRAY}{ns1:>{max_ns_len}}{NC}: {ns1_cname}") |
|
if ns1_a: |
|
print(f" {BOLD_GRAY}{'':>{max_ns_len-1}}{NC}└→ {ns1_a}") |
|
else: |
|
print(f" {BOLD_GRAY}{'':>{max_ns_len-1}}{NC}└→ (unable to resolve)") |
|
else: |
|
print(f" {BOLD_GRAY}{ns1:>{max_ns_len}}{NC}: {ns1_a if ns1_a else 'FAILED'}") |
|
|
|
if ns2_delegated: |
|
print( |
|
f" {BOLD_GRAY}{ns2:>{max_ns_len}}{NC}: {CYAN}(delegated to other NS){NC}" |
|
) |
|
elif ns2_cname: |
|
print(f" {BOLD_GRAY}{ns2:>{max_ns_len}}{NC}: {ns2_cname}") |
|
if ns2_a: |
|
print(f" {BOLD_GRAY}{'':>{max_ns_len-1}}{NC}└→ {ns2_a}") |
|
else: |
|
print(f" {BOLD_GRAY}{'':>{max_ns_len-1}}{NC}└→ (unable to resolve)") |
|
else: |
|
print(f" {BOLD_GRAY}{ns2:>{max_ns_len}}{NC}: {ns2_a if ns2_a else 'FAILED'}") |
|
|
|
# Use traced results for comparison, but preserve status for difference reporting |
|
if ns1_delegated: |
|
ns1_result = "DELEGATED" |
|
elif ns1_a: |
|
ns1_result = ns1_a |
|
elif ns1_cname: |
|
ns1_result = f"CNAME {ns1_cname}" |
|
else: |
|
ns1_result = "FAILED" |
|
|
|
if ns2_delegated: |
|
ns2_result = "DELEGATED" |
|
elif ns2_a: |
|
ns2_result = ns2_a |
|
elif ns2_cname: |
|
ns2_result = f"CNAME {ns2_cname}" |
|
else: |
|
ns2_result = "FAILED" |
|
else: |
|
ns1_result = dig_query(ns1, name, record_type) |
|
ns2_result = dig_query(ns2, name, record_type) |
|
|
|
# If querying for A records but got CNAME, show trace and resolve |
|
if record_type == "A": |
|
# Check if we got hostnames instead of IPs |
|
ns1_is_hostname = ns1_result and not re.match(r"^[\d\s.]+$", ns1_result) |
|
ns2_is_hostname = ns2_result and not re.match(r"^[\d\s.]+$", ns2_result) |
|
|
|
ns1_cname = ns1_result if ns1_is_hostname else None |
|
ns2_cname = ns2_result if ns2_is_hostname else None |
|
|
|
# Show resolution path if we have CNAMEs |
|
if ns1_cname or ns2_cname: |
|
max_ns_len = max(len(ns1), len(ns2)) |
|
if ns1_cname: |
|
print( |
|
f" {BOLD_GRAY}{ns1:>{max_ns_len}}{NC}: {ns1_result if ns1_result else ns1_cname}" |
|
) |
|
ns1_a = recursive_resolve_local(ns1_cname.rstrip("."), "A") |
|
if ns1_a: |
|
print(f" {BOLD_GRAY}{'':>{max_ns_len-1}}{NC}└→ {ns1_a}") |
|
ns1_result = ns1_a |
|
else: |
|
print(f" {BOLD_GRAY}{ns1:>{max_ns_len}}{NC}: {ns1_result}") |
|
|
|
if ns2_cname: |
|
print( |
|
f" {BOLD_GRAY}{ns2:>{max_ns_len}}{NC}: {ns2_result if ns2_result else ns2_cname}" |
|
) |
|
ns2_a = recursive_resolve_local(ns2_cname.rstrip("."), "A") |
|
if ns2_a: |
|
print(f" {BOLD_GRAY}{'':>{max_ns_len-1}}{NC}└→ {ns2_a}") |
|
ns2_result = ns2_a |
|
else: |
|
print(f" {BOLD_GRAY}{ns2:>{max_ns_len}}{NC}: {ns2_result}") |
|
|
|
record_type = "TRACE_A" |
|
|
|
if record_type not in ["CNAME", "TRACE_A"]: |
|
if not ns1_result: |
|
ns1_result = "FAILED" |
|
if not ns2_result: |
|
ns2_result = "FAILED" |
|
|
|
max_ns_len = max(len(ns1), len(ns2)) |
|
is_match = ns1_result.lower() == ns2_result.lower() |
|
print(format_ns_result(ns1, ns1_result, max_ns_len, is_match)) |
|
print(format_ns_result(ns2, ns2_result, max_ns_len, is_match)) |
|
|
|
expected_differ = (record_type == "SOA") or (record_type == "NS" and name == origin) |
|
|
|
if ns1_result == "FAILED" and ns2_result == "FAILED": |
|
print(f" {RED}✗ FAILED{NC}") |
|
match_count += 1 |
|
elif ( |
|
ns1_result != "FAILED" |
|
and ns2_result != "FAILED" |
|
and ns1_result.lower() == ns2_result.lower() |
|
): |
|
print(f" {GREEN}✓ MATCH{NC}") |
|
match_count += 1 |
|
elif expected_differ: |
|
print(f" {YELLOW}✗ DIFFER (expected){NC}") |
|
match_count += 1 |
|
else: |
|
print(f" {RED}✗ DIFFER{NC}") |
|
differ_count += 1 |
|
differences.append( |
|
{ |
|
"name": name, |
|
"type": record_type, |
|
"ns1": ns1, |
|
"ns1_result": ns1_result, |
|
"ns2": ns2, |
|
"ns2_result": ns2_result, |
|
} |
|
) |
|
print() |
|
print() |
|
|
|
# Test grouped records as complete sets |
|
group_differences = [] |
|
test_counts = {"MX": 0, "NS": 0, "TXT": 0} |
|
match_counts = {"MX": 0, "NS": 0, "TXT": 0} |
|
counters = {"MX": 0, "NS": 0, "TXT": 0} |
|
|
|
def test_record_set(record_type, total_sets, skip_apex=False): |
|
"""Test a complete set of records of given type""" |
|
print(f"{BLUE}{'='*66}{NC}") |
|
print(f"{BLUE}Test {record_type} records as complete sets...{NC}") |
|
print(f"{BLUE}{'='*66}{NC}") |
|
|
|
names = extract_records_by_type(bind_file, record_type, skip_apex) |
|
|
|
for fqdn_name in names: |
|
test_counts[record_type] += 1 |
|
counters[record_type] += 1 |
|
print(format_test_header(counters[record_type], total_sets, record_type, fqdn_name)) |
|
|
|
ns1_result = dig_query(ns1, fqdn_name, record_type) |
|
ns2_result = dig_query(ns2, fqdn_name, record_type) |
|
|
|
# Check if this is an apex NS record (expected to differ) |
|
expected_differ = record_type == "NS" and fqdn_name == origin |
|
is_match = compare_and_display_results( |
|
ns1, ns2, ns1_result, ns2_result, record_type, expected_differ |
|
) |
|
if is_match: |
|
match_counts[record_type] += 1 |
|
else: |
|
group_differences.append( |
|
{ |
|
"name": fqdn_name, |
|
"type": record_type, |
|
"ns1": ns1, |
|
"ns1_result": ns1_result if ns1_result else "FAILED", |
|
"ns2": ns2, |
|
"ns2_result": ns2_result if ns2_result else "FAILED", |
|
} |
|
) |
|
print() |
|
|
|
# Test MX records as complete sets |
|
if filter_type.upper() == "ALL" or filter_type.upper() == "MX": |
|
test_record_set("MX", total_mx_sets) |
|
|
|
# Test NS records as complete sets |
|
if filter_type.upper() == "ALL" or filter_type.upper() == "NS": |
|
test_record_set("NS", total_ns_sets) |
|
|
|
# Test TXT records as complete sets |
|
if filter_type.upper() == "ALL" or filter_type.upper() == "TXT": |
|
test_record_set("TXT", total_txt_sets) |
|
|
|
all_differences = differences + group_differences |
|
|
|
# Final summary |
|
print(f"{BLUE}=================================================================={NC}") |
|
total_matches = match_count + sum(match_counts.values()) |
|
total_differences = differ_count + sum(test_counts[t] - match_counts[t] for t in test_counts) |
|
total_tested = len(records) + sum(test_counts.values()) |
|
|
|
print( |
|
f"Tests: {total_tested}, {GREEN}Matches: {total_matches}{NC}, {RED}Differences: {total_differences}{NC}" |
|
) |
|
if total_differences > 0: |
|
success_rate = (total_matches / total_tested) * 100 |
|
print(f"Success rate: {success_rate:.1f}%") |
|
|
|
# Print difference summary |
|
if all_differences: |
|
print(f"\n{RED}DIFFERENCES SUMMARY:{NC}") |
|
print("=" * 50) |
|
for diff in all_differences: |
|
# Color code based on record type and move type to left |
|
type_color = get_type_color(diff["type"]) |
|
print(f"{type_color}{diff['type']}{NC} {BOLD}{diff['name']}{NC}") |
|
# Use orange for differences instead of red |
|
max_ns_len = max(len(diff["ns1"]), len(diff["ns2"])) |
|
print(f" {BOLD_GRAY}{diff['ns1']:>{max_ns_len}}{NC}: {YELLOW}{diff['ns1_result']}{NC}") |
|
print(f" {BOLD_GRAY}{diff['ns2']:>{max_ns_len}}{NC}: {YELLOW}{diff['ns2_result']}{NC}") |
|
print() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |