Last active
November 4, 2019 16:25
-
-
Save onstatus/3f268e12ec6dff0a2046b50ff9ba9fa0 to your computer and use it in GitHub Desktop.
Script to check DNSSEC validation chain using getdns python binding and saving result in influxdb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import getdns | |
import datetime | |
import time | |
from influxdb import InfluxDBClient | |
from influxdb.client import InfluxDBClientError | |
DBUSER = 'influx_dbuser' | |
DBPASSWORD = 'influx_dbpass' | |
DBNAME = 'influx_dbname' | |
DBHOST = 'localhost' | |
DBPORT = 8086 | |
dnssec_status = { | |
400: "DNSSEC_SECURE", | |
401: "DNSSEC_BOGUS", | |
402: "DNSSEC_INDETERINATE", | |
403: "DNSSEC_INSECURE", | |
404: "DNSSEC_NOT_PERFORMED" | |
} | |
recursive_servers = [ | |
[{ "address_type": "IPv4", "address_data": '8.8.8.8' }], | |
[{ "address_type": "IPv4", "address_data": '9.9.9.9' }], | |
] | |
monitored_domains = [ | |
"icann.org", | |
"iana.org", | |
"dnssec-failed.org", | |
] | |
result_extension = { | |
"dnssec_return_status" : getdns.EXTENSION_TRUE, | |
"dnssec_return_validation_chain": getdns.EXTENSION_TRUE | |
} | |
getdns_result = { | |
getdns.RESPSTATUS_GOOD: "GOOD", | |
getdns.RESPSTATUS_NO_NAME: "NO_NAME", | |
getdns.RESPSTATUS_ALL_TIMEOUT: "ALL_TIMEOUT", | |
getdns.RESPSTATUS_NO_SECURE_ANSWERS: "NO_SECURE_ANSWERS", | |
getdns.RESPSTATUS_ALL_BOGUS_ANSWERS: "ALL_BOGUS_ANSWERS" | |
} | |
KSK_tag = { | |
19036: "KSK2010", | |
20326: "KSK2017" | |
} | |
metric = "ksk" | |
def main(): | |
series = [] | |
ctx = getdns.Context() | |
ctx.resolution_type = getdns.RESOLUTION_STUB | |
# For each recursive resolver | |
for resolver in recursive_servers: | |
ctx.upstream_recursive_servers = resolver | |
# For each domain | |
for domain in monitored_domains: | |
pointValues = {} | |
print("Info: Start using resolver {0} to validate {1}".format(resolver[0]["address_data"],domain)) | |
# EDNS, packet size, response time (latency) | |
# Get result | |
try: | |
results = ctx.address(name=domain, extensions=result_extension) | |
validation_chain = results.validation_chain | |
ksk = validation_chain[-1]["rdata"]["key_tag"] | |
dnssec_status_id = 0 | |
dnssec_status_str = "" | |
for result in results.replies_tree: | |
if "dnssec_status" in result.keys(): | |
dnssec_status_id = result["dnssec_status"] | |
dnssec_status_str = dnssec_status[result["dnssec_status"]] | |
break | |
pointValues["measurement"] = metric | |
pointValues["tags"] = {} | |
pointValues["tags"]["resolver"] = resolver[0]["address_data"] | |
pointValues["tags"]["domain"] = domain | |
pointValues["tags"]["value"] = results.status | |
pointValues["tags"]["result_str"] = getdns_result[results.status] | |
pointValues["fields"] = {} | |
pointValues["fields"]["dnssec_status"] = dnssec_status_id | |
pointValues["fields"]["dnssec_status_str"] = dnssec_status_str | |
pointValues["fields"]["ksk"] = ksk | |
pointValues["fields"]["ksk_str"] = KSK_tag[ksk] | |
except Exception, e: | |
print("Error: {0} while using resolver {1} to validate {2}".format(str(e), resolver[0]["address_data"],domain)) | |
print("Info: End using resolver {0} to validate {1}".format(resolver[0]["address_data"],domain)) | |
series.append(pointValues) | |
client = InfluxDBClient(DBHOST, DBPORT, DBUSER, DBPASSWORD, DBNAME) | |
retention_policy = 'ksk_roll' | |
client.create_retention_policy(retention_policy, 'INF', 3, default=True) | |
client.write_points(series, retention_policy=retention_policy) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Result using grafana
