Skip to content

Instantly share code, notes, and snippets.

@mrpapercut
Last active August 9, 2024 15:17
Show Gist options
  • Save mrpapercut/92422ecf06b5ab8e64e502da5e33b9f7 to your computer and use it in GitHub Desktop.
Save mrpapercut/92422ecf06b5ab8e64e502da5e33b9f7 to your computer and use it in GitHub Desktop.
Raw DNS requests with python
#!/usr/bin/env python3
import binascii
import socket
import sys
from collections import OrderedDict
# See https://web.archive.org/web/20180919041301/https://routley.io/tech/2017/12/28/hand-writing-dns-messages.html
# See https://tools.ietf.org/html/rfc1035
def send_udp_message(message, address, port):
"""send_udp_message sends a message to UDP server
message should be a hexadecimal encoded string
"""
message = message.replace(" ", "").replace("\n", "")
server_address = (address, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.sendto(binascii.unhexlify(message), server_address)
data, _ = sock.recvfrom(4096)
finally:
sock.close()
return binascii.hexlify(data).decode("utf-8")
def build_message(type="A", address=""):
ID = 43690 # 16-bit identifier (0-65535) # 43690 equals 'aaaa'
QR = 0 # Query: 0, Response: 1 1bit
OPCODE = 0 # Standard query 4bit
AA = 0 # ? 1bit
TC = 0 # Message is truncated? 1bit
RD = 1 # Recursion? 1bit
RA = 0 # ? 1bit
Z = 0 # ? 3bit
RCODE = 0 # ? 4bit
query_params = str(QR)
query_params += str(OPCODE).zfill(4)
query_params += str(AA) + str(TC) + str(RD) + str(RA)
query_params += str(Z).zfill(3)
query_params += str(RCODE).zfill(4)
query_params = "{:04x}".format(int(query_params, 2))
QDCOUNT = 1 # Number of questions 4bit
ANCOUNT = 0 # Number of answers 4bit
NSCOUNT = 0 # Number of authority records 4bit
ARCOUNT = 0 # Number of additional records 4bit
message = ""
message += "{:04x}".format(ID)
message += query_params
message += "{:04x}".format(QDCOUNT)
message += "{:04x}".format(ANCOUNT)
message += "{:04x}".format(NSCOUNT)
message += "{:04x}".format(ARCOUNT)
# QNAME is url split up by '.', preceded by int indicating length of part
addr_parts = address.split(".")
for part in addr_parts:
addr_len = "{:02x}".format(len(part))
addr_part = binascii.hexlify(part.encode())
message += addr_len
message += addr_part.decode()
message += "00" # Terminating bit for QNAME
# Type of request
QTYPE = get_type(type)
message += QTYPE
# Class for lookup. 1 is Internet
QCLASS = 1
message += "{:04x}".format(QCLASS)
return message
def decode_message(message):
res = []
ID = message[0:4]
query_params = message[4:8]
QDCOUNT = message[8:12]
ANCOUNT = message[12:16]
NSCOUNT = message[16:20]
ARCOUNT = message[20:24]
params = "{:b}".format(int(query_params, 16)).zfill(16)
QPARAMS = OrderedDict([
("QR", params[0:1]),
("OPCODE", params[1:5]),
("AA", params[5:6]),
("TC", params[6:7]),
("RD", params[7:8]),
("RA", params[8:9]),
("Z", params[9:12]),
("RCODE", params[12:16])
])
# Question section
QUESTION_SECTION_STARTS = 24
question_parts = parse_parts(message, QUESTION_SECTION_STARTS, [])
QNAME = ".".join(map(lambda p: binascii.unhexlify(p).decode(), question_parts))
QTYPE_STARTS = QUESTION_SECTION_STARTS + (len("".join(question_parts))) + (len(question_parts) * 2) + 2
QCLASS_STARTS = QTYPE_STARTS + 4
QTYPE = message[QTYPE_STARTS:QCLASS_STARTS]
QCLASS = message[QCLASS_STARTS:QCLASS_STARTS + 4]
res.append("\n# HEADER")
res.append("ID: " + ID)
res.append("QUERYPARAMS: ")
for qp in QPARAMS:
res.append(" - " + qp + ": " + QPARAMS[qp])
res.append("\n# QUESTION SECTION")
res.append("QNAME: " + QNAME)
res.append("QTYPE: " + QTYPE + " (\"" + get_type(int(QTYPE, 16)) + "\")")
res.append("QCLASS: " + QCLASS)
# Answer section
ANSWER_SECTION_STARTS = QCLASS_STARTS + 4
NUM_ANSWERS = max([int(ANCOUNT, 16), int(NSCOUNT, 16), int(ARCOUNT, 16)])
if NUM_ANSWERS > 0:
res.append("\n# ANSWER SECTION")
for ANSWER_COUNT in range(NUM_ANSWERS):
if (ANSWER_SECTION_STARTS < len(message)):
ANAME = message[ANSWER_SECTION_STARTS:ANSWER_SECTION_STARTS + 4] # Refers to Question
ATYPE = message[ANSWER_SECTION_STARTS + 4:ANSWER_SECTION_STARTS + 8]
ACLASS = message[ANSWER_SECTION_STARTS + 8:ANSWER_SECTION_STARTS + 12]
TTL = int(message[ANSWER_SECTION_STARTS + 12:ANSWER_SECTION_STARTS + 20], 16)
RDLENGTH = int(message[ANSWER_SECTION_STARTS + 20:ANSWER_SECTION_STARTS + 24], 16)
RDDATA = message[ANSWER_SECTION_STARTS + 24:ANSWER_SECTION_STARTS + 24 + (RDLENGTH * 2)]
if ATYPE == get_type("A"):
octets = [RDDATA[i:i+2] for i in range(0, len(RDDATA), 2)]
RDDATA_decoded = ".".join(list(map(lambda x: str(int(x, 16)), octets)))
else:
RDDATA_decoded = ".".join(map(lambda p: binascii.unhexlify(p).decode('iso8859-1'), parse_parts(RDDATA, 0, [])))
ANSWER_SECTION_STARTS = ANSWER_SECTION_STARTS + 24 + (RDLENGTH * 2)
try: ATYPE
except NameError: None
else:
res.append("# ANSWER " + str(ANSWER_COUNT + 1))
res.append("QDCOUNT: " + str(int(QDCOUNT, 16)))
res.append("ANCOUNT: " + str(int(ANCOUNT, 16)))
res.append("NSCOUNT: " + str(int(NSCOUNT, 16)))
res.append("ARCOUNT: " + str(int(ARCOUNT, 16)))
res.append("ANAME: " + ANAME)
res.append("ATYPE: " + ATYPE + " (\"" + get_type(int(ATYPE, 16)) + "\")")
res.append("ACLASS: " + ACLASS)
res.append("\nTTL: " + str(TTL))
res.append("RDLENGTH: " + str(RDLENGTH))
res.append("RDDATA: " + RDDATA)
res.append("RDDATA decoded (result): " + RDDATA_decoded + "\n")
return "\n".join(res)
def get_type(type):
types = [
"ERROR", # type 0 does not exist
"A",
"NS",
"MD",
"MF",
"CNAME",
"SOA",
"MB",
"MG",
"MR",
"NULL",
"WKS",
"PTS",
"HINFO",
"MINFO",
"MX",
"TXT"
]
return "{:04x}".format(types.index(type)) if isinstance(type, str) else types[type]
def parse_parts(message, start, parts):
part_start = start + 2
part_len = message[start:part_start]
if len(part_len) == 0:
return parts
part_end = part_start + (int(part_len, 16) * 2)
parts.append(message[part_start:part_end])
if message[part_end:part_end + 2] == "00" or part_end > len(message):
return parts
else:
return parse_parts(message, part_end, parts)
# Usage: python3 raw-dns-req.py github.com
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = "github.com"
# See get_type function for other possibilities for first argument
message = build_message("A", url)
print("Request:\n" + message)
print("\nRequest (decoded):" + decode_message(message))
# second argument is external DNS server, third argument is port
response = send_udp_message(message, "1.1.1.1", 53)
print("\nResponse:\n" + response)
print("\nResponse (decoded):" + decode_message(response))
@oscar230
Copy link

oscar230 commented Nov 4, 2020

Hi, can I use a subset of this code in my thesis? This is exactly what I was looking for. I will leave credits.
What license do you use? If none, can you add a license like GNU Public License 3.0 to this?
Great Regards. Oscar Andersson, Karlstad University.

@mrpapercut
Copy link
Author

Please feel free to use the code, it's public for a reason :) I added a GNU Public License here: https://gist.github.com/mrpapercut/b73e8748c4cf22a541f442622f5672ff

@mrpapercut
Copy link
Author

Hi Ian, I've changed the license from GPL to WTFPL, meaning you can do whatever you want with the code :)

I've also updated the link to the original article that inspired this gist.

@iancummings
Copy link

Hi, many thanks!
Ian

(my original comment seems to have gone - for the sake of others, I was asking about GPL V3 restrictions concerning publishing source code).

@robertsdotpm
Copy link

@mrpapercut hey there, I wanted to say thanks for the nice DNS code. It's very elegant and gives you exactly what you need. You've saved me a lot of work and I appreciate it. Good job!

@null-Exception1
Copy link

hello i just wanted to say, thank you so much for this code, i'm not looking to use it anywhere, i was just curious about the workings of dns and looking at the code helps me so much more than any explanation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment