Last active
August 29, 2015 14:06
-
-
Save mathershifter/bad7fa37fd41ba1be1b2 to your computer and use it in GitHub Desktop.
A simple SSH module with predefined prompts for Arists EOS CLI and bash modes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright (c) 2014 Arista Networks, Inc. All rights reserved. | |
# Arista Networks, Inc. Confidential and Proprietary. | |
r""" | |
Simple SSH client library or utility if run directly. | |
----------------------------------------------------- | |
**Examples:** | |
ssh = Ssh() | |
ssh.connect(device, username, password, timeout) | |
ssh.authorize(authorize_password) | |
# one at a time | |
ssh.send("configure") | |
ssh.send("management ssh") | |
ssh.send("exit") | |
ssh.send("end") | |
# batch script | |
print ssh.batch("configure\rmanagement ssh\rexit\rend") | |
# bad command in script | |
try: | |
print ssh.batch("configure\rinterface Et2000\rexit") | |
except ValueError as exp: | |
print "Caught an error message:", exp.message | |
# one-liner show command | |
print ssh.send("show interfaces status | include connected|disabled") | |
command with prompts | |
reload_prompts = [ | |
re.compile(r"System configuration has been modified. Save\?", re.I), | |
re.compile(r"Proceed with reload\? \[confirm\]", re.I) | |
] | |
print ssh.send("reload", reload_prompts, ["yes", ""]) | |
""" | |
import paramiko | |
import re | |
from StringIO import StringIO | |
class Ssh(object): | |
"""Specialize SSH class for interacting with Arista switches""" | |
_errors = [ | |
re.compile(r"% ?error", re.I), | |
re.compile(r"% ?bad secret", re.I), | |
re.compile(r"% ?invalid input", re.I), | |
re.compile(r"% ?(?:incomplete|ambiguous) command", re.I), | |
re.compile(r"connection timed out", re.I), | |
re.compile(r"bash: \w+: [\w ]+"), | |
re.compile(r"returned error code:\d+") | |
] | |
_prompts = [ | |
# Match on: | |
# cs-spine-2a......14:08:54# | |
# cs-spine-2a> | |
# cs-spine-2a# | |
# cs-spine-2a(s1)# | |
# cs-spine-2a(s1)(config)# | |
# cs-spine-2b(vrf:management)(config)# | |
# cs-spine-2b(s1)(vrf:management)(config)# | |
re.compile(r"[\r\n]?[\w+\-\.:\/]+(?:\([^\)]+\)){,3}(?:>|#) ?$"), | |
# Match on: | |
# [admin@cs-spine-2a /]$ | |
# [admin@cs-spine-2a local]$ | |
# [admin@cs-spine-2a ~]$ | |
# -bash-4.1# | |
re.compile(r"\[\w+\@[\w\-\.]+(?: [^\]])\] ?[>#\$] ?$") | |
] | |
def __init__(self): | |
self._ssh = None | |
self._channel = None | |
def _handle_errors(self, response): | |
"""look for errors""" | |
for regex in self._errors: | |
match = regex.search(response) | |
if match: | |
# capture part of output that contains the error, | |
# but do not raise an exception yet. We need to make | |
# sure to receive all the data from that channel | |
return True | |
return False | |
def _handle_input(self, response, prompt, answer): | |
"""look for interactive prompts and send answer""" | |
if prompt is None or answer is None: | |
return | |
if not hasattr(prompt, "__iter__"): | |
prompt = [prompt] | |
if not hasattr(answer, "__iter__"): | |
answer = [answer] | |
if len(prompt) != len(answer): | |
raise ValueError(("Lists of prompts and answers have different" | |
"lengths")) | |
for _prompt, _answer in zip(prompt, answer): | |
match = _prompt.search(response) | |
if match: | |
self._channel.send(_answer + '\r') | |
def _handle_prompt(self, response): | |
"""look for cli prompt""" | |
for regex in self._prompts: | |
match = regex.search(response) | |
if match: | |
return True | |
def authorize(self, secret=None): | |
"""Authorize the session""" | |
self.send("enable", re.compile(r"password: ?$", re.I), secret) | |
def batch(self, script): | |
"""Send a series of commands to the device""" | |
response = "" | |
for line in script.splitlines(): | |
response += self.send(line.strip()) | |
return response | |
def close(self): | |
"""close the session""" | |
self._ssh.close() | |
def connect(self, host, port=22, username=None, password=None, timeout=30): | |
"""Connect to a host and invoke the shell. Returns nothing """ | |
self._ssh = paramiko.SSHClient() | |
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
self._ssh.connect(host, port=port, username=username, password=password) | |
# we must invoke a shell, otherwise session commands like 'enable', | |
# 'terminal width', etc. won't stick | |
channel = self._ssh.invoke_shell() | |
channel.settimeout(timeout) | |
self._channel = channel | |
def send(self, command, input_prompt=None, input_answer=""): | |
"""Sends a command to the remote device and returns the response""" | |
buff = StringIO() | |
errored_response = "" | |
self._channel.sendall(str(command) + '\r') | |
while True: | |
try: | |
response = self._channel.recv(200) | |
except socket.timeout: | |
raise Timeout("% Timeout while running: {}".format(command)) | |
buff.write(response) | |
buff.seek(buff.tell() - 150) | |
window = buff.read() | |
if self._handle_errors(window): | |
errored_response = buff.getvalue() | |
# deal with interactive input | |
self._handle_input(window, input_prompt, input_answer) | |
if self._handle_prompt(window): | |
if errored_response: | |
raise ValueError(errored_response) | |
else: | |
return buff.getvalue() | |
def main(): | |
"""Main function called if script is run directly""" | |
import argparse | |
parser = argparse.ArgumentParser(prog="tacbot") | |
arg = parser.add_argument | |
arg("-d", "--device", | |
help="hostname or IP address of remote device.") | |
arg("-u", "--username", default="admin", | |
help=("Specifies the username on the remote device. If none is " | |
"provided the user will be prompted.")) | |
arg("-p", "--password", default="", | |
help=("Specifies the remote password. The user will be prompted if one " | |
"is not provided.")) | |
arg("-a", "--authorize-password", default=None, | |
help=("Use if a password for elevated prvilges is required")) | |
arg("-t", "--timeout", type=int, default=30, | |
help="Override the default 30 second timeout") | |
arg("-c", "--command", action="append", | |
help="Specifies the command to send to the device") | |
args = parser.parse_args() | |
ssh = Ssh() | |
ssh.connect(args.device, username=args.username, password=args.password, | |
timeout=args.timeout) | |
ssh.authorize(args.authorize_password) | |
# same as: | |
# ssh.send("enable", re.compile(r"password: ?$", re.I), | |
# args.authorize_password) | |
ssh.send("terminal length 0") | |
print ssh.batch("\r".join(args.command)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment