Skip to content

Instantly share code, notes, and snippets.

@teeberg
Created February 7, 2025 10:39
Show Gist options
  • Save teeberg/ba26276db5359f53f90def4801b5438d to your computer and use it in GitHub Desktop.
Save teeberg/ba26276db5359f53f90def4801b5438d to your computer and use it in GitHub Desktop.
SOCKS5 server
#!/usr/bin/env python3
import logging
import select
import socket
import struct
import sys
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
logging.basicConfig(level=logging.DEBUG)
SOCKS_VERSION = 5
AUTH_METHOD_NOPASSWORD = 0
AUTH_METHOD_USERNAME_PASSWORD = 2
class ThreadingTCPServer(ThreadingMixIn, TCPServer):
allow_reuse_address = True
class SocksProxy(StreamRequestHandler):
username = 'username'
password = 'password'
def log(self, msg):
fileno = self.request.fileno()
msg = f'[{fileno}] {msg}'
print(msg)
def handle(self):
self.log('Accepting connection from {}:{} on fd {}'.format(*self.client_address, self.request.fileno()))
# greeting header
# read and unpack 2 bytes from a client
header = self.connection.recv(2)
version, nmethods = struct.unpack("!BB", header)
# socks 5
assert version == SOCKS_VERSION
assert nmethods > 0
# get available methods
methods = self.get_available_methods(nmethods)
self.log(f'Client offered methods: {methods}')
# accept only either passwordless or USERNAME/PASSWORD auth
supported_methods = {AUTH_METHOD_NOPASSWORD, AUTH_METHOD_USERNAME_PASSWORD} & set(methods)
self.log(f'Available methods after negotiation: {supported_methods}')
if not supported_methods:
# close connection
self.server.close_request(self.request)
return
# send welcome message
chosen_method = AUTH_METHOD_USERNAME_PASSWORD if AUTH_METHOD_USERNAME_PASSWORD in supported_methods else AUTH_METHOD_NOPASSWORD
self.log(f'Auth method: {chosen_method}')
self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, chosen_method))
if AUTH_METHOD_NOPASSWORD not in supported_methods and not self.verify_credentials():
return
# request
version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
assert version == SOCKS_VERSION
if address_type == 1: # IPv4
address = socket.inet_ntoa(self.connection.recv(4))
self.log(f'Connecting to IPv4: {address}')
elif address_type == 3: # Domain name
domain_length = self.connection.recv(1)[0]
domain_name = self.connection.recv(domain_length).decode()
try:
address = socket.gethostbyname(domain_name)
except socket.gaierror as ex:
if 'Name or service not known' in str(ex):
self.log(f'Could not resolve domain {domain_name}: {ex}')
self.server.close_request(self.request)
return
raise
self.log(f'Connecting to Domain name: {domain_name} (IP: {address})')
else:
self.log(f'Unsupported address type: {address_type}')
self.server.close_request(self.request)
return
port = struct.unpack('!H', self.connection.recv(2))[0]
# reply
try:
if cmd == 1: # CONNECT
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log(f'Connecting to {address}:{port}')
remote.connect((address, port))
bind_address = remote.getsockname()
self.log('Connected to %s %s' % (address, port))
else:
self.log(f'Unsupported cmd: {cmd}')
self.server.close_request(self.request)
return
addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
port = bind_address[1]
reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1,
addr, port)
except Exception as err:
logging.exception(err)
# return connection refused error
reply = self.generate_failed_reply(address_type, 5)
self.connection.sendall(reply)
# establish data exchange
if reply[1] == 0 and cmd == 1:
self.exchange_loop(self.connection, remote)
self.server.close_request(self.request)
def get_available_methods(self, n):
methods = []
for _ in range(n):
methods.append(ord(self.connection.recv(1)))
return methods
def verify_credentials(self):
version = ord(self.connection.recv(1))
assert version == 1
username_len = ord(self.connection.recv(1))
username = self.connection.recv(username_len).decode('utf-8')
password_len = ord(self.connection.recv(1))
password = self.connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
response = struct.pack("!BB", version, 0)
self.connection.sendall(response)
return True
# failure, status != 0
response = struct.pack("!BB", version, 0xFF)
self.connection.sendall(response)
self.server.close_request(self.request)
return False
def generate_failed_reply(self, address_type, error_number):
return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
def exchange_loop(self, client, remote):
while True:
# wait until client or remote is available for read
r, _w, _e = select.select([client, remote], [], [])
if client in r:
try:
data = client.recv(4096)
except ConnectionResetError:
self.log('Client forcibly closed connection')
break
if remote.send(data) <= 0:
self.log('Trying to send data to client returned 0')
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
self.log('Client closed connection')
break
if __name__ == '__main__':
host = '127.0.0.1'
port = 9011
if len(sys.argv) > 1:
host = sys.argv[1]
if len(sys.argv) > 2:
port = int(sys.argv[2])
server_address = (host, port)
with ThreadingTCPServer(server_address, SocksProxy) as server:
print(f'Listening on {":".join(map(str, server.server_address))}')
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment