Created
February 6, 2018 10:49
-
-
Save xl7dev/98b653f5416bc205fcebc06f3f8b2996 to your computer and use it in GitHub Desktop.
AsyncResolver for python3.5+
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
@author: xl7dev | |
@time: 2018/2/6 上午11:40 | |
for Python3.5+ | |
""" | |
import random | |
import string | |
import pycares | |
import aiodns | |
import asyncio | |
class AsyncResolver(object): | |
def __init__(self): | |
self.timeout = 0.1 | |
self.loop = asyncio.get_event_loop() | |
self.resolver = aiodns.DNSResolver(timeout=self.timeout, loop=self.loop) | |
self.resolver.nameservers = ['8.8.8.8'] | |
async def query(self, domain, type): | |
return await self.resolver.query(domain, type) | |
def wildcard_A(self, domain): | |
genrandstr = lambda i: ''.join(random.choices(string.ascii_lowercase + string.digits, k=i)) | |
tasks = [asyncio.ensure_future(self.query(genrandstr(20) + '.' + domain, 'A')) for _ in range(6)] | |
reqs = asyncio.gather(*tasks) | |
try: | |
f = self.loop.run_until_complete(reqs) | |
data = list(set(map(lambda x: x[0].host, f))) | |
print('[*] Found wildcard dns record:{data}'.format(data=data)) | |
return data | |
except: | |
print('[*] Not Found wildcard dns record') | |
return False | |
def query_A(self, domain): | |
"""query DNS A records. | |
ex: {'domain': 'www.google.com','type': 'A','data': [u'93.46.8.89']}} | |
""" | |
f = self.query(domain, 'A') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.host, result)) | |
item = {"domain": domain, "type": "A", "data": data} | |
return item | |
def query_AAAA(self, domain): | |
"""query DNS AAAA records. | |
ex: {'domain': 'www.google.com','type': 'AAAA','data': [u'2404:6800:4005:809::200e']}} | |
""" | |
f = self.query(domain, 'A') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.host, result)) | |
item = {"domain": domain, "type": "AAAA", "data": data} | |
return item | |
def query_CNAME(self, domain): | |
"""query DNS CNAME records. | |
ex: {'domain': 'www.baidu.com','type': 'CNAME','data': [u'www.a.shifen.com']}} | |
""" | |
f = self.query(domain, 'CNAME') | |
result = self.loop.run_until_complete(f) | |
data = result.cname | |
item = {"domain": domain, "type": "CNAME", "data": data} | |
return item | |
def query_MX(self, domain): | |
"""query DNS MX records. | |
ex: {'domain':'google.com', 'type': 'MX', 'data': ['aspmx.l.google.com.', | |
'alt3.aspmx.l.google.com.', | |
'alt4.aspmx.l.google.com.', | |
'alt1.aspmx.l.google.com.', | |
'alt2.aspmx.l.google.com.']}} | |
""" | |
f = self.query(domain, 'MX') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.host, result)) | |
item = {"domain": domain, "type": "MX", "data": data} | |
return item | |
def query_NS(self, domain): | |
"""query DNS NS records. | |
ex: {'domain': 'google.com','type':'NS','data': ['ns3.google.com.', | |
'ns1.google.com.', | |
'ns4.google.com.', | |
'ns2.google.com.']}} | |
""" | |
f = self.query(domain, 'NS') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.host, result)) | |
item = {"domain": domain, "type": "NS", "data": data} | |
return item | |
def query_SOA(self, domain): | |
"""query DNS SOA records. | |
ex: {'domain': 'google.com', 'type': 'SOA', 'data': ['ns4.google.com.']}} | |
""" | |
f = self.query(domain, 'SOA') | |
result = self.loop.run_until_complete(f) | |
data = result.nsname | |
item = {"domain": domain, "type": "SOA", "data": data} | |
return item | |
def query_SRV(self, domain): | |
"""query DNS SRV records.,ex: | |
{'domain','xmpp-server._tcp.jabber.org', 'type': 'SRV', 'data': ['hermes2v6.jabber.org','hermes2.jabber.org']} | |
""" | |
f = self.query(domain, 'SRV') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.host, result)) | |
item = {"domain": domain, "type": "SRV", "data": data} | |
return item | |
def query_TXT(self, domain): | |
"""query DNS TXT records, normally, it includes [SPF] information. | |
ex: {'domain': 'yahoo.com', 'type': 'TXT', 'data': ['v=spf1 redirect=_spf.mail.yahoo.com']}} | |
""" | |
f = self.query(domain, 'TXT') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.text, result)) | |
item = {"domain": domain, "type": "TXT", "data": data} | |
return item | |
def query_NAPTR(self, domain): | |
"""query DNS NAPTR records, ex: | |
{'domain': 'sip2sip.info', 'type': 'NAPTR', 'data': ['_sip._udp.sip2sip.info', | |
'_sip._tcp.sip2sip.info', | |
'_sips._tcp.sip2sip.info']}} | |
""" | |
f = self.query(domain, 'NAPTR') | |
result = self.loop.run_until_complete(f) | |
data = list(map(lambda x: x.replacement, result)) | |
item = {"domain": domain, "type": "NAPTR", "data": data} | |
return item | |
def query_PTR(self, ip): | |
"""query DNS PTR records, ex: | |
{'domain': 'google-public-dns-a.google.com', 'type': 'PTR', 'data': '8.8.8.8'}} | |
""" | |
f = self.query(pycares.reverse_address(ip), 'PTR') | |
result = self.loop.run_until_complete(f) | |
data = result.name | |
item = {"domain": data, "type": "PTR", "data": [ip]} | |
return item | |
def main(): | |
demo = AsyncResolver() | |
print(demo.wildcard_A('taobao.com')) | |
print(demo.query_A('google.com')) | |
print(demo.query_AAAA('google.com')) | |
print(demo.query_CNAME('www.baidu.com')) | |
print(demo.query_NS('google.com')) | |
print(demo.query_MX('google.com')) | |
print(demo.query_SOA('google.com')) | |
print(demo.query_TXT('google.com')) | |
print(demo.query_PTR('8.8.8.8')) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment