Created
October 11, 2022 20:39
-
-
Save TheTechromancer/d69e3b9e4d8a659c845bb38081d636d1 to your computer and use it in GitHub Desktop.
A simple python script to filter out unresolved/wildcard DNS records.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import string | |
import random | |
import dns.resolver | |
import threading | |
import tldextract | |
import concurrent.futures | |
_cache = dict() | |
__wildcard_lock = threading.Lock() | |
wildcard_locks = dict() | |
wildcards = dict() | |
rand_pool = string.ascii_lowercase + string.digits | |
def rand_string(length=10): | |
return "".join([random.choice(rand_pool) for _ in range(int(length))]) | |
def is_domain(d): | |
extracted = tldextract.extract(d) | |
if extracted.domain and not extracted.subdomain: | |
return True | |
return False | |
def is_subdomain(d): | |
extracted = tldextract.extract(d) | |
if extracted.domain and extracted.subdomain: | |
return True | |
return False | |
def parent_domain(d): | |
if is_domain(d): | |
return d | |
else: | |
split_domain = str(d).split(".") | |
if len(split_domain) == 1: | |
return "." | |
else: | |
return ".".join(split_domain[1:]) | |
def domain_parents(d): | |
""" | |
Returns all parents of a subdomain | |
test.www.evilcorp.com --> [www.evilcorp.com, evilcorp.com] | |
""" | |
parent = str(d) | |
while 1: | |
parent = parent_domain(parent) | |
if is_subdomain(parent): | |
yield parent | |
continue | |
elif is_domain(parent): | |
yield parent | |
break | |
def resolve(query, rdtypes=None): | |
if rdtypes is None: | |
rdtypes = ("A", "AAAA", "TXT", "NS", "SOA", "MX", "CNAME", "SRV") | |
answers = set() | |
for rdtype in rdtypes: | |
try: | |
for answer in list(dns.resolver.resolve(query, rdtype=rdtype)): | |
answers.add((rdtype, str(answer))) | |
except Exception as e: | |
pass | |
return answers | |
def wildcard_lock( domain): | |
with __wildcard_lock: | |
try: | |
return wildcard_locks[domain] | |
except KeyError: | |
lock = threading.Lock() | |
wildcard_locks[domain] = lock | |
return lock | |
def is_wildcard(query): | |
if is_domain(query): | |
return is_wildcard(f"lskdgahgasldfl.{query}") | |
parent = parent_domain(query) | |
with wildcard_lock(parent): | |
orig_results = set(a[-1] for a in resolve(query, rdtypes=("A", "AAAA"))) | |
parents = set(domain_parents(query)) | |
is_wildcard_bool = False | |
if parent in _cache: | |
return _cache[parent] | |
for parent in parents: | |
if parent in wildcards: | |
return True | |
wildcard_ips = set() | |
for parent in parents: | |
ips = set() | |
for _ in range(5): | |
rand_query = f"{rand_string(length=10)}.{parent}" | |
ips.update(set(a[-1] for a in resolve(rand_query, rdtypes=("A", "AAAA")))) | |
if ips: | |
try: | |
wildcards[parent].update(ips) | |
except KeyError: | |
wildcards[parent] = ips | |
wildcard_ips.update(ips) | |
if orig_results and wildcard_ips and all([ip in wildcard_ips for ip in orig_results]): | |
is_wildcard_bool = True | |
_cache.update({parent: is_wildcard_bool}) | |
return is_wildcard_bool | |
def clean(d): | |
is_wildcard_bool = False | |
ips = resolve(d) | |
if ips: | |
is_wildcard_bool = is_wildcard(d) | |
return ips, is_wildcard_bool | |
futures = dict() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=250) as executor: | |
try: | |
for dnsname in map(str.rstrip, sys.stdin): | |
future = executor.submit(clean, dnsname) | |
futures[future] = dnsname | |
for future in concurrent.futures.as_completed(futures): | |
dnsname = futures[future] | |
answers, is_wildcard_bool = future.result() | |
if answers and not is_wildcard_bool: | |
answers = [":".join(a) for a in answers] | |
print(f"{dnsname}\t{','.join(answers)}") | |
except KeyboardInterrupt: | |
executor.shutdown(cancel_futures=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment