Skip to content

Instantly share code, notes, and snippets.

@cedricvidal
Created October 22, 2018 17:34
Show Gist options
  • Save cedricvidal/06e4570cedb1c03c68c6184ec12d5405 to your computer and use it in GitHub Desktop.
Save cedricvidal/06e4570cedb1c03c68c6184ec12d5405 to your computer and use it in GitHub Desktop.
Python 2 non blocking asyncore port forwarding TCP proxy
#!/usr/bin/env python
#
# s.py - a simple port forwarder using asyncore
#
# by Yusuke Shinyama, *public domain*
# http://www.unixuser.org/~euske/python/s.py
#
import sys, socket, asyncore
## Server
##
class Server(asyncore.dispatcher):
def __init__(self, port, destaddr, bindaddr="127.0.0.1"):
asyncore.dispatcher.__init__(self)
self.destaddr = destaddr
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((bindaddr, port))
self.listen(1)
self.session = 0
print >>sys.stderr, "Listening: %s:%d" % (bindaddr, port)
return
def handle_accept(self):
(conn, (addr,port)) = self.accept()
print >>sys.stderr, "Accepted:", addr
Proxy(conn, self.session).connect_remote(self.destaddr)
self.session += 1
return
## Proxy
##
class Proxy(asyncore.dispatcher):
def __init__(self, sock, session):
self.session = session
self.sendbuffer = ""
self.client = None
self.disp("BEGIN")
asyncore.dispatcher.__init__(self, sock)
return
# overridable methods
def local2remote(self, s):
print ">>> %r" % s
return s
def remote2local(self, s):
print "<<< %r" % s
return s
def connect_remote(self, addr):
assert not self.client, "already connected"
self.addr = addr
self.disp("(connecting to %s:%d)" % self.addr)
self.client = Client(self)
self.client.connect(addr)
return
def disconnect_remote(self):
assert self.client, "not connected"
self.client.close()
self.client = None
return
def disp(self, s):
print >>sys.stderr, "SESSION %s:" % self.session, s
return
def remote_connected(self):
self.disp("(connected to remote %s:%d)" % self.addr)
return
def remote_closed(self):
self.disp("(closed by remote %s:%d)" % self.addr)
self.client = None
if not self.sendbuffer:
self.handle_close()
return
def remote_error(self):
self.disp("(error by remote %s:%d)" % self.addr)
return
def remote_read(self, data):
if data:
data = self.remote2local(data)
if data:
self.sendbuffer += data
return
def handle_read(self):
data = self.recv(8192)
if data:
data = self.local2remote(data)
if data and self.client:
self.client.remote_write(data)
return
def writable(self):
return 0 < len(self.sendbuffer)
def handle_write(self):
n = self.send(self.sendbuffer)
self.sendbuffer = self.sendbuffer[n:]
if not self.sendbuffer and not self.client:
self.handle_close()
return
def handle_close(self):
if self.client:
self.disp("(closed by local)")
self.disconnect_remote()
self.close()
self.disp("END")
return
## Client
##
class Client(asyncore.dispatcher):
def __init__(self, proxy):
self.proxy = proxy
self.sendbuffer = ""
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
return
def handle_connect(self):
self.proxy.remote_connected()
return
def handle_close(self):
self.proxy.remote_closed()
self.close()
return
def handle_error(self):
self.proxy.remote_error()
return
def handle_read(self):
self.proxy.remote_read(self.recv(8192))
return
def remote_write(self, data):
self.sendbuffer += data
return
def writable(self):
return 0 < len(self.sendbuffer)
def handle_write(self):
n = self.send(self.sendbuffer)
self.sendbuffer = self.sendbuffer[n:]
return
# main
if __name__=='__main__':
if len(sys.argv) < 4:
print >>sys.stderr, "usage: s.py local_port remote_host remote_port"
sys.exit(2)
Server(int(sys.argv[1]), (sys.argv[2], int(sys.argv[3])))
asyncore.loop()
@cedricvidal
Copy link
Author

Saving it because finding an actually working Python TCP proxy was quite hard ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment