Last active
August 29, 2024 01:52
-
-
Save lechuhuuha/3deddb1138cf2dd715411ce52f65caaf to your computer and use it in GitHub Desktop.
use mutil threading for calling socket request to domains, skip any domains that does not respond after 30 seconds
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Compares output of URLFWD records to URL Redirects | |
$ python3 check_url_redirects.py <apikey> | |
Prerequisites: | |
Python 3 install | |
pip install requests pydig | |
""" | |
import socket | |
import requests | |
import argparse | |
import pydig | |
from datetime import datetime | |
import csv | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
API_URL = "https://api.nsone.net" | |
ZONES_TO_SKIP = [] | |
def get_zones_list(url, auth, verify=True): | |
endpoint = url + "/v1/zones" | |
response = requests.get(endpoint, headers=auth, verify=verify) | |
response.raise_for_status() | |
return response.json() | |
def get_zone(zone, url, auth, verify=True): | |
endpoint = url + "/v1/zones/" + zone | |
response = requests.get(endpoint, headers=auth, verify=verify) | |
response.raise_for_status() | |
return response.json() | |
def is_ipv4(s): | |
return ':' not in s | |
dns_cache = {} | |
def add_custom_dns(domain, port, ip): | |
key = (domain, port) | |
if is_ipv4(ip): | |
value = (socket.AddressFamily.AF_INET, socket.SocketKind.SOCK_STREAM, 6, '', (ip, port)) | |
else: # ipv6 | |
value = (socket.AddressFamily.AF_INET6, 0, 0, '', (ip, port, 0, 0)) | |
dns_cache[key] = [value] | |
prv_getaddrinfo = socket.getaddrinfo | |
def new_getaddrinfo(*args): | |
try: | |
return dns_cache[args[:2]] # hostname and port | |
except KeyError: | |
return prv_getaddrinfo(*args) | |
socket.getaddrinfo = new_getaddrinfo | |
def output_csv(summary, display): | |
filename = "URL_Redirect_summary_" + datetime.now().strftime("%Y%m%d-%H%M%S") + ".csv" | |
with open(filename, "w", encoding="UTF8") as fout: | |
csv_writer = csv.writer(fout) | |
header = ["zone", "domain", "urlfwd_status", "urlfwd_location", "url_redirect_status", "url_redirect_location", "status_match", "location_match"] | |
csv_writer.writerow(header) | |
for record in summary: | |
row = [record["zone"], record["domain"], record["urlfwd_status"], record["urlfwd_location"], | |
record["redirect_status"], record["redirect_location"], record["status_match"], record["location_match"]] | |
csv_writer.writerow(row) | |
if display: | |
print(open(filename).read()) | |
print("Filename:", filename) | |
def output_error_csv(errors): | |
filename = "Error_Domains_" + datetime.now().strftime("%Y%m%d-%H%M%S") + ".csv" | |
with open(filename, "w", encoding="UTF8") as fout: | |
csv_writer = csv.writer(fout) | |
header = ["zone", "domain", "error"] | |
csv_writer.writerow(header) | |
for error in errors: | |
csv_writer.writerow(error) | |
print("Error domains saved to:", filename) | |
def process_record(record, zone, urlfwd_ip, url_redirect_ip): | |
domain = record["domain"] | |
print("WORKING ON", domain) | |
# Set timeout to 30 seconds for HTTP requests | |
timeout = 30 | |
try: | |
# Get URLFWD Response | |
add_custom_dns(domain, 80, urlfwd_ip) | |
res = requests.get('http://' + domain, allow_redirects=False, timeout=timeout) | |
urlfwd_status = res.status_code | |
urlfwd_location = res.headers.get('Location', "") | |
# Get Redirect Response | |
add_custom_dns(domain, 80, url_redirect_ip) | |
redirect_res = requests.get('http://' + domain, allow_redirects=False, timeout=timeout) | |
redirect_status = redirect_res.status_code | |
redirect_location = redirect_res.headers.get('location', "") | |
# Compare Status | |
status_match = "YES" if urlfwd_status == redirect_status else "NO" | |
# Compare Location | |
location_match = "YES" if urlfwd_location == redirect_location else "NO" | |
return { | |
"zone": zone["zone"], | |
"domain": domain, | |
"urlfwd_status": urlfwd_status, | |
"urlfwd_location": urlfwd_location, | |
"redirect_status": redirect_status, | |
"redirect_location": redirect_location, | |
"status_match": status_match, | |
"location_match": location_match | |
} | |
except requests.exceptions.RequestException as e: | |
return {"zone": zone["zone"], "domain": domain, "error": str(e)} | |
if __name__ == "__main__": | |
help_texts = { | |
"main": __doc__, | |
"api_key": "NS1 API key", | |
"customer": "Specify customer ID if using an operator key", | |
"display": "Also print results to console", | |
"verify": "Disable SSL verification" | |
} | |
parser = argparse.ArgumentParser( | |
description=help_texts["main"], formatter_class=argparse.RawTextHelpFormatter | |
) | |
parser.add_argument("api_key", type=str, help=help_texts["api_key"]) | |
parser.add_argument("--customer", type=str, required=False, help=help_texts["customer"]) | |
parser.add_argument("-d", "--display", action="store_true", required=False, help=help_texts["display"]) | |
parser.add_argument("-v", "--verify", action="store_false", required=False, help=help_texts["verify"]) | |
args = parser.parse_args() | |
AUTH = {"X-NSONE-Key": args.api_key} | |
if args.customer: | |
AUTH["X-NSONE-Key"] += "!{customer}".format(customer=args.customer) | |
# Get IPs to use for separate URLFWD and Redirect tests | |
urlfwd_ip = pydig.query('urlforward.nsone.net', 'A')[0] | |
url_redirect_ip = pydig.query('redirectsv2.nsone.net', 'A')[0] | |
summary = [] | |
errors = [] | |
zones = get_zones_list(API_URL, AUTH, args.verify) | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_record = {} | |
for zone in zones: | |
zone["zone"] = zone["zone"].replace("/", "%2F") | |
if zone["zone"] in ZONES_TO_SKIP: | |
print("SKIPPED:", zone["zone"]) | |
continue | |
records = [r for r in get_zone(zone["zone"], API_URL, AUTH, args.verify)["records"]] | |
for record in records: | |
if record["type"] == "URLFWD": | |
future = executor.submit(process_record, record, zone, urlfwd_ip, url_redirect_ip) | |
future_to_record[future] = record | |
for future in as_completed(future_to_record): | |
record = future_to_record[future] | |
try: | |
result = future.result() | |
if result and "error" not in result: | |
summary.append(result) | |
elif result and "error" in result: | |
errors.append([result["zone"], result["domain"], result["error"]]) | |
except Exception as exc: | |
print(f'Record {record["domain"]} generated an exception: {exc}') | |
if summary: | |
output_csv(summary, args.display) | |
else: | |
print("No URLFWD records exist in your account") | |
if errors: | |
output_error_csv(errors) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment