Skip to content

Instantly share code, notes, and snippets.

@lechuhuuha
Last active August 29, 2024 01:52
Show Gist options
  • Save lechuhuuha/3deddb1138cf2dd715411ce52f65caaf to your computer and use it in GitHub Desktop.
Save lechuhuuha/3deddb1138cf2dd715411ce52f65caaf to your computer and use it in GitHub Desktop.
use mutil threading for calling socket request to domains, skip any domains that does not respond after 30 seconds
#!/usr/bin/env python3
"""
Compares output of URLFWD records to URL Redirects
$ python3 check_url_redirects.py <apikey>
Prerequisites:
Python 3 install
pip install requests pydig
"""
import socket
import requests
import argparse
import pydig
from datetime import datetime
import csv
from concurrent.futures import ThreadPoolExecutor, as_completed
API_URL = "https://api.nsone.net"
ZONES_TO_SKIP = []
def get_zones_list(url, auth, verify=True):
endpoint = url + "/v1/zones"
response = requests.get(endpoint, headers=auth, verify=verify)
response.raise_for_status()
return response.json()
def get_zone(zone, url, auth, verify=True):
endpoint = url + "/v1/zones/" + zone
response = requests.get(endpoint, headers=auth, verify=verify)
response.raise_for_status()
return response.json()
def is_ipv4(s):
return ':' not in s
dns_cache = {}
def add_custom_dns(domain, port, ip):
key = (domain, port)
if is_ipv4(ip):
value = (socket.AddressFamily.AF_INET, socket.SocketKind.SOCK_STREAM, 6, '', (ip, port))
else: # ipv6
value = (socket.AddressFamily.AF_INET6, 0, 0, '', (ip, port, 0, 0))
dns_cache[key] = [value]
prv_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args):
try:
return dns_cache[args[:2]] # hostname and port
except KeyError:
return prv_getaddrinfo(*args)
socket.getaddrinfo = new_getaddrinfo
def output_csv(summary, display):
filename = "URL_Redirect_summary_" + datetime.now().strftime("%Y%m%d-%H%M%S") + ".csv"
with open(filename, "w", encoding="UTF8") as fout:
csv_writer = csv.writer(fout)
header = ["zone", "domain", "urlfwd_status", "urlfwd_location", "url_redirect_status", "url_redirect_location", "status_match", "location_match"]
csv_writer.writerow(header)
for record in summary:
row = [record["zone"], record["domain"], record["urlfwd_status"], record["urlfwd_location"],
record["redirect_status"], record["redirect_location"], record["status_match"], record["location_match"]]
csv_writer.writerow(row)
if display:
print(open(filename).read())
print("Filename:", filename)
def output_error_csv(errors):
filename = "Error_Domains_" + datetime.now().strftime("%Y%m%d-%H%M%S") + ".csv"
with open(filename, "w", encoding="UTF8") as fout:
csv_writer = csv.writer(fout)
header = ["zone", "domain", "error"]
csv_writer.writerow(header)
for error in errors:
csv_writer.writerow(error)
print("Error domains saved to:", filename)
def process_record(record, zone, urlfwd_ip, url_redirect_ip):
domain = record["domain"]
print("WORKING ON", domain)
# Set timeout to 30 seconds for HTTP requests
timeout = 30
try:
# Get URLFWD Response
add_custom_dns(domain, 80, urlfwd_ip)
res = requests.get('http://' + domain, allow_redirects=False, timeout=timeout)
urlfwd_status = res.status_code
urlfwd_location = res.headers.get('Location', "")
# Get Redirect Response
add_custom_dns(domain, 80, url_redirect_ip)
redirect_res = requests.get('http://' + domain, allow_redirects=False, timeout=timeout)
redirect_status = redirect_res.status_code
redirect_location = redirect_res.headers.get('location', "")
# Compare Status
status_match = "YES" if urlfwd_status == redirect_status else "NO"
# Compare Location
location_match = "YES" if urlfwd_location == redirect_location else "NO"
return {
"zone": zone["zone"],
"domain": domain,
"urlfwd_status": urlfwd_status,
"urlfwd_location": urlfwd_location,
"redirect_status": redirect_status,
"redirect_location": redirect_location,
"status_match": status_match,
"location_match": location_match
}
except requests.exceptions.RequestException as e:
return {"zone": zone["zone"], "domain": domain, "error": str(e)}
if __name__ == "__main__":
help_texts = {
"main": __doc__,
"api_key": "NS1 API key",
"customer": "Specify customer ID if using an operator key",
"display": "Also print results to console",
"verify": "Disable SSL verification"
}
parser = argparse.ArgumentParser(
description=help_texts["main"], formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("api_key", type=str, help=help_texts["api_key"])
parser.add_argument("--customer", type=str, required=False, help=help_texts["customer"])
parser.add_argument("-d", "--display", action="store_true", required=False, help=help_texts["display"])
parser.add_argument("-v", "--verify", action="store_false", required=False, help=help_texts["verify"])
args = parser.parse_args()
AUTH = {"X-NSONE-Key": args.api_key}
if args.customer:
AUTH["X-NSONE-Key"] += "!{customer}".format(customer=args.customer)
# Get IPs to use for separate URLFWD and Redirect tests
urlfwd_ip = pydig.query('urlforward.nsone.net', 'A')[0]
url_redirect_ip = pydig.query('redirectsv2.nsone.net', 'A')[0]
summary = []
errors = []
zones = get_zones_list(API_URL, AUTH, args.verify)
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_record = {}
for zone in zones:
zone["zone"] = zone["zone"].replace("/", "%2F")
if zone["zone"] in ZONES_TO_SKIP:
print("SKIPPED:", zone["zone"])
continue
records = [r for r in get_zone(zone["zone"], API_URL, AUTH, args.verify)["records"]]
for record in records:
if record["type"] == "URLFWD":
future = executor.submit(process_record, record, zone, urlfwd_ip, url_redirect_ip)
future_to_record[future] = record
for future in as_completed(future_to_record):
record = future_to_record[future]
try:
result = future.result()
if result and "error" not in result:
summary.append(result)
elif result and "error" in result:
errors.append([result["zone"], result["domain"], result["error"]])
except Exception as exc:
print(f'Record {record["domain"]} generated an exception: {exc}')
if summary:
output_csv(summary, args.display)
else:
print("No URLFWD records exist in your account")
if errors:
output_error_csv(errors)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment