Created
August 28, 2024 15:10
-
-
Save lechuhuuha/12dfdb5d3303a0f6cfebed912eb58c23 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Compares output of URLFWD records to URL Redirects | |
$ python3 check_url_redirects.py <apikey> | |
Prerequisites: | |
Python 3 install | |
pip install requests pydig | |
""" | |
import socket | |
import requests | |
import argparse | |
import pydig | |
from datetime import datetime | |
import csv | |
API_URL="https://api.nsone.net" | |
ZONES_TO_SKIP = [] | |
def get_zones_list(url, auth, verify=True): | |
endpoint = url + "/v1/zones" | |
response = requests.get(endpoint, headers=auth, verify=verify) | |
response.raise_for_status() | |
return response.json() | |
def get_zone(zone, url, auth, verify=True): | |
endpoint = url + "/v1/zones/" + zone | |
response = requests.get(endpoint, headers=auth, verify=verify) | |
response.raise_for_status() | |
return response.json() | |
def is_ipv4(s): | |
return ':' not in s | |
dns_cache = {} | |
def add_custom_dns(domain, port, ip): | |
key = (domain, port) | |
# Strange parameters explained at: | |
# https://docs.python.org/2/library/socket.html#socket.getaddrinfo | |
# Values were taken from the output of socket.getaddrinfo(...) | |
if is_ipv4(ip): | |
value = (socket.AddressFamily.AF_INET, socket.SocketKind.SOCK_STREAM, 6, '', (ip, port)) | |
else: # ipv6 | |
value = (socket.AddressFamily.AF_INET6, 0, 0, '', (ip, port, 0, 0)) | |
dns_cache[key] = [value] | |
prv_getaddrinfo = socket.getaddrinfo | |
def new_getaddrinfo(*args): | |
# Uncomment to see what calls to getaddrinfo look like. | |
#print(args) | |
try: | |
return dns_cache[args[:2]] # hostname and port | |
except KeyError: | |
return prv_getaddrinfo(*args) | |
socket.getaddrinfo = new_getaddrinfo | |
def output_csv(summary, display): | |
filename="URL_Redirect_summary_"+datetime.now().strftime("%Y%m%d-%H%M%S")+".csv" | |
with open(filename, "w", encoding="UTF8") as fout: | |
csv_writer = csv.writer(fout) | |
header = ["zone","domain","urlfwd_status","urlfwd_location","url_redirect_status","url_redirect_location","status_match","location_match"] | |
csv_writer.writerow(header) | |
for record in summary: | |
zone = record["zone"] | |
domain = record["domain"] | |
urlfwd_status = record["urlfwd_status"] | |
urlfwd_location = record["urlfwd_location"] | |
redirect_status = record["redirect_status"] | |
redirect_location = record["redirect_location"] | |
status_match = record["status_match"] | |
location_match = record["location_match"] | |
row = [zone,domain,urlfwd_status,urlfwd_location,redirect_status,redirect_location,status_match,location_match] | |
csv_writer.writerow(row) | |
if display: | |
print(open(filename).read()) | |
print("Filename:",filename) | |
if name == "__main__": | |
help_texts = { | |
"main": doc, | |
"api_key": "NS1 API key", | |
"customer": "Specify customer ID if using an operator key", | |
"display": "Also print results to console", | |
"verify": "Disable SSL verification" | |
} | |
parser = argparse.ArgumentParser( | |
description=help_texts["main"], formatter_class=argparse.RawTextHelpFormatter | |
) | |
parser.add_argument("api_key", type=str, help=help_texts["api_key"]) | |
parser.add_argument( | |
"--customer", type=str, required=False, help=help_texts["customer"] | |
) | |
parser.add_argument( | |
"-d", | |
"--display", | |
action="store_true", | |
required=False, | |
help=help_texts["display"], | |
) | |
parser.add_argument( | |
"-v", | |
"--verify", | |
action="store_false", | |
required=False, | |
help=help_texts["verify"], | |
) | |
args = parser.parse_args() | |
AUTH = {"X-NSONE-Key": args.api_key} | |
if args.customer: | |
AUTH["X-NSONE-Key"] += "!{customer}".format(customer=args.customer) | |
#Get IP's to use for separate URLFWD and Redirect tests | |
urlfwd_ip = pydig.query('urlforward.nsone.net', 'A')[0] | |
url_redirect_ip = pydig.query('redirectsv2.nsone.net', 'A')[0] | |
summary = [] | |
zones = get_zones_list(API_URL, AUTH, args.verify) | |
for zone in zones: | |
zone["zone"] = zone["zone"].replace("/","%2F") | |
if zone["zone"] in ZONES_TO_SKIP: | |
print("SKIPPED:",zone["zone"]) | |
continue | |
records = [ | |
r for r in get_zone(zone["zone"], API_URL, AUTH, args.verify)["records"] | |
] | |
for record in records: | |
if record["type"] == "URLFWD": | |
domain = record["domain"] | |
print("WORKING ON",domain) | |
#Get URLFWD Response | |
add_custom_dns(domain, 80, urlfwd_ip) | |
res = requests.get('http://'+domain,allow_redirects=False) | |
urlfwd_status = res.status_code | |
urlfwd_location = "" | |
if "Location" in res.headers: | |
urlfwd_location = res.headers['Location'] | |
#Get Redirect Response | |
add_custom_dns(domain, 80, url_redirect_ip) | |
redirect_res = requests.get('http://'+domain,allow_redirects=False) | |
redirect_status = redirect_res.status_code | |
redirect_location = "" | |
if "location" in redirect_res.headers: | |
redirect_location = redirect_res.headers['location'] | |
#Compare Status | |
status_match = "NO" | |
if urlfwd_status == redirect_status: | |
status_match = "YES" | |
#Compare Location | |
location_match = "NO" | |
if urlfwd_location == redirect_location: | |
location_match = "YES" | |
summary.append({ | |
"zone": zone["zone"], | |
"domain": domain, | |
"urlfwd_status": urlfwd_status, | |
"urlfwd_location": urlfwd_location, | |
"redirect_status": redirect_status, | |
"redirect_location": redirect_location, | |
"status_match": status_match, | |
"location_match": location_match | |
}) | |
if len(summary) > 0: | |
output_csv(summary, args.display) | |
else: | |
print("No URLFWD records exist in your account") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment