Last active
September 28, 2024 03:46
-
-
Save scturtle/7967cb4e7c2bb0f91ca5 to your computer and use it in GitHub Desktop.
python socks5 proxy server with asyncio (async/await)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.5 | |
import socket | |
import asyncio | |
from struct import pack, unpack | |
class Client(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.transport = transport | |
self.server_transport = None | |
def data_received(self, data): | |
# print('recv:', repr(data)) | |
self.server_transport.write(data) | |
def connection_lost(self, *args): | |
self.server_transport.close() | |
class Server(asyncio.Protocol): | |
INIT, HOST, DATA = 0, 1, 2 | |
def connection_made(self, transport): | |
print('from:', transport.get_extra_info('peername')) | |
self.transport = transport | |
self.state = self.INIT | |
def connection_lost(self, exc): | |
self.transport.close() | |
def data_received(self, data): | |
# print('send:', repr(data)) | |
if self.state == self.INIT: | |
assert data[0] == 0x05 | |
self.transport.write(pack('!BB', 0x05, 0x00)) # no auth | |
self.state = self.HOST | |
elif self.state == self.HOST: | |
ver, cmd, rsv, atype = data[:4] | |
assert ver == 0x05 and cmd == 0x01 | |
if atype == 3: # domain | |
length = data[4] | |
hostname, nxt = data[5:5+length], 5+length | |
elif atype == 1: # ipv4 | |
hostname, nxt = socket.inet_ntop(socket.AF_INET, data[4:8]), 8 | |
elif atype == 4: # ipv6 | |
hostname, nxt = socket.inet_ntop(socket.AF_INET6, data[4:20]), 20 | |
port = unpack('!H', data[nxt:nxt+2])[0] | |
print('to:', hostname, port) | |
asyncio.ensure_future(self.connect(hostname, port)) | |
self.state = self.DATA | |
elif self.state == self.DATA: | |
self.client_transport.write(data) | |
async def connect(self, hostname, port): | |
loop = asyncio.get_event_loop() | |
transport, client = \ | |
await loop.create_connection(Client, hostname, port) | |
client.server_transport = self.transport | |
self.client_transport = transport | |
hostip, port = transport.get_extra_info('sockname') | |
host = unpack("!I", socket.inet_aton(hostip))[0] | |
self.transport.write( | |
pack('!BBBBIH', 0x05, 0x00, 0x00, 0x01, host, port)) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
srv = loop.create_server(Server, 'localhost', 8000) | |
loop.run_until_complete(srv) | |
loop.run_forever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.5 | |
import struct | |
import asyncio | |
import socket | |
from socket import gethostbyname, inet_aton, inet_ntoa | |
class Client(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.transport = transport | |
def data_received(self, data): | |
# print('recv:', repr(data)) | |
self.server.write(data) | |
def connection_lost(self, *args): | |
self.server.close() | |
async def handler(reader, writer): | |
async def read(fmt, size): | |
return struct.unpack(fmt, await reader.read(size)) | |
def write(*argv): | |
writer.write(struct.pack(*argv)) | |
loop = asyncio.get_event_loop() | |
async def call(func, *argv): | |
return await loop.run_in_executor(None, func, *argv) | |
ver, meth = await read('BB', 2) | |
assert ver == 0x05 | |
await reader.read(meth) | |
write('!BB', 0x05, 0x00) # no auth | |
ver, cmd, rsv, atype = await read('BBBB', 4) | |
assert ver == 0x05 and cmd == 0x01 | |
if atype == 0x03: # domain | |
length = (await read('B', 1))[0] | |
hostname = (await read("!{}s".format(length), length))[0] | |
hostip = socket.gethostbyname(hostname) | |
host = struct.unpack("!I", inet_aton(hostip))[0] | |
elif atype == 0x01: # ipv4 | |
host = await reader.read(4) | |
hostname = isocket.inet_ntop(socket.AF_INET, host) | |
elif atype == 0x03: # ipv6 | |
host = await reader.read(16) | |
hostname = isocket.inet_ntop(socket.AF_INET, host) | |
port = (await read('!H', 2))[0] | |
print('hostname:', hostname, 'port:', port) | |
transport, client = await loop.create_connection(Client, hostname, port) | |
client.server = writer | |
write('!BBBBIH', 0x05, 0x00, 0x00, 0x01, host, port) | |
data = await reader.read(8192) | |
while data: | |
client.transport.write(data) | |
if len(data) < 8192: | |
break | |
data = await reader.read(8192) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(asyncio.start_server(handler, '0.0.0.0', 8000)) | |
try: | |
loop.run_forever() | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
when I try to write a socks server with asyncio, I read data_received() will be called when receive some data, I don't know how to write my codes until read your server.py. It is amazing! Your code broaden my horizon! Thanks for your excellent codes.