Last active
September 28, 2023 19:12
-
-
Save joshfinley/ce7d759ccc6c7772b251753a6ec32c3e to your computer and use it in GitHub Desktop.
Version 3 is current working version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
import re | |
def establish_tunnel(remote_socket, target_host, target_port): | |
connect_req = f"CONNECT {target_host}:{target_port} HTTP/1.1\r\n\r\n".encode() | |
remote_socket.send(connect_req) | |
# Read the response. In a production setting, you'd want to actually parse the response. | |
response = remote_socket.recv(4096) | |
print(f"Received response from corporate proxy: {response.decode()}") | |
def extract_target_address(data): | |
# Try to extract the host and port information from the HTTP header using a regular expression. | |
# This version considers the possibility of not having a port in the header and having either domain names or IPs. | |
match_with_port = re.search(r'Host: ([a-zA-Z0-9.-]+):([0-9]+)', data.decode(), re.IGNORECASE) | |
match_without_port = re.search(r'Host: ([a-zA-Z0-9.-]+)', data.decode(), re.IGNORECASE) | |
if match_with_port: | |
host = match_with_port.group(1) | |
port = int(match_with_port.group(2)) | |
return host, port | |
elif match_without_port: | |
host = match_without_port.group(1) | |
return host, None | |
return None, None | |
def handle_client(client_socket, proxy_host, proxy_port): | |
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
remote_socket.connect((proxy_host, proxy_port)) | |
data = client_socket.recv(4096) | |
target_host, target_port = extract_target_address(data) | |
# If target host and port are not specified, fall back to a default proxy (e.g., Squid) | |
if target_host is None: | |
target_host = "default.proxy.server" # Replace with your default proxy server | |
target_port = 3128 # Replace with your default proxy port | |
establish_tunnel(remote_socket, target_host, target_port) | |
while True: | |
if len(data) == 0: | |
break | |
remote_socket.send(data) | |
remote_data = remote_socket.recv(4096) | |
client_socket.send(remote_data) | |
data = client_socket.recv(4096) | |
client_socket.close() | |
remote_socket.close() | |
def main(local_host, local_port, proxy_host, proxy_port): | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind((local_host, local_port)) | |
server.listen(5) | |
print(f"[*] Listening on {local_host}:{local_port}") | |
while True: | |
client_socket, addr = server.accept() | |
print(f"[*] Accepted connection from: {addr[0]}:{addr[1]}") | |
# Spin a thread to handle the client | |
client_thread = threading.Thread(target=handle_client, args=(client_socket, proxy_host, proxy_port)) | |
client_thread.start() | |
if __name__ == "__main__": | |
# Local settings (where the proxy will run) | |
LOCAL_HOST = "127.0.0.1" | |
LOCAL_PORT = 8080 | |
# Corporate Proxy settings | |
PROXY_HOST = "corporate.proxy.address" # Replace with your corporate proxy address | |
PROXY_PORT = 8080 # Replace with your corporate proxy port | |
main(LOCAL_HOST, LOCAL_PORT, PROXY_HOST, PROXY_PORT) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment