Last active
August 18, 2017 15:12
-
-
Save dboyliao/47dfbb2f3a36a12f7f54b6f2f0960096 to your computer and use it in GitHub Desktop.
helper script for ssh tunneling local host port to destination port
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf8 -*- | |
| from __future__ import print_function | |
| import re | |
| import subprocess as sp | |
| import argparse | |
| import sys | |
| import os | |
| import yaml | |
| if os.name != "posix": | |
| print("Only support Unix/Linux System. Sorry....", file=sys.stderr) | |
| sys.exit(3) | |
| def bind(port, user_name, dest, id_file=None, host='127.0.0.1', ssh_port='22'): | |
| config = _read_config() | |
| if user_name: | |
| config["user_name"] = user_name | |
| if dest: | |
| config["dest"] = dest | |
| assert config.get("user_name", None) is not None, \ | |
| "user name not given and can't find it in the config file" | |
| assert config.get("dest", None) is not None, \ | |
| "destination not given and can't find it in the config file" | |
| assert port is not None, \ | |
| "no port for tunneling" | |
| user_name = config["user_name"] | |
| dest = config["dest"] | |
| host = config.get("host", host) | |
| ssh_port = config.get("ssh_port", ssh_port) | |
| err_code = _ssh_forward(user_name, dest, port, id_file, host, ssh_port) | |
| return err_code | |
| def _ssh_forward(user_name, dest, port, id_file=None, host="127.0.0.1", ssh_port='22'): | |
| if id_file: | |
| cmd = ["ssh", "-i", id_file, | |
| "-p", ssh_port, | |
| "-L", "{port}:{host}:{port}".format(port=port, host=host), | |
| "{}@{}".format(user_name, dest), | |
| "-f", "-N"] | |
| else: | |
| cmd = ["ssh", "-p", ssh_port, "-L", | |
| "{port}:{host}:{port}".format(port=port, host=host), | |
| "{}@{}".format(user_name, dest), | |
| "-f", "-N"] | |
| err_code = sp.call(cmd) | |
| return err_code | |
| def break_connect(**kwargs): | |
| dest = kwargs.get("dest", None) | |
| port = kwargs.get("port", None) | |
| config = _read_config().get("tunnel", {}) | |
| if dest: | |
| config["dest"] = dest | |
| assert config.get("dest", None) is not None, \ | |
| "destination not given and can't find it in config file" | |
| dest = config["dest"] | |
| p_ssh_tunnel = sp.Popen("ps aux | grep ssh | grep -- -L", shell=True, stdout=sp.PIPE) | |
| p_dest = sp.Popen(["grep", dest], stdin=p_ssh_tunnel.stdout, stdout=sp.PIPE) | |
| if port: | |
| print("Break pipe to {}:{}".format(dest, port)) | |
| p_port = sp.Popen(["grep", port], stdin=p_dest.stdout, stdout=sp.PIPE) | |
| p_kill = sp.Popen("awk '{ print $2 }' | xargs kill", | |
| shell=True, | |
| stdin=p_port.stdout) | |
| else: | |
| print("Breaking all pipes to {}".format(dest)) | |
| p_kill = sp.Popen("awk '{ print $2 }' | xargs kill", | |
| shell=True, | |
| stdin=p_dest.stdout) | |
| p_ssh_tunnel.stdout.close() | |
| _, err = p_kill.communicate() | |
| if err: | |
| return 2 | |
| return 0 | |
| def ls_tunnel(): | |
| p_all_tunnel = sp.Popen("ps aux | grep ssh | grep -v grep | grep -- -L", | |
| stdout=sp.PIPE, | |
| shell=True) | |
| out, err = p_all_tunnel.communicate() | |
| if err: | |
| print(err, file=sys.stderr) | |
| return 2 | |
| forward_pattern = re.compile(r"(\d*):([\d\w\.]*):\d*") | |
| dest_pattern = re.compile(r"([\w\d\.]*)@([\w\d\.]*)") | |
| out = out.decode('utf8') | |
| forward_matchs = [m for m in forward_pattern.finditer(out)] | |
| dest_matchs = [m for m in dest_pattern.finditer(out)] | |
| for forward_m, dest_m in zip(forward_matchs, dest_matchs): | |
| port = forward_m.group(1) | |
| host = forward_m.group(2) | |
| user = dest_m.group(1) | |
| dest = dest_m.group(2) | |
| print("{host}:{port} --> {user}@{dest}:{port}".format(host=host, | |
| port=port, | |
| user=user, | |
| dest=dest)) | |
| return 0 | |
| def config(): | |
| data = _read_config() | |
| print("leave blank to use original configuration") | |
| old_user_name = data.get("user_name", None) | |
| prompt = old_user_name is None and \ | |
| "user name: " or \ | |
| "user name ({}): ".format(old_user_name) | |
| user_name = input(prompt) | |
| old_dest = data.get("dest", None) | |
| prompt = old_dest is None and \ | |
| "destination: " or \ | |
| "destination ({}): ".format(old_dest) | |
| dest = input(prompt) | |
| old_host = data.get("host", None) | |
| prompt = old_host is None and \ | |
| "host: " or \ | |
| "host ({}): ".format(old_host) | |
| host = input(prompt) | |
| old_ssh_port = data.get("ssh_port", None) | |
| prompt = old_ssh_port is None and \ | |
| "ssh port: " or \ | |
| "ssh port ({}): ".format(old_ssh_port) | |
| ssh_port = input(prompt) | |
| if user_name: | |
| data["user_name"] = user_name | |
| if dest: | |
| data["dest"] = dest | |
| if host: | |
| data["host"] = host | |
| if ssh_port: | |
| data["ssh_port"] = ssh_port | |
| config_path = _get_config_path() | |
| with open(config_path, "w") as wf: | |
| yaml.dump({"tunnel": data}, wf, default_flow_style=False) | |
| def ls_config(): | |
| config_path = _get_config_path() | |
| if not os.path.exists(config_path): | |
| print("No config file found", file=sys.stderr) | |
| return 2 | |
| with open(config_path, "r") as rf: | |
| print(rf.read()) | |
| return 0 | |
| def _get_config_path(): | |
| home_dir = os.path.expanduser("~") | |
| config_path = os.path.join(home_dir, ".tunnel_config.yaml") | |
| return config_path | |
| def _read_config(): | |
| config_path = _get_config_path() | |
| if os.path.exists(config_path): | |
| with open(config_path, "r") as rf: | |
| config_content = yaml.load(rf) | |
| return config_content["tunnel"] | |
| else: | |
| return {} | |
| def main(subcmd, **kwargs): | |
| if subcmd == "bind": | |
| return bind(**kwargs) | |
| elif subcmd == "config": | |
| return config() | |
| elif subcmd == "break": | |
| return break_connect(**kwargs) | |
| elif subcmd == "ls-config": | |
| return ls_config() | |
| elif subcmd == "ls": | |
| return ls_tunnel() | |
| else: | |
| print("Unknown subcommand: {}".format(subcmd), file=sys.stderr) | |
| return 2 | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("subcmd", metavar="SUB_COMMAND", | |
| help="subcommand [bind, break, ls-config, config, ls]") | |
| parser.add_argument("-u", "--user-name", dest="user_name", | |
| metavar="USER_NAME", | |
| default=None, | |
| help="loging name") | |
| parser.add_argument("-d", "--destination", dest="dest", | |
| metavar="DESTINATION", | |
| default=None, | |
| help="destination address") | |
| parser.add_argument("-p", "--port", dest="port", | |
| metavar="PORT", | |
| default=None, | |
| help="port to tunnel or break") | |
| parser.add_argument("-i", "--id-file", metavar="PEM_FILE", | |
| dest="id_file", | |
| default=None, | |
| help="ssh identity file path") | |
| parser.add_argument('--host', metavar="HOST", | |
| dest="host", default='127.0.0.1', | |
| help="host address (default: 127.0.0.1)") | |
| parser.add_argument("--ssh-port", metavar="SSH_PORT", | |
| dest="ssh_port", default='22', | |
| help="ssh port (default: 22)") | |
| args = vars(parser.parse_args()) | |
| sys.exit(main(**args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment