Created
July 16, 2017 04:49
-
-
Save cognifloyd/52df0cb7ad345a9d6a0ef5b47284c536 to your computer and use it in GitHub Desktop.
pxssh modified to support preexec_fn, dimensions, and config_file options.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os | |
from pexpect import TIMEOUT, EOF, spawn | |
from pexpect.pxssh import pxssh as orig_pxssh, ExceptionPxssh | |
__all__ = ['ExceptionPxssh', 'pxssh'] | |
class ExceptionPxsshHostKey(ExceptionPxssh): | |
pass | |
# noinspection PyPep8Naming | |
class pxssh(orig_pxssh): | |
# a mix of pexpect.spawn (specify command) and pexpect.pxssh (specify options) | |
def __init__(self, command=None, args=None, timeout=30, maxread=2000, | |
searchwindowsize=None, logfile=None, cwd=None, env=None, | |
ignore_sighup=False, echo=True, preexec_fn=None, | |
encoding=None, codec_errors='strict', dimensions=None, | |
options=None, | |
config_file=None): | |
self.ssh_cmd = command if command else 'ssh' | |
self.ssh_args = [] if not args else args | |
self.config_file = config_file | |
# preexec_fn and dimensions get passed to _spawn directly unless command is None | |
self.preexec_fn = preexec_fn | |
self.dimensions = dimensions | |
options = options if options else {} | |
orig_pxssh.__init__( | |
self, | |
# None, args=[], | |
timeout=timeout, maxread=maxread, | |
searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env, | |
ignore_sighup=ignore_sighup, echo=echo, # preexec_fn=preexec_fn, | |
encoding=encoding, codec_errors=codec_errors, # dimensions=dimensions | |
options=options, | |
) | |
def login(self, server, username=None, password='', terminal_type='ansi', | |
original_prompt=r"[#$]", login_timeout=10, port=None, | |
auto_prompt_reset=True, ssh_key=None, quiet=True, | |
sync_multiplier=1, check_local_ip=True, | |
config_file=None, save_host_key=True): | |
ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()]) | |
if not check_local_ip: | |
ssh_options = ssh_options + " -o'NoHostAuthenticationForLocalhost=yes'" | |
if self.force_password: | |
ssh_options = ssh_options + ' ' + self.SSH_OPTS | |
ssh_args = ' '.join(self.ssh_args) | |
if quiet: | |
ssh_args = ssh_args + ' -q' | |
if port is not None: | |
ssh_args = ssh_args + ' -p %s' % (str(port)) | |
if username is not None: | |
ssh_args = ssh_args + ' -l %s' % (str(username)) | |
if config_file is not None or self.config_file is not None: | |
if not config_file: | |
config_file = self.config_file | |
ssh_args = ssh_args + ' -F %s' % config_file | |
if ssh_key is not None: | |
try: | |
os.path.isfile(ssh_key) | |
except: | |
raise ExceptionPxssh('private ssh key does not exist') | |
ssh_args = ssh_args + ' -i %s' % ssh_key | |
cmd = "%s %s %s %s" % (self.ssh_cmd, ssh_options, ssh_args, server) | |
# This does not distinguish between a remote server 'password' prompt | |
# and a local ssh 'passphrase' prompt (for unlocking a private key). | |
self._spawn(cmd, preexec_fn=self.preexec_fn, dimensions=self.dimensions) | |
connect_prompts = ["(?i)are you sure you want to continue connecting", | |
original_prompt, | |
"(?i)(?:password)|(?:passphrase for key)", | |
"(?i)permission denied", | |
"(?i)terminal type", | |
TIMEOUT] | |
i = self.expect(connect_prompts + ["(?i)connection closed by remote host", EOF], timeout=login_timeout) | |
# First phase | |
if i == 0: | |
# New certificate -- always accept it. | |
# This is what you get if SSH does not have the remote host's | |
# public key stored in the 'known_hosts' cache. | |
if not save_host_key: | |
self.close() | |
raise ExceptionPxsshHostKey('Host key not in known_hosts!') | |
self.sendline("yes") | |
i = self.expect(connect_prompts) | |
if i == 2: # password or passphrase | |
self.sendline(password) | |
i = self.expect(connect_prompts) | |
if i == 4: | |
self.sendline(terminal_type) | |
i = self.expect(connect_prompts) | |
if i == 7: | |
self.close() | |
raise ExceptionPxssh('Could not establish connection to host') | |
# Second phase | |
if i == 0: | |
# This is weird. This should not happen twice in a row. | |
self.close() | |
raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.') | |
elif i == 1: # can occur if you have a public key pair set to authenticate. | |
# # TODO: May NOT be OK if expect() got tricked and matched a false prompt. | |
pass | |
elif i == 2: # password prompt again | |
# For incorrect passwords, some ssh servers will | |
# ask for the password again, others return 'denied' right away. | |
# If we get the password prompt again then this means | |
# we didn't get the password right the first time. | |
self.close() | |
raise ExceptionPxssh('password refused') | |
elif i == 3: # permission denied -- password was bad. | |
self.close() | |
raise ExceptionPxssh('permission denied') | |
elif i == 4: # terminal type again? WTF? | |
self.close() | |
raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.') | |
elif i == 5: # Timeout | |
# This is tricky... I presume that we are at the command-line prompt. | |
# It may be that the shell prompt was so weird that we couldn't match | |
# it. Or it may be that we couldn't log in for some other reason. I | |
# can't be sure, but it's safe to guess that we did login because if | |
# I presume wrong and we are not logged in then this should be caught | |
# later when I try to set the shell prompt. | |
pass | |
elif i == 6: # Connection closed by remote host | |
self.close() | |
raise ExceptionPxssh('connection closed') | |
else: # Unexpected | |
self.close() | |
raise ExceptionPxssh('unexpected login response') | |
if not self.sync_original_prompt(sync_multiplier): | |
self.close() | |
raise ExceptionPxssh('could not synchronize with original prompt') | |
# We appear to be in. | |
# set shell prompt to something unique. | |
if auto_prompt_reset: | |
if not self.set_unique_prompt(): | |
self.close() | |
raise ExceptionPxssh('could not set shell prompt ' | |
'(received: %r, expected: %r).' % ( | |
self.before, self.PROMPT,)) | |
return True | |
login.__doc__ = orig_pxssh.login.__doc__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment