Skip to content

Instantly share code, notes, and snippets.

@xl7dev
Created February 6, 2018 10:49
Show Gist options
  • Save xl7dev/98b653f5416bc205fcebc06f3f8b2996 to your computer and use it in GitHub Desktop.
Save xl7dev/98b653f5416bc205fcebc06f3f8b2996 to your computer and use it in GitHub Desktop.
AsyncResolver for python3.5+
#!/usr/bin/env python
# encoding: utf-8
"""
@author: xl7dev
@time: 2018/2/6 上午11:40
for Python3.5+
"""
import random
import string
import pycares
import aiodns
import asyncio
class AsyncResolver(object):
def __init__(self):
self.timeout = 0.1
self.loop = asyncio.get_event_loop()
self.resolver = aiodns.DNSResolver(timeout=self.timeout, loop=self.loop)
self.resolver.nameservers = ['8.8.8.8']
async def query(self, domain, type):
return await self.resolver.query(domain, type)
def wildcard_A(self, domain):
genrandstr = lambda i: ''.join(random.choices(string.ascii_lowercase + string.digits, k=i))
tasks = [asyncio.ensure_future(self.query(genrandstr(20) + '.' + domain, 'A')) for _ in range(6)]
reqs = asyncio.gather(*tasks)
try:
f = self.loop.run_until_complete(reqs)
data = list(set(map(lambda x: x[0].host, f)))
print('[*] Found wildcard dns record:{data}'.format(data=data))
return data
except:
print('[*] Not Found wildcard dns record')
return False
def query_A(self, domain):
"""query DNS A records.
ex: {'domain': 'www.google.com','type': 'A','data': [u'93.46.8.89']}}
"""
f = self.query(domain, 'A')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.host, result))
item = {"domain": domain, "type": "A", "data": data}
return item
def query_AAAA(self, domain):
"""query DNS AAAA records.
ex: {'domain': 'www.google.com','type': 'AAAA','data': [u'2404:6800:4005:809::200e']}}
"""
f = self.query(domain, 'A')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.host, result))
item = {"domain": domain, "type": "AAAA", "data": data}
return item
def query_CNAME(self, domain):
"""query DNS CNAME records.
ex: {'domain': 'www.baidu.com','type': 'CNAME','data': [u'www.a.shifen.com']}}
"""
f = self.query(domain, 'CNAME')
result = self.loop.run_until_complete(f)
data = result.cname
item = {"domain": domain, "type": "CNAME", "data": data}
return item
def query_MX(self, domain):
"""query DNS MX records.
ex: {'domain':'google.com', 'type': 'MX', 'data': ['aspmx.l.google.com.',
'alt3.aspmx.l.google.com.',
'alt4.aspmx.l.google.com.',
'alt1.aspmx.l.google.com.',
'alt2.aspmx.l.google.com.']}}
"""
f = self.query(domain, 'MX')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.host, result))
item = {"domain": domain, "type": "MX", "data": data}
return item
def query_NS(self, domain):
"""query DNS NS records.
ex: {'domain': 'google.com','type':'NS','data': ['ns3.google.com.',
'ns1.google.com.',
'ns4.google.com.',
'ns2.google.com.']}}
"""
f = self.query(domain, 'NS')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.host, result))
item = {"domain": domain, "type": "NS", "data": data}
return item
def query_SOA(self, domain):
"""query DNS SOA records.
ex: {'domain': 'google.com', 'type': 'SOA', 'data': ['ns4.google.com.']}}
"""
f = self.query(domain, 'SOA')
result = self.loop.run_until_complete(f)
data = result.nsname
item = {"domain": domain, "type": "SOA", "data": data}
return item
def query_SRV(self, domain):
"""query DNS SRV records.,ex:
{'domain','xmpp-server._tcp.jabber.org', 'type': 'SRV', 'data': ['hermes2v6.jabber.org','hermes2.jabber.org']}
"""
f = self.query(domain, 'SRV')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.host, result))
item = {"domain": domain, "type": "SRV", "data": data}
return item
def query_TXT(self, domain):
"""query DNS TXT records, normally, it includes [SPF] information.
ex: {'domain': 'yahoo.com', 'type': 'TXT', 'data': ['v=spf1 redirect=_spf.mail.yahoo.com']}}
"""
f = self.query(domain, 'TXT')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.text, result))
item = {"domain": domain, "type": "TXT", "data": data}
return item
def query_NAPTR(self, domain):
"""query DNS NAPTR records, ex:
{'domain': 'sip2sip.info', 'type': 'NAPTR', 'data': ['_sip._udp.sip2sip.info',
'_sip._tcp.sip2sip.info',
'_sips._tcp.sip2sip.info']}}
"""
f = self.query(domain, 'NAPTR')
result = self.loop.run_until_complete(f)
data = list(map(lambda x: x.replacement, result))
item = {"domain": domain, "type": "NAPTR", "data": data}
return item
def query_PTR(self, ip):
"""query DNS PTR records, ex:
{'domain': 'google-public-dns-a.google.com', 'type': 'PTR', 'data': '8.8.8.8'}}
"""
f = self.query(pycares.reverse_address(ip), 'PTR')
result = self.loop.run_until_complete(f)
data = result.name
item = {"domain": data, "type": "PTR", "data": [ip]}
return item
def main():
demo = AsyncResolver()
print(demo.wildcard_A('taobao.com'))
print(demo.query_A('google.com'))
print(demo.query_AAAA('google.com'))
print(demo.query_CNAME('www.baidu.com'))
print(demo.query_NS('google.com'))
print(demo.query_MX('google.com'))
print(demo.query_SOA('google.com'))
print(demo.query_TXT('google.com'))
print(demo.query_PTR('8.8.8.8'))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment