Created
November 1, 2012 18:04
-
-
Save quarnster/3995421 to your computer and use it in GitHub Desktop.
A tiny python dns proxy script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based on code originally from http://code.activestate.com/recipes/491264-mini-fake-dns-server/ | |
# | |
# DNS rfc: http://www.ietf.org/rfc/rfc1035.txt | |
# http://en.wikipedia.org/wiki/List_of_DNS_record_types | |
import socket | |
import re | |
import sys | |
import traceback | |
import struct | |
import Queue | |
import threading | |
import pickle | |
import os | |
def decode_ipv4(addr): | |
return ".".join("%d" % a for a in struct.unpack("BBBB", addr)) | |
def encode_ipv4(addr): | |
return struct.pack("BBBB", *[int(x) for x in addr.split('.')]) | |
def decode_ipv6(addr): | |
return ":".join("%x" % a for a in struct.unpack("!HHHHHHHH", addr)) | |
def encode_ipv6(addr): | |
return struct.pack("!HHHHHHHH", *[int(x,16) for x in addr.split(':')]) | |
def decode_string2(data, fulldata): | |
domain = "" | |
pos = 0 | |
retpos = -1 | |
length = ord(data[pos]) | |
while length != 0: | |
if len(domain): | |
domain += "." | |
encoded = (length & 0xc0) == 0xc0 | |
if encoded: | |
length = struct.unpack("!H", data[pos:pos+2])[0] | |
off = length & ~0xc000 | |
length = ord(fulldata[off]) | |
data = fulldata[off:] | |
if retpos == -1: | |
retpos = pos+2 | |
pos = 0 | |
domain += data[pos+1:pos+1+length] | |
pos += length + 1 | |
length = ord(data[pos]) | |
pos += 1 | |
if retpos == -1: | |
retpos = pos | |
domain = domain.lower() | |
return domain, retpos | |
def decode_string(data, fulldata): | |
ret, null = decode_string2(data, fulldata) | |
return ret | |
def encode_string(data, fulldata=""): | |
orig = data | |
data = ["%c%s" % (chr(len(a)), a) for a in data.split(".")] | |
ret = "" | |
while len(data): | |
full = "".join(data) | |
idx = fulldata.find(full) | |
if idx != -1: | |
ret += struct.pack("!H", 0xc000|idx) | |
break | |
ret += data.pop(0) | |
if not len(data): | |
ret += chr(0) | |
return ret | |
def create_header(QR=1, OPCODE=0, AA=0, TC=0, RD=1, RA=0, Z=0, RCODE=0): | |
data = 0 | |
data |= (QR & 1) << 15 | |
data |= (OPCODE & 15) << 11 | |
data |= (AA & 1) << 10 | |
data |= (TC & 1) << 9 | |
data |= (RD & 1) << 8 | |
data |= (RA & 1) << 7 | |
data |= (Z & 7) << 4 | |
data |= (RCODE & 15) << 0 | |
return data | |
def decode_header(data): | |
QR = (data >> 15) & 1 | |
OPCODE = (data >> 11) & 15 | |
AA = (data >> 10) & 1 | |
TC = (data >> 9) & 1 | |
RD = (data >> 8) & 1 | |
RA = (data >> 7) & 1 | |
Z = (data >> 4) & 7 | |
RCODE = (data >> 0) & 15 | |
return QR, OPCODE, AA, TC, RD, RA, Z, RCODE | |
typelut = { | |
1: (lambda a,b: decode_ipv4(a), lambda a,b: encode_ipv4(a)), | |
5: (decode_string, encode_string), | |
12: (decode_string, encode_string), | |
28: (lambda a,b: decode_ipv6(a), lambda a,b: encode_ipv6(a)), | |
} | |
class Response: | |
def __init__(self, rawdata, fulldata=""): | |
self.rawdata = rawdata | |
self.name, off = decode_string2(rawdata, fulldata) | |
self.type2, self.class2, self.ttl, valuelength = struct.unpack("!HHIH", rawdata[off:off+10]) | |
off += 10 | |
self.value = rawdata[off:off+valuelength] | |
off += valuelength | |
self.length = off | |
if self.type2 in typelut: | |
self.value = typelut[self.type2][0](self.value, fulldata) | |
def add_self(self, data): | |
data += encode_string(self.name, data) | |
encoded = self.value | |
if self.type2 in typelut: | |
encoded = typelut[self.type2][1](self.value, data) | |
data += struct.pack("!HHIH", self.type2, self.class2, self.ttl, len(encoded)) | |
data += encoded | |
return data | |
def __repr__(self): | |
return "%s %2d %2d %s" % (self.name, self.type2, self.class2, self.value if self.type2 in typelut else "Unknown decoder") | |
class DNSQuery: | |
def __init__(self, data): | |
self.data = data | |
self.qname = '' | |
self.responses = [] | |
# See rfc section 4.1.1 | |
self.id = data[:2] | |
self.header = struct.unpack("!H", self.data[2:4])[0] | |
header = decode_header(self.header) | |
self.aa = header[2] | |
self.rcode = header[7] | |
opcode = header[1] | |
self.counts = list(struct.unpack("!HHHH", self.data[4:4+2*4])) | |
assert opcode == 0 | |
assert self.counts[0] == 1 | |
pos = 12 | |
for i in range(self.counts[0]): | |
self.qname, pos2 = decode_string2(data[pos:], data) | |
pos += pos2 | |
self.qtype, self.qclass = struct.unpack("!HH", data[pos:pos+4]) | |
pos += 4 | |
assert self.qclass == 1 | |
for i in range(self.counts[1]+self.counts[2]+self.counts[3]): | |
r = Response(data[pos:], data) | |
pos += r.length | |
self.responses.append(r) | |
def key(self): | |
return (self.qname, self.qtype) | |
def add_response(self, data, type=1): | |
enc = encode_string(self.qname) | |
assert type in typelut | |
encoded = typelut[type][1](data, "") | |
enc += struct.pack("!HHIH", type, 1, 60, len(encoded)) | |
enc += encoded | |
self.responses.append(Response(enc)) | |
self.counts[1] = len(self.responses) | |
def create_response(self, header=None): | |
if not header: | |
header = create_header(AA=self.aa, RCODE=self.rcode) | |
packet = '' | |
if self.qname: | |
packet += self.id # id | |
packet += struct.pack("!H", header) | |
packet += struct.pack("!HHHH", 1, self.counts[1], self.counts[2], self.counts[3]) # Questions and Answers Counts | |
# Question | |
packet += encode_string(self.qname) | |
packet += struct.pack("!HH", self.qtype, 1) # Type and internet class | |
# Answers | |
for resp in self.responses: | |
packet = resp.add_self(packet) | |
return packet | |
def __repr__(self): | |
ret = "Header: %s %s\n" % (str(decode_header(self.header)), str(self.counts)) | |
ret += "Question: %3d %2d %s\n" % (self.qtype, self.qclass, self.qname) | |
ret += "Response:\n" | |
for resp in self.responses: | |
ret += "\t%s\n" % resp | |
return ret | |
if __name__ == '__main__': | |
dns = None | |
cacheEntries = False | |
useCache = True | |
useEtcHosts = True | |
debug = 0 | |
i = 1 | |
while i < len(sys.argv): | |
arg = sys.argv[i] | |
if arg == "-proxydns": | |
i += 1 | |
dns = sys.argv[i] | |
elif arg == "-disablecache": | |
useCache = False | |
elif arg == "-disableetchosts": | |
useEtcHosts = False | |
elif arg == "-cachenew": | |
cacheEntries = True | |
elif arg == "-debug": | |
i += 1 | |
debug = int(sys.argv[i]) | |
else: | |
raise Exception("Unknown argument: %s" % arg) | |
i += 1 | |
if dns == None: | |
raise Exception("Please provide a proxydns argument") | |
realdns = (dns, 53) | |
udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
udps.bind(('', 53)) | |
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock2.settimeout(5) | |
running = True | |
queue = Queue.Queue() | |
cache = {} | |
questions = {} | |
host_sites = {} | |
if os.access("/etc/hosts", os.R_OK): | |
f = open("/etc/hosts") | |
data = f.read() | |
f.close() | |
hosts_sites = dict([(b, a) for a,b in re.findall("^(\d+\.\d+\.\d+\.\d+)\s+([\w\.]+)(?:\s|$)", data, re.MULTILINE)]) | |
if os.access("./dns.dump", os.R_OK): | |
f = open("./dns.dump", "rb") | |
cache = pickle.load(f) | |
f.close() | |
def thread(): | |
while running: | |
try: | |
p = queue.get() | |
if not p: | |
break | |
data = sock2.recv(1024) | |
p2 = DNSQuery(data) | |
if cacheEntries: | |
cache[p2.key()] = p2 | |
addr = questions[p2.id] | |
if (debug&1) != 0: | |
print "Looked up:" | |
print p2 | |
del questions[p2.id] | |
udps.sendto(p2.create_response(), addr) | |
except: | |
traceback.print_exc() | |
finally: | |
queue.task_done() | |
print "broke out of thread" | |
counts = [0,0,0] | |
try: | |
t = threading.Thread(target=thread) | |
t.start() | |
while running: | |
try: | |
data, addr = udps.recvfrom(1024) | |
p = DNSQuery(data) | |
if useEtcHosts and p.qname in hosts_sites: | |
counts[0] += 1 | |
ip = hosts_sites[p.qname] | |
p.add_response(ip) | |
if (debug&2) != 0: | |
print "Returning /etc/hosts lookup:" | |
print p | |
udps.sendto(p.create_response(), addr) | |
elif useCache and p.key() in cache: | |
counts[1] += 1 | |
p2 = cache[p.key()] | |
p2.id = p.id | |
if (debug&4) != 0: | |
print "Returning cached entry:" | |
print p2 | |
udps.sendto(p2.create_response(), addr) | |
else: | |
counts[2] += 1 | |
questions[p.id] = addr | |
sock2.sendto(p.create_response(header=create_header(QR=0)), realdns) | |
queue.put(True) | |
if (debug&8) != 0: | |
print "/etc/hosts: %4d, cache: %4d, new lookups: %4d\r" % tuple(counts), | |
sys.stdout.flush() | |
except KeyboardInterrupt as ki: | |
print "executing on keyboard break" | |
running = False | |
break | |
except: | |
traceback.print_exc() | |
finally: | |
print "shutting down" | |
running = False | |
queue.put(None) | |
sock2.settimeout(0.1) | |
udps.close() | |
sock2.close() | |
f = open("./dns.dump", "wb") | |
pickle.dump(cache, f) | |
f.close() | |
print "dumped dns cache" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment