Skip to content

Instantly share code, notes, and snippets.

@nanpuyue
Created January 21, 2026 04:47
Show Gist options
  • Select an option

  • Save nanpuyue/3c5999c30a4e1d03b3709fb9745ae12e to your computer and use it in GitHub Desktop.

Select an option

Save nanpuyue/3c5999c30a4e1d03b3709fb9745ae12e to your computer and use it in GitHub Desktop.
import socket
import struct
import argparse
from typing import Tuple, List
class LocalResolver:
"""本地 DNS 解析器, 显式区分 IPv4 / IPv6"""
@staticmethod
def resolve_ipv4(domain: str) -> str:
return socket.getaddrinfo(domain, None, socket.AF_INET)[0][4][0]
@staticmethod
def resolve_ipv6(domain: str) -> str:
return socket.getaddrinfo(domain, None, socket.AF_INET6)[0][4][0]
class Socks5Client:
"""SOCKS5 TCP 控制连接"""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.sock: socket.socket | None = None
def connect(self) -> None:
self.sock = socket.create_connection((self.host, self.port))
self._handshake()
def _handshake(self) -> None:
self.sock.sendall(b"\x05\x01\x00")
resp = self._recv_exact(2)
if resp != b"\x05\x00":
raise RuntimeError("SOCKS5 handshake failed")
def connect_target(self, atyp: str, addr: str, port: int) -> socket.socket:
req = self._build_connect_request(atyp, addr, port)
print(">>> TCP SOCKS5 CONNECT")
print(f" ATYP={atyp:6} DST={self.format_dst(addr, port, atyp)}")
self.sock.sendall(req)
rep, bnd_atyp, bnd_addr, bnd_port = self._recv_connect_reply()
if rep != 0x00:
raise RuntimeError(f"CONNECT failed, REP={rep}")
print("<<< TCP SOCKS5 CONNECT")
print(f" REP=SUCCESS ATYP={bnd_atyp} BND={self.format_dst(bnd_addr, bnd_port, bnd_atyp)}")
return self.sock
def _recv_connect_reply(self) -> Tuple[int, str, str, int]:
header = self._recv_exact(4)
rep = header[1]
atyp = header[3]
if atyp == 1:
addr = socket.inet_ntop(socket.AF_INET, self._recv_exact(4))
atyp_name = "IPV4"
elif atyp == 4:
addr = socket.inet_ntop(socket.AF_INET6, self._recv_exact(16))
atyp_name = "IPV6"
elif atyp == 3:
ln = self._recv_exact(1)[0]
addr = self._recv_exact(ln).decode()
atyp_name = "DOMAIN"
else:
raise RuntimeError("Unknown ATYP in reply")
port = struct.unpack("!H", self._recv_exact(2))[0]
return rep, atyp_name, addr, port
@staticmethod
def _build_connect_request(atyp: str, addr: str, port: int) -> bytes:
if atyp == "IPV4":
return b"\x05\x01\x00\x01" + socket.inet_pton(socket.AF_INET, addr) + struct.pack("!H", port)
if atyp == "IPV6":
return b"\x05\x01\x00\x04" + socket.inet_pton(socket.AF_INET6, addr) + struct.pack("!H", port)
if atyp == "DOMAIN":
return b"\x05\x01\x00\x03" + bytes([len(addr)]) + addr.encode() + struct.pack("!H", port)
raise ValueError("Invalid ATYP")
@staticmethod
def format_dst(addr: str, port: int, atyp: str) -> str:
if atyp == "IPV6":
return f"[{addr}]:{port}"
return f"{addr}:{port}"
def _recv_exact(self, n: int) -> bytes:
buf = b""
while len(buf) < n:
data = self.sock.recv(n - len(buf))
if not data:
raise RuntimeError("Unexpected EOF")
buf += data
return buf
def close(self) -> None:
if self.sock:
self.sock.close()
class TcpTester:
"""TCP CONNECT 行为测试"""
def __init__(self, socks_host: str, socks_port: int):
self.socks_host = socks_host
self.socks_port = socks_port
def run(self, domain: str, port: int) -> None:
tests = [
("IPV4", LocalResolver.resolve_ipv4(domain)),
("IPV6", LocalResolver.resolve_ipv6(domain)),
("DOMAIN", domain),
]
for atyp, target in tests:
self.run_single(atyp, target, domain, port)
def run_single(self, atyp: str, target: str, host_header: str, port: int) -> None:
print(f"\n=== TCP connect via {target} (ATYP={atyp}) ===")
client = Socks5Client(self.socks_host, self.socks_port)
client.connect()
sock = client.connect_target(atyp, target, port)
self.send_http(sock, host_header)
client.close()
@staticmethod
def send_http(sock: socket.socket, host: str) -> None:
req = (
f"GET / HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"User-Agent: socks5-test\r\n"
f"Connection: close\r\n\r\n"
).encode()
print(">>> HTTP request")
sock.sendall(req)
resp = b""
while b"\r\n\r\n" not in resp:
data = sock.recv(4096)
if not data:
break
resp += data
header = resp.split(b"\r\n\r\n", 1)[0].decode(errors="replace")
print("<<< HTTP response header")
for line in header.splitlines():
print(f" {line}")
class ProgramOptions:
def __init__(self):
self.parser = argparse.ArgumentParser(description="SOCKS5 TCP CONNECT tester")
self.parser.add_argument("-x", required=True, metavar="HOST:PORT")
def parse(self) -> Tuple[str, int]:
args = self.parser.parse_args()
host, port = args.x.rsplit(":", 1)
return host, int(port)
if __name__ == "__main__":
opts = ProgramOptions()
socks_host, socks_port = opts.parse()
print(f"SOCKS5 server: {socks_host}:{socks_port}")
tester = TcpTester(socks_host, socks_port)
tester.run("bing.com", 80)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment