Skip to content

Instantly share code, notes, and snippets.

@iTrooz
Last active August 9, 2024 21:29
Show Gist options
  • Save iTrooz/e410e753f0e8f89dfb413c1dfe939900 to your computer and use it in GitHub Desktop.
Save iTrooz/e410e753f0e8f89dfb413c1dfe939900 to your computer and use it in GitHub Desktop.
Utility to setup SSH UDP port forwarding easily
#!/usr/bin/env python3
"""
This script setups the UDP translators in order to enable UDP traffic forwarding over SSH.
-L example usage: `py ssh_udp.py -L 53000:8.8.8.8:53 your_ssh_host` an then `dig @127.0.0.1 -p 53000 google.com` locally
-R example usage: `py ssh_udp.py -R 53000:8.8.8.8:53 your_ssh_host` an then `dig @127.0.0.1 -p 53000 google.com` on the server
Note: I know this code is ugly.
Thanks to https://stackpointer.io/network/ssh-port-forwarding-tcp-udp/365/ for the help
"""
import subprocess
import time
import signal
import sys
import random
def has_exited(process: subprocess.Popen) -> bool:
return process.poll() != None
def random_port() -> int:
return random.randint(32768, 60999)
class SSH_UDP:
def __init__(self):
self.exiting = False
def run(self):
processes = []
signal.signal(signal.SIGINT, lambda *_: self.exit_processes(processes))
self.local_tmp_port = random_port()
self.remote_tmp_port = random_port()
self.parse_args()
self.run_local_udp_translator()
self.run_remote_udp_translator()
self.run_port_forward()
processes += [self.local_udp_translator_process, self.remote_udp_translator_process, self.port_forward_process]
print("Ready")
self.wait_and_exit(processes)
# Commands have exited
def run_process(self, cmd: list[str]):
print(f"Running command: {" ".join(cmd)}")
return subprocess.Popen(cmd)
def run_local_udp_translator(self):
if self.forward_mode == "-L":
cmd = ["socat", "-T15", f"udp4-recvfrom:{self.local_port},reuseaddr,fork", f"tcp:localhost:{self.local_tmp_port}"]
elif self.forward_mode == "-R":
cmd = ["socat", "-T15", f"tcp4-listen:{self.local_tmp_port},reuseaddr,fork", f"udp:{self.local_host}:{self.local_port}"]
else:
raise ValueError("Invalid forward mode")
self.local_udp_translator_process = self.run_process(cmd)
def run_remote_udp_translator(self):
if self.forward_mode == "-L":
inner_cmd = ["socat", f"tcp4-listen:{self.remote_tmp_port},reuseaddr,fork", f"udp:{self.remote_host}:{self.remote_port}"]
elif self.forward_mode == "-R":
inner_cmd = ["socat", f"udp4-recvfrom:{self.remote_port},reuseaddr,fork", f"tcp:localhost:{self.remote_tmp_port}"]
else:
raise ValueError("Invalid forward mode")
# -t makes the remote process exit when ssh exits
cmd = ["ssh", "-t", self.host, *inner_cmd]
self.remote_udp_translator_process = self.run_process(cmd)
def run_port_forward(self):
if self.forward_mode == "-L":
# Forward to our local udp translator (localhost)
cmd = ["ssh", "-N", "-L", f"{self.local_host}:{self.local_tmp_port}:localhost:{self.remote_tmp_port}", self.host]
elif self.forward_mode == "-R":
# Forward to the remote udp translator (localhost)
cmd = ["ssh", "-N", "-R", f"{self.remote_host}:{self.remote_tmp_port}:localhost:{self.local_tmp_port}", self.host]
else:
raise ValueError("Invalid forward mode")
self.port_forward_process = self.run_process(cmd)
def parse_args(self):
iterator = iter(sys.argv)
next(iterator) # Skip program name
host_parsed = False
ports_parsed = False
for arg in iterator:
if arg == "-L" or arg == "-R":
if ports_parsed:
raise ValueError("Ports already parsed")
else:
self.forward_mode = arg
self.ports_arg = next(iterator)
self.parse_ports_arg(self.forward_mode, self.ports_arg)
ports_parsed = True
else:
if host_parsed:
raise ValueError("Host already parsed")
else:
self.host = arg
host_parsed = True
if not (host_parsed and ports_parsed):
raise ValueError("Invalid arguments")
def parse_ports_arg(self, forward_mode: str, ports_arg: str):
split = ports_arg.split(":")
if len(split) == 3: # 4444:localhost:4444
self.local_host = "0.0.0.0"
self.local_port = int(split[0])
self.remote_host = split[1]
self.remote_port = int(split[2])
elif len(split) == 4: # localhost:4444:localhost:4444
self.local_host = split[0]
self.local_port = int(split[1])
self.remote_host = split[2]
self.remote_port = int(split[3])
else:
raise ValueError("Invalid port argument")
if forward_mode == "-L":
pass
elif forward_mode == "-R":
self.local_port, self.remote_port = self.remote_port, self.local_port
self.local_host, self.remote_host = self.remote_host, self.local_host
else:
raise ValueError("Invalid forward mode")
def wait_and_exit(self, processes: list[subprocess.Popen]):
while not self.exiting:
if any(map(has_exited, processes)):
self.exit_processes(processes)
break
time.sleep(1)
def exit_processes(self, processes: list[subprocess.Popen]):
print("Exiting...")
self.exiting = True
for process in processes:
process.terminate()
for process in processes:
process.wait()
sys.exit(0)
if __name__ == "__main__":
c = SSH_UDP()
c.run()
@Miniontoby
Copy link

Hi! I like this script.

However, consider using from <module> import <item> to save some resources. (not that it really matters, but it is better in the long run)

Here is a diff of what changes should be made to do that.

9,13c9,13
< import subprocess
< import time
< import signal
< import sys
< import random
---
> from subprocess import Popen
> from time import sleep
> from signal import signal, SIGINT
> from sys import argv, exit
> from random import randint
15c15
< def has_exited(process: subprocess.Popen) -> bool:
---
> def has_exited(process: Popen) -> bool:
19c19
<     return random.randint(32768, 60999)
---
>     return randint(32768, 60999)
27c27
<         signal.signal(signal.SIGINT, lambda *_: self.exit_processes(processes))
---
>         signal(SIGINT, lambda *_: self.exit_processes(processes))
42,43c42,43
<         print(f"Running command: {" ".join(cmd)}")
<         return subprocess.Popen(cmd)
---
>         print(f"Running command: {' '.join(cmd)}")
>         return Popen(cmd)
80c80
<         iterator = iter(sys.argv)
---
>         iterator = iter(argv)
127c127
<     def wait_and_exit(self, processes: list[subprocess.Popen]):
---
>     def wait_and_exit(self, processes: list[Popen]):
132c132
<             time.sleep(1)
---
>             sleep(1)
134c134
<     def exit_processes(self, processes: list[subprocess.Popen]):
---
>     def exit_processes(self, processes: list[Popen]):
142c142
<         sys.exit(0)
---
>         exit(0)

@iTrooz
Copy link
Author

iTrooz commented Aug 9, 2024

Hey ! I just tried it, and didn't find noticeable performance gain (maybe 1 ms gained out of avg 35ms)
And I prefer the longer syntax I currently have, to be able to clearly see where a function comes from.
So I'm not going to change it

But thanks for the input

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment