-
-
Save helderjnpinto/e968eb32a497aa43f319a7faf399c24c to your computer and use it in GitHub Desktop.
DNS domain bitsquat checker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import unicode_literals | |
# Original idea: http://dinaburg.org/bitsquatting.html | |
import gevent.monkey | |
gevent.monkey.patch_all() | |
import dns.name, dns.resolver | |
import gevent, gevent.pool | |
import sys | |
DNS_TIMEOUT = 5 | |
USAGE = """Usage: | |
bitsquat.py <domain name> | |
Example: | |
bitsquat.py google.com | |
""" | |
EXISTING_TLDS = frozenset(""" | |
ac ad ae aero af ag ai al am an ao aq ar arpa as asia at au aw ax az ba bb bd be bf bg bh bi biz bj bm bn bo br bs bt bv bw by bz | |
ca cat cc cd cf cg ch ci ck cl cm cn co com coop cr cu cv cw cx cy cz de dj dk dm do dz ec edu ee eg er es et eu fi | |
fj fk fm fo fr ga gb gd ge gf gg gh gi gl gm gn gov gp gq gr gs gt gu gw gy hk hm hn hr ht hu id ie il im in info int io iq ir is it | |
je jm jo jobs jp ke kg kh ki km kn kp kr kw ky kz la lb lc li lk lr ls lt lu lv ly | |
ma mc md me mg mh mil mk ml mm mn mo mobi mp mq mr ms mt mu museum mv mw mx my mz na name nc ne net nf ng ni nl no np nr nu nz | |
om org pa pe pf pg ph pk pl pm pn post pr pro ps pt pw py qa re ro rs ru rw sa sb sc sd se sg sh si sj sk sl sm sn so sr st su sv sx sy sz | |
tc td tel tf tg th tj tk tl tm tn to tp tr travel tt tv tw tz ua ug uk us uy uz va vc ve vg vi vn vu wf ws | |
xn--0zwm56d xn--11b5bs3a9aj6g xn--3e0b707e xn--45brj9c xn--80akhbyknj4f xn--80ao21a xn--90a3ac xn--9t4b11yi5a xn--clchc0ea0b2g2a9gcd | |
xn--deba0ad xn--fiqs8s xn--fiqz9s xn--fpcrj9c3d xn--fzc2c9e2c xn--g6w251d xn--gecrj9c xn--h2brj9c xn--hgbk6aj7f53bba xn--hlcj6aya9esc7a | |
xn--j6w193g xn--jxalpdlp xn--kgbechtv xn--kprw13d xn--kpry57d xn--lgbbat1ad8j xn--mgb9awbf xn--mgbaam7a8h xn--mgbayh7gpa xn--mgbbh1a71e | |
xn--mgbc0a9azcg xn--mgberp4a5d4ar xn--mgbx4cd0ab xn--o3cw4h xn--ogbpf8fl xn--p1ai xn--pgbs0dh xn--s9brj9c xn--wgbh1c xn--wgbl6a | |
xn--xkc2al3hye2a xn--xkc2dl3a5ee0h xn--yfro4i67o xn--ygbi2ammx xn--zckzah | |
xxx ye yt za zm zw | |
""".split()) | |
# Remove configure=False and nameservers assignment to use /etc/resolv.conf | |
resolver = dns.resolver.Resolver(configure=False) | |
resolver.timeout = DNS_TIMEOUT | |
resolver.nameservers = ["127.0.0.1"] | |
def ibitflips(name): | |
for i in range(0, len(name)): | |
ch = name[i] | |
# Change 1-7 bits. Don't try bit 7 (0x80) because DNS is ASCII only. | |
for bit in range(0, 7): | |
newch = unichr(ord(ch) ^ (1 << bit)) | |
if (newch.isalnum() or newch == '-') and ch.lower() != newch.lower(): | |
newname = name[:i] + newch + name[i + 1:] | |
yield newname | |
def check_availability(hostname): | |
qname = dns.name.from_text(hostname) | |
try: | |
answer = gevent.with_timeout(DNS_TIMEOUT, resolver.query, qname, 'SOA', raise_on_no_answer=False) | |
return hostname, "taken", answer.rrset[0] if answer.rrset else "" | |
except dns.resolver.NXDOMAIN: | |
# The only good response. | |
return hostname, "free", "" | |
except (gevent.timeout.Timeout, dns.resolver.Timeout) as e: | |
return hostname, "error", "timeout" | |
except Exception as e: | |
sys.stderr.write(u"error: {hostname}: {e!r}\n".format(hostname=hostname, e=e).encode('utf-8', 'replace')) | |
return hostname, "error", "other" + str(e) | |
def main(): | |
if len(sys.argv) < 1: | |
print USAGE | |
sys.exit() | |
original = sys.argv[1] | |
assert "." in original | |
name, suffix = original.rsplit(".", 1) | |
flipped_hostnames = ( | |
[f + "." + suffix for f in ibitflips(name)] | |
+ [name + "." + f for f in ibitflips(suffix) if f.lower() in EXISTING_TLDS] | |
) | |
par = gevent.pool.Pool(size=30) | |
check_results = list(par.imap_unordered(check_availability, flipped_hostnames)) | |
check_results.sort(key=lambda t: (t[1], t[0])) # free/error/taken | |
taken_hostnames = [rt[0] for rt in check_results if rt[1] == 'taken'] | |
free_hostnames = [rt[0] for rt in check_results if rt[1] == 'free'] | |
error_hostnames = [rt[0] for rt in check_results if rt[1] == 'error'] | |
count_all = len(check_results) | |
count_taken = len(taken_hostnames) | |
count_free = len(free_hostnames) | |
count_error = len(error_hostnames) | |
taken_soa_groups = {} | |
longest_soa_email = 0 | |
for hostname, result, detail in check_results: | |
if result == 'taken' and detail != "": | |
soa_email = unicode(detail.rname).rstrip(".").replace(".", "@", 1) | |
longest_soa_email = max(longest_soa_email, len(soa_email)) | |
taken_soa_groups.setdefault(soa_email, []).append(hostname) | |
taken_soa_groups_text = "\n".join([ | |
soa_email.ljust(longest_soa_email, " ") + ": " + " ".join(group) | |
for soa_email, group in taken_soa_groups.viewitems() | |
]) | |
sys.stdout.write("""All flips: {all} taken: {taken} ({taken_perc}%) free: {free} ({free_perc}%) error: {error}\n | |
Free: {free_hostnames}\n | |
Taken:\n{taken_soa_groups}\n""".format( | |
all=count_all, taken=count_taken, free=count_free, error=count_error, | |
taken_perc=int(float(count_taken) / count_all * 100), | |
free_perc=int(float(count_free) / count_all * 100), | |
free_hostnames=" ".join(free_hostnames), | |
taken_soa_groups=taken_soa_groups_text, | |
).encode('utf-8', 'replace')) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment