Last active
August 3, 2016 02:08
-
-
Save mathershifter/377c330442472197391cd1fa109769ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright (c) 2014 Arista Networks, Inc. All rights reserved. | |
# Arista Networks, Inc. Confidential and Proprietary. | |
r""" | |
Simple SSH client library or utility if run directly. | |
----------------------------------------------------- | |
**Examples:** | |
Connect | |
ssh = Ssh() | |
ssh.connect(device, username, password, timeout) | |
Sending | |
ssh.send("enable") | |
ssh.send("configure") | |
ssh.send("management ssh") | |
ssh.send("exit") | |
ssh.send("end") | |
Sending a batch scrpt | |
print ssh.batch("configure\rmanagement ssh\rexit\rend") | |
Bad command in script | |
try: | |
print ssh.batch("configure\rinterface Et2000\rexit") | |
except ValueError as exp: | |
print "Caught an error message:", exp.message | |
One-liner show command | |
print ssh.send("show interfaces status | include connected|disabled") | |
Command with prompts | |
prompts = [ | |
re.compile(r"System configuration has been modified. Save\?", re.I), | |
re.compile(r"Proceed with reload\? \[confirm\]", re.I) | |
] | |
print ssh.send("reload", prompts, ["yes", ""]) | |
""" | |
from __future__ import (absolute_import, division, print_function, | |
unicode_literals) | |
import time | |
import paramiko | |
import re | |
try: | |
from StringIO import StringIO | |
except: | |
from io import StringIO | |
class Ssh(object): | |
"""Specialize SSH class for interacting with Arista switches""" | |
_errors = [ | |
re.compile(r"% ?error", re.I), | |
re.compile(r"% ?bad secret", re.I), | |
re.compile(r"% ?invalid input", re.I), | |
re.compile(r"% ?(?:incomplete|ambiguous) command", re.I), | |
re.compile(r"connection timed out", re.I), | |
re.compile(r"bash: \w+: [\w ]+"), | |
re.compile(r"returned error code:\d+") | |
] | |
_prompts = [ | |
# Match on: | |
# cs-spine-2a......14:08:54# | |
# cs-spine-2a> | |
# cs-spine-2a# | |
# cs-spine-2a(s1)# | |
# cs-spine-2a(s1)(config)# | |
# cs-spine-2b(vrf:management)(config)# | |
# cs-spine-2b(s1)(vrf:management)(config)# | |
re.compile(r"[\r\n]?[\w+\-\.:\/]+(?:\([^\)]+\)){,3}(?:>|#) ?$"), | |
# Match on: | |
# [admin@cs-spine-2a /]$ | |
# [admin@cs-spine-2a local]$ | |
# [admin@cs-spine-2a ~]$ | |
re.compile(r"\[\w+\@[\w\-\.]+(?: [^\]])\] ?[>#\$] ?$"), | |
# user@host:~$ | |
re.compile(r"\w+\@[\w\-\.]+:(?:[^\$])+\$ ?$"), | |
# # | |
re.compile(r"^[>#\$] ?$") | |
] | |
def __init__(self): | |
self._ssh = None | |
self._channel = None | |
self._banner = None | |
def _handle_errors(self, response): | |
"""look for errors""" | |
for regex in self._errors: | |
match = regex.search(response) | |
if match: | |
# capture part of output that contains the error, | |
# but do not raise an exception yet. We need to make | |
# sure to receive all the data from that channel | |
return True | |
return False | |
def _handle_input(self, response, prompt, answer): | |
"""look for interactive prompts and send answer""" | |
if prompt is None or answer is None: | |
return | |
if not hasattr(prompt, "__iter__"): | |
prompt = [prompt] | |
if not hasattr(answer, "__iter__"): | |
answer = [answer] | |
if len(prompt) != len(answer): | |
raise ValueError(("Lists of prompts and answers have different" | |
"lengths")) | |
for _prompt, _answer in zip(prompt, answer): | |
match = _prompt.search(response) | |
if match: | |
self._channel.send(_answer + '\r') | |
def _handle_prompt(self, response): | |
"""look for cli prompt""" | |
last = response.splitlines()[-1] | |
for regex in self._prompts: | |
match = regex.search(last) | |
if match: | |
return True | |
def batch(self, script): | |
"""Send a series of commands to the device""" | |
response = "" | |
for line in script.splitlines(): | |
response += self.send(line.strip()) | |
return response | |
def close(self): | |
"""close the session""" | |
self._ssh.close() | |
def connect(self, host, port=22, username=None, password=None, timeout=30): | |
"""Connect to a host and invoke the shell. Returns nothing """ | |
self._ssh = paramiko.SSHClient() | |
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
self._ssh.connect(host, port=port, username=username, password=password) | |
# we must invoke a shell, otherwise session commands like 'enable', | |
# 'terminal width', etc. won't stick | |
channel = self._ssh.invoke_shell() | |
channel.settimeout(timeout) | |
self._channel = channel | |
self._banner = self.send('\r') | |
def send(self, command, input_prompt=None, input_answer=""): | |
"""Sends a command to the remote device and returns the response""" | |
buff = StringIO() | |
errored_response = "" | |
self._channel.sendall(command + '\r') | |
# wait for channel to be recv_ready (only seems to be a problem in py3) | |
while not self._channel.recv_ready(): | |
#print("waiting for channel to be recv_ready...") | |
time.sleep(.01) | |
while True: | |
try: | |
response = self._channel.recv(9999).decode("utf-8") | |
except socket.timeout: | |
raise Timeout("% Timeout while running: {}".format(command)) | |
buff.write(response) | |
place = buff.tell() - 150 | |
if place < 0: | |
place = 0 | |
buff.seek(place) | |
window = buff.read() | |
if self._handle_errors(window): | |
errored_response = buff.getvalue() | |
# deal with interactive input | |
self._handle_input(window, input_prompt, input_answer) | |
if self._handle_prompt(window): | |
if errored_response: | |
raise ValueError(errored_response) | |
else: | |
return buff.getvalue() | |
def main(): | |
"""Main function called if script is run directly""" | |
import argparse | |
parser = argparse.ArgumentParser(prog="tacbot") | |
arg = parser.add_argument | |
arg("commands", type=str, nargs='+', | |
help="Specifies the command to send to the device") | |
arg("-d", "--device", | |
help="hostname or IP address of remote device.") | |
arg("-u", "--username", default="admin", | |
help=("Specifies the username on the remote device. If none is " | |
"provided the user will be prompted.")) | |
arg("-p", "--password", default="", | |
help=("Specifies the remote password. The user will be prompted if one " | |
"is not provided.")) | |
arg("-t", "--timeout", type=int, default=30, | |
help="Override the default 30 second timeout") | |
arg("-P", "--port", type=int, default=22, | |
help="Specify a port number, default is 22") | |
args = parser.parse_args() | |
ssh = Ssh() | |
ssh.connect(args.device, port=args.port, username=args.username, | |
password=args.password, timeout=args.timeout) | |
ssh.send('\r') | |
print(ssh.batch("\r".join(args.commands))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment