Skip to content

Instantly share code, notes, and snippets.

@joshfinley
Created September 28, 2023 20:13
Show Gist options
  • Save joshfinley/d4503df7f7f407dd50b6a9236b9425b3 to your computer and use it in GitHub Desktop.
Save joshfinley/d4503df7f7f407dd50b6a9236b9425b3 to your computer and use it in GitHub Desktop.
tunssh.py
import socket
import threading
import socks # PySocks
def handle_client(client_socket):
# Establish a tunnel through the corporate proxy using CONNECT
remote_socket = socks.socksocket()
remote_socket.set_proxy(socks.PROXY_TYPE_HTTP, "corporate_proxy_ip", corporate_proxy_port)
try:
remote_socket.connect(("target_ssh_server", 22))
except Exception as e:
print(f"Could not establish tunnel: {e}")
client_socket.close()
return
def forward_data(src_socket, dst_socket):
while True:
data = src_socket.recv(4096)
if len(data) == 0:
break
dst_socket.sendall(data)
threading.Thread(target=forward_data, args=(client_socket, remote_socket)).start()
threading.Thread(target=forward_data, args=(remote_socket, client_socket)).start()
def main():
local_host = "0.0.0.0"
local_port = 8888 # This port should match with what you use in the SSH tunnel
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((local_host, local_port))
server.listen(5)
print(f"Listening on {local_host}:{local_port}")
while True:
client_sock, addr = server.accept()
print(f"Received incoming connection from {addr[0]}:{addr[1]}")
# Start a new thread to handle this client
client_thread = threading.Thread(target=handle_client, args=(client_sock,))
client_thread.start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment