Last active
March 13, 2024 21:48
-
-
Save imneonizer/5a4709b0b3f3a4374e716574eab3e5f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright (C) 2003-2007 Robey Pointer <[email protected]> | |
# | |
# This file is part of paramiko. | |
# | |
# Paramiko is free software; you can redistribute it and/or modify it under the | |
# terms of the GNU Lesser General Public License as published by the Free | |
# Software Foundation; either version 2.1 of the License, or (at your option) | |
# any later version. | |
# | |
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY | |
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | |
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
# details. | |
# | |
# You should have received a copy of the GNU Lesser General Public License | |
# along with Paramiko; if not, write to the Free Software Foundation, Inc., | |
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. | |
""" | |
Sample script showing how to do local port forwarding over paramiko. | |
This script connects to the requested SSH server and sets up local port | |
forwarding (the openssh -L option) from a local port through a tunneled | |
connection to a destination reachable from the SSH server machine. | |
""" | |
import getpass | |
import os | |
import socket | |
import select | |
try: | |
import SocketServer | |
except ImportError: | |
import socketserver as SocketServer | |
import sys | |
from optparse import OptionParser | |
import paramiko | |
SSH_PORT = 22 | |
DEFAULT_PORT = 4000 | |
g_verbose = True | |
class ForwardServer(SocketServer.ThreadingTCPServer): | |
daemon_threads = True | |
allow_reuse_address = True | |
class Handler(SocketServer.BaseRequestHandler): | |
def handle(self): | |
try: | |
chan = self.ssh_transport.open_channel( | |
"direct-tcpip", | |
(self.chain_host, self.chain_port), | |
self.request.getpeername(), | |
) | |
except Exception as e: | |
verbose( | |
"Incoming request to %s:%d failed: %s" | |
% (self.chain_host, self.chain_port, repr(e)) | |
) | |
return | |
if chan is None: | |
verbose( | |
"Incoming request to %s:%d was rejected by the SSH server." | |
% (self.chain_host, self.chain_port) | |
) | |
return | |
verbose( | |
"Connected! Tunnel open %r -> %r -> %r" | |
% ( | |
self.request.getpeername(), | |
chan.getpeername(), | |
(self.chain_host, self.chain_port), | |
) | |
) | |
while True: | |
r, w, x = select.select([self.request, chan], [], []) | |
if self.request in r: | |
data = self.request.recv(1024) | |
if len(data) == 0: | |
break | |
chan.send(data) | |
if chan in r: | |
data = chan.recv(1024) | |
if len(data) == 0: | |
break | |
self.request.send(data) | |
peername = self.request.getpeername() | |
chan.close() | |
self.request.close() | |
verbose("Tunnel closed from %r" % (peername,)) | |
def forward_tunnel(local_port, remote_host, remote_port, transport): | |
# this is a little convoluted, but lets me configure things for the Handler | |
# object. (SocketServer doesn't give Handlers any way to access the outer | |
# server normally.) | |
class SubHander(Handler): | |
chain_host = remote_host | |
chain_port = remote_port | |
ssh_transport = transport | |
ForwardServer(("", local_port), SubHander).serve_forever() | |
def verbose(s): | |
if g_verbose: | |
print(s) | |
HELP = """\ | |
Set up a forward tunnel across an SSH server, using paramiko. A local port | |
(given with -p) is forwarded across an SSH session to an address:port from | |
the SSH server. This is similar to the openssh -L option. | |
""" | |
def get_host_port(spec, default_port): | |
"parse 'hostname:22' into a host and port, with the port optional" | |
args = (spec.split(":", 1) + [default_port])[:2] | |
args[1] = int(args[1]) | |
return args[0], args[1] | |
def parse_options(): | |
global g_verbose | |
parser = OptionParser( | |
usage="usage: %prog [options] <ssh-server>[:<server-port>]", | |
version="%prog 1.0", | |
description=HELP, | |
) | |
parser.add_option( | |
"-q", | |
"--quiet", | |
action="store_false", | |
dest="verbose", | |
default=True, | |
help="squelch all informational output", | |
) | |
parser.add_option( | |
"-p", | |
"--local-port", | |
action="store", | |
type="int", | |
dest="port", | |
default=DEFAULT_PORT, | |
help="local port to forward (default: %d)" % DEFAULT_PORT, | |
) | |
parser.add_option( | |
"-u", | |
"--user", | |
action="store", | |
type="string", | |
dest="user", | |
default=getpass.getuser(), | |
help="username for SSH authentication (default: %s)" | |
% getpass.getuser(), | |
) | |
parser.add_option( | |
"-K", | |
"--key", | |
action="store", | |
type="string", | |
dest="keyfile", | |
default=None, | |
help="private key file to use for SSH authentication", | |
) | |
parser.add_option( | |
"", | |
"--no-key", | |
action="store_false", | |
dest="look_for_keys", | |
default=True, | |
help="don't look for or use a private key file", | |
) | |
parser.add_option( | |
"-P", | |
"--password", | |
action="store_true", | |
dest="readpass", | |
default=False, | |
help="read password (for key or password auth) from stdin", | |
) | |
parser.add_option( | |
"-r", | |
"--remote", | |
action="store", | |
type="string", | |
dest="remote", | |
default=None, | |
metavar="host:port", | |
help="remote host and port to forward to", | |
) | |
options, args = parser.parse_args() | |
if len(args) != 1: | |
parser.error("Incorrect number of arguments.") | |
if options.remote is None: | |
parser.error("Remote address required (-r).") | |
g_verbose = options.verbose | |
server_host, server_port = get_host_port(args[0], SSH_PORT) | |
remote_host, remote_port = get_host_port(options.remote, SSH_PORT) | |
return options, (server_host, server_port), (remote_host, remote_port) | |
def main(): | |
options, server, remote = parse_options() | |
password = None | |
if options.readpass: | |
password = getpass.getpass("Enter SSH password: ") | |
client = paramiko.SSHClient() | |
client.load_system_host_keys() | |
client.set_missing_host_key_policy(paramiko.WarningPolicy()) | |
verbose("Connecting to ssh host %s:%d ..." % (server[0], server[1])) | |
try: | |
client.connect( | |
server[0], | |
server[1], | |
username=options.user, | |
key_filename=options.keyfile, | |
look_for_keys=options.look_for_keys, | |
password=password, | |
) | |
except Exception as e: | |
print("*** Failed to connect to %s:%d: %r" % (server[0], server[1], e)) | |
sys.exit(1) | |
verbose( | |
"Now forwarding port %d to %s:%d ..." | |
% (options.port, remote[0], remote[1]) | |
) | |
try: | |
forward_tunnel( | |
options.port, remote[0], remote[1], client.get_transport() | |
) | |
except KeyboardInterrupt: | |
print("C-c: Port forwarding stopped.") | |
sys.exit(0) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import paramiko | |
import re | |
import os | |
import time | |
import collections | |
import threading | |
import hashlib | |
import concurrent.futures | |
pprint_lock = threading.Lock() | |
class ShellHistory: | |
def __init__(self, stdin_maxlen, stdout_maxlen): | |
self._stdin = collections.deque(maxlen=stdin_maxlen) | |
self._stdout = collections.deque(maxlen=stdout_maxlen) | |
@property | |
def stdin(self): | |
return self._stdin | |
@property | |
def stdout(self): | |
return self._stdout | |
class RemoteSSH: | |
def __init__(self, *args, **kwargs): | |
if args or kwargs: | |
self.connect(*args, **kwargs) | |
def connect(self, username, host, password, \ | |
host_keys="/dev/null", port=22, timeout=5, \ | |
stdin_history_size=10, stdout_history_size=20): | |
self.username = username.strip() | |
self.host = host.strip() | |
self.hostname = host | |
self.password = password | |
self.host_keys = host_keys | |
self.port = port | |
self.timeout = timeout | |
self.stdin_history_size = stdin_history_size | |
self.stdout_history_size = stdout_history_size | |
self.init_session() | |
self.pattern = [ | |
self.host, | |
self.host, | |
self.hostname, | |
"{}@{}".format(self.username, self.host), | |
"{}@{}".format(self.username, self.hostname), | |
"{}@{} => {}".format(self.username, self.hostname, self.host), | |
] | |
return self | |
def init_session(self): | |
self.session = paramiko.SSHClient() | |
self.session.load_host_keys(self.host_keys) | |
self.session.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
self.session.connect( | |
username=self.username, | |
hostname=self.host, | |
password=self.password, | |
port=self.port, | |
timeout=self.timeout | |
) | |
self.channel = self.session.invoke_shell() | |
self.stdin = self.channel.makefile('wb') | |
self.stdout = self.channel.makefile('r') | |
self.sftp = self.session.open_sftp() | |
self.history = ShellHistory( | |
self.stdin_history_size, | |
self.stdout_history_size | |
) | |
self.hostname = self.execute("hostname").strip() | |
def __del__(self): | |
try: | |
self.sftp.close() | |
self.session.close() | |
except: pass | |
def put(self, localpath, remotepath): | |
sftp = self.session.open_sftp() | |
if os.path.isdir(localpath): | |
try: | |
# test if remote_path exists | |
sftp.chdir(remotepath) | |
except IOError: | |
# create remote dir | |
sftp.mkdir(remotepath) | |
# copy file to remote one by one | |
for file in os.listdir(localpath): | |
self.put(os.path.join(localpath, file), os.path.join(remotepath, file)) | |
else: | |
sftp.put(localpath, remotepath) | |
sftp.close() | |
def get(self, remotepath, localpath, isdir=False): | |
sftp = self.session.open_sftp() | |
if isdir: | |
# create local dir | |
os.makedirs(localpath, exist_ok=True) | |
# copy files from remote one by one | |
for file in sftp.listdir(path=remotepath): | |
self.get(os.path.join(remotepath, file), os.path.join(localpath, file)) | |
else: | |
try: | |
sftp.get(remotepath, localpath) | |
except OSError: | |
# exception means it's a directory | |
if os.path.exists(localpath) and os.path.isfile(localpath) and os.path.getsize(localpath) == 0: | |
# remove temporary file which is created by sftp | |
os.remove(localpath) | |
# recursively download directory | |
self.get(remotepath, localpath, isdir=True) | |
sftp.close() | |
def reset(self): | |
self.session.close() | |
self.init_session() | |
def terminate(self): | |
# sends program terminate command, i.e, ctrl+c | |
self.stdin.write("\x03") | |
def change_password(self, current_password=None, new_password=None, username=None): | |
current_password = current_password or self.password | |
new_password = new_password or self.password | |
username = username or self.username | |
if current_password == new_password: | |
return True | |
if (username == self.username) and (current_password != self.password): | |
raise ValueError("current_password doesn't matches with self.password") | |
cmd = "sudo echo -n; echo '{username}:{new_password}' | sudo chpasswd".format( | |
username=username, | |
new_password=new_password | |
) | |
self.execute(cmd) | |
if "incorrect password attempt" in self.execute("sudo ls"): | |
# this means password changed successfully! | |
# update current password in the object | |
self.password = new_password | |
return True | |
# return false if password is unchanged | |
return False | |
def write_password(self, password=None): | |
# write password to the shell | |
self.stdin.write(password or self.password + "\n") | |
def write(self, command, postfix="\n", sudo=False): | |
if command.startswith("sudo") or sudo: | |
# if commands starts with sudo, then automatically enter password | |
command = "echo {} | sudo -S {}".format(self.password, command.strip()) | |
# execute command normally | |
self.stdin.write(command + postfix) | |
self.history._stdin.append(command) | |
def execute(self, command, pprint=0, end='', sudo=False): | |
# execute a single command and return output without writing to stdin | |
# faster than stdin.write | |
if command.startswith("sudo") or sudo: | |
command = "echo {} | sudo -S {}".format(self.password, command.strip()) | |
stdin, stdout, stderr = self.session.exec_command(command.strip() + "\n") | |
stdout.channel.recv_exit_status() | |
error = "" | |
if stderr: | |
error = "\n".join(stderr.readlines()) | |
output = '\n'.join([self.clean(line.rstrip()) for line in stdout.readlines()]) | |
output += error | |
# replace traces of 'sudo enter password' from output | |
output = output.replace("[sudo] password for {}: ".format(self.username), "") | |
if pprint: | |
self.pprint(output, pattern=pprint, end=end) | |
return output | |
def clean(self, line): | |
# clean string to remove any kind of shell color formatting | |
return re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]').sub('', line).replace('\b', '').replace('\r', '') | |
def get_color_escape(self, r, g, b, background=False): | |
return '\033[{};2;{};{};{}m'.format(48 if background else 38, r, g, b) | |
def colorize(self, string): | |
RESET = '\033[0m' | |
return self.get_color_escape(*self.get_rgb(self.host))+string+RESET | |
def get_rgb(self, string): | |
if "." in string: | |
string = string.split(".")[-1] | |
h = str(int(hashlib.md5(string.encode('utf-8')).hexdigest(), 16) % 10**9) | |
r, g, b = int(h[:3]), int(h[3:6]), int(h[6:]) | |
s, scale = r + g +b, 255 | |
r, g, b = round((r/s)*scale), round((g/s)*scale), round((b/s)*scale) | |
h = int(hashlib.md5(string.encode('utf-8')).hexdigest(), 32) % 10**1 | |
h = round(h / 3) | |
if h == 0: | |
r = r**10 | |
elif h == 1: | |
g = g**10 | |
else: | |
b = b**10 | |
return r, g, b | |
def pprint(self, string, pattern=0, end=None): | |
try: | |
pattern = self.pattern[pattern] | |
except: | |
pattern = self.pattern[0] | |
pprint_lock.acquire() | |
try: | |
for line in string.split("\n"): | |
print(self.colorize(pattern), "|", line) | |
if end: | |
print(end, end='') | |
except Exception: | |
raise | |
finally: | |
pprint_lock.release() | |
def read(self, nbytes=2048, data="", bypass_sudo=True, repeat_last=False, clean=True, include_stdin=False): | |
# receive data in buffers | |
while True: | |
if self.channel.recv_ready(): | |
data += self.channel.recv(nbytes).decode() | |
else: | |
break | |
# logic to bypass sudo | |
if "[sudo] password for" in data and bypass_sudo == True: | |
# automatically enter password | |
self.write_password() | |
time.sleep(1) | |
# read more from stdout after | |
# password is written to stdin | |
return self.read(data=data, bypass_sudo=False) | |
# save stdout history | |
output = "" | |
for line in data.split("\n"): | |
# if clean = True, use regex to get rid of colorful content | |
line = self.clean(line.rstrip()) if clean else line.rstrip() | |
# don't include executed command in stdout | |
if include_stdin == False: | |
try: | |
if line == self.history._stdin[-1]: | |
continue | |
except: pass | |
self.history._stdout.append(line) | |
output += line+"\n" | |
# logic to return data from history if output is empty | |
if (not output.rstrip()) and (repeat_last == True): | |
output = '\n'.join(self.history.stdout) | |
return output.rstrip() | |
class Thread(threading.Thread): | |
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): | |
threading.Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon) | |
self._return = None | |
def run(self): | |
if self._target is not None: | |
self._return = self._target(*self._args, **self._kwargs) | |
def join(self): | |
threading.Thread.join(self) | |
return self._return | |
class ConcurrentThreadPool: | |
def map(self, target, args): | |
result = [] | |
with concurrent.futures.ProcessPoolExecutor() as executor: | |
futures = [executor.submit(target, arg) for arg in args] | |
for future in concurrent.futures.as_completed(futures): | |
result.append(future.result()) | |
return result | |
class ThreadPool: | |
def map(self, target, args): | |
threads = [Thread(target=target, args=(arg,)) for arg in args] | |
[t.start() for t in threads] | |
return [t.join() for t in threads if t.join()] | |
def map(target, args, concurrent=False): | |
if concurrent: | |
return ConcurrentThreadPool().map(target=target, args=args) | |
return ThreadPool().map(target=target, args=args) | |
sync_lock = threading.Lock() | |
def sync(func): | |
def inner(*args, **kwargs): | |
sync_lock.acquire() | |
try: | |
func(*args, **kwargs) | |
except Exception: | |
raise | |
finally: | |
sync_lock.release() | |
return inner |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.