Skip to content

Instantly share code, notes, and snippets.

@efrenfuentes
Last active April 19, 2016 04:33
Show Gist options
  • Save efrenfuentes/eeb5993e16750ddffb9b15f786f2544b to your computer and use it in GitHub Desktop.
Save efrenfuentes/eeb5993e16750ddffb9b15f786f2544b to your computer and use it in GitHub Desktop.
Device
import subprocess
from snimpy.manager import Manager as M
from snimpy.manager import load
import paramiko
import MySQLdb
import socket
USER = 'zenossmon'
KEY_FILE = '/home/efren/.ssh/id_rsa.pub'
class Device:
def __init__(self, id, host, ip, snmp_group,
alive=False, snmp=False, ssh=False, ssh_error="", mysql=False,
mysql_user='', mysql_password='', uname="", scanned=False):
self.id = id
self.host = host
self.ip = ip
if snmp_group != "":
self.snmp_group = snmp_group
else:
self.snmp_group = "public"
self.alive = alive == 1
self.snmp = snmp == 1
self.ssh = ssh == 1
self.ssh_error = ssh_error
self.mysql = mysql == 1
self.mysql_user = mysql_user
self.mysql_password = mysql_password
self.uname = uname
self.scanned = scanned
def host_or_ip(self):
if self.ip != "" and self.ip is not None:
return self.ip
else:
return self.host
def scan(self):
self.is_alive()
self.has_snmp()
self.has_ssh()
if self.mysql_user != "" and self.mysql_user is not None:
self.has_mysql()
self.scanned = True
def is_alive(self):
alive = False
try:
output = subprocess.check_output(
"ping -c 1 -w 20 {}".format(self.host_or_ip()), shell=True)
alive = True
except subprocess.CalledProcessError:
alive = False
self.alive = alive
def has_ssh(self):
result = False
self.ssh_error = ""
ssh = paramiko.SSHClient()
ssh.load_host_keys(KEY_FILE)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.host_or_ip(), username=USER, timeout=3)
stdin, stdout, stderr = ssh.exec_command('uname -a')
self.uname = stdout.read()
self.ssh = True
result = True
except paramiko.SSHException, e:
self.ssh_error = "{'paramiko.SSHException': 'Password is invalid'}"
except paramiko.AuthenticationException:
self.ssh_error = "{'paramiko.AuthenticationException': 'Authentication failed for some reason'}"
except socket.error, e:
self.ssh_error = "{'socket.error': 'Socket connection failed'}"
finally:
self.ssh = result
def has_snmp(self):
load("SNMPv2-MIB")
try:
m = M(host=self.host_or_ip(), community=self.snmp_group,
version=2, timeout=2)
if m.sysName is not None:
snmp = True
except:
snmp = False
self.snmp = snmp
def has_mysql(self):
try:
db = MySQLdb.connect(self.host_or_ip(), self.mysql_user,
self.mysql_password)
cursor = db.cursor()
cursor.execute("SELECT VERSION()")
results = cursor.fetchone()
# Check if anything at all is returned
if results:
self.mysql = True
else:
self.mysql = False
except MySQLdb.Error, e:
self.mysql = False
def to_dict(self):
device_dict = {
"id": self.id,
"host": self.host,
"ip": self.ip,
"snmp_group": self.snmp_group,
"alive": self.alive,
"snmp": self.snmp,
"ssh": self.ssh,
"mysql_user": self.mysql_user,
"mysql_password": self.mysql_password,
"uname": self.uname,
"scanned": self.scanned
}
return device_dict
def __repr__(self):
return "{0} ({1})".format(self.host, self.ip)
def __str__(self):
return str(self.to_dict())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment