Skip to content

Instantly share code, notes, and snippets.

@6d61726b760a
Last active May 17, 2023 03:03
Show Gist options
  • Save 6d61726b760a/b9b2f7d65eca0a4f3143200fb34b24e4 to your computer and use it in GitHub Desktop.
Save 6d61726b760a/b9b2f7d65eca0a4f3143200fb34b24e4 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# a quick and dirty dns test script
# checks to see if we can resolve a list of hosts
# writes to the hec endpoint on the localhost
# (expected to be run from hfw)
#
# cron: * * * * * /root/dnstest.py >/tmp/dnstest.out 2>&1
#
# [email protected]
import ssl
import socket
import json
import urllib.request
import urllib.parse
import urllib.error
import logging
from logging.handlers import RotatingFileHandler
ssl._create_default_https_context = ssl._create_unverified_context
# logging config
logger = logging.getLogger(__name__) # Gets or creates a logger
logger.setLevel(logging.INFO) # set log level
formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s"
)
logFile = "/tmp/dnstest.log"
file_handler = RotatingFileHandler(
logFile, mode="a", maxBytes=5 * 1024 * 1024, backupCount=2, encoding=None, delay=0
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler) # add file handler to logger
def get_addr_info_wrapper(hostname):
# see `man getent` `/ hosts `
# see `man getaddrinfo`
results = []
try:
result = socket.getaddrinfo(hostname, 0)
for r in result:
# socket.AddressFamily.AF_INET = ipv4
if (
r[0] is socket.AddressFamily.AF_INET
and r[1] is socket.SocketKind.SOCK_RAW
):
results.append(r[4][0])
return {
"host": hostname,
"status": "success",
# "results": list(dict.fromkeys(results)),
"results": results,
}
except socket.error:
return {
"host": hostname,
"status": "fail",
"results": [],
}
host_list = [
"http-inputs-your.splunkcloud.com",
"http-inputs-firehose-your.splunkcloud.com",
"inputs1.your.splunkcloud.com",
"...",
"inputs15.your.splunkcloud.com",
]
# create our result object
result_object = {"source_host": socket.gethostname(), "results": []}
# iterate over our host list
for host in host_list:
# and append test results to our result object
result_object["results"].append(get_addr_info_wrapper(host))
# send the result to splunk - we're concerned that we
# might have DNS resolution issues so log it locally
# (this is running on a hfw)
splunk_payload = {
"index": "your_index",
"sourcetype": "json:dnstest",
"event": result_object,
}
splunk_url = "https://localhost:8088/services/collector/event"
# encode our payload
encoded_data = json.dumps(splunk_payload).encode("utf8")
# create the request, add headers
request = urllib.request.Request(splunk_url, data=encoded_data, method="POST")
request.add_header("Content-Type", "application/json")
request.add_header("Authorization", "Splunk HECTOKEN-HECT-OKEN-HECT-TOKENHECTOKE")
try:
# send request, read response, log response
response = urllib.request.urlopen(request)
response_data = response.read().decode("utf-8")
logger.info(response_data)
except Exception as e:
# else log exception
logger.error("Exception!", exc_info=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment