Created
July 22, 2023 02:03
-
-
Save iljavs/518c0ddb338a344ffce5bc67370560ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import impacket.dns | |
import socket | |
import select | |
import sys | |
import multiprocessing | |
import signal | |
# DNS server to connect to | |
dns_server = "8.8.8.8" | |
# todo | |
def udp_pre_hook(address: tuple, data: bytes) -> bytes: | |
dns_packet = impacket.dns.DNS(data) | |
txid = dns_packet.get_transaction_id() | |
sport = address[1] | |
print ("udp request, pktlen: %d TXID: %d, source port: %d" % | |
(len(data), txid, sport)) | |
return data | |
# todo | |
def udp_post_hook(data: bytes) -> bytes: | |
return data | |
# todo | |
def tcp_pre_hook(data: bytes) -> bytes: | |
return data | |
# todo | |
def tcp_post_hook(data: bytes) -> bytes: | |
return data | |
def handle_udp(data: bytes, address: tuple, udp_conn: socket.socket): | |
data = udp_pre_hook(address, data) | |
dns_server_ip = socket.gethostbyname(dns_server) | |
print ("Sending DNS query to %s" % dns_server_ip) | |
dns_server_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
dns_server_udp_sock.sendto(data, (dns_server_ip, 53)) | |
dns_server_udp_sock.settimeout(15) | |
try: | |
dns_response_pkt, dns_server_addr = dns_server_udp_sock.recvfrom(1024) | |
except socket.timeout: | |
print ("DNS server timeout") | |
dns_server_udp_sock.close() | |
udp_conn.close() | |
return | |
print ("Received DNS response from %s" % dns_server_addr[0]) | |
dns_server_udp_sock.close() | |
dns_response_pkt = udp_post_hook(dns_response_pkt) | |
udp_conn.sendto(dns_response_pkt, address) | |
print ("Sent DNS response to client: %s" % dns_response_pkt) | |
udp_conn.close() # ???? | |
print ("Closed UDP connection") | |
def handle_tcp(tcp_conn: socket.socket): | |
data = tcp_conn.recv(1024) | |
print ("Received TCP data from client: %s" % data) | |
data = tcp_pre_hook(data) | |
dns_server_ip = socket.gethostbyname(dns_server) | |
print ("Sending DNS query to %s" % dns_server_ip) | |
dns_server_tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
dns_server_tcp_sock.connect((dns_server_ip, 53)) | |
dns_server_tcp_sock.send(data) | |
dns_response_pkt = dns_server_tcp_sock.recv(1024) | |
print ("Received DNS response from %s" % dns_server_ip) | |
dns_response_pkt = tcp_post_hook(dns_response_pkt) | |
tcp_conn.send(dns_response_pkt) | |
print ("Sent DNS response to client: %s" % dns_response_pkt) | |
tcp_conn.close() # ???? | |
print ("Closed TCP connection") | |
def signal_handler(sig, frame): | |
print('You pressed Ctrl+C!') | |
sys.exit(0) | |
if __name__ == "__main__": | |
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
udp.bind(('', 53)) | |
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
tcp.bind(('', 53)) | |
tcp.listen(1) | |
signal.signal(signal.SIGINT, signal_handler) | |
try: | |
while True: | |
rsock, wsock, esock = select.select([udp, tcp], [], []) | |
for sock in rsock: | |
if sock == udp: | |
data, address = udp.recvfrom(1024) | |
print ("UDP connection from %s" % address[0]) | |
udp_process = multiprocessing.Process( | |
target=handle_udp, | |
args=(data, address, udp)) | |
udp_process.start() | |
elif sock == tcp: | |
tcp_conn, tcp_addr = tcp.accept() | |
print ("TCP connection from %s" % tcp_addr[0]) | |
tcp_process = multiprocessing.Process( | |
target=handle_tcp, | |
args=(tcp_conn,)) | |
tcp_process.start() | |
except KeyboardInterrupt: | |
print("\nCtrl+C received. Exiting gracefully?.") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment