Last active
April 19, 2016 04:33
-
-
Save efrenfuentes/eeb5993e16750ddffb9b15f786f2544b to your computer and use it in GitHub Desktop.
Device
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
from snimpy.manager import Manager as M | |
from snimpy.manager import load | |
import paramiko | |
import MySQLdb | |
import socket | |
USER = 'zenossmon' | |
KEY_FILE = '/home/efren/.ssh/id_rsa.pub' | |
class Device: | |
def __init__(self, id, host, ip, snmp_group, | |
alive=False, snmp=False, ssh=False, ssh_error="", mysql=False, | |
mysql_user='', mysql_password='', uname="", scanned=False): | |
self.id = id | |
self.host = host | |
self.ip = ip | |
if snmp_group != "": | |
self.snmp_group = snmp_group | |
else: | |
self.snmp_group = "public" | |
self.alive = alive == 1 | |
self.snmp = snmp == 1 | |
self.ssh = ssh == 1 | |
self.ssh_error = ssh_error | |
self.mysql = mysql == 1 | |
self.mysql_user = mysql_user | |
self.mysql_password = mysql_password | |
self.uname = uname | |
self.scanned = scanned | |
def host_or_ip(self): | |
if self.ip != "" and self.ip is not None: | |
return self.ip | |
else: | |
return self.host | |
def scan(self): | |
self.is_alive() | |
self.has_snmp() | |
self.has_ssh() | |
if self.mysql_user != "" and self.mysql_user is not None: | |
self.has_mysql() | |
self.scanned = True | |
def is_alive(self): | |
alive = False | |
try: | |
output = subprocess.check_output( | |
"ping -c 1 -w 20 {}".format(self.host_or_ip()), shell=True) | |
alive = True | |
except subprocess.CalledProcessError: | |
alive = False | |
self.alive = alive | |
def has_ssh(self): | |
result = False | |
self.ssh_error = "" | |
ssh = paramiko.SSHClient() | |
ssh.load_host_keys(KEY_FILE) | |
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
try: | |
ssh.connect(self.host_or_ip(), username=USER, timeout=3) | |
stdin, stdout, stderr = ssh.exec_command('uname -a') | |
self.uname = stdout.read() | |
self.ssh = True | |
result = True | |
except paramiko.SSHException, e: | |
self.ssh_error = "{'paramiko.SSHException': 'Password is invalid'}" | |
except paramiko.AuthenticationException: | |
self.ssh_error = "{'paramiko.AuthenticationException': 'Authentication failed for some reason'}" | |
except socket.error, e: | |
self.ssh_error = "{'socket.error': 'Socket connection failed'}" | |
finally: | |
self.ssh = result | |
def has_snmp(self): | |
load("SNMPv2-MIB") | |
try: | |
m = M(host=self.host_or_ip(), community=self.snmp_group, | |
version=2, timeout=2) | |
if m.sysName is not None: | |
snmp = True | |
except: | |
snmp = False | |
self.snmp = snmp | |
def has_mysql(self): | |
try: | |
db = MySQLdb.connect(self.host_or_ip(), self.mysql_user, | |
self.mysql_password) | |
cursor = db.cursor() | |
cursor.execute("SELECT VERSION()") | |
results = cursor.fetchone() | |
# Check if anything at all is returned | |
if results: | |
self.mysql = True | |
else: | |
self.mysql = False | |
except MySQLdb.Error, e: | |
self.mysql = False | |
def to_dict(self): | |
device_dict = { | |
"id": self.id, | |
"host": self.host, | |
"ip": self.ip, | |
"snmp_group": self.snmp_group, | |
"alive": self.alive, | |
"snmp": self.snmp, | |
"ssh": self.ssh, | |
"mysql_user": self.mysql_user, | |
"mysql_password": self.mysql_password, | |
"uname": self.uname, | |
"scanned": self.scanned | |
} | |
return device_dict | |
def __repr__(self): | |
return "{0} ({1})".format(self.host, self.ip) | |
def __str__(self): | |
return str(self.to_dict()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment