Last active
April 8, 2025 08:19
-
-
Save jpic/6b9df2a08906eda4dbf7057bdd055179 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Save as proxy2.py on your LOCAL machine | |
import socket | |
import threading | |
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler | |
class SOCKSProxyHandler(StreamRequestHandler): | |
def handle(self): | |
print("New connection") | |
# Read initial byte to determine SOCKS version | |
version = self.connection.recv(1) | |
print(f"Received version: {version}") | |
if version == b'\x04': # SOCKS4 | |
self.handle_socks4() | |
elif version == b'\x05': # SOCKS5 | |
self.handle_socks5() | |
else: | |
print("Unsupported SOCKS version") | |
return | |
def handle_socks4(self): | |
print("Handling SOCKS4") | |
cmd = self.connection.recv(1) # CONNECT (0x01) | |
if cmd != b'\x01': | |
print("SOCKS4: Only CONNECT supported") | |
return | |
port = int.from_bytes(self.connection.recv(2), 'big') | |
addr = socket.inet_ntoa(self.connection.recv(4)) | |
user_id = b'' | |
while True: # Read until null-terminated user ID | |
byte = self.connection.recv(1) | |
if byte == b'\x00': | |
break | |
user_id += byte | |
print(f"SOCKS4: Connecting to {addr}:{port}, user_id={user_id.decode('ascii')}") | |
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
remote.connect((addr, port)) | |
# SOCKS4 reply: success (0x5A) | |
self.connection.send(b'\x00\x5A' + self.connection.recv(2) + socket.inet_aton('0.0.0.0')) | |
self.forward_data(remote) | |
except Exception as e: | |
print(f"SOCKS4 error: {e}") | |
self.connection.send(b'\x00\x5B' + self.connection.recv(2) + socket.inet_aton('0.0.0.0')) # Failed | |
finally: | |
remote.close() | |
def handle_socks5(self): | |
print("Handling SOCKS5") | |
# SOCKS5 greeting | |
nmethods = self.connection.recv(1)[0] | |
methods = self.connection.recv(nmethods) | |
print(f"Auth methods: {methods}") | |
self.connection.send(b'\x05\x00') # No authentication | |
# SOCKS5 request | |
request = self.connection.recv(4) | |
print(f"Received request: {request}") | |
if len(request) < 4 or request[0] != 5 or request[1] != 1: | |
print("Invalid SOCKS5 request") | |
self.connection.send(b'\x05\x07\x00\x01') # Command not supported | |
return | |
atyp = request[3] | |
if atyp == 1: # IPv4 | |
dest_addr = socket.inet_ntoa(self.connection.recv(4)) | |
elif atyp == 3: # Domain name | |
addr_len = self.connection.recv(1)[0] | |
dest_addr = self.connection.recv(addr_len).decode('utf-8') | |
else: | |
print("Unsupported address type") | |
self.connection.send(b'\x05\x08\x00\x01') | |
return | |
dest_port = int.from_bytes(self.connection.recv(2), 'big') | |
print(f"SOCKS5: Connecting to {dest_addr}:{dest_port}") | |
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
remote.connect((dest_addr, dest_port)) | |
self.connection.send(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') # Success | |
self.forward_data(remote) | |
except Exception as e: | |
print(f"SOCKS5 error: {e}") | |
self.connection.send(b'\x05\x04\x00\x01') # Host unreachable | |
finally: | |
remote.close() | |
def forward_data(self, remote): | |
print("Forwarding data") | |
def forward(src, dst): | |
while True: | |
data = src.recv(4096) | |
if not data: | |
break | |
dst.send(data) | |
threading.Thread(target=forward, args=(self.connection, remote)).start() | |
forward(remote, self.connection) | |
class ThreadingTCPServer(ThreadingMixIn, TCPServer): | |
pass | |
if __name__ == "__main__": | |
server = ThreadingTCPServer(('127.0.0.1', 1337), SOCKSProxyHandler) | |
print("SOCKS proxy running on localhost:1337") | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment