Skip to content

Instantly share code, notes, and snippets.

@maesoser
Created May 13, 2024 10:29
Show Gist options
  • Save maesoser/a51bfa72db27eec80079feacc8e6bc95 to your computer and use it in GitHub Desktop.
Save maesoser/a51bfa72db27eec80079feacc8e6bc95 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''
atlas-dig --country ES --probes 3 www.google.es
'''
import urllib3
import requests, argparse, base64, json, os
from time import sleep
from tabulate import tabulate
from dnslib import DNSRecord
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(category=InsecureRequestWarning)
headers={
"Accept": "application/json",
"Content-Type": "application/json"
}
waittime = 30
def get_args():
parser = argparse.ArgumentParser(description="Performs distributed dig requests using RIPE ATLAS")
parser.add_argument('target', metavar='target', type=str, nargs='+',help='Target to get information')
parser.add_argument('--type', dest='dnstype', help='DNS Query Type', default='A')
parser.add_argument('--country', help='Target Country')
parser.add_argument('--asn', help='Target ASN')
parser.add_argument('--id', help='Measurement ID')
parser.add_argument('--resolver', help='DNS Resolver to use')
parser.add_argument('--probes', help='Number of probes', default=10)
parser.add_argument('--key', help='Number of probes', default=os.getenv("ATLAS_KEY"))
parser.add_argument('--email', help='Number of probes', default=os.getenv("ATLAS_EMAIL"))
parser.add_argument('--json', dest='json_out', help='JSON Output', action='store_true')
args = parser.parse_args()
args.target = args.target[0]
return args
def check_measurement_status(args):
url = "https://atlas.ripe.net/api/v2/measurements/{}/?key={}".format(args.id,args.key)
response = requests.get(url, headers=headers, verify=False)
reply = response.json()
return reply["status"]["name"]
def is_measurement_done(args):
return (check_measurement_status(args) == "Stopped")
def get_results(args):
url = "https://atlas.ripe.net/api/v2/measurements/{}/results/?key={}".format(args.id,args.key)
response = requests.get(url, headers=headers, verify=False)
reply = response.json()
return reply
def create_measurement(args):
url = "https://atlas.ripe.net/api/v2/measurements//?key={}".format(args.key)
payload = {
"definitions": [
{
"af": 4,
"query_class": "IN",
"query_type": args.dnstype,
"query_argument": args.target,
"use_macros": False,
"description": "{} query to {}".format(args.dnstype, args.target),
"use_probe_resolver": False,
"resolve_on_probe": False,
"set_nsid_bit": False,
"protocol": "UDP",
"udp_payload_size": 512,
"retry": 0,
"skip_dns_check": False,
"include_qbuf": False,
"include_abuf": True,
"prepend_probe_id": False,
"set_rd_bit": False,
"set_do_bit": False,
"set_cd_bit": False,
"timeout": 5000,
"type": "dns"
}
],
"probes": [
{
"tags": {
"include": [],
"exclude": []
},
"type": "asn",
"value": 13335,
"requested": args.probes
}
],
"is_oneoff": True,
"bill_to": args.email
}
if args.resolver != None:
payload["definitions"][0]["target"] = args.resolver
else:
payload["definitions"][0]["use_probe_resolver"] = True
if args.asn != None:
payload["probes"][0]["type"] = "asn"
payload["probes"][0]["value"] = args.asn
if args.country != None:
payload["probes"][0]["type"] = "country"
payload["probes"][0]["value"] = args.country
response = requests.post(url, headers=headers, data=json.dumps(payload), verify=False)
reply = response.json()
return reply
def show_results(args):
results = get_results(args)
data = []
headers = [ "Probe ID", "Probe Address", "Resolver", "RTT", "Type", "Value" ]
total_rtt = 0.0
total_measurements = 0.0
for probe in results:
for resultset in probe["resultset"]:
result = resultset.get("result")
if result != None:
result["abuf"] = base64.b64decode(result["abuf"])
result["abuf"] = DNSRecord.parse(result["abuf"])
for rr in result["abuf"].rr:
data.append([
probe["prb_id"],
probe["from"],
resultset["dst_addr"],
result["rt"],
rr.rtype,
rr.rdata
])
total_rtt += float(result["rt"])
total_measurements += 1.0
print(tabulate(data, headers=headers))
print("\nAvg RTT: {}\tValid measurements: {}".format(total_rtt/total_measurements, total_measurements))
args = get_args()
if args.id is None:
print("Query:\t{}\t{}".format(args.dnstype,args.target))
response = create_measurement(args)
if response.get("error") != None:
raw_json = json.dumps(response.get("error"), indent=4, sort_keys=False)
print(raw_json)
exit(-1)
args.id = response.get("measurements")[0]
print("Created job with ID {}".format(args.id))
sleep(5)
status = check_measurement_status(args)
while status != "Stopped":
print("Test {} status is {}, waiting {} seconds".format(args.id, status.lower(),waittime))
sleep(waittime)
status = check_measurement_status(args)
print("Query:\t{}\t{}".format(args.dnstype,args.target))
show_results(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment