Skip to content

Instantly share code, notes, and snippets.

@cybercase
Last active August 9, 2017 11:55
Show Gist options
  • Save cybercase/2c7238e4d971102654ba to your computer and use it in GitHub Desktop.
Save cybercase/2c7238e4d971102654ba to your computer and use it in GitHub Desktop.
Simple (and limited) DNS query client
#!/usr/bin/env python
# -*- coding:utf8 -*-
import random
import socket
import struct
import StringIO
import argparse
from collections import namedtuple
MAX_PACKET_SIZE = 512
STRUCT_DNS_HEADER = '!HHHHHH'
HEADER_LEN = 12
# Documentation: http://www.ccs.neu.edu/home/amislove/teaching/cs4700/fall09/handouts/project1-primer.pdf
class Question(object):
def __init__(self, domain):
self.domain = domain
self.id = random.randint(0, 65535) # Request ID
self.qr = 0 # Query or Response
self.opcode = 0 # Query Type: Standard
self.aa = 0 # Meaningful only in answers
self.tc = 0 # Truncated Flag
self.rd = 1 # Allow recursive query
self.ra = 0 # Meaningful only in answers
self.z = 0 # Reserved
self.rcode = 0 # Response code (Meaningful only in answers)
self.qdcount = 1 # Entries in Question (only 1 supported)
self.ancount = 0
self.nscount = 0
self.arcount = 0
# Question section
self.qtype = 1 # Only A Records
self.qclass = 1 # Address class (Internet addresses)
def _header(self):
control = 0
control |= self.qr << 15
control |= self.opcode << 11
control |= self.aa << 10
control |= self.tc << 9
control |= self.rd << 8
control |= self.ra << 7
control |= self.z << 4
control |= self.rcode
return struct.pack(STRUCT_DNS_HEADER, self.id, control, self.qdcount,
self.ancount, self.nscount, self.arcount)
def _question(self):
output = StringIO.StringIO()
for l in self.domain.split('.'):
ln = len(l)
assert(ln < 64)
output.write(struct.pack('B', ln))
output.write(l)
output.write(struct.pack('B', 0))
output.write(struct.pack('!HH', self.qtype, self.qclass))
return output.getvalue()
@property
def data(self):
return self._header() + self._question()
class Answer(object):
RCODE_ERRORS = {
1: 'Format error - the name server was unable to interpret the query',
2: 'Server failure - the name server was unable to process this query '
'due to a problem withthe name server',
3: 'Name Error - Meaningful only for responses from an authoritative '
'name server, this code signifies that the domain name referenced '
'in the query does not exist.',
4: 'Not Implemented - The name server does not support the requested '
'kind of query.',
5: 'Refused - The name server refuses to perform the specified '
'operation for policy reasons.'
}
TYPE_ARECORD = 1
TYPE_CNAME = 5
Record = namedtuple('Record', 'domain type_ ttl addr alias')
def __init__(self, data):
self.data_stream = StringIO.StringIO(data)
self._parse_header()
self._parse_question()
self._parse_answer()
def _parse_header(self):
header_data = self.data_stream.read(HEADER_LEN)
self.id, control, self.qdcount, self.ancount, self.nscount, \
self.arcount = struct.unpack(STRUCT_DNS_HEADER, header_data)
self.qr = control >> 15
self.opcode = control >> 11 & (2**4-1)
self.aa = control >> 10 & 1
self.tc = control >> 9 & 1
self.rd = control >> 8 & 1
self.ra = control >> 7 & 1
self.z = control >> 4 & (2**3-1)
self.rcode = control & (2**4-1)
if self.tc != 0:
raise RuntimeError("Can't handle truncated packets")
if self.rcode != 0:
raise RuntimeError(self.RCODE_ERRORS.get(self.rcode, 'Unknown Error'))
def _parse_question(self):
self.qlabels = self._read_labels(self.data_stream)
self.qtype, self.qclass = struct.unpack('!HH', self.data_stream.read(4))
def _read_labels(self, stream):
labels = []
ln, = struct.unpack('B', stream.read(1))
while ln != 0:
if ln < 64:
labels.append(stream.read(ln))
ln, = struct.unpack('B', stream.read(1))
elif ln >= 64:
jmp_offset, = struct.unpack('B', stream.read(1))
jmp_offset = ((ln << 8) | jmp_offset) & 0x3FFF # 16383 -> (2**14-1) -> 00111111 11111111
curr_offset = stream.tell() # save current offset
stream.seek(jmp_offset) # jump back to the labels
labels += self._read_labels(stream) # read the labels
stream.seek(curr_offset) # return to current_offset
ln = 0 # force exit
return labels
def _parse_answer(self):
self.results = []
for i in range(self.ancount):
domain = '.'.join(self._read_labels(self.data_stream))
type_, class_, ttl, rdlength = \
struct.unpack('!HHIH', self.data_stream.read(10))
assert(class_ == 1)
addr = ''
alias = ''
if type_ == self.TYPE_ARECORD: # A Record
assert(rdlength == 4)
addr = struct.unpack('BBBB', self.data_stream.read(4))
addr = '{0}.{1}.{2}.{3}'.format(*addr)
type_ = 'A'
elif type_ == self.TYPE_CNAME: # CNAME
alias = '.'.join(self._read_labels(self.data_stream))
type_ = 'CNAME'
else:
raise RuntimeError('Unsupported rdata type: {}'.format(type_))
self.results.append(
Answer.Record(domain, type_, ttl, addr, alias)
)
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument('domain', help='Domain name')
ap.add_argument('-d', '--dns', help='Dns address', default='8.8.8.8')
args = ap.parse_args()
question = Question(args.domain)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto(question.data, (args.dns, 53))
data, address = s.recvfrom(MAX_PACKET_SIZE)
answer = Answer(data)
for r in answer.results:
print r
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment