Last active
May 17, 2023 03:03
-
-
Save 6d61726b760a/b9b2f7d65eca0a4f3143200fb34b24e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# a quick and dirty dns test script | |
# checks to see if we can resolve a list of hosts | |
# writes to the hec endpoint on the localhost | |
# (expected to be run from hfw) | |
# | |
# cron: * * * * * /root/dnstest.py >/tmp/dnstest.out 2>&1 | |
# | |
# [email protected] | |
import ssl | |
import socket | |
import json | |
import urllib.request | |
import urllib.parse | |
import urllib.error | |
import logging | |
from logging.handlers import RotatingFileHandler | |
ssl._create_default_https_context = ssl._create_unverified_context | |
# logging config | |
logger = logging.getLogger(__name__) # Gets or creates a logger | |
logger.setLevel(logging.INFO) # set log level | |
formatter = logging.Formatter( | |
"%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s" | |
) | |
logFile = "/tmp/dnstest.log" | |
file_handler = RotatingFileHandler( | |
logFile, mode="a", maxBytes=5 * 1024 * 1024, backupCount=2, encoding=None, delay=0 | |
) | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) # add file handler to logger | |
def get_addr_info_wrapper(hostname): | |
# see `man getent` `/ hosts ` | |
# see `man getaddrinfo` | |
results = [] | |
try: | |
result = socket.getaddrinfo(hostname, 0) | |
for r in result: | |
# socket.AddressFamily.AF_INET = ipv4 | |
if ( | |
r[0] is socket.AddressFamily.AF_INET | |
and r[1] is socket.SocketKind.SOCK_RAW | |
): | |
results.append(r[4][0]) | |
return { | |
"host": hostname, | |
"status": "success", | |
# "results": list(dict.fromkeys(results)), | |
"results": results, | |
} | |
except socket.error: | |
return { | |
"host": hostname, | |
"status": "fail", | |
"results": [], | |
} | |
host_list = [ | |
"http-inputs-your.splunkcloud.com", | |
"http-inputs-firehose-your.splunkcloud.com", | |
"inputs1.your.splunkcloud.com", | |
"...", | |
"inputs15.your.splunkcloud.com", | |
] | |
# create our result object | |
result_object = {"source_host": socket.gethostname(), "results": []} | |
# iterate over our host list | |
for host in host_list: | |
# and append test results to our result object | |
result_object["results"].append(get_addr_info_wrapper(host)) | |
# send the result to splunk - we're concerned that we | |
# might have DNS resolution issues so log it locally | |
# (this is running on a hfw) | |
splunk_payload = { | |
"index": "your_index", | |
"sourcetype": "json:dnstest", | |
"event": result_object, | |
} | |
splunk_url = "https://localhost:8088/services/collector/event" | |
# encode our payload | |
encoded_data = json.dumps(splunk_payload).encode("utf8") | |
# create the request, add headers | |
request = urllib.request.Request(splunk_url, data=encoded_data, method="POST") | |
request.add_header("Content-Type", "application/json") | |
request.add_header("Authorization", "Splunk HECTOKEN-HECT-OKEN-HECT-TOKENHECTOKE") | |
try: | |
# send request, read response, log response | |
response = urllib.request.urlopen(request) | |
response_data = response.read().decode("utf-8") | |
logger.info(response_data) | |
except Exception as e: | |
# else log exception | |
logger.error("Exception!", exc_info=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment