Skip to content

Instantly share code, notes, and snippets.

@cveilleux
Created December 7, 2023 22:08
Show Gist options
  • Save cveilleux/20fd7574a0a9fe796519336efc273cbe to your computer and use it in GitHub Desktop.
Save cveilleux/20fd7574a0a9fe796519336efc273cbe to your computer and use it in GitHub Desktop.
Advanced unbound python DNS logging module
"""
This is an advanced version of the log.py example found at
https://github.com/NLnetLabs/unbound/blob/master/pythonmod/examples/log.py
Changes:
- Updated for python 3
- Full parsing of resource records in the response using dnslib
- Output in JSON format for easy parsing by other tools.
Requirements:
- Python 3
- dnslib package (tested with 0.9.23)
Configuration:
```
server:
module-config: "validator python iterator"
[...]
python:
python-script: "/etc/unbound/log.py"
```
Example output:
info: DNS_LOG {"request": {"timestamp": 1701986395.7798707, "id": "971B747A2E48D323", "qname": "google.ca.", "qtype": "MX", "qclass": "IN", "client_ip": "192.168.10.138"}, "response": {"code": "NOERROR", "elapsed": 4, "records": [{"type": "MX", "hostname": "google.ca", "ttl": 205, "length": 19, "value": "0 smtp.google.com.", "parse_result": "SUCCESS"}]}}
info: DNS_LOG {"request": {"timestamp": 1701986414.0460334, "id": "4305F3A07ACF3360", "qname": "google.com.", "qtype": "NS", "qclass": "IN", "client_ip": "192.168.10.138"}, "response": {"code": "NOERROR", "elapsed": 5, "records": [{"type": "NS", "hostname": "google.com", "ttl": 17565, "length": 16, "value": "ns1.google.com.", "parse_result": "SUCCESS"}, {"type": "NS", "hostname": "google.com", "ttl": 17565, "length": 16, "value": "ns4.google.com.", "parse_result": "SUCCESS"}, {"type": "NS", "hostname": "google.com", "ttl": 17565, "length": 16, "value": "ns2.google.com.", "parse_result": "SUCCESS"}, {"type": "NS", "hostname": "google.com", "ttl": 17565, "length": 16, "value": "ns3.google.com.", "parse_result": "SUCCESS"}]}}
info: DNS_LOG {"request": {"timestamp": 1701986418.4660685, "id": "2BBFE335E48CEF91", "qname": "amazon.ca.", "qtype": "AAAA", "qclass": "IN", "client_ip": "192.168.10.138"}, "response": {"code": "NOERROR", "elapsed": 4, "records": [{"type": "SOA", "hostname": "amazon.ca", "ttl": 312, "length": 69, "value": "dns-external-master.amazon.com. root.amazon.com. 2010111888 600 300 3024000 900", "parse_result": "SUCCESS"}]}}
info: DNS_LOG {"request": {"timestamp": 1701986418.4618928, "id": "2B76CC367EFCBB8C", "qname": "amazon.ca.", "qtype": "A", "qclass": "IN", "client_ip": "192.168.10.138"}, "response": {"code": "NOERROR", "elapsed": 13, "records": [{"type": "A", "hostname": "amazon.ca", "ttl": 46, "length": 4, "value": "54.239.19.238", "parse_result": "SUCCESS"}, {"type": "A", "hostname": "amazon.ca", "ttl": 46, "length": 4, "value": "54.239.18.172", "parse_result": "SUCCESS"}, {"type": "A", "hostname": "amazon.ca", "ttl": 46, "length": 4, "value": "52.94.225.242", "parse_result": "SUCCESS"}]}}
info: DNS_LOG {"request": {"timestamp": 1701986418.496134, "id": "515FE3BE79FFDB78", "qname": "172.18.239.54.in-addr.arpa.", "qtype": "PTR", "qclass": "IN", "client_ip": "192.168.10.138"}, "response": {"code": "NXDOMAIN", "elapsed": 6, "records": [{"type": "SOA", "hostname": "239.54.in-addr.arpa", "ttl": 843, "length": 69, "value": "dns-external-master.amazon.com. root.amazon.com. 39 3600 900 604800 900", "parse_result": "SUCCESS"}]}}
"""
import json
import random
import time
from dnslib.label import DNSBuffer
from dnslib.dns import RDMAP
RETURN_CODES = {
RCODE_NOERROR: "NOERROR",
RCODE_FORMERR: "FORMERR",
RCODE_SERVFAIL: "SERVFAIL",
RCODE_NXDOMAIN: "NXDOMAIN",
RCODE_NOTIMPL: "NOTIMPL",
RCODE_REFUSED: "REFUSED",
RCODE_YXDOMAIN: "YXDOMAIN",
RCODE_YXRRSET: "YXRRSET",
RCODE_NXRRSET: "NXRRSET",
RCODE_NOTAUTH: "NOTAUTH",
RCODE_NOTZONE: "NOTZONE",
}
def gen_req_id(length=16, alphabet="0123456789ABCDEF"):
rnd = random.SystemRandom()
return "".join(rnd.choice(alphabet) for _ in range(length))
def bytes_to_int(data):
return int.from_bytes(data, byteorder="big", signed=False)
def log_dns_msg(qstate, qdata):
return_msg_rep = qstate.return_msg.rep
reply_list = qstate.mesh_info.reply_list
if not return_msg_rep:
log_warn("log.py: qstate.return_msg.rep missing.")
return
while reply_list:
if not reply_list.query_reply:
log_warn("log.py: qstate.mesh_info.reply_list.query_reply missing.")
else:
rcode = return_msg_rep.flags & 0xF
elapsed = round((time.perf_counter() - qdata["perf_counter"]) * 1000)
request = dict(
timestamp=qdata["request_time"],
id=qdata.get("request_id"),
qname=qstate.qinfo.qname_str,
qtype=qstate.qinfo.qtype_str,
qclass=qstate.qinfo.qclass_str,
client_ip=reply_list.query_reply.addr,
)
log = dict(
request=request,
response=dict(code=RETURN_CODES.get(rcode) or rcode, elapsed=elapsed, records=[]),
)
for i in range(0, return_msg_rep.rrset_count):
rr = return_msg_rep.rrsets[i]
rk = rr.rk
d = rr.entry.data
hostname = rk.dname_str
if hostname[-1] == ".":
hostname = hostname[:-1]
for j in range(0, d.count):
rr_data = d.rr_data[j]
length = bytes_to_int(rr_data[:2])
data = rr_data[2:]
value = None
parse_result = "UNSUPPORTED"
if rk.type_str in RDMAP:
try:
buffer = DNSBuffer(data)
parsed_record = RDMAP[rk.type_str].parse(buffer, length)
value = repr(parsed_record)
parse_result = "SUCCESS"
except Exception as e:
log_warn(
f"log.py: Parsing of {rk.type_str} record failed with error: {e}"
)
parse_result = "ERROR"
log_record = dict(
type=rk.type_str,
hostname=hostname,
ttl=d.rr_ttl[j],
length=length,
value=value,
parse_result=parse_result,
)
log["response"]["records"].append(log_record)
log_info("DNS_LOG " + json.dumps(log))
reply_list = reply_list.next
def init(id, cfg):
return True
def deinit(id):
return True
def inform_super(id, qstate, superqstate, qdata):
return True
def operate(id, event, qstate, qdata):
if event in (MODULE_EVENT_NEW, MODULE_EVENT_PASS):
qdata["request_id"] = gen_req_id()
qdata["request_time"] = time.time()
qdata["perf_counter"] = time.perf_counter()
qstate.ext_state[id] = MODULE_WAIT_MODULE
return True
elif event == MODULE_EVENT_MODDONE:
if qstate.return_msg:
log_dns_msg(qstate, qdata)
qstate.ext_state[id] = MODULE_FINISHED
return True
qstate.ext_state[id] = MODULE_ERROR
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment