Last active
January 20, 2026 04:23
-
-
Save nanpuyue/65ff0d0fc3504c9990897c62865cf7ce to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import struct | |
| import random | |
| import argparse | |
| from typing import List, Tuple | |
| class DnsQueryBuilder: | |
| """构造标准 DNS 查询报文""" | |
| @staticmethod | |
| def build(domain: str) -> bytes: | |
| tid = random.randint(0, 0xFFFF) | |
| flags = 0x0100 | |
| qdcount = 1 | |
| header = struct.pack("!HHHHHH", tid, flags, qdcount, 0, 0, 0) | |
| question = b"" | |
| for part in domain.split("."): | |
| question += bytes([len(part)]) + part.encode() | |
| question += b"\x00" | |
| question += struct.pack("!HH", 1, 1) | |
| return header + question | |
| class DnsResponseParser: | |
| """解析 DNS 响应 payload""" | |
| @staticmethod | |
| def parse_and_print(payload: bytes) -> None: | |
| answers = struct.unpack("!H", payload[6:8])[0] | |
| print(f"DNS Answer count: {answers}") | |
| pos = 12 | |
| while payload[pos] != 0: | |
| pos += 1 + payload[pos] | |
| pos += 5 | |
| for _ in range(answers): | |
| pos += 2 | |
| rtype, rclass, ttl, rdlen = struct.unpack("!HHIH", payload[pos:pos + 10]) | |
| pos += 10 | |
| if rtype == 1 and rdlen == 4: | |
| ip = socket.inet_ntop(socket.AF_INET, payload[pos:pos + 4]) | |
| print(f"DNS A record: {ip}") | |
| pos += rdlen | |
| class Socks5Client: | |
| """SOCKS5 TCP 控制连接""" | |
| def __init__(self, host: str, port: int): | |
| self.host = host | |
| self.port = port | |
| self.sock: socket.socket | None = None | |
| def connect(self) -> None: | |
| self.sock = socket.create_connection((self.host, self.port)) | |
| self._handshake() | |
| def _handshake(self) -> None: | |
| assert self.sock is not None | |
| self.sock.sendall(b"\x05\x01\x00") | |
| if self.sock.recv(2) != b"\x05\x00": | |
| raise RuntimeError("SOCKS5 auth failed") | |
| def udp_associate(self) -> Tuple[str, int]: | |
| assert self.sock is not None | |
| print(">>> TCP SOCKS5 UDP ASSOCIATE") | |
| print(" VER=5 CMD=UDP_ASSOCIATE ATYP=IPV4 DST=0.0.0.0:0") | |
| self.sock.sendall(b"\x05\x03\x00\x01\x00\x00\x00\x00\x00\x00") | |
| resp = self.sock.recv(22) | |
| if resp[1] != 0x00: | |
| raise RuntimeError(f"UDP_ASSOCIATE failed, REP={resp[1]}") | |
| atyp = resp[3] | |
| offset = 4 | |
| if atyp == 1: | |
| addr = socket.inet_ntop(socket.AF_INET, resp[offset:offset + 4]) | |
| offset += 4 | |
| atyp_name = "IPV4" | |
| elif atyp == 4: | |
| addr = socket.inet_ntop(socket.AF_INET6, resp[offset:offset + 16]) | |
| offset += 16 | |
| atyp_name = "IPV6" | |
| else: | |
| raise RuntimeError("Unsupported BND.ADDR") | |
| port = struct.unpack("!H", resp[offset:offset + 2])[0] | |
| print("<<< TCP SOCKS5 UDP ASSOCIATE") | |
| print(f" REP=SUCCESS ATYP={atyp_name} BND={self.format_dst(addr, port, atyp_name)}") | |
| return addr, port | |
| @staticmethod | |
| def format_dst(addr: str, port: int, atyp: str) -> str: | |
| if atyp == "IPV6": | |
| return f"[{addr}]:{port}" | |
| return f"{addr}:{port}" | |
| def close(self) -> None: | |
| if self.sock: | |
| self.sock.close() | |
| class Socks5UdpClient: | |
| """SOCKS5 UDP relay 客户端""" | |
| def __init__(self, relay_addr: str, relay_port: int): | |
| self.relay_addr = relay_addr | |
| self.relay_port = relay_port | |
| if ":" in relay_addr: | |
| self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) | |
| self.target = (relay_addr, relay_port, 0, 0) | |
| else: | |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| self.target = (relay_addr, relay_port) | |
| @staticmethod | |
| def build_packet(addr_type: str, addr: str, port: int, payload: bytes) -> bytes: | |
| rsv = b"\x00\x00" | |
| frag = b"\x00" | |
| if addr_type == "ipv4": | |
| atyp = b"\x01" | |
| addr_bin = socket.inet_pton(socket.AF_INET, addr) | |
| elif addr_type == "ipv6": | |
| atyp = b"\x04" | |
| addr_bin = socket.inet_pton(socket.AF_INET6, addr) | |
| elif addr_type == "domain": | |
| atyp = b"\x03" | |
| addr_bin = bytes([len(addr)]) + addr.encode() | |
| else: | |
| raise ValueError("Invalid addr type") | |
| return rsv + frag + atyp + addr_bin + struct.pack("!H", port) + payload | |
| def send_and_recv(self, packet: bytes) -> bytes: | |
| self.sock.sendto(packet, self.target) | |
| data, _ = self.sock.recvfrom(4096) | |
| return data | |
| @staticmethod | |
| def parse_response(data: bytes) -> Tuple[str, str, int, bytes]: | |
| offset = 3 | |
| atyp = data[offset] | |
| offset += 1 | |
| if atyp == 1: | |
| addr = socket.inet_ntop(socket.AF_INET, data[offset:offset + 4]) | |
| offset += 4 | |
| atyp_name = "IPV4" | |
| elif atyp == 4: | |
| addr = socket.inet_ntop(socket.AF_INET6, data[offset:offset + 16]) | |
| offset += 16 | |
| atyp_name = "IPV6" | |
| elif atyp == 3: | |
| ln = data[offset] | |
| offset += 1 | |
| addr = data[offset:offset + ln].decode() | |
| offset += ln | |
| atyp_name = "DOMAIN" | |
| else: | |
| raise RuntimeError("Unknown ATYP") | |
| port = struct.unpack("!H", data[offset:offset + 2])[0] | |
| offset += 2 | |
| return atyp_name, addr, port, data[offset:] | |
| @staticmethod | |
| def format_dst(addr: str, port: int, atyp: str) -> str: | |
| if atyp == "IPV6": | |
| return f"[{addr}]:{port}" | |
| return f"{addr}:{port}" | |
| def close(self) -> None: | |
| self.sock.close() | |
| class DnsTester: | |
| """DNS UDP 测试执行器""" | |
| DNS_PORT = 53 | |
| def __init__(self, socks_host: str, socks_port: int): | |
| self.socks = Socks5Client(socks_host, socks_port) | |
| def run(self, tests: List[Tuple[str, str]], domain: str) -> None: | |
| self.socks.connect() | |
| relay_addr, relay_port = self.socks.udp_associate() | |
| udp = Socks5UdpClient(relay_addr, relay_port) | |
| for addr_type, server in tests: | |
| self.run_single(udp, addr_type, server, domain) | |
| udp.close() | |
| self.socks.close() | |
| def run_single(self, udp: Socks5UdpClient, addr_type: str, server: str, domain: str) -> None: | |
| print(f"\n=== {addr_type.upper()} → {server} ===") | |
| payload = DnsQueryBuilder.build(domain) | |
| packet = udp.build_packet(addr_type, server, self.DNS_PORT, payload) | |
| print(f">>> ATYP={addr_type.upper():6} DST={server}:53 LEN={len(packet)}") | |
| resp = udp.send_and_recv(packet) | |
| atyp, addr, port, dns_payload = udp.parse_response(resp) | |
| print(f"<<< ATYP={atyp:6} DST={udp.format_dst(addr, port, atyp)} LEN={len(resp)}") | |
| DnsResponseParser.parse_and_print(dns_payload) | |
| class ProgramOptions: | |
| """命令行参数解析""" | |
| DEFAULT = "127.0.0.1:1080" | |
| def __init__(self): | |
| self.parser = argparse.ArgumentParser(description="SOCKS5 UDP DNS tester") | |
| self.parser.add_argument( | |
| "-x", | |
| metavar="HOST:PORT", | |
| default=self.DEFAULT, | |
| help="SOCKS5 proxy, same as curl -x, default 127.0.0.1:1080", | |
| ) | |
| def parse(self) -> Tuple[str, int]: | |
| args = self.parser.parse_args() | |
| host, port = args.x.rsplit(":", 1) | |
| return host, int(port) | |
| if __name__ == "__main__": | |
| opts = ProgramOptions() | |
| socks_host, socks_port = opts.parse() | |
| print(f"SOCKS5 server: {socks_host}:{socks_port}") | |
| tests = [ | |
| ("ipv4", "8.8.8.8"), | |
| ("ipv6", "2001:4860:4860::8888"), | |
| ("domain", "dns4.stdio.link"), | |
| ("domain", "dns4.stdio.link"), | |
| ("domain", "dns6.stdio.link"), | |
| ("domain", "dns6.stdio.link"), | |
| ] | |
| tester = DnsTester(socks_host, socks_port) | |
| tester.run(tests, "example.com") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment