Created
April 22, 2019 19:20
-
-
Save spacelatte/b7572a9583a0e9f5634868669ec2daa7 to your computer and use it in GitHub Desktop.
usage: python3 whois.py http://data.iana.org/TLD/tlds-alpha-by-domain.txt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os, sys, time, json, socket, threading, hashlib, pickle | |
from urllib import request | |
SERVERS = [ | |
"whois.arin.net", | |
"whois.apnic.net", | |
"whois.abuse.net", | |
"whois.afrinic.net", | |
"whois.nic.gov", | |
"whois.internic.net", | |
"whois.iana.org", | |
"whois.krnic.net", | |
"whois.lacnic.net", | |
"whois.ra.net", | |
"whois.peeringdb.com", | |
"whois.ripe.net", | |
] | |
def wait(count=11, delay=0.1): | |
while threading.active_count() > count: | |
time.sleep(0.1) | |
continue | |
return | |
def query(server, domain, size=1048576): | |
print("#", domain, server) | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
#s.settimeout(None) | |
s.setblocking(True) | |
s.connect((server, 43)) | |
s.sendall("domain {}\r\n".format(domain).encode()) | |
return s.recv(size).decode() | |
return None | |
def whois(domain): | |
return { s: query(s, domain) for s in SERVERS } | |
def parallel(servers, domain): | |
results = dict() | |
fnproxy = lambda server, domain: results.setdefault(server, query(server, domain)) | |
threads = [ | |
threading.Thread(group=None, name=server, target=fnproxy, args=(server, domain)) | |
for server in servers | |
] | |
[ (wait(), thread.start()) for thread in threads ] | |
[ thread.join() for thread in threads ] | |
print(">", domain, json.dumps(results, indent='\t')) | |
return results | |
def handle(name, data, bdir="data"): | |
if not os.path.exists(bdir): | |
os.mkdir(bdir) | |
with open(os.path.join(bdir, name), "w") as file: | |
json.dump(data, file, indent='\t') | |
return print(">", name, json.dumps(data, indent='\t')) | |
def m1main(args): | |
__final = dict() | |
for arg in args: | |
results = dict() | |
fnproxy = lambda domain: results.setdefault(domain, parallel(SERVERS, domain)) | |
threads = [ | |
threading.Thread(group=None, name=line, target=fnproxy, args=(line, )) | |
for line in request.urlopen(arg).read().decode().split("\n") | |
if line and not line.startswith("#") | |
] | |
[ (wait(), thread.start()) for thread in threads ] | |
[ thread.join() for thread in threads ] | |
__final[arg] = results | |
handle(hashlib.md5(arg.encode()).hexdigest(), results) | |
continue | |
return __final | |
def m2main(args): | |
return [ | |
[ | |
handle(line, parallel(SERVERS, line)) | |
for line in request.urlopen(arg).read().decode().split("\n") | |
if line and not line.startswith("#") | |
] for arg in args | |
] | |
def m3main(args): | |
return [ | |
[ | |
handle(line, json,dumps(whois(line))) | |
for line in request.urlopen(arg).read().decode().split("\n") | |
if line and not line.startswith("#") | |
] for arg in args | |
] | |
if __name__ == '__main__': | |
sys.exit(m2main(sys.argv[1:])) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment