Skip to content

Instantly share code, notes, and snippets.

@nanpuyue
Last active January 20, 2026 04:23
Show Gist options
  • Select an option

  • Save nanpuyue/65ff0d0fc3504c9990897c62865cf7ce to your computer and use it in GitHub Desktop.

Select an option

Save nanpuyue/65ff0d0fc3504c9990897c62865cf7ce to your computer and use it in GitHub Desktop.
import socket
import struct
import random
import argparse
from typing import List, Tuple
class DnsQueryBuilder:
"""构造标准 DNS 查询报文"""
@staticmethod
def build(domain: str) -> bytes:
tid = random.randint(0, 0xFFFF)
flags = 0x0100
qdcount = 1
header = struct.pack("!HHHHHH", tid, flags, qdcount, 0, 0, 0)
question = b""
for part in domain.split("."):
question += bytes([len(part)]) + part.encode()
question += b"\x00"
question += struct.pack("!HH", 1, 1)
return header + question
class DnsResponseParser:
"""解析 DNS 响应 payload"""
@staticmethod
def parse_and_print(payload: bytes) -> None:
answers = struct.unpack("!H", payload[6:8])[0]
print(f"DNS Answer count: {answers}")
pos = 12
while payload[pos] != 0:
pos += 1 + payload[pos]
pos += 5
for _ in range(answers):
pos += 2
rtype, rclass, ttl, rdlen = struct.unpack("!HHIH", payload[pos:pos + 10])
pos += 10
if rtype == 1 and rdlen == 4:
ip = socket.inet_ntop(socket.AF_INET, payload[pos:pos + 4])
print(f"DNS A record: {ip}")
pos += rdlen
class Socks5Client:
"""SOCKS5 TCP 控制连接"""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.sock: socket.socket | None = None
def connect(self) -> None:
self.sock = socket.create_connection((self.host, self.port))
self._handshake()
def _handshake(self) -> None:
assert self.sock is not None
self.sock.sendall(b"\x05\x01\x00")
if self.sock.recv(2) != b"\x05\x00":
raise RuntimeError("SOCKS5 auth failed")
def udp_associate(self) -> Tuple[str, int]:
assert self.sock is not None
print(">>> TCP SOCKS5 UDP ASSOCIATE")
print(" VER=5 CMD=UDP_ASSOCIATE ATYP=IPV4 DST=0.0.0.0:0")
self.sock.sendall(b"\x05\x03\x00\x01\x00\x00\x00\x00\x00\x00")
resp = self.sock.recv(22)
if resp[1] != 0x00:
raise RuntimeError(f"UDP_ASSOCIATE failed, REP={resp[1]}")
atyp = resp[3]
offset = 4
if atyp == 1:
addr = socket.inet_ntop(socket.AF_INET, resp[offset:offset + 4])
offset += 4
atyp_name = "IPV4"
elif atyp == 4:
addr = socket.inet_ntop(socket.AF_INET6, resp[offset:offset + 16])
offset += 16
atyp_name = "IPV6"
else:
raise RuntimeError("Unsupported BND.ADDR")
port = struct.unpack("!H", resp[offset:offset + 2])[0]
print("<<< TCP SOCKS5 UDP ASSOCIATE")
print(f" REP=SUCCESS ATYP={atyp_name} BND={self.format_dst(addr, port, atyp_name)}")
return addr, port
@staticmethod
def format_dst(addr: str, port: int, atyp: str) -> str:
if atyp == "IPV6":
return f"[{addr}]:{port}"
return f"{addr}:{port}"
def close(self) -> None:
if self.sock:
self.sock.close()
class Socks5UdpClient:
"""SOCKS5 UDP relay 客户端"""
def __init__(self, relay_addr: str, relay_port: int):
self.relay_addr = relay_addr
self.relay_port = relay_port
if ":" in relay_addr:
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.target = (relay_addr, relay_port, 0, 0)
else:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.target = (relay_addr, relay_port)
@staticmethod
def build_packet(addr_type: str, addr: str, port: int, payload: bytes) -> bytes:
rsv = b"\x00\x00"
frag = b"\x00"
if addr_type == "ipv4":
atyp = b"\x01"
addr_bin = socket.inet_pton(socket.AF_INET, addr)
elif addr_type == "ipv6":
atyp = b"\x04"
addr_bin = socket.inet_pton(socket.AF_INET6, addr)
elif addr_type == "domain":
atyp = b"\x03"
addr_bin = bytes([len(addr)]) + addr.encode()
else:
raise ValueError("Invalid addr type")
return rsv + frag + atyp + addr_bin + struct.pack("!H", port) + payload
def send_and_recv(self, packet: bytes) -> bytes:
self.sock.sendto(packet, self.target)
data, _ = self.sock.recvfrom(4096)
return data
@staticmethod
def parse_response(data: bytes) -> Tuple[str, str, int, bytes]:
offset = 3
atyp = data[offset]
offset += 1
if atyp == 1:
addr = socket.inet_ntop(socket.AF_INET, data[offset:offset + 4])
offset += 4
atyp_name = "IPV4"
elif atyp == 4:
addr = socket.inet_ntop(socket.AF_INET6, data[offset:offset + 16])
offset += 16
atyp_name = "IPV6"
elif atyp == 3:
ln = data[offset]
offset += 1
addr = data[offset:offset + ln].decode()
offset += ln
atyp_name = "DOMAIN"
else:
raise RuntimeError("Unknown ATYP")
port = struct.unpack("!H", data[offset:offset + 2])[0]
offset += 2
return atyp_name, addr, port, data[offset:]
@staticmethod
def format_dst(addr: str, port: int, atyp: str) -> str:
if atyp == "IPV6":
return f"[{addr}]:{port}"
return f"{addr}:{port}"
def close(self) -> None:
self.sock.close()
class DnsTester:
"""DNS UDP 测试执行器"""
DNS_PORT = 53
def __init__(self, socks_host: str, socks_port: int):
self.socks = Socks5Client(socks_host, socks_port)
def run(self, tests: List[Tuple[str, str]], domain: str) -> None:
self.socks.connect()
relay_addr, relay_port = self.socks.udp_associate()
udp = Socks5UdpClient(relay_addr, relay_port)
for addr_type, server in tests:
self.run_single(udp, addr_type, server, domain)
udp.close()
self.socks.close()
def run_single(self, udp: Socks5UdpClient, addr_type: str, server: str, domain: str) -> None:
print(f"\n=== {addr_type.upper()} → {server} ===")
payload = DnsQueryBuilder.build(domain)
packet = udp.build_packet(addr_type, server, self.DNS_PORT, payload)
print(f">>> ATYP={addr_type.upper():6} DST={server}:53 LEN={len(packet)}")
resp = udp.send_and_recv(packet)
atyp, addr, port, dns_payload = udp.parse_response(resp)
print(f"<<< ATYP={atyp:6} DST={udp.format_dst(addr, port, atyp)} LEN={len(resp)}")
DnsResponseParser.parse_and_print(dns_payload)
class ProgramOptions:
"""命令行参数解析"""
DEFAULT = "127.0.0.1:1080"
def __init__(self):
self.parser = argparse.ArgumentParser(description="SOCKS5 UDP DNS tester")
self.parser.add_argument(
"-x",
metavar="HOST:PORT",
default=self.DEFAULT,
help="SOCKS5 proxy, same as curl -x, default 127.0.0.1:1080",
)
def parse(self) -> Tuple[str, int]:
args = self.parser.parse_args()
host, port = args.x.rsplit(":", 1)
return host, int(port)
if __name__ == "__main__":
opts = ProgramOptions()
socks_host, socks_port = opts.parse()
print(f"SOCKS5 server: {socks_host}:{socks_port}")
tests = [
("ipv4", "8.8.8.8"),
("ipv6", "2001:4860:4860::8888"),
("domain", "dns4.stdio.link"),
("domain", "dns4.stdio.link"),
("domain", "dns6.stdio.link"),
("domain", "dns6.stdio.link"),
]
tester = DnsTester(socks_host, socks_port)
tester.run(tests, "example.com")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment