Skip to content

Instantly share code, notes, and snippets.

@mathershifter
Last active August 3, 2016 02:08
Show Gist options
  • Save mathershifter/377c330442472197391cd1fa109769ec to your computer and use it in GitHub Desktop.
Save mathershifter/377c330442472197391cd1fa109769ec to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Arista Networks, Inc. All rights reserved.
# Arista Networks, Inc. Confidential and Proprietary.
r"""
Simple SSH client library or utility if run directly.
-----------------------------------------------------
**Examples:**
Connect
ssh = Ssh()
ssh.connect(device, username, password, timeout)
Sending
ssh.send("enable")
ssh.send("configure")
ssh.send("management ssh")
ssh.send("exit")
ssh.send("end")
Sending a batch scrpt
print ssh.batch("configure\rmanagement ssh\rexit\rend")
Bad command in script
try:
print ssh.batch("configure\rinterface Et2000\rexit")
except ValueError as exp:
print "Caught an error message:", exp.message
One-liner show command
print ssh.send("show interfaces status | include connected|disabled")
Command with prompts
prompts = [
re.compile(r"System configuration has been modified. Save\?", re.I),
re.compile(r"Proceed with reload\? \[confirm\]", re.I)
]
print ssh.send("reload", prompts, ["yes", ""])
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import time
import paramiko
import re
try:
from StringIO import StringIO
except:
from io import StringIO
class Ssh(object):
"""Specialize SSH class for interacting with Arista switches"""
_errors = [
re.compile(r"% ?error", re.I),
re.compile(r"% ?bad secret", re.I),
re.compile(r"% ?invalid input", re.I),
re.compile(r"% ?(?:incomplete|ambiguous) command", re.I),
re.compile(r"connection timed out", re.I),
re.compile(r"bash: \w+: [\w ]+"),
re.compile(r"returned error code:\d+")
]
_prompts = [
# Match on:
# cs-spine-2a......14:08:54#
# cs-spine-2a>
# cs-spine-2a#
# cs-spine-2a(s1)#
# cs-spine-2a(s1)(config)#
# cs-spine-2b(vrf:management)(config)#
# cs-spine-2b(s1)(vrf:management)(config)#
re.compile(r"[\r\n]?[\w+\-\.:\/]+(?:\([^\)]+\)){,3}(?:>|#) ?$"),
# Match on:
# [admin@cs-spine-2a /]$
# [admin@cs-spine-2a local]$
# [admin@cs-spine-2a ~]$
re.compile(r"\[\w+\@[\w\-\.]+(?: [^\]])\] ?[>#\$] ?$"),
# user@host:~$
re.compile(r"\w+\@[\w\-\.]+:(?:[^\$])+\$ ?$"),
# #
re.compile(r"^[>#\$] ?$")
]
def __init__(self):
self._ssh = None
self._channel = None
self._banner = None
def _handle_errors(self, response):
"""look for errors"""
for regex in self._errors:
match = regex.search(response)
if match:
# capture part of output that contains the error,
# but do not raise an exception yet. We need to make
# sure to receive all the data from that channel
return True
return False
def _handle_input(self, response, prompt, answer):
"""look for interactive prompts and send answer"""
if prompt is None or answer is None:
return
if not hasattr(prompt, "__iter__"):
prompt = [prompt]
if not hasattr(answer, "__iter__"):
answer = [answer]
if len(prompt) != len(answer):
raise ValueError(("Lists of prompts and answers have different"
"lengths"))
for _prompt, _answer in zip(prompt, answer):
match = _prompt.search(response)
if match:
self._channel.send(_answer + '\r')
def _handle_prompt(self, response):
"""look for cli prompt"""
last = response.splitlines()[-1]
for regex in self._prompts:
match = regex.search(last)
if match:
return True
def batch(self, script):
"""Send a series of commands to the device"""
response = ""
for line in script.splitlines():
response += self.send(line.strip())
return response
def close(self):
"""close the session"""
self._ssh.close()
def connect(self, host, port=22, username=None, password=None, timeout=30):
"""Connect to a host and invoke the shell. Returns nothing """
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect(host, port=port, username=username, password=password)
# we must invoke a shell, otherwise session commands like 'enable',
# 'terminal width', etc. won't stick
channel = self._ssh.invoke_shell()
channel.settimeout(timeout)
self._channel = channel
self._banner = self.send('\r')
def send(self, command, input_prompt=None, input_answer=""):
"""Sends a command to the remote device and returns the response"""
buff = StringIO()
errored_response = ""
self._channel.sendall(command + '\r')
# wait for channel to be recv_ready (only seems to be a problem in py3)
while not self._channel.recv_ready():
#print("waiting for channel to be recv_ready...")
time.sleep(.01)
while True:
try:
response = self._channel.recv(9999).decode("utf-8")
except socket.timeout:
raise Timeout("% Timeout while running: {}".format(command))
buff.write(response)
place = buff.tell() - 150
if place < 0:
place = 0
buff.seek(place)
window = buff.read()
if self._handle_errors(window):
errored_response = buff.getvalue()
# deal with interactive input
self._handle_input(window, input_prompt, input_answer)
if self._handle_prompt(window):
if errored_response:
raise ValueError(errored_response)
else:
return buff.getvalue()
def main():
"""Main function called if script is run directly"""
import argparse
parser = argparse.ArgumentParser(prog="tacbot")
arg = parser.add_argument
arg("commands", type=str, nargs='+',
help="Specifies the command to send to the device")
arg("-d", "--device",
help="hostname or IP address of remote device.")
arg("-u", "--username", default="admin",
help=("Specifies the username on the remote device. If none is "
"provided the user will be prompted."))
arg("-p", "--password", default="",
help=("Specifies the remote password. The user will be prompted if one "
"is not provided."))
arg("-t", "--timeout", type=int, default=30,
help="Override the default 30 second timeout")
arg("-P", "--port", type=int, default=22,
help="Specify a port number, default is 22")
args = parser.parse_args()
ssh = Ssh()
ssh.connect(args.device, port=args.port, username=args.username,
password=args.password, timeout=args.timeout)
ssh.send('\r')
print(ssh.batch("\r".join(args.commands)))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment