Skip to content

Instantly share code, notes, and snippets.

@cognifloyd
Created July 16, 2017 04:49
Show Gist options
  • Save cognifloyd/52df0cb7ad345a9d6a0ef5b47284c536 to your computer and use it in GitHub Desktop.
Save cognifloyd/52df0cb7ad345a9d6a0ef5b47284c536 to your computer and use it in GitHub Desktop.
pxssh modified to support preexec_fn, dimensions, and config_file options.
from __future__ import print_function
import os
from pexpect import TIMEOUT, EOF, spawn
from pexpect.pxssh import pxssh as orig_pxssh, ExceptionPxssh
__all__ = ['ExceptionPxssh', 'pxssh']
class ExceptionPxsshHostKey(ExceptionPxssh):
pass
# noinspection PyPep8Naming
class pxssh(orig_pxssh):
# a mix of pexpect.spawn (specify command) and pexpect.pxssh (specify options)
def __init__(self, command=None, args=None, timeout=30, maxread=2000,
searchwindowsize=None, logfile=None, cwd=None, env=None,
ignore_sighup=False, echo=True, preexec_fn=None,
encoding=None, codec_errors='strict', dimensions=None,
options=None,
config_file=None):
self.ssh_cmd = command if command else 'ssh'
self.ssh_args = [] if not args else args
self.config_file = config_file
# preexec_fn and dimensions get passed to _spawn directly unless command is None
self.preexec_fn = preexec_fn
self.dimensions = dimensions
options = options if options else {}
orig_pxssh.__init__(
self,
# None, args=[],
timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env,
ignore_sighup=ignore_sighup, echo=echo, # preexec_fn=preexec_fn,
encoding=encoding, codec_errors=codec_errors, # dimensions=dimensions
options=options,
)
def login(self, server, username=None, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True,
config_file=None, save_host_key=True):
ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()])
if not check_local_ip:
ssh_options = ssh_options + " -o'NoHostAuthenticationForLocalhost=yes'"
if self.force_password:
ssh_options = ssh_options + ' ' + self.SSH_OPTS
ssh_args = ' '.join(self.ssh_args)
if quiet:
ssh_args = ssh_args + ' -q'
if port is not None:
ssh_args = ssh_args + ' -p %s' % (str(port))
if username is not None:
ssh_args = ssh_args + ' -l %s' % (str(username))
if config_file is not None or self.config_file is not None:
if not config_file:
config_file = self.config_file
ssh_args = ssh_args + ' -F %s' % config_file
if ssh_key is not None:
try:
os.path.isfile(ssh_key)
except:
raise ExceptionPxssh('private ssh key does not exist')
ssh_args = ssh_args + ' -i %s' % ssh_key
cmd = "%s %s %s %s" % (self.ssh_cmd, ssh_options, ssh_args, server)
# This does not distinguish between a remote server 'password' prompt
# and a local ssh 'passphrase' prompt (for unlocking a private key).
self._spawn(cmd, preexec_fn=self.preexec_fn, dimensions=self.dimensions)
connect_prompts = ["(?i)are you sure you want to continue connecting",
original_prompt,
"(?i)(?:password)|(?:passphrase for key)",
"(?i)permission denied",
"(?i)terminal type",
TIMEOUT]
i = self.expect(connect_prompts + ["(?i)connection closed by remote host", EOF], timeout=login_timeout)
# First phase
if i == 0:
# New certificate -- always accept it.
# This is what you get if SSH does not have the remote host's
# public key stored in the 'known_hosts' cache.
if not save_host_key:
self.close()
raise ExceptionPxsshHostKey('Host key not in known_hosts!')
self.sendline("yes")
i = self.expect(connect_prompts)
if i == 2: # password or passphrase
self.sendline(password)
i = self.expect(connect_prompts)
if i == 4:
self.sendline(terminal_type)
i = self.expect(connect_prompts)
if i == 7:
self.close()
raise ExceptionPxssh('Could not establish connection to host')
# Second phase
if i == 0:
# This is weird. This should not happen twice in a row.
self.close()
raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.')
elif i == 1: # can occur if you have a public key pair set to authenticate.
# # TODO: May NOT be OK if expect() got tricked and matched a false prompt.
pass
elif i == 2: # password prompt again
# For incorrect passwords, some ssh servers will
# ask for the password again, others return 'denied' right away.
# If we get the password prompt again then this means
# we didn't get the password right the first time.
self.close()
raise ExceptionPxssh('password refused')
elif i == 3: # permission denied -- password was bad.
self.close()
raise ExceptionPxssh('permission denied')
elif i == 4: # terminal type again? WTF?
self.close()
raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.')
elif i == 5: # Timeout
# This is tricky... I presume that we are at the command-line prompt.
# It may be that the shell prompt was so weird that we couldn't match
# it. Or it may be that we couldn't log in for some other reason. I
# can't be sure, but it's safe to guess that we did login because if
# I presume wrong and we are not logged in then this should be caught
# later when I try to set the shell prompt.
pass
elif i == 6: # Connection closed by remote host
self.close()
raise ExceptionPxssh('connection closed')
else: # Unexpected
self.close()
raise ExceptionPxssh('unexpected login response')
if not self.sync_original_prompt(sync_multiplier):
self.close()
raise ExceptionPxssh('could not synchronize with original prompt')
# We appear to be in.
# set shell prompt to something unique.
if auto_prompt_reset:
if not self.set_unique_prompt():
self.close()
raise ExceptionPxssh('could not set shell prompt '
'(received: %r, expected: %r).' % (
self.before, self.PROMPT,))
return True
login.__doc__ = orig_pxssh.login.__doc__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment