Created
July 3, 2025 17:34
-
-
Save kaniini/821a9eedee96e060ad46fe44393f5148 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Cobbled together from: https://gist.github.com/im-redactd/73528b6e9cdbe28d2554e2821778a6c6 | |
import re | |
from datetime import datetime | |
import argparse | |
import sys | |
from pathlib import Path | |
import time | |
import logging | |
def setup_logging(debug=False): | |
"""Configure logging based on debug flag.""" | |
level = logging.DEBUG if debug else logging.INFO | |
logging.basicConfig( | |
level=level, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
return logging.getLogger(__name__) | |
def parse_timestamp(date_str): | |
"""Convert OPNsense timestamp to Unix epoch time.""" | |
try: | |
# Format from OPNsense: "1 2025/02/17 20:08:29" | |
# Remove the leading number and convert | |
clean_date = ' '.join(date_str.split()[1:]) | |
dt = datetime.strptime(clean_date, '%Y/%m/%d %H:%M:%S') | |
return int(dt.timestamp()) | |
except Exception as e: | |
logger.error(f"Failed to parse timestamp: {date_str}") | |
return None | |
def get_lease_file(path='/var/lib/dhcp/dhcpd.leases'): | |
"""Retrieve the lease file content from OPNsense via SSH.""" | |
logger = logging.getLogger(__name__) | |
with open(path, 'r') as file: | |
contents = file.read() | |
return contents | |
def parse_lease_file(lease_content): | |
"""Parse the DHCP lease file content and return a list of valid leases.""" | |
logger = logging.getLogger(__name__) | |
leases = [] | |
current_lease = None | |
for line in lease_content.split('\n'): | |
line = line.strip() | |
if not line or line.startswith('root@') or line.startswith('#'): | |
continue | |
logger.debug(f"Processing line: {line}") | |
# Start of a lease block | |
if line.startswith('lease'): | |
if current_lease: | |
leases.append(current_lease) | |
logger.debug(f"Added lease: {current_lease}") | |
current_lease = {} | |
ip = line.split()[1] | |
current_lease['ip'] = ip | |
# MAC address | |
elif 'hardware ethernet' in line: | |
mac = line.split()[2].rstrip(';') | |
current_lease['mac'] = mac | |
# Hostname | |
elif 'client-hostname' in line: | |
hostname = line.split('"')[1] if '"' in line else line.split()[1].rstrip(';') | |
current_lease['hostname'] = hostname | |
# Lease state | |
elif line.startswith('binding state '): | |
state = line.split('binding state')[1].strip().rstrip(';') | |
current_lease['state'] = state | |
# End time | |
elif line.startswith('ends'): | |
date_str = ' '.join(line.split()[1:]).rstrip(';') | |
current_lease['ends'] = date_str | |
# Client ID | |
elif line.startswith('uid'): | |
uid = line.split('"')[1] if '"' in line else line.split()[1].rstrip(';') | |
current_lease['uid'] = uid | |
# End of lease block | |
elif line.strip() == '}': | |
if current_lease: | |
leases.append(current_lease) | |
logger.debug(f"Added lease at block end: {current_lease}") | |
current_lease = None | |
# Add the last lease if exists | |
if current_lease: | |
leases.append(current_lease) | |
logger.debug(f"Added final lease: {current_lease}") | |
# Filter only active leases | |
active_leases = [lease for lease in leases | |
if lease.get('state') == 'active' | |
and 'mac' in lease | |
and 'ip' in lease] | |
logger.debug(f"Found {len(active_leases)} active leases out of {len(leases)} total leases") | |
logger.debug("Active leases:") | |
for lease in active_leases: | |
logger.debug(f" {lease}") | |
return active_leases | |
def convert_to_dnsmasq(leases): | |
"""Convert leases to dnsmasq lease file format.""" | |
logger = logging.getLogger(__name__) | |
dnsmasq_lines = [] | |
for lease in leases: | |
logger.debug(f"Converting lease: {lease}") | |
if 'mac' in lease and 'ip' in lease: | |
# Get expiry time as Unix timestamp | |
expiry = lease.get('ends', '') | |
if expiry: | |
expiry_epoch = parse_timestamp(expiry) | |
if not expiry_epoch: | |
logger.error(f"Skipping lease due to invalid timestamp: {lease}") | |
continue | |
else: | |
logger.error(f"Skipping lease due to missing expiry time: {lease}") | |
continue | |
# Get required fields | |
mac = lease['mac'] | |
ip = lease['ip'] | |
hostname = lease.get('hostname', '*') | |
# Format client ID - if not available, use MAC address with '01:' prefix | |
client_id = lease.get('uid', f"01:{mac}") | |
# Clean up client ID - remove escape sequences and quotes | |
client_id = client_id.replace('\\', '').replace('"', '') | |
if not client_id.startswith('01:'): | |
client_id = f"01:{mac}" | |
# Format: [epoch timestamp] [MAC address] [IP address] [hostname] [client ID] | |
line = f"{expiry_epoch} {mac} {ip} {hostname} {client_id}" | |
dnsmasq_lines.append(line) | |
logger.debug(f"Added dnsmasq lease line: {line}") | |
return dnsmasq_lines | |
def main(): | |
parser = argparse.ArgumentParser(description='Convert ISC DHCP leases to dnsmasq format') | |
parser.add_argument('--input', required=True, help='Input file path') | |
parser.add_argument('--output', required=True, help='Output file path') | |
parser.add_argument('--debug', action='store_true', help='Enable debug logging') | |
args = parser.parse_args() | |
# Setup logging | |
logger = setup_logging(args.debug) | |
try: | |
# Get lease file content | |
lease_content = get_lease_file() | |
# Parse leases | |
logger.info("Parsing lease file content") | |
leases = parse_lease_file(lease_content) | |
# Convert to dnsmasq format | |
logger.info("Converting to dnsmasq format") | |
dnsmasq_lines = convert_to_dnsmasq(leases) | |
# Write output file | |
output_path = Path(args.output) | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
logger.info(f"Writing output to {args.output}") | |
with open(output_path, 'w') as f: | |
f.write('\n'.join(dnsmasq_lines) + '\n') | |
logger.info(f"Successfully wrote {len(dnsmasq_lines)} entries to {args.output}") | |
except Exception as e: | |
logger.error(f"Error: {str(e)}") | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment