Skip to content

Instantly share code, notes, and snippets.

@raku-cat
Last active August 3, 2025 05:28
Show Gist options
  • Save raku-cat/e14d6c1d4e8a0665373f0486bc909c7d to your computer and use it in GitHub Desktop.
Save raku-cat/e14d6c1d4e8a0665373f0486bc909c7d to your computer and use it in GitHub Desktop.
Python script to generate a DNS zone file for hickory-dns using the ztnet api. Mostly machine generated with AI.
#!/usr/bin/env python3
# ztnet-dns: Generate and update a Hickory DNS zone file from ZTNet API
import os
import sys
import re
import argparse
import json
import requests
from datetime import datetime
try:
import dbus
except ImportError:
dbus = None
# Configuration from environment
API_ADDRESS = os.getenv("API_ADDRESS", "http://localhost:3000")
API_URL = f"{API_ADDRESS}/api/v1"
ZONE_STORAGE_DIR = "/var/named"
MANAGED_START = "; ztnet-dns managed area START"
MANAGED_END = "; ztnet-dns managed area END"
MANAGEMENT_NOTICE = (
"; This file is managed by ztnet-dns.py, contents may be overwritten"
)
# Global for org context
ORG_ID = None
def log(message):
print(message)
def is_valid_zone(zone):
pattern = r"^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$"
return re.match(pattern, zone) is not None
def api_url_prefix():
return f"{API_URL}/org/{ORG_ID}" if ORG_ID else API_URL
def api_request(url, headers):
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
log(f"API request failed: {e}")
return None
def get_personal_networks(api_url, headers):
return api_request(f"{api_url}/network", headers)
def get_orgs(api_url, headers):
return api_request(f"{api_url}/org", headers)
def get_org_networks(org_id, api_url, headers):
return api_request(f"{api_url}/org/{org_id}/network", headers)
def get_network_members(nwid, api_url_prefix, headers):
return api_request(f"{api_url_prefix}/network/{nwid}/member", headers)
def find_network(nwid, api_url, headers):
global ORG_ID
# Check personal networks
personal_nets = get_personal_networks(api_url, headers)
if personal_nets:
for net in personal_nets:
if net and net.get("id") == nwid:
ORG_ID = None
return net
# Check organization networks
orgs = get_orgs(api_url, headers)
if orgs:
for org in orgs:
if not org:
continue
org_id = org.get("id")
if not org_id:
continue
org_nets = get_org_networks(org_id, api_url, headers)
if org_nets:
for net in org_nets:
if net and net.get("id") == nwid:
ORG_ID = org_id
return net
return None
def generate_managed_content(zone, ns_ip, record_tuples):
# Sort records for consistent ordering
record_tuples.sort(key=lambda x: (x[0], x[1]))
# Generate A records
record_lines = []
for node_name, ip in record_tuples:
fqdn = f"{node_name}.{zone}."
record_lines.append(f"{fqdn} IN A {ip}")
# Generate NS record
ns_record = f"{zone}. IN NS ns.{zone}."
ns_a_record = f"ns.{zone}. IN A {ns_ip}"
return ns_record, ns_a_record, record_lines
def extract_soa_serial(soa_content):
"""Extract serial from SOA record"""
for line in soa_content.split("\n"):
if "serial" in line:
parts = line.split()
if parts:
return parts[0]
return None
def update_soa_serial(soa_content, new_serial):
"""Update serial in SOA record"""
lines = soa_content.split("\n")
updated = []
for line in lines:
if "serial" in line:
# Preserve indentation and comments
parts = line.split(";", 1)
if parts:
serial_part = parts[0].strip()
if serial_part:
# Replace only the serial number
new_line = re.sub(r"\d+", new_serial, parts[0], count=1)
if len(parts) > 1:
new_line += ";" + parts[1]
updated.append(new_line)
continue
updated.append(line)
return "\n".join(updated)
def find_soa_record(content):
"""Find SOA record in content"""
start = content.find("IN SOA")
if start == -1:
return None, None
# Find the start of the SOA record
soa_start = content.rfind("\n", 0, start)
if soa_start == -1:
soa_start = 0
else:
soa_start += 1
# Find the end of the SOA record
soa_end = content.find(")", soa_start)
if soa_end == -1:
return None, None
# Include the closing parenthesis
soa_end = content.find("\n", soa_end)
if soa_end == -1:
soa_end = len(content)
else:
soa_end += 1
return content[soa_start:soa_end], soa_end
def parse_zone_file(content, zone):
"""Parse zone file into sections"""
sections = {
"header": "",
"managed_start": -1,
"managed_end": -1,
"soa_record": None,
"ns_record": None,
"ns_a_record": None,
"a_records": [],
"footer": "",
"is_managed": False,
}
# Check if managed area exists
start_idx = content.find(MANAGED_START)
end_idx = content.find(MANAGED_END)
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
sections["is_managed"] = True
end_idx += len(MANAGED_END)
# Extract sections
sections["header"] = content[:start_idx]
managed_content = content[
start_idx + len(MANAGED_START) : end_idx - len(MANAGED_END)
]
sections["footer"] = content[end_idx:]
# Extract SOA record if exists
soa_record, soa_end = find_soa_record(managed_content)
if soa_record:
sections["soa_record"] = soa_record
# Extract records after SOA
record_content = managed_content[soa_end:]
# Find NS record
ns_pattern = re.compile(
rf"^{re.escape(zone)}\.?\s+IN\s+NS\s+ns\.{re.escape(zone)}\.?\s*$",
re.MULTILINE,
)
ns_match = ns_pattern.search(record_content)
if ns_match:
sections["ns_record"] = ns_match.group(0).strip()
record_content = record_content.replace(ns_match.group(0), "", 1)
# Find NS A record
ns_a_pattern = re.compile(
rf"^ns\.{re.escape(zone)}\.?\s+IN\s+A\s+(\d+\.\d+\.\d+\.\d+)\s*$",
re.MULTILINE,
)
ns_a_match = ns_a_pattern.search(record_content)
if ns_a_match:
sections["ns_a_record"] = ns_a_match.group(0).strip()
record_content = record_content.replace(ns_a_match.group(0), "", 1)
# Extract all other A records
a_records = re.findall(
r"^.*\.?\s+IN\s+A\s+\d+\.\d+\.\d+\.\d+\s*$",
record_content,
re.MULTILINE,
)
sections["a_records"] = [r.strip() for r in a_records if r.strip()]
return sections
def extract_records_from_content(content, zone):
"""Extract managed records from unmanaged content"""
records = {
"origin": None,
"ttl": None,
"soa": None,
"ns": None,
"ns_a": None,
"other_content": content,
}
# Extract $ORIGIN
origin_match = re.search(r"^\$ORIGIN\s+(\S+)", content, re.MULTILINE)
if origin_match:
records["origin"] = origin_match.group(0).strip()
records["other_content"] = records["other_content"].replace(
origin_match.group(0), "", 1
)
# Extract $TTL
ttl_match = re.search(r"^\$TTL\s+(\S+)", content, re.MULTILINE)
if ttl_match:
records["ttl"] = ttl_match.group(0).strip()
records["other_content"] = records["other_content"].replace(
ttl_match.group(0), "", 1
)
# Extract SOA record
soa_record, soa_end = find_soa_record(content)
if soa_record:
records["soa"] = soa_record.strip()
records["other_content"] = records["other_content"].replace(soa_record, "", 1)
# Extract NS record
ns_pattern = re.compile(
rf"^{re.escape(zone)}\.?\s+IN\s+NS\s+ns\.{re.escape(zone)}\.?\s*$", re.MULTILINE
)
ns_match = ns_pattern.search(content)
if ns_match:
records["ns"] = ns_match.group(0).strip()
records["other_content"] = records["other_content"].replace(
ns_match.group(0), "", 1
)
# Extract NS A record
ns_a_pattern = re.compile(
rf"^ns\.{re.escape(zone)}\.?\s+IN\s+A\s+(\d+\.\d+\.\d+\.\d+)\s*$", re.MULTILINE
)
ns_a_match = ns_a_pattern.search(content)
if ns_a_match:
records["ns_a"] = ns_a_match.group(0).strip()
records["other_content"] = records["other_content"].replace(
ns_a_match.group(0), "", 1
)
# Clean up extra newlines
records["other_content"] = re.sub(
r"\n\s*\n", "\n\n", records["other_content"]
).strip()
return records
def restart_hickory_dns():
"""Restart Hickory DNS service using dbus"""
if dbus is None:
log("WARNING: dbus module not available. Cannot restart Hickory DNS.")
return False
try:
bus = dbus.SystemBus()
systemd = bus.get_object(
"org.freedesktop.systemd1", "/org/freedesktop/systemd1"
)
manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager")
response = manager.RestartUnit("hickory-dns.service", "replace")
log("Successfully restarted Hickory DNS service")
return True
except dbus.exceptions.DBusException as e:
log(f"DBus error restarting Hickory DNS: {e}")
except Exception as e:
log(f"Error restarting Hickory DNS: {e}")
return False
def should_include_node(node_name):
"""Determine if a node should be included in DNS records"""
if not node_name:
return False
# Skip nodes with names ending in '-nodns'
if node_name.lower().endswith("-nodns"):
return False
return True
def main():
parser = argparse.ArgumentParser(
description="Generate DNS zone from ZTNet API",
epilog="""
environment variables:
ZTNET_API_TOKEN Valid ztnet API token
API_ADDRESS (optional) Url to the ztnet instance, defaults to http://localhost:3000
""",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("network_id", help="ZeroTier network ID")
parser.add_argument(
"-d",
"--dry-run",
action="store_true",
help="Print generated content without writing files",
)
parser.add_argument(
"--force",
action="store_true",
help="Force processing of existing unmanaged files",
)
parser.add_argument(
"--no-systemd", action="store_true", help="Skip restarting Hickory DNS service"
)
args = parser.parse_args()
# Read token after parsing arguments to allow --help without token
ZTNET_API_TOKEN = os.getenv("ZTNET_API_TOKEN")
if not ZTNET_API_TOKEN:
sys.exit("ERROR: must set ZTNET_API_TOKEN!")
headers = {"x-ztnet-auth": ZTNET_API_TOKEN}
network_id = args.network_id
dry_run = args.dry_run
force = args.force
no_systemd = args.no_systemd
# Find network information
netinfo = find_network(network_id, API_URL, headers)
if not netinfo:
log(f"ERROR: Network ID {network_id} not found in personal or any organization")
return 1
try:
zone = netinfo["dns"]["domain"]
except KeyError:
log("ERROR: Missing DNS domain in network info")
return 1
if not is_valid_zone(zone):
log(f"ERROR: Invalid domain name from network info: '{zone}'")
return 1
zone_path = os.path.join(ZONE_STORAGE_DIR, f"{zone}.zone")
# Get network members
netmembers = get_network_members(network_id, api_url_prefix(), headers)
if not netmembers:
log(f"ERROR: Failed to get members for network {network_id}")
return 1
try:
ns_ip = netinfo["dns"]["servers"][0]
except (KeyError, IndexError, TypeError):
log("ERROR: No valid DNS server IP found in .dns.servers")
return 1
# Process members - skip nodes with no name or names ending in '-nodns'
record_tuples = []
for member in netmembers:
if not member:
continue
if not member.get("authorized", False) or not member.get("ipAssignments"):
continue
if member['name'] is not None:
node_name = member.get("name", "").replace(" ", "_")
else:
continue
# Skip nodes with no name or names ending in '-nodns'
if not should_include_node(node_name):
continue
for ip in member["ipAssignments"]:
if ip: # skip empty strings
record_tuples.append((node_name, ip))
# Generate managed content
ns_record, ns_a_record, a_records = generate_managed_content(
zone, ns_ip, record_tuples
)
new_serial = datetime.now().strftime("%Y%m%d%H")
# Build managed content block
managed_content = f"""{MANAGED_START}
$ORIGIN {zone}.
$TTL 3600
{{soa_record}}
{ns_record}
{ns_a_record}
"""
managed_content += "\n".join(a_records) + f"\n{MANAGED_END}\n"
# Handle dry-run for new file
if not os.path.exists(zone_path) and dry_run:
soa_record = f"""{zone}. IN SOA ns.{zone}. hostmaster.{zone}. (
{new_serial} ; serial
3600 ; refresh
600 ; retry
604800 ; expire
86400 ) ; minimum
"""
output_content = f"{MANAGEMENT_NOTICE}\n" + managed_content.format(
soa_record=soa_record
)
print(output_content)
return 0
file_written = False
# Handle new file creation (non-dry-run)
if not os.path.exists(zone_path):
if dry_run:
return 0
log(f"Creating new zone file: {zone_path}")
soa_record = f"""{zone}. IN SOA ns.{zone}. hostmaster.{zone}. (
{new_serial} ; serial
3600 ; refresh
600 ; retry
604800 ; expire
86400 ) ; minimum
"""
try:
with open(zone_path, "w") as f:
f.write(f"{MANAGEMENT_NOTICE}\n")
f.write(managed_content.format(soa_record=soa_record))
file_written = True
except Exception as e:
log(f"ERROR creating file: {e}")
return 1
else:
# Process existing file
try:
with open(zone_path, "r") as f:
current_content = f.read()
except Exception as e:
log(f"ERROR reading file: {e}")
return 1
# Parse existing content
sections = parse_zone_file(current_content, zone)
# Check if file is managed
if not sections["is_managed"]:
if not force:
error_msg = (
f"ERROR: {zone_path} is not managed by ztnet-dns "
"(missing managed block markers). Use --force to override."
)
log(error_msg)
return 1
# Handle unmanaged file (force)
log(f"Processing unmanaged file with force: {zone_path}")
# Extract managed records from existing content
extracted = extract_records_from_content(current_content, zone)
# Use existing SOA if available, otherwise create new
if extracted["soa"]:
# Update serial in existing SOA
soa_record = update_soa_serial(extracted["soa"], new_serial)
else:
soa_record = f"""{zone}. IN SOA ns.{zone}. hostmaster.{zone}. (
{new_serial} ; serial
3600 ; refresh
600 ; retry
604800 ; expire
86400 ) ; minimum
"""
# Use existing NS record if available
ns_record = extracted["ns"] or ns_record
ns_a_record = extracted["ns_a"] or ns_a_record
# Build managed content with extracted/updated records
managed_block = managed_content.format(soa_record=soa_record)
# Rebuild file content
output_content = (
f"{MANAGEMENT_NOTICE}\n{managed_block}\n{extracted['other_content']}"
)
if dry_run:
print(output_content)
return 0
try:
with open(zone_path, "w") as f:
f.write(output_content)
log(f"Updated zone file: {zone_path}")
file_written = True
except Exception as e:
log(f"ERROR updating file: {e}")
return 1
else:
# File is managed - check for changes
has_changes = False
# Compare NS record
new_ns_record = ns_record
if sections["ns_record"] != new_ns_record:
log(
f"NS record changed: '{sections['ns_record']}' -> '{new_ns_record}'"
)
has_changes = True
# Compare NS A record
new_ns_a_record = ns_a_record
if sections["ns_a_record"] != new_ns_a_record:
log(
f"NS A record changed: '{sections['ns_a_record']}' -> '{new_ns_a_record}'"
)
has_changes = True
# Compare A records
new_a_records = set(a_records)
current_a_records = set(sections["a_records"])
added = new_a_records - current_a_records
removed = current_a_records - new_a_records
if added:
sample = list(sorted(added))[:3]
sample_str = ", ".join(sample)
if len(added) > 3:
sample_str += "..."
log(f"Added {len(added)} A records: {sample_str}")
has_changes = True
if removed:
sample = list(sorted(removed))[:3]
sample_str = ", ".join(sample)
if len(removed) > 3:
sample_str += "..."
log(f"Removed {len(removed)} A records: {sample_str}")
has_changes = True
# Update SOA serial if changes detected
soa_record = sections["soa_record"] or ""
if has_changes:
# Extract current serial
current_serial = extract_soa_serial(soa_record) or "0000000000"
# Only update if the new serial would be greater
if new_serial > current_serial:
soa_record = update_soa_serial(soa_record, new_serial)
log(f"Updating SOA serial: {current_serial} -> {new_serial}")
else:
# Increment serial if current is newer
incremented = str(int(current_serial) + 1)
soa_record = update_soa_serial(soa_record, incremented)
log(f"Incrementing SOA serial: {current_serial} -> {incremented}")
# Rebuild managed content
new_managed = managed_content.format(soa_record=soa_record)
# Rebuild full content
output_content = sections["header"] + new_managed + sections["footer"]
if dry_run:
print(output_content)
return 0
# Write file if changes detected
if has_changes or not sections["soa_record"]:
try:
log(f"Updating zone file: {zone_path}")
with open(zone_path, "w") as f:
f.write(output_content)
file_written = True
except Exception as e:
log(f"ERROR updating file: {e}")
return 1
else:
log(f"Zone file unchanged: {zone_path}")
return 3
# Restart Hickory DNS if we wrote a file
if file_written and not dry_run and not no_systemd:
if not restart_hickory_dns():
log("WARNING: Hickory DNS not restarted - zone changes might not be active")
return 0
if __name__ == "__main__":
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment