Skip to content

Instantly share code, notes, and snippets.

@kalloc
Last active September 7, 2015 20:53
Show Gist options
  • Save kalloc/49361f0d72ef201942aa to your computer and use it in GitHub Desktop.
Save kalloc/49361f0d72ef201942aa to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# ainmarh.lab and project daedalus.ru
try:
from twisted.internet import epollreactor
epollreactor.install()
except:
print 'skip poll'
pass
from twisted.names import common, server, dns, authority
from twisted.application import service, internet
from twisted.internet import reactor
PORT = 53
IP = '127.0.0.1'
EMAIL = 'some.mail.com' # in SOA . for @
NS1 = 'ns1.your.ns'
NS2 = 'ns2.your.ns'
class NoFileAuthority(authority.FileAuthority):
def __init__(self, soa, records):
common.ResolverBase.__init__(self)
self.soa, self.records = soa, records
class MyDNSServerFactory(server.DNSServerFactory):
@staticmethod
def doLogQuery(proto, host, ip):
def doClose(result, proto):
proto.transport.loseConnection()
return proto.set(host, ip).addCallback(doClose, proto)
def handleSOA(self, message, proto, address):
query = message.queries[0]
query_host = query.name.name
payload = dns.Record_SOA(NS1,
EMAIL,
serial=37337, refresh=10800,
retry=3600, expire=604800,
minimum=86400)
answer = dns.RRHeader(
query_host, dns.SOA, payload=payload, ttl=3600, auth=True)
message.answers.append(answer)
self.sendReply(proto, message, address)
pass
def handleA(self, message, proto, address):
query = message.queries[0]
query_host = query.name.name
answer = dns.RRHeader(
query_host, payload=dns.Record_A(IP), ttl=60, auth=True)
message.answers.append(answer)
self.sendReply(proto, message, address)
pass
def handleNS(self, message, proto, address):
query = message.queries[0]
query_host = query.name.name
answer = dns.RRHeader(
query_host, dns.NS, payload=dns.Record_NS(NS2), ttl=3600, auth=True)
message.answers.append(answer)
answer = dns.RRHeader(
query_host, dns.NS, payload=dns.Record_NS(NS1), ttl=3600, auth=True)
message.answers.append(answer)
self.sendReply(proto, message, address)
pass
def handleQuery(self, message, proto, address):
query = message.queries[0]
ip = address[0] if address else proto.transport.client[0]
query_host = query.name.name
print '[%s] ip:%s lookup %s' % (query, ip, query_host)
if query.type == 1:
self.handleA(message, proto, address)
elif query.type == 6:
self.handleSOA(message, proto, address)
elif query.type == 2:
self.handleNS(message, proto, address)
else:
return server.DNSServerFactory.handleQuery(self, message, proto,
address)
reply = None
return reply
class DNSServer(object):
zones = []
soa = {}
def __init__(self):
self.set_soa()
pass
def set_soa(self, serial=100, refresh=1234, minimum=7654,
expire=19283784, retry=15, ttl=1):
self.soa['serial'] = serial
self.soa['refresh'] = refresh
self.soa['minimum'] = minimum
self.soa['expire'] = expire
self.soa['retry'] = retry
self.soa['ttl'] = ttl
def add_zone(self, host, zones):
soa_record = dns.Record_SOA(
mname=host,
rname='postmaster.' + host,
serial=self.soa['serial'],
refresh=self.soa['retry'],
minimum=self.soa['minimum'],
expire=self.soa['expire'],
retry=self.soa['retry'],
ttl=self.soa['ttl']
)
if '@' in zones:
zones[host] = zones['@']
del zones['@']
if host in zones:
zones[host].insert(0, soa_record)
self.zones.append(NoFileAuthority(
soa=(host, soa_record),
records=zones
))
srv = DNSServer()
f = MyDNSServerFactory(None, None, srv.zones, verbose=0)
p = dns.DNSDatagramProtocol(f)
if __name__ == '__main__':
reactor.listenTCP(PORT, f)
reactor.listenUDP(PORT, p)
reactor.run()
else:
srv = service.MultiService()
application = service.Application("5WHO DNS Service")
srv.setServiceParent(service.IServiceCollection(application))
internet.TCPServer(PORT, f).setServiceParent(srv)
internet.UDPServer(PORT, p).setServiceParent(srv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment