Skip to content

Instantly share code, notes, and snippets.

@onstatus
Last active November 4, 2019 16:25
Show Gist options
  • Save onstatus/3f268e12ec6dff0a2046b50ff9ba9fa0 to your computer and use it in GitHub Desktop.
Save onstatus/3f268e12ec6dff0a2046b50ff9ba9fa0 to your computer and use it in GitHub Desktop.
Script to check DNSSEC validation chain using getdns python binding and saving result in influxdb
import getdns
import datetime
import time
from influxdb import InfluxDBClient
from influxdb.client import InfluxDBClientError
DBUSER = 'influx_dbuser'
DBPASSWORD = 'influx_dbpass'
DBNAME = 'influx_dbname'
DBHOST = 'localhost'
DBPORT = 8086
dnssec_status = {
400: "DNSSEC_SECURE",
401: "DNSSEC_BOGUS",
402: "DNSSEC_INDETERINATE",
403: "DNSSEC_INSECURE",
404: "DNSSEC_NOT_PERFORMED"
}
recursive_servers = [
[{ "address_type": "IPv4", "address_data": '8.8.8.8' }],
[{ "address_type": "IPv4", "address_data": '9.9.9.9' }],
]
monitored_domains = [
"icann.org",
"iana.org",
"dnssec-failed.org",
]
result_extension = {
"dnssec_return_status" : getdns.EXTENSION_TRUE,
"dnssec_return_validation_chain": getdns.EXTENSION_TRUE
}
getdns_result = {
getdns.RESPSTATUS_GOOD: "GOOD",
getdns.RESPSTATUS_NO_NAME: "NO_NAME",
getdns.RESPSTATUS_ALL_TIMEOUT: "ALL_TIMEOUT",
getdns.RESPSTATUS_NO_SECURE_ANSWERS: "NO_SECURE_ANSWERS",
getdns.RESPSTATUS_ALL_BOGUS_ANSWERS: "ALL_BOGUS_ANSWERS"
}
KSK_tag = {
19036: "KSK2010",
20326: "KSK2017"
}
metric = "ksk"
def main():
series = []
ctx = getdns.Context()
ctx.resolution_type = getdns.RESOLUTION_STUB
# For each recursive resolver
for resolver in recursive_servers:
ctx.upstream_recursive_servers = resolver
# For each domain
for domain in monitored_domains:
pointValues = {}
print("Info: Start using resolver {0} to validate {1}".format(resolver[0]["address_data"],domain))
# EDNS, packet size, response time (latency)
# Get result
try:
results = ctx.address(name=domain, extensions=result_extension)
validation_chain = results.validation_chain
ksk = validation_chain[-1]["rdata"]["key_tag"]
dnssec_status_id = 0
dnssec_status_str = ""
for result in results.replies_tree:
if "dnssec_status" in result.keys():
dnssec_status_id = result["dnssec_status"]
dnssec_status_str = dnssec_status[result["dnssec_status"]]
break
pointValues["measurement"] = metric
pointValues["tags"] = {}
pointValues["tags"]["resolver"] = resolver[0]["address_data"]
pointValues["tags"]["domain"] = domain
pointValues["tags"]["value"] = results.status
pointValues["tags"]["result_str"] = getdns_result[results.status]
pointValues["fields"] = {}
pointValues["fields"]["dnssec_status"] = dnssec_status_id
pointValues["fields"]["dnssec_status_str"] = dnssec_status_str
pointValues["fields"]["ksk"] = ksk
pointValues["fields"]["ksk_str"] = KSK_tag[ksk]
except Exception, e:
print("Error: {0} while using resolver {1} to validate {2}".format(str(e), resolver[0]["address_data"],domain))
print("Info: End using resolver {0} to validate {1}".format(resolver[0]["address_data"],domain))
series.append(pointValues)
client = InfluxDBClient(DBHOST, DBPORT, DBUSER, DBPASSWORD, DBNAME)
retention_policy = 'ksk_roll'
client.create_retention_policy(retention_policy, 'INF', 3, default=True)
client.write_points(series, retention_policy=retention_policy)
if __name__ == "__main__":
main()
@onstatus
Copy link
Author

Result using grafana
ksk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment