-
-
Save srand2/4d3f2222e06baecdeb2d5a01a6407da0 to your computer and use it in GitHub Desktop.
A simple python script to filter out unresolved/wildcard DNS records.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import string | |
import random | |
import dns.resolver | |
import threading | |
import tldextract | |
import concurrent.futures | |
_cache = dict() | |
__wildcard_lock = threading.Lock() | |
wildcard_locks = dict() | |
wildcards = dict() | |
rand_pool = string.ascii_lowercase + string.digits | |
def rand_string(length=10): | |
return "".join([random.choice(rand_pool) for _ in range(int(length))]) | |
def is_domain(d): | |
extracted = tldextract.extract(d) | |
if extracted.domain and not extracted.subdomain: | |
return True | |
return False | |
def is_subdomain(d): | |
extracted = tldextract.extract(d) | |
if extracted.domain and extracted.subdomain: | |
return True | |
return False | |
def parent_domain(d): | |
if is_domain(d): | |
return d | |
else: | |
split_domain = str(d).split(".") | |
if len(split_domain) == 1: | |
return "." | |
else: | |
return ".".join(split_domain[1:]) | |
def domain_parents(d): | |
""" | |
Returns all parents of a subdomain | |
test.www.evilcorp.com --> [www.evilcorp.com, evilcorp.com] | |
""" | |
parent = str(d) | |
while 1: | |
parent = parent_domain(parent) | |
if is_subdomain(parent): | |
yield parent | |
continue | |
elif is_domain(parent): | |
yield parent | |
break | |
def resolve(query, rdtypes=None): | |
if rdtypes is None: | |
rdtypes = ("A", "AAAA", "TXT", "NS", "SOA", "MX", "CNAME", "SRV") | |
answers = set() | |
for rdtype in rdtypes: | |
try: | |
for answer in list(dns.resolver.resolve(query, rdtype=rdtype)): | |
answers.add((rdtype, str(answer))) | |
except Exception as e: | |
pass | |
return answers | |
def wildcard_lock( domain): | |
with __wildcard_lock: | |
try: | |
return wildcard_locks[domain] | |
except KeyError: | |
lock = threading.Lock() | |
wildcard_locks[domain] = lock | |
return lock | |
def is_wildcard(query): | |
if is_domain(query): | |
return is_wildcard(f"lskdgahgasldfl.{query}") | |
parent = parent_domain(query) | |
with wildcard_lock(parent): | |
orig_results = set(a[-1] for a in resolve(query, rdtypes=("A", "AAAA"))) | |
parents = set(domain_parents(query)) | |
is_wildcard_bool = False | |
if parent in _cache: | |
return _cache[parent] | |
for parent in parents: | |
if parent in wildcards: | |
return True | |
wildcard_ips = set() | |
for parent in parents: | |
ips = set() | |
for _ in range(5): | |
rand_query = f"{rand_string(length=10)}.{parent}" | |
ips.update(set(a[-1] for a in resolve(rand_query, rdtypes=("A", "AAAA")))) | |
if ips: | |
try: | |
wildcards[parent].update(ips) | |
except KeyError: | |
wildcards[parent] = ips | |
wildcard_ips.update(ips) | |
if orig_results and wildcard_ips and all([ip in wildcard_ips for ip in orig_results]): | |
is_wildcard_bool = True | |
_cache.update({parent: is_wildcard_bool}) | |
return is_wildcard_bool | |
def clean(d): | |
is_wildcard_bool = False | |
ips = resolve(d) | |
if ips: | |
is_wildcard_bool = is_wildcard(d) | |
return ips, is_wildcard_bool | |
futures = dict() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=250) as executor: | |
try: | |
for dnsname in map(str.rstrip, sys.stdin): | |
future = executor.submit(clean, dnsname) | |
futures[future] = dnsname | |
for future in concurrent.futures.as_completed(futures): | |
dnsname = futures[future] | |
answers, is_wildcard_bool = future.result() | |
if answers and not is_wildcard_bool: | |
answers = [":".join(a) for a in answers] | |
print(f"{dnsname}\t{','.join(answers)}") | |
except KeyboardInterrupt: | |
executor.shutdown(cancel_futures=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment