Skip to content

Instantly share code, notes, and snippets.

@Romern
Created March 6, 2025 10:19
Show Gist options
  • Save Romern/4de7a0b4c27a8acb2b94f4a3dd250659 to your computer and use it in GitHub Desktop.
Save Romern/4de7a0b4c27a8acb2b94f4a3dd250659 to your computer and use it in GitHub Desktop.
import socket
import threading
# Configuration
LISTEN_HOST = '0.0.0.0' # Listen on all interfaces
LISTEN_PORT = 12345 # Port to listen on
TARGET_HOST = 'example.com' # Destination host
TARGET_PORT = 80 # Destination port
SEARCH_BYTES = b'hello' # Byte sequence to search for
REPLACE_BYTES = b'world' # Replacement byte sequence
def handle_client(client_socket):
"""Handles client connection and forwards data to the target server."""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect((TARGET_HOST, TARGET_PORT))
def forward(source, destination):
"""Forwards data between sockets, replacing specific byte sequences."""
try:
while True:
data = source.recv(4096)
if not data:
break
modified_data = data.replace(SEARCH_BYTES, REPLACE_BYTES)
destination.sendall(modified_data)
except Exception as e:
print(f"Connection error: {e}")
finally:
source.close()
destination.close()
# Start bidirectional forwarding
threading.Thread(target=forward, args=(client_socket, server_socket)).start()
threading.Thread(target=forward, args=(server_socket, client_socket)).start()
def start_proxy():
"""Starts the TCP proxy."""
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy_socket.bind((LISTEN_HOST, LISTEN_PORT))
proxy_socket.listen(5)
print(f"Listening on {LISTEN_HOST}:{LISTEN_PORT} and forwarding to {TARGET_HOST}:{TARGET_PORT}")
while True:
client_socket, addr = proxy_socket.accept()
print(f"Accepted connection from {addr}")
threading.Thread(target=handle_client, args=(client_socket,)).start()
if __name__ == "__main__":
start_proxy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment