Skip to content

Instantly share code, notes, and snippets.

@mathershifter
Last active August 29, 2015 14:06
Show Gist options
  • Save mathershifter/bad7fa37fd41ba1be1b2 to your computer and use it in GitHub Desktop.
Save mathershifter/bad7fa37fd41ba1be1b2 to your computer and use it in GitHub Desktop.
A simple SSH module with predefined prompts for Arists EOS CLI and bash modes
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Arista Networks, Inc. All rights reserved.
# Arista Networks, Inc. Confidential and Proprietary.
r"""
Simple SSH client library or utility if run directly.
-----------------------------------------------------
**Examples:**
ssh = Ssh()
ssh.connect(device, username, password, timeout)
ssh.authorize(authorize_password)
# one at a time
ssh.send("configure")
ssh.send("management ssh")
ssh.send("exit")
ssh.send("end")
# batch script
print ssh.batch("configure\rmanagement ssh\rexit\rend")
# bad command in script
try:
print ssh.batch("configure\rinterface Et2000\rexit")
except ValueError as exp:
print "Caught an error message:", exp.message
# one-liner show command
print ssh.send("show interfaces status | include connected|disabled")
command with prompts
reload_prompts = [
re.compile(r"System configuration has been modified. Save\?", re.I),
re.compile(r"Proceed with reload\? \[confirm\]", re.I)
]
print ssh.send("reload", reload_prompts, ["yes", ""])
"""
import paramiko
import re
from StringIO import StringIO
class Ssh(object):
"""Specialize SSH class for interacting with Arista switches"""
_errors = [
re.compile(r"% ?error", re.I),
re.compile(r"% ?bad secret", re.I),
re.compile(r"% ?invalid input", re.I),
re.compile(r"% ?(?:incomplete|ambiguous) command", re.I),
re.compile(r"connection timed out", re.I),
re.compile(r"bash: \w+: [\w ]+"),
re.compile(r"returned error code:\d+")
]
_prompts = [
# Match on:
# cs-spine-2a......14:08:54#
# cs-spine-2a>
# cs-spine-2a#
# cs-spine-2a(s1)#
# cs-spine-2a(s1)(config)#
# cs-spine-2b(vrf:management)(config)#
# cs-spine-2b(s1)(vrf:management)(config)#
re.compile(r"[\r\n]?[\w+\-\.:\/]+(?:\([^\)]+\)){,3}(?:>|#) ?$"),
# Match on:
# [admin@cs-spine-2a /]$
# [admin@cs-spine-2a local]$
# [admin@cs-spine-2a ~]$
# -bash-4.1#
re.compile(r"\[\w+\@[\w\-\.]+(?: [^\]])\] ?[>#\$] ?$")
]
def __init__(self):
self._ssh = None
self._channel = None
def _handle_errors(self, response):
"""look for errors"""
for regex in self._errors:
match = regex.search(response)
if match:
# capture part of output that contains the error,
# but do not raise an exception yet. We need to make
# sure to receive all the data from that channel
return True
return False
def _handle_input(self, response, prompt, answer):
"""look for interactive prompts and send answer"""
if prompt is None or answer is None:
return
if not hasattr(prompt, "__iter__"):
prompt = [prompt]
if not hasattr(answer, "__iter__"):
answer = [answer]
if len(prompt) != len(answer):
raise ValueError(("Lists of prompts and answers have different"
"lengths"))
for _prompt, _answer in zip(prompt, answer):
match = _prompt.search(response)
if match:
self._channel.send(_answer + '\r')
def _handle_prompt(self, response):
"""look for cli prompt"""
for regex in self._prompts:
match = regex.search(response)
if match:
return True
def authorize(self, secret=None):
"""Authorize the session"""
self.send("enable", re.compile(r"password: ?$", re.I), secret)
def batch(self, script):
"""Send a series of commands to the device"""
response = ""
for line in script.splitlines():
response += self.send(line.strip())
return response
def close(self):
"""close the session"""
self._ssh.close()
def connect(self, host, port=22, username=None, password=None, timeout=30):
"""Connect to a host and invoke the shell. Returns nothing """
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(host, port=port, username=username, password=password)
# we must invoke a shell, otherwise session commands like 'enable',
# 'terminal width', etc. won't stick
channel = self._ssh.invoke_shell()
channel.settimeout(timeout)
self._channel = channel
def send(self, command, input_prompt=None, input_answer=""):
"""Sends a command to the remote device and returns the response"""
buff = StringIO()
errored_response = ""
self._channel.sendall(str(command) + '\r')
while True:
try:
response = self._channel.recv(200)
except socket.timeout:
raise Timeout("% Timeout while running: {}".format(command))
buff.write(response)
buff.seek(buff.tell() - 150)
window = buff.read()
if self._handle_errors(window):
errored_response = buff.getvalue()
# deal with interactive input
self._handle_input(window, input_prompt, input_answer)
if self._handle_prompt(window):
if errored_response:
raise ValueError(errored_response)
else:
return buff.getvalue()
def main():
"""Main function called if script is run directly"""
import argparse
parser = argparse.ArgumentParser(prog="tacbot")
arg = parser.add_argument
arg("-d", "--device",
help="hostname or IP address of remote device.")
arg("-u", "--username", default="admin",
help=("Specifies the username on the remote device. If none is "
"provided the user will be prompted."))
arg("-p", "--password", default="",
help=("Specifies the remote password. The user will be prompted if one "
"is not provided."))
arg("-a", "--authorize-password", default=None,
help=("Use if a password for elevated prvilges is required"))
arg("-t", "--timeout", type=int, default=30,
help="Override the default 30 second timeout")
arg("-c", "--command", action="append",
help="Specifies the command to send to the device")
args = parser.parse_args()
ssh = Ssh()
ssh.connect(args.device, username=args.username, password=args.password,
timeout=args.timeout)
ssh.authorize(args.authorize_password)
# same as:
# ssh.send("enable", re.compile(r"password: ?$", re.I),
# args.authorize_password)
ssh.send("terminal length 0")
print ssh.batch("\r".join(args.command))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment