Skip to content

Instantly share code, notes, and snippets.

@spacelatte
Created April 22, 2019 19:20
Show Gist options
  • Save spacelatte/b7572a9583a0e9f5634868669ec2daa7 to your computer and use it in GitHub Desktop.
Save spacelatte/b7572a9583a0e9f5634868669ec2daa7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os, sys, time, json, socket, threading, hashlib, pickle
from urllib import request
SERVERS = [
"whois.arin.net",
"whois.apnic.net",
"whois.abuse.net",
"whois.afrinic.net",
"whois.nic.gov",
"whois.internic.net",
"whois.iana.org",
"whois.krnic.net",
"whois.lacnic.net",
"whois.ra.net",
"whois.peeringdb.com",
"whois.ripe.net",
]
def wait(count=11, delay=0.1):
while threading.active_count() > count:
time.sleep(0.1)
continue
return
def query(server, domain, size=1048576):
print("#", domain, server)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
#s.settimeout(None)
s.setblocking(True)
s.connect((server, 43))
s.sendall("domain {}\r\n".format(domain).encode())
return s.recv(size).decode()
return None
def whois(domain):
return { s: query(s, domain) for s in SERVERS }
def parallel(servers, domain):
results = dict()
fnproxy = lambda server, domain: results.setdefault(server, query(server, domain))
threads = [
threading.Thread(group=None, name=server, target=fnproxy, args=(server, domain))
for server in servers
]
[ (wait(), thread.start()) for thread in threads ]
[ thread.join() for thread in threads ]
print(">", domain, json.dumps(results, indent='\t'))
return results
def handle(name, data, bdir="data"):
if not os.path.exists(bdir):
os.mkdir(bdir)
with open(os.path.join(bdir, name), "w") as file:
json.dump(data, file, indent='\t')
return print(">", name, json.dumps(data, indent='\t'))
def m1main(args):
__final = dict()
for arg in args:
results = dict()
fnproxy = lambda domain: results.setdefault(domain, parallel(SERVERS, domain))
threads = [
threading.Thread(group=None, name=line, target=fnproxy, args=(line, ))
for line in request.urlopen(arg).read().decode().split("\n")
if line and not line.startswith("#")
]
[ (wait(), thread.start()) for thread in threads ]
[ thread.join() for thread in threads ]
__final[arg] = results
handle(hashlib.md5(arg.encode()).hexdigest(), results)
continue
return __final
def m2main(args):
return [
[
handle(line, parallel(SERVERS, line))
for line in request.urlopen(arg).read().decode().split("\n")
if line and not line.startswith("#")
] for arg in args
]
def m3main(args):
return [
[
handle(line, json,dumps(whois(line)))
for line in request.urlopen(arg).read().decode().split("\n")
if line and not line.startswith("#")
] for arg in args
]
if __name__ == '__main__':
sys.exit(m2main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment