Skip to content

Instantly share code, notes, and snippets.

@quarnster
Created November 1, 2012 18:04
Show Gist options
  • Save quarnster/3995421 to your computer and use it in GitHub Desktop.
Save quarnster/3995421 to your computer and use it in GitHub Desktop.
A tiny python dns proxy script
# Based on code originally from http://code.activestate.com/recipes/491264-mini-fake-dns-server/
#
# DNS rfc: http://www.ietf.org/rfc/rfc1035.txt
# http://en.wikipedia.org/wiki/List_of_DNS_record_types
import socket
import re
import sys
import traceback
import struct
import Queue
import threading
import pickle
import os
def decode_ipv4(addr):
return ".".join("%d" % a for a in struct.unpack("BBBB", addr))
def encode_ipv4(addr):
return struct.pack("BBBB", *[int(x) for x in addr.split('.')])
def decode_ipv6(addr):
return ":".join("%x" % a for a in struct.unpack("!HHHHHHHH", addr))
def encode_ipv6(addr):
return struct.pack("!HHHHHHHH", *[int(x,16) for x in addr.split(':')])
def decode_string2(data, fulldata):
domain = ""
pos = 0
retpos = -1
length = ord(data[pos])
while length != 0:
if len(domain):
domain += "."
encoded = (length & 0xc0) == 0xc0
if encoded:
length = struct.unpack("!H", data[pos:pos+2])[0]
off = length & ~0xc000
length = ord(fulldata[off])
data = fulldata[off:]
if retpos == -1:
retpos = pos+2
pos = 0
domain += data[pos+1:pos+1+length]
pos += length + 1
length = ord(data[pos])
pos += 1
if retpos == -1:
retpos = pos
domain = domain.lower()
return domain, retpos
def decode_string(data, fulldata):
ret, null = decode_string2(data, fulldata)
return ret
def encode_string(data, fulldata=""):
orig = data
data = ["%c%s" % (chr(len(a)), a) for a in data.split(".")]
ret = ""
while len(data):
full = "".join(data)
idx = fulldata.find(full)
if idx != -1:
ret += struct.pack("!H", 0xc000|idx)
break
ret += data.pop(0)
if not len(data):
ret += chr(0)
return ret
def create_header(QR=1, OPCODE=0, AA=0, TC=0, RD=1, RA=0, Z=0, RCODE=0):
data = 0
data |= (QR & 1) << 15
data |= (OPCODE & 15) << 11
data |= (AA & 1) << 10
data |= (TC & 1) << 9
data |= (RD & 1) << 8
data |= (RA & 1) << 7
data |= (Z & 7) << 4
data |= (RCODE & 15) << 0
return data
def decode_header(data):
QR = (data >> 15) & 1
OPCODE = (data >> 11) & 15
AA = (data >> 10) & 1
TC = (data >> 9) & 1
RD = (data >> 8) & 1
RA = (data >> 7) & 1
Z = (data >> 4) & 7
RCODE = (data >> 0) & 15
return QR, OPCODE, AA, TC, RD, RA, Z, RCODE
typelut = {
1: (lambda a,b: decode_ipv4(a), lambda a,b: encode_ipv4(a)),
5: (decode_string, encode_string),
12: (decode_string, encode_string),
28: (lambda a,b: decode_ipv6(a), lambda a,b: encode_ipv6(a)),
}
class Response:
def __init__(self, rawdata, fulldata=""):
self.rawdata = rawdata
self.name, off = decode_string2(rawdata, fulldata)
self.type2, self.class2, self.ttl, valuelength = struct.unpack("!HHIH", rawdata[off:off+10])
off += 10
self.value = rawdata[off:off+valuelength]
off += valuelength
self.length = off
if self.type2 in typelut:
self.value = typelut[self.type2][0](self.value, fulldata)
def add_self(self, data):
data += encode_string(self.name, data)
encoded = self.value
if self.type2 in typelut:
encoded = typelut[self.type2][1](self.value, data)
data += struct.pack("!HHIH", self.type2, self.class2, self.ttl, len(encoded))
data += encoded
return data
def __repr__(self):
return "%s %2d %2d %s" % (self.name, self.type2, self.class2, self.value if self.type2 in typelut else "Unknown decoder")
class DNSQuery:
def __init__(self, data):
self.data = data
self.qname = ''
self.responses = []
# See rfc section 4.1.1
self.id = data[:2]
self.header = struct.unpack("!H", self.data[2:4])[0]
header = decode_header(self.header)
self.aa = header[2]
self.rcode = header[7]
opcode = header[1]
self.counts = list(struct.unpack("!HHHH", self.data[4:4+2*4]))
assert opcode == 0
assert self.counts[0] == 1
pos = 12
for i in range(self.counts[0]):
self.qname, pos2 = decode_string2(data[pos:], data)
pos += pos2
self.qtype, self.qclass = struct.unpack("!HH", data[pos:pos+4])
pos += 4
assert self.qclass == 1
for i in range(self.counts[1]+self.counts[2]+self.counts[3]):
r = Response(data[pos:], data)
pos += r.length
self.responses.append(r)
def key(self):
return (self.qname, self.qtype)
def add_response(self, data, type=1):
enc = encode_string(self.qname)
assert type in typelut
encoded = typelut[type][1](data, "")
enc += struct.pack("!HHIH", type, 1, 60, len(encoded))
enc += encoded
self.responses.append(Response(enc))
self.counts[1] = len(self.responses)
def create_response(self, header=None):
if not header:
header = create_header(AA=self.aa, RCODE=self.rcode)
packet = ''
if self.qname:
packet += self.id # id
packet += struct.pack("!H", header)
packet += struct.pack("!HHHH", 1, self.counts[1], self.counts[2], self.counts[3]) # Questions and Answers Counts
# Question
packet += encode_string(self.qname)
packet += struct.pack("!HH", self.qtype, 1) # Type and internet class
# Answers
for resp in self.responses:
packet = resp.add_self(packet)
return packet
def __repr__(self):
ret = "Header: %s %s\n" % (str(decode_header(self.header)), str(self.counts))
ret += "Question: %3d %2d %s\n" % (self.qtype, self.qclass, self.qname)
ret += "Response:\n"
for resp in self.responses:
ret += "\t%s\n" % resp
return ret
if __name__ == '__main__':
dns = None
cacheEntries = False
useCache = True
useEtcHosts = True
debug = 0
i = 1
while i < len(sys.argv):
arg = sys.argv[i]
if arg == "-proxydns":
i += 1
dns = sys.argv[i]
elif arg == "-disablecache":
useCache = False
elif arg == "-disableetchosts":
useEtcHosts = False
elif arg == "-cachenew":
cacheEntries = True
elif arg == "-debug":
i += 1
debug = int(sys.argv[i])
else:
raise Exception("Unknown argument: %s" % arg)
i += 1
if dns == None:
raise Exception("Please provide a proxydns argument")
realdns = (dns, 53)
udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udps.bind(('', 53))
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock2.settimeout(5)
running = True
queue = Queue.Queue()
cache = {}
questions = {}
host_sites = {}
if os.access("/etc/hosts", os.R_OK):
f = open("/etc/hosts")
data = f.read()
f.close()
hosts_sites = dict([(b, a) for a,b in re.findall("^(\d+\.\d+\.\d+\.\d+)\s+([\w\.]+)(?:\s|$)", data, re.MULTILINE)])
if os.access("./dns.dump", os.R_OK):
f = open("./dns.dump", "rb")
cache = pickle.load(f)
f.close()
def thread():
while running:
try:
p = queue.get()
if not p:
break
data = sock2.recv(1024)
p2 = DNSQuery(data)
if cacheEntries:
cache[p2.key()] = p2
addr = questions[p2.id]
if (debug&1) != 0:
print "Looked up:"
print p2
del questions[p2.id]
udps.sendto(p2.create_response(), addr)
except:
traceback.print_exc()
finally:
queue.task_done()
print "broke out of thread"
counts = [0,0,0]
try:
t = threading.Thread(target=thread)
t.start()
while running:
try:
data, addr = udps.recvfrom(1024)
p = DNSQuery(data)
if useEtcHosts and p.qname in hosts_sites:
counts[0] += 1
ip = hosts_sites[p.qname]
p.add_response(ip)
if (debug&2) != 0:
print "Returning /etc/hosts lookup:"
print p
udps.sendto(p.create_response(), addr)
elif useCache and p.key() in cache:
counts[1] += 1
p2 = cache[p.key()]
p2.id = p.id
if (debug&4) != 0:
print "Returning cached entry:"
print p2
udps.sendto(p2.create_response(), addr)
else:
counts[2] += 1
questions[p.id] = addr
sock2.sendto(p.create_response(header=create_header(QR=0)), realdns)
queue.put(True)
if (debug&8) != 0:
print "/etc/hosts: %4d, cache: %4d, new lookups: %4d\r" % tuple(counts),
sys.stdout.flush()
except KeyboardInterrupt as ki:
print "executing on keyboard break"
running = False
break
except:
traceback.print_exc()
finally:
print "shutting down"
running = False
queue.put(None)
sock2.settimeout(0.1)
udps.close()
sock2.close()
f = open("./dns.dump", "wb")
pickle.dump(cache, f)
f.close()
print "dumped dns cache"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment