Last active
August 9, 2017 11:55
-
-
Save cybercase/2c7238e4d971102654ba to your computer and use it in GitHub Desktop.
Simple (and limited) DNS query client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding:utf8 -*- | |
import random | |
import socket | |
import struct | |
import StringIO | |
import argparse | |
from collections import namedtuple | |
MAX_PACKET_SIZE = 512 | |
STRUCT_DNS_HEADER = '!HHHHHH' | |
HEADER_LEN = 12 | |
# Documentation: http://www.ccs.neu.edu/home/amislove/teaching/cs4700/fall09/handouts/project1-primer.pdf | |
class Question(object): | |
def __init__(self, domain): | |
self.domain = domain | |
self.id = random.randint(0, 65535) # Request ID | |
self.qr = 0 # Query or Response | |
self.opcode = 0 # Query Type: Standard | |
self.aa = 0 # Meaningful only in answers | |
self.tc = 0 # Truncated Flag | |
self.rd = 1 # Allow recursive query | |
self.ra = 0 # Meaningful only in answers | |
self.z = 0 # Reserved | |
self.rcode = 0 # Response code (Meaningful only in answers) | |
self.qdcount = 1 # Entries in Question (only 1 supported) | |
self.ancount = 0 | |
self.nscount = 0 | |
self.arcount = 0 | |
# Question section | |
self.qtype = 1 # Only A Records | |
self.qclass = 1 # Address class (Internet addresses) | |
def _header(self): | |
control = 0 | |
control |= self.qr << 15 | |
control |= self.opcode << 11 | |
control |= self.aa << 10 | |
control |= self.tc << 9 | |
control |= self.rd << 8 | |
control |= self.ra << 7 | |
control |= self.z << 4 | |
control |= self.rcode | |
return struct.pack(STRUCT_DNS_HEADER, self.id, control, self.qdcount, | |
self.ancount, self.nscount, self.arcount) | |
def _question(self): | |
output = StringIO.StringIO() | |
for l in self.domain.split('.'): | |
ln = len(l) | |
assert(ln < 64) | |
output.write(struct.pack('B', ln)) | |
output.write(l) | |
output.write(struct.pack('B', 0)) | |
output.write(struct.pack('!HH', self.qtype, self.qclass)) | |
return output.getvalue() | |
@property | |
def data(self): | |
return self._header() + self._question() | |
class Answer(object): | |
RCODE_ERRORS = { | |
1: 'Format error - the name server was unable to interpret the query', | |
2: 'Server failure - the name server was unable to process this query ' | |
'due to a problem withthe name server', | |
3: 'Name Error - Meaningful only for responses from an authoritative ' | |
'name server, this code signifies that the domain name referenced ' | |
'in the query does not exist.', | |
4: 'Not Implemented - The name server does not support the requested ' | |
'kind of query.', | |
5: 'Refused - The name server refuses to perform the specified ' | |
'operation for policy reasons.' | |
} | |
TYPE_ARECORD = 1 | |
TYPE_CNAME = 5 | |
Record = namedtuple('Record', 'domain type_ ttl addr alias') | |
def __init__(self, data): | |
self.data_stream = StringIO.StringIO(data) | |
self._parse_header() | |
self._parse_question() | |
self._parse_answer() | |
def _parse_header(self): | |
header_data = self.data_stream.read(HEADER_LEN) | |
self.id, control, self.qdcount, self.ancount, self.nscount, \ | |
self.arcount = struct.unpack(STRUCT_DNS_HEADER, header_data) | |
self.qr = control >> 15 | |
self.opcode = control >> 11 & (2**4-1) | |
self.aa = control >> 10 & 1 | |
self.tc = control >> 9 & 1 | |
self.rd = control >> 8 & 1 | |
self.ra = control >> 7 & 1 | |
self.z = control >> 4 & (2**3-1) | |
self.rcode = control & (2**4-1) | |
if self.tc != 0: | |
raise RuntimeError("Can't handle truncated packets") | |
if self.rcode != 0: | |
raise RuntimeError(self.RCODE_ERRORS.get(self.rcode, 'Unknown Error')) | |
def _parse_question(self): | |
self.qlabels = self._read_labels(self.data_stream) | |
self.qtype, self.qclass = struct.unpack('!HH', self.data_stream.read(4)) | |
def _read_labels(self, stream): | |
labels = [] | |
ln, = struct.unpack('B', stream.read(1)) | |
while ln != 0: | |
if ln < 64: | |
labels.append(stream.read(ln)) | |
ln, = struct.unpack('B', stream.read(1)) | |
elif ln >= 64: | |
jmp_offset, = struct.unpack('B', stream.read(1)) | |
jmp_offset = ((ln << 8) | jmp_offset) & 0x3FFF # 16383 -> (2**14-1) -> 00111111 11111111 | |
curr_offset = stream.tell() # save current offset | |
stream.seek(jmp_offset) # jump back to the labels | |
labels += self._read_labels(stream) # read the labels | |
stream.seek(curr_offset) # return to current_offset | |
ln = 0 # force exit | |
return labels | |
def _parse_answer(self): | |
self.results = [] | |
for i in range(self.ancount): | |
domain = '.'.join(self._read_labels(self.data_stream)) | |
type_, class_, ttl, rdlength = \ | |
struct.unpack('!HHIH', self.data_stream.read(10)) | |
assert(class_ == 1) | |
addr = '' | |
alias = '' | |
if type_ == self.TYPE_ARECORD: # A Record | |
assert(rdlength == 4) | |
addr = struct.unpack('BBBB', self.data_stream.read(4)) | |
addr = '{0}.{1}.{2}.{3}'.format(*addr) | |
type_ = 'A' | |
elif type_ == self.TYPE_CNAME: # CNAME | |
alias = '.'.join(self._read_labels(self.data_stream)) | |
type_ = 'CNAME' | |
else: | |
raise RuntimeError('Unsupported rdata type: {}'.format(type_)) | |
self.results.append( | |
Answer.Record(domain, type_, ttl, addr, alias) | |
) | |
if __name__ == '__main__': | |
ap = argparse.ArgumentParser() | |
ap.add_argument('domain', help='Domain name') | |
ap.add_argument('-d', '--dns', help='Dns address', default='8.8.8.8') | |
args = ap.parse_args() | |
question = Question(args.domain) | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.sendto(question.data, (args.dns, 53)) | |
data, address = s.recvfrom(MAX_PACKET_SIZE) | |
answer = Answer(data) | |
for r in answer.results: | |
print r |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment