Skip to content

Instantly share code, notes, and snippets.

@dboyliao
Last active August 18, 2017 15:12
Show Gist options
  • Select an option

  • Save dboyliao/47dfbb2f3a36a12f7f54b6f2f0960096 to your computer and use it in GitHub Desktop.

Select an option

Save dboyliao/47dfbb2f3a36a12f7f54b6f2f0960096 to your computer and use it in GitHub Desktop.
helper script for ssh tunneling local host port to destination port
#!/usr/bin/env python3
# -*- coding: utf8 -*-
from __future__ import print_function
import re
import subprocess as sp
import argparse
import sys
import os
import yaml
if os.name != "posix":
print("Only support Unix/Linux System. Sorry....", file=sys.stderr)
sys.exit(3)
def bind(port, user_name, dest, id_file=None, host='127.0.0.1', ssh_port='22'):
config = _read_config()
if user_name:
config["user_name"] = user_name
if dest:
config["dest"] = dest
assert config.get("user_name", None) is not None, \
"user name not given and can't find it in the config file"
assert config.get("dest", None) is not None, \
"destination not given and can't find it in the config file"
assert port is not None, \
"no port for tunneling"
user_name = config["user_name"]
dest = config["dest"]
host = config.get("host", host)
ssh_port = config.get("ssh_port", ssh_port)
err_code = _ssh_forward(user_name, dest, port, id_file, host, ssh_port)
return err_code
def _ssh_forward(user_name, dest, port, id_file=None, host="127.0.0.1", ssh_port='22'):
if id_file:
cmd = ["ssh", "-i", id_file,
"-p", ssh_port,
"-L", "{port}:{host}:{port}".format(port=port, host=host),
"{}@{}".format(user_name, dest),
"-f", "-N"]
else:
cmd = ["ssh", "-p", ssh_port, "-L",
"{port}:{host}:{port}".format(port=port, host=host),
"{}@{}".format(user_name, dest),
"-f", "-N"]
err_code = sp.call(cmd)
return err_code
def break_connect(**kwargs):
dest = kwargs.get("dest", None)
port = kwargs.get("port", None)
config = _read_config().get("tunnel", {})
if dest:
config["dest"] = dest
assert config.get("dest", None) is not None, \
"destination not given and can't find it in config file"
dest = config["dest"]
p_ssh_tunnel = sp.Popen("ps aux | grep ssh | grep -- -L", shell=True, stdout=sp.PIPE)
p_dest = sp.Popen(["grep", dest], stdin=p_ssh_tunnel.stdout, stdout=sp.PIPE)
if port:
print("Break pipe to {}:{}".format(dest, port))
p_port = sp.Popen(["grep", port], stdin=p_dest.stdout, stdout=sp.PIPE)
p_kill = sp.Popen("awk '{ print $2 }' | xargs kill",
shell=True,
stdin=p_port.stdout)
else:
print("Breaking all pipes to {}".format(dest))
p_kill = sp.Popen("awk '{ print $2 }' | xargs kill",
shell=True,
stdin=p_dest.stdout)
p_ssh_tunnel.stdout.close()
_, err = p_kill.communicate()
if err:
return 2
return 0
def ls_tunnel():
p_all_tunnel = sp.Popen("ps aux | grep ssh | grep -v grep | grep -- -L",
stdout=sp.PIPE,
shell=True)
out, err = p_all_tunnel.communicate()
if err:
print(err, file=sys.stderr)
return 2
forward_pattern = re.compile(r"(\d*):([\d\w\.]*):\d*")
dest_pattern = re.compile(r"([\w\d\.]*)@([\w\d\.]*)")
out = out.decode('utf8')
forward_matchs = [m for m in forward_pattern.finditer(out)]
dest_matchs = [m for m in dest_pattern.finditer(out)]
for forward_m, dest_m in zip(forward_matchs, dest_matchs):
port = forward_m.group(1)
host = forward_m.group(2)
user = dest_m.group(1)
dest = dest_m.group(2)
print("{host}:{port} --> {user}@{dest}:{port}".format(host=host,
port=port,
user=user,
dest=dest))
return 0
def config():
data = _read_config()
print("leave blank to use original configuration")
old_user_name = data.get("user_name", None)
prompt = old_user_name is None and \
"user name: " or \
"user name ({}): ".format(old_user_name)
user_name = input(prompt)
old_dest = data.get("dest", None)
prompt = old_dest is None and \
"destination: " or \
"destination ({}): ".format(old_dest)
dest = input(prompt)
old_host = data.get("host", None)
prompt = old_host is None and \
"host: " or \
"host ({}): ".format(old_host)
host = input(prompt)
old_ssh_port = data.get("ssh_port", None)
prompt = old_ssh_port is None and \
"ssh port: " or \
"ssh port ({}): ".format(old_ssh_port)
ssh_port = input(prompt)
if user_name:
data["user_name"] = user_name
if dest:
data["dest"] = dest
if host:
data["host"] = host
if ssh_port:
data["ssh_port"] = ssh_port
config_path = _get_config_path()
with open(config_path, "w") as wf:
yaml.dump({"tunnel": data}, wf, default_flow_style=False)
def ls_config():
config_path = _get_config_path()
if not os.path.exists(config_path):
print("No config file found", file=sys.stderr)
return 2
with open(config_path, "r") as rf:
print(rf.read())
return 0
def _get_config_path():
home_dir = os.path.expanduser("~")
config_path = os.path.join(home_dir, ".tunnel_config.yaml")
return config_path
def _read_config():
config_path = _get_config_path()
if os.path.exists(config_path):
with open(config_path, "r") as rf:
config_content = yaml.load(rf)
return config_content["tunnel"]
else:
return {}
def main(subcmd, **kwargs):
if subcmd == "bind":
return bind(**kwargs)
elif subcmd == "config":
return config()
elif subcmd == "break":
return break_connect(**kwargs)
elif subcmd == "ls-config":
return ls_config()
elif subcmd == "ls":
return ls_tunnel()
else:
print("Unknown subcommand: {}".format(subcmd), file=sys.stderr)
return 2
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("subcmd", metavar="SUB_COMMAND",
help="subcommand [bind, break, ls-config, config, ls]")
parser.add_argument("-u", "--user-name", dest="user_name",
metavar="USER_NAME",
default=None,
help="loging name")
parser.add_argument("-d", "--destination", dest="dest",
metavar="DESTINATION",
default=None,
help="destination address")
parser.add_argument("-p", "--port", dest="port",
metavar="PORT",
default=None,
help="port to tunnel or break")
parser.add_argument("-i", "--id-file", metavar="PEM_FILE",
dest="id_file",
default=None,
help="ssh identity file path")
parser.add_argument('--host', metavar="HOST",
dest="host", default='127.0.0.1',
help="host address (default: 127.0.0.1)")
parser.add_argument("--ssh-port", metavar="SSH_PORT",
dest="ssh_port", default='22',
help="ssh port (default: 22)")
args = vars(parser.parse_args())
sys.exit(main(**args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment