Created
March 16, 2022 02:11
-
-
Save honmaple/1b181456f58bed222c18047a725cac5f to your computer and use it in GitHub Desktop.
transparent proxy with python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import socket | |
import select | |
import sys | |
IP_TRANSPARENT = 19 | |
HOST = "0.0.0.0" | |
PORT = 10 | |
buffer_size = 4096 | |
class TheServer: | |
input_list = [] | |
channel = {} | |
def __init__(self, host, port): | |
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.server.setsockopt(socket.IPPROTO_IP, IP_TRANSPARENT, 1) | |
self.server.bind((host, port)) | |
self.server.listen(32) | |
def forward(self, c_ip, c_port, r_ip, r_port): | |
forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
forward.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
forward.setsockopt(socket.IPPROTO_IP, IP_TRANSPARENT, 1) | |
forward.bind((c_ip, c_port)) | |
forward.settimeout(5) | |
try: | |
a = forward.connect((r_ip, r_port)) | |
print(a) | |
return forward | |
except Exception as e: | |
print(e) | |
return False | |
def main_loop(self): | |
self.input_list.append(self.server) | |
while True: | |
ss = select.select | |
inputready, outputready, exceptready = ss(self.input_list, [], []) | |
for s in inputready: | |
if s == self.server: | |
self.on_accept() | |
break | |
self.data = s.recv(buffer_size) | |
if len(self.data) == 0: | |
self.on_close(s) | |
break | |
else: | |
self.on_recv(s) | |
def on_accept(self): | |
clientsock, (c_ip, c_port) = self.server.accept() | |
r_ip, r_port = clientsock.getsockname() | |
print("[ ] Connection from tcp://%s:%d to tcp://%s:%d" % | |
(c_ip, c_port, r_ip, r_port)) | |
forward = self.forward(c_ip, c_port, r_ip, r_port) | |
if forward: | |
print("%s:%d has connected" % (c_ip, c_port)) | |
self.input_list.append(clientsock) | |
self.input_list.append(forward) | |
self.channel[clientsock] = forward | |
self.channel[forward] = clientsock | |
else: | |
print("Can't establish connection with remote server.") | |
print("Closing connection with client side", c_ip, ":", c_port) | |
clientsock.close() | |
def on_close(self, s): | |
print(s.getpeername(), "has disconnected") | |
# remove objects from input_list | |
self.input_list.remove(s) | |
self.input_list.remove(self.channel[s]) | |
out = self.channel[s] | |
# close the connection with client | |
self.channel[out].close() # equivalent to do self.s.close() | |
# close the connection with remote server | |
self.channel[s].close() | |
# delete both objects from channel dict | |
del self.channel[out] | |
del self.channel[s] | |
def on_recv(self, s): | |
data = self.data | |
# here we can parse and/or modify the data before send forward | |
print(data.decode("utf-8")) | |
self.channel[s].send(data) | |
if __name__ == '__main__': | |
server = TheServer(HOST, PORT) | |
try: | |
server.main_loop() | |
except KeyboardInterrupt: | |
print("Ctrl C - Stopping server") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment