Last active
August 3, 2025 05:28
-
-
Save raku-cat/e14d6c1d4e8a0665373f0486bc909c7d to your computer and use it in GitHub Desktop.
Python script to generate a DNS zone file for hickory-dns using the ztnet api. Mostly machine generated with AI.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# ztnet-dns: Generate and update a Hickory DNS zone file from ZTNet API | |
import os | |
import sys | |
import re | |
import argparse | |
import json | |
import requests | |
from datetime import datetime | |
try: | |
import dbus | |
except ImportError: | |
dbus = None | |
# Configuration from environment | |
API_ADDRESS = os.getenv("API_ADDRESS", "http://localhost:3000") | |
API_URL = f"{API_ADDRESS}/api/v1" | |
ZONE_STORAGE_DIR = "/var/named" | |
MANAGED_START = "; ztnet-dns managed area START" | |
MANAGED_END = "; ztnet-dns managed area END" | |
MANAGEMENT_NOTICE = ( | |
"; This file is managed by ztnet-dns.py, contents may be overwritten" | |
) | |
# Global for org context | |
ORG_ID = None | |
def log(message): | |
print(message) | |
def is_valid_zone(zone): | |
pattern = r"^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$" | |
return re.match(pattern, zone) is not None | |
def api_url_prefix(): | |
return f"{API_URL}/org/{ORG_ID}" if ORG_ID else API_URL | |
def api_request(url, headers): | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
log(f"API request failed: {e}") | |
return None | |
def get_personal_networks(api_url, headers): | |
return api_request(f"{api_url}/network", headers) | |
def get_orgs(api_url, headers): | |
return api_request(f"{api_url}/org", headers) | |
def get_org_networks(org_id, api_url, headers): | |
return api_request(f"{api_url}/org/{org_id}/network", headers) | |
def get_network_members(nwid, api_url_prefix, headers): | |
return api_request(f"{api_url_prefix}/network/{nwid}/member", headers) | |
def find_network(nwid, api_url, headers): | |
global ORG_ID | |
# Check personal networks | |
personal_nets = get_personal_networks(api_url, headers) | |
if personal_nets: | |
for net in personal_nets: | |
if net and net.get("id") == nwid: | |
ORG_ID = None | |
return net | |
# Check organization networks | |
orgs = get_orgs(api_url, headers) | |
if orgs: | |
for org in orgs: | |
if not org: | |
continue | |
org_id = org.get("id") | |
if not org_id: | |
continue | |
org_nets = get_org_networks(org_id, api_url, headers) | |
if org_nets: | |
for net in org_nets: | |
if net and net.get("id") == nwid: | |
ORG_ID = org_id | |
return net | |
return None | |
def generate_managed_content(zone, ns_ip, record_tuples): | |
# Sort records for consistent ordering | |
record_tuples.sort(key=lambda x: (x[0], x[1])) | |
# Generate A records | |
record_lines = [] | |
for node_name, ip in record_tuples: | |
fqdn = f"{node_name}.{zone}." | |
record_lines.append(f"{fqdn} IN A {ip}") | |
# Generate NS record | |
ns_record = f"{zone}. IN NS ns.{zone}." | |
ns_a_record = f"ns.{zone}. IN A {ns_ip}" | |
return ns_record, ns_a_record, record_lines | |
def extract_soa_serial(soa_content): | |
"""Extract serial from SOA record""" | |
for line in soa_content.split("\n"): | |
if "serial" in line: | |
parts = line.split() | |
if parts: | |
return parts[0] | |
return None | |
def update_soa_serial(soa_content, new_serial): | |
"""Update serial in SOA record""" | |
lines = soa_content.split("\n") | |
updated = [] | |
for line in lines: | |
if "serial" in line: | |
# Preserve indentation and comments | |
parts = line.split(";", 1) | |
if parts: | |
serial_part = parts[0].strip() | |
if serial_part: | |
# Replace only the serial number | |
new_line = re.sub(r"\d+", new_serial, parts[0], count=1) | |
if len(parts) > 1: | |
new_line += ";" + parts[1] | |
updated.append(new_line) | |
continue | |
updated.append(line) | |
return "\n".join(updated) | |
def find_soa_record(content): | |
"""Find SOA record in content""" | |
start = content.find("IN SOA") | |
if start == -1: | |
return None, None | |
# Find the start of the SOA record | |
soa_start = content.rfind("\n", 0, start) | |
if soa_start == -1: | |
soa_start = 0 | |
else: | |
soa_start += 1 | |
# Find the end of the SOA record | |
soa_end = content.find(")", soa_start) | |
if soa_end == -1: | |
return None, None | |
# Include the closing parenthesis | |
soa_end = content.find("\n", soa_end) | |
if soa_end == -1: | |
soa_end = len(content) | |
else: | |
soa_end += 1 | |
return content[soa_start:soa_end], soa_end | |
def parse_zone_file(content, zone): | |
"""Parse zone file into sections""" | |
sections = { | |
"header": "", | |
"managed_start": -1, | |
"managed_end": -1, | |
"soa_record": None, | |
"ns_record": None, | |
"ns_a_record": None, | |
"a_records": [], | |
"footer": "", | |
"is_managed": False, | |
} | |
# Check if managed area exists | |
start_idx = content.find(MANAGED_START) | |
end_idx = content.find(MANAGED_END) | |
if start_idx != -1 and end_idx != -1 and end_idx > start_idx: | |
sections["is_managed"] = True | |
end_idx += len(MANAGED_END) | |
# Extract sections | |
sections["header"] = content[:start_idx] | |
managed_content = content[ | |
start_idx + len(MANAGED_START) : end_idx - len(MANAGED_END) | |
] | |
sections["footer"] = content[end_idx:] | |
# Extract SOA record if exists | |
soa_record, soa_end = find_soa_record(managed_content) | |
if soa_record: | |
sections["soa_record"] = soa_record | |
# Extract records after SOA | |
record_content = managed_content[soa_end:] | |
# Find NS record | |
ns_pattern = re.compile( | |
rf"^{re.escape(zone)}\.?\s+IN\s+NS\s+ns\.{re.escape(zone)}\.?\s*$", | |
re.MULTILINE, | |
) | |
ns_match = ns_pattern.search(record_content) | |
if ns_match: | |
sections["ns_record"] = ns_match.group(0).strip() | |
record_content = record_content.replace(ns_match.group(0), "", 1) | |
# Find NS A record | |
ns_a_pattern = re.compile( | |
rf"^ns\.{re.escape(zone)}\.?\s+IN\s+A\s+(\d+\.\d+\.\d+\.\d+)\s*$", | |
re.MULTILINE, | |
) | |
ns_a_match = ns_a_pattern.search(record_content) | |
if ns_a_match: | |
sections["ns_a_record"] = ns_a_match.group(0).strip() | |
record_content = record_content.replace(ns_a_match.group(0), "", 1) | |
# Extract all other A records | |
a_records = re.findall( | |
r"^.*\.?\s+IN\s+A\s+\d+\.\d+\.\d+\.\d+\s*$", | |
record_content, | |
re.MULTILINE, | |
) | |
sections["a_records"] = [r.strip() for r in a_records if r.strip()] | |
return sections | |
def extract_records_from_content(content, zone): | |
"""Extract managed records from unmanaged content""" | |
records = { | |
"origin": None, | |
"ttl": None, | |
"soa": None, | |
"ns": None, | |
"ns_a": None, | |
"other_content": content, | |
} | |
# Extract $ORIGIN | |
origin_match = re.search(r"^\$ORIGIN\s+(\S+)", content, re.MULTILINE) | |
if origin_match: | |
records["origin"] = origin_match.group(0).strip() | |
records["other_content"] = records["other_content"].replace( | |
origin_match.group(0), "", 1 | |
) | |
# Extract $TTL | |
ttl_match = re.search(r"^\$TTL\s+(\S+)", content, re.MULTILINE) | |
if ttl_match: | |
records["ttl"] = ttl_match.group(0).strip() | |
records["other_content"] = records["other_content"].replace( | |
ttl_match.group(0), "", 1 | |
) | |
# Extract SOA record | |
soa_record, soa_end = find_soa_record(content) | |
if soa_record: | |
records["soa"] = soa_record.strip() | |
records["other_content"] = records["other_content"].replace(soa_record, "", 1) | |
# Extract NS record | |
ns_pattern = re.compile( | |
rf"^{re.escape(zone)}\.?\s+IN\s+NS\s+ns\.{re.escape(zone)}\.?\s*$", re.MULTILINE | |
) | |
ns_match = ns_pattern.search(content) | |
if ns_match: | |
records["ns"] = ns_match.group(0).strip() | |
records["other_content"] = records["other_content"].replace( | |
ns_match.group(0), "", 1 | |
) | |
# Extract NS A record | |
ns_a_pattern = re.compile( | |
rf"^ns\.{re.escape(zone)}\.?\s+IN\s+A\s+(\d+\.\d+\.\d+\.\d+)\s*$", re.MULTILINE | |
) | |
ns_a_match = ns_a_pattern.search(content) | |
if ns_a_match: | |
records["ns_a"] = ns_a_match.group(0).strip() | |
records["other_content"] = records["other_content"].replace( | |
ns_a_match.group(0), "", 1 | |
) | |
# Clean up extra newlines | |
records["other_content"] = re.sub( | |
r"\n\s*\n", "\n\n", records["other_content"] | |
).strip() | |
return records | |
def restart_hickory_dns(): | |
"""Restart Hickory DNS service using dbus""" | |
if dbus is None: | |
log("WARNING: dbus module not available. Cannot restart Hickory DNS.") | |
return False | |
try: | |
bus = dbus.SystemBus() | |
systemd = bus.get_object( | |
"org.freedesktop.systemd1", "/org/freedesktop/systemd1" | |
) | |
manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager") | |
response = manager.RestartUnit("hickory-dns.service", "replace") | |
log("Successfully restarted Hickory DNS service") | |
return True | |
except dbus.exceptions.DBusException as e: | |
log(f"DBus error restarting Hickory DNS: {e}") | |
except Exception as e: | |
log(f"Error restarting Hickory DNS: {e}") | |
return False | |
def should_include_node(node_name): | |
"""Determine if a node should be included in DNS records""" | |
if not node_name: | |
return False | |
# Skip nodes with names ending in '-nodns' | |
if node_name.lower().endswith("-nodns"): | |
return False | |
return True | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Generate DNS zone from ZTNet API", | |
epilog=""" | |
environment variables: | |
ZTNET_API_TOKEN Valid ztnet API token | |
API_ADDRESS (optional) Url to the ztnet instance, defaults to http://localhost:3000 | |
""", | |
formatter_class=argparse.RawTextHelpFormatter | |
) | |
parser.add_argument("network_id", help="ZeroTier network ID") | |
parser.add_argument( | |
"-d", | |
"--dry-run", | |
action="store_true", | |
help="Print generated content without writing files", | |
) | |
parser.add_argument( | |
"--force", | |
action="store_true", | |
help="Force processing of existing unmanaged files", | |
) | |
parser.add_argument( | |
"--no-systemd", action="store_true", help="Skip restarting Hickory DNS service" | |
) | |
args = parser.parse_args() | |
# Read token after parsing arguments to allow --help without token | |
ZTNET_API_TOKEN = os.getenv("ZTNET_API_TOKEN") | |
if not ZTNET_API_TOKEN: | |
sys.exit("ERROR: must set ZTNET_API_TOKEN!") | |
headers = {"x-ztnet-auth": ZTNET_API_TOKEN} | |
network_id = args.network_id | |
dry_run = args.dry_run | |
force = args.force | |
no_systemd = args.no_systemd | |
# Find network information | |
netinfo = find_network(network_id, API_URL, headers) | |
if not netinfo: | |
log(f"ERROR: Network ID {network_id} not found in personal or any organization") | |
return 1 | |
try: | |
zone = netinfo["dns"]["domain"] | |
except KeyError: | |
log("ERROR: Missing DNS domain in network info") | |
return 1 | |
if not is_valid_zone(zone): | |
log(f"ERROR: Invalid domain name from network info: '{zone}'") | |
return 1 | |
zone_path = os.path.join(ZONE_STORAGE_DIR, f"{zone}.zone") | |
# Get network members | |
netmembers = get_network_members(network_id, api_url_prefix(), headers) | |
if not netmembers: | |
log(f"ERROR: Failed to get members for network {network_id}") | |
return 1 | |
try: | |
ns_ip = netinfo["dns"]["servers"][0] | |
except (KeyError, IndexError, TypeError): | |
log("ERROR: No valid DNS server IP found in .dns.servers") | |
return 1 | |
# Process members - skip nodes with no name or names ending in '-nodns' | |
record_tuples = [] | |
for member in netmembers: | |
if not member: | |
continue | |
if not member.get("authorized", False) or not member.get("ipAssignments"): | |
continue | |
if member['name'] is not None: | |
node_name = member.get("name", "").replace(" ", "_") | |
else: | |
continue | |
# Skip nodes with no name or names ending in '-nodns' | |
if not should_include_node(node_name): | |
continue | |
for ip in member["ipAssignments"]: | |
if ip: # skip empty strings | |
record_tuples.append((node_name, ip)) | |
# Generate managed content | |
ns_record, ns_a_record, a_records = generate_managed_content( | |
zone, ns_ip, record_tuples | |
) | |
new_serial = datetime.now().strftime("%Y%m%d%H") | |
# Build managed content block | |
managed_content = f"""{MANAGED_START} | |
$ORIGIN {zone}. | |
$TTL 3600 | |
{{soa_record}} | |
{ns_record} | |
{ns_a_record} | |
""" | |
managed_content += "\n".join(a_records) + f"\n{MANAGED_END}\n" | |
# Handle dry-run for new file | |
if not os.path.exists(zone_path) and dry_run: | |
soa_record = f"""{zone}. IN SOA ns.{zone}. hostmaster.{zone}. ( | |
{new_serial} ; serial | |
3600 ; refresh | |
600 ; retry | |
604800 ; expire | |
86400 ) ; minimum | |
""" | |
output_content = f"{MANAGEMENT_NOTICE}\n" + managed_content.format( | |
soa_record=soa_record | |
) | |
print(output_content) | |
return 0 | |
file_written = False | |
# Handle new file creation (non-dry-run) | |
if not os.path.exists(zone_path): | |
if dry_run: | |
return 0 | |
log(f"Creating new zone file: {zone_path}") | |
soa_record = f"""{zone}. IN SOA ns.{zone}. hostmaster.{zone}. ( | |
{new_serial} ; serial | |
3600 ; refresh | |
600 ; retry | |
604800 ; expire | |
86400 ) ; minimum | |
""" | |
try: | |
with open(zone_path, "w") as f: | |
f.write(f"{MANAGEMENT_NOTICE}\n") | |
f.write(managed_content.format(soa_record=soa_record)) | |
file_written = True | |
except Exception as e: | |
log(f"ERROR creating file: {e}") | |
return 1 | |
else: | |
# Process existing file | |
try: | |
with open(zone_path, "r") as f: | |
current_content = f.read() | |
except Exception as e: | |
log(f"ERROR reading file: {e}") | |
return 1 | |
# Parse existing content | |
sections = parse_zone_file(current_content, zone) | |
# Check if file is managed | |
if not sections["is_managed"]: | |
if not force: | |
error_msg = ( | |
f"ERROR: {zone_path} is not managed by ztnet-dns " | |
"(missing managed block markers). Use --force to override." | |
) | |
log(error_msg) | |
return 1 | |
# Handle unmanaged file (force) | |
log(f"Processing unmanaged file with force: {zone_path}") | |
# Extract managed records from existing content | |
extracted = extract_records_from_content(current_content, zone) | |
# Use existing SOA if available, otherwise create new | |
if extracted["soa"]: | |
# Update serial in existing SOA | |
soa_record = update_soa_serial(extracted["soa"], new_serial) | |
else: | |
soa_record = f"""{zone}. IN SOA ns.{zone}. hostmaster.{zone}. ( | |
{new_serial} ; serial | |
3600 ; refresh | |
600 ; retry | |
604800 ; expire | |
86400 ) ; minimum | |
""" | |
# Use existing NS record if available | |
ns_record = extracted["ns"] or ns_record | |
ns_a_record = extracted["ns_a"] or ns_a_record | |
# Build managed content with extracted/updated records | |
managed_block = managed_content.format(soa_record=soa_record) | |
# Rebuild file content | |
output_content = ( | |
f"{MANAGEMENT_NOTICE}\n{managed_block}\n{extracted['other_content']}" | |
) | |
if dry_run: | |
print(output_content) | |
return 0 | |
try: | |
with open(zone_path, "w") as f: | |
f.write(output_content) | |
log(f"Updated zone file: {zone_path}") | |
file_written = True | |
except Exception as e: | |
log(f"ERROR updating file: {e}") | |
return 1 | |
else: | |
# File is managed - check for changes | |
has_changes = False | |
# Compare NS record | |
new_ns_record = ns_record | |
if sections["ns_record"] != new_ns_record: | |
log( | |
f"NS record changed: '{sections['ns_record']}' -> '{new_ns_record}'" | |
) | |
has_changes = True | |
# Compare NS A record | |
new_ns_a_record = ns_a_record | |
if sections["ns_a_record"] != new_ns_a_record: | |
log( | |
f"NS A record changed: '{sections['ns_a_record']}' -> '{new_ns_a_record}'" | |
) | |
has_changes = True | |
# Compare A records | |
new_a_records = set(a_records) | |
current_a_records = set(sections["a_records"]) | |
added = new_a_records - current_a_records | |
removed = current_a_records - new_a_records | |
if added: | |
sample = list(sorted(added))[:3] | |
sample_str = ", ".join(sample) | |
if len(added) > 3: | |
sample_str += "..." | |
log(f"Added {len(added)} A records: {sample_str}") | |
has_changes = True | |
if removed: | |
sample = list(sorted(removed))[:3] | |
sample_str = ", ".join(sample) | |
if len(removed) > 3: | |
sample_str += "..." | |
log(f"Removed {len(removed)} A records: {sample_str}") | |
has_changes = True | |
# Update SOA serial if changes detected | |
soa_record = sections["soa_record"] or "" | |
if has_changes: | |
# Extract current serial | |
current_serial = extract_soa_serial(soa_record) or "0000000000" | |
# Only update if the new serial would be greater | |
if new_serial > current_serial: | |
soa_record = update_soa_serial(soa_record, new_serial) | |
log(f"Updating SOA serial: {current_serial} -> {new_serial}") | |
else: | |
# Increment serial if current is newer | |
incremented = str(int(current_serial) + 1) | |
soa_record = update_soa_serial(soa_record, incremented) | |
log(f"Incrementing SOA serial: {current_serial} -> {incremented}") | |
# Rebuild managed content | |
new_managed = managed_content.format(soa_record=soa_record) | |
# Rebuild full content | |
output_content = sections["header"] + new_managed + sections["footer"] | |
if dry_run: | |
print(output_content) | |
return 0 | |
# Write file if changes detected | |
if has_changes or not sections["soa_record"]: | |
try: | |
log(f"Updating zone file: {zone_path}") | |
with open(zone_path, "w") as f: | |
f.write(output_content) | |
file_written = True | |
except Exception as e: | |
log(f"ERROR updating file: {e}") | |
return 1 | |
else: | |
log(f"Zone file unchanged: {zone_path}") | |
return 3 | |
# Restart Hickory DNS if we wrote a file | |
if file_written and not dry_run and not no_systemd: | |
if not restart_hickory_dns(): | |
log("WARNING: Hickory DNS not restarted - zone changes might not be active") | |
return 0 | |
if __name__ == "__main__": | |
try: | |
exit_code = main() | |
sys.exit(exit_code) | |
except KeyboardInterrupt: | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment