Skip to content

Instantly share code, notes, and snippets.

@TheFlyingCorpse
Created July 16, 2020 15:43
Show Gist options
  • Save TheFlyingCorpse/d9ce43139ddb200b312028278d01fda6 to your computer and use it in GitHub Desktop.
Save TheFlyingCorpse/d9ce43139ddb200b312028278d01fda6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
description = "Fetches the QuickStatus data from the HPE MSL Tape Library to report on the real health status"
__version__ = "0.1.0"
__version_date__ = "2020-06-06"
__author__ = "Rune Darrud <[email protected]>"
__description__ = "Check HPE MSL QuickStatus Plugin"
__license__ = "MIT"
# Options
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import sys
import traceback
import json
import time
from datetime import timedelta
from seleniumwire import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
import requests
activity_status = {
'0':'Idle',
'1':'Moving',
'2':'Scanning',
'3':'Test Running',
'4':'Initializing',
'5':'Configuring',
'6':'Upgrading',
'7':'Drive Upgrading',
'8':'Magazine removed',
'9':'Magazine unlocked',
'10':'Drive cleaning in progress',
'11':'System starting',
'12':'System shutting down',
'13':'Unlocking',
'14':'Startup halted',
'15':'Calibrating',
'16':'Configuring Drives',
'20':'Stopped',
'21':'Unit lock unlocked',
'22':'Bottom cover removed',
'23':'Top cover removed',
'24':'Connection lost to module'
}
drive_load_status = {
'0':'Empty',
'1':'Media present',
'2':'Seated',
'3':'Threaded',
'4':'Mounted',
}
def scrapeEvents(args,entry):
"""
"""
if args.verbose:
print('Get the events')
# URLLIB3 emits a warning about the SSL security, disable it if we already ignore SSL verification.
if not args.verify_ssl:
requests.packages.urllib3.disable_warnings()
# Read the token from the body
token = str(entry.body.decode()).split('token=')[1]
if args.verbose:
print('Cookie obtained (expecting just a PHPSESSID_SECURE=..): ' + str(token))
# Create a new requests session so we can query for data we're not able to read using headless Chrome navigated in a normal manner
s = requests.Session()
s.headers.update({'Accept': 'application/json, text/javascript, */*; q=0.01'})
s.headers.update({'Accept-Language':'en-US'})
s.headers.update({'X-Requested-With': 'XMLHttpRequest'})
s.headers.update({'Sec-Fetch-Mode':'cors'})
s.headers.update({'Sec-Fetch-Site':'same-origin'})
s.headers.update({'Sec-Fetch-Dest':'empty'})
s.headers.update({'Cache-Control':'max-age=0'})
s.headers.update({'referer': 'https://' + args.host + '/index.php'})
s.headers.update({'origin': 'https://' + args.host + ''})
s.headers.update({'Cookie':entry.headers['Cookie']}) # Expecting a 'PHPSESSID_SECURE=...' cookie
# Create a POST request to get the EVENTSINFO with TICKET, not INFOCONFIG as is what we get as the user (normally).
# TICKETS are the open / unresolved issues that might not be directly tied to hardware / software issues
r = s.post('https://' + args.host + '/inc/CUCEvents.php', verify=args.verify_ssl,
data={
'uc':'GET_EVENTSINFO',
'data[CASE]':'TICKET',
'data[LIMIT]':'10',
'data[INCLUDE_CLOSED]':'FALSE',
'token':token
}
)
events = r.json()['data']
return events
def scrapeData(args,driver):
"""
"""
# Get the data from the URL
if args.verbose:
print("VERBOSE: Fetching the URL: " + url)
driver.get(url)
# Wait for the request to finish, ie get the response body
if args.verbose:
print("VERBOSE: Waiting for '/inc/CUCLogin.php' to show as a completed request")
null = driver.wait_for_request('/inc/CUCLogin.php',timeout=args.timeout_login)
# Find the dropdown of usernames, select the username used as the argument
if args.verbose:
print("VERBOSE: Selecting the username '" + str(args.username) + "' from the dropdown")
driver.find_element_by_xpath("//select[@name='slctAccount']/option[text()='" + args.username + "']").click()
# Fill in the password
if args.verbose:
print("VERBOSE: Typing in the password")
driver.find_element_by_id("logPwd").send_keys(args.password)
# "Click" login
if args.verbose:
print("VERBOSE: 'Clicking' the login button")
driver.find_element_by_id("BTNLOGIN").click()
# Wait for the QuickStatus to be fetched in the background, this is what we're really here for
if args.verbose:
print("VERBOSE: Waiting for '/inc/CUCQuickStatus.php' to show as a completed request")
null = driver.wait_for_request('/inc/CUCQuickStatus.php',timeout=args.timeout_quickstatus)
# Iterate over the returned requests - QuickStatus
if args.verbose:
print("VERBOSE: Iterating over requests")
for entry in driver.requests:
if entry.path == url + '/inc/CUCQuickStatus.php':
# Decode the body before deserializing the JSON to Python.
if args.verbose:
print("VERBOSE: Decode and deserialize the JSON response")
r = json.loads(entry.response.body.decode())
# We just need the first response
break
# Bad authorization handling allows us to just query for what we're not shown in the webinterface.
events = scrapeEvents(args,entry)
# Store the result here
scrapeResult = {}
# Iterate over every key inside of 'data' in the response
if args.verbose:
print("VERBOSE: Transform the result into a usable dictionary")
for status in r['data']:
# Ignore this key, I haven't found any documentation for it
if status == 'QUICKSTATUS_ERRORS_ADVANCED':
continue
# "zip" two arrays together as they are expected to be of equal lenght, one key, one val, into a dictionary
# If multiple entries, we skip the first as thats our "headers" and just add a new entry for every data row
scrapeResult[status] = list()
if len(r['data'][status]) > 2:
for i in range(len(r['data'][status])):
if i == 0:
continue
scrapeResult[status].append(dict(zip(r['data'][status][0],r['data'][status][i])))
# Continue to the next, dont add it again by letting it continue further
continue
# "zip" two arrays together as they are expected to be of equal lenght, one key, one val, into a dictionary
scrapeResult[status].append(dict(zip(r['data'][status][0],r['data'][status][1])))
for i in range(len(events['TICKET_LOG'])):
if 'TICKET_LOG' not in scrapeResult:
scrapeResult['TICKET_LOG'] = list()
if i == 0:
continue
# "zip" two arrays together as they are expected to be of equal lenght, one key, one val, into a dictionary
# If multiple entries, we skip the first as thats our "headers" and just add a new entry for every data row
scrapeResult['TICKET_LOG'].append(dict(zip(events['TICKET_LOG'][0],events['TICKET_LOG'][i])))
# Add in the logStatus
# scrapeResult['logStatus'] = logStatus
# Return the result
return scrapeResult
def parse_command_line():
"""parse command line arguments
Also add current version and version date to description
"""
# define command line options
parser = ArgumentParser(
description=description + "\nVersion: " + __version__ + " (" + __version_date__ + ")",
formatter_class=RawDescriptionHelpFormatter, add_help=False)
group = parser.add_argument_group(title="mandatory arguments")
group.add_argument("-H", "--host",
help="define the host to request. To change the port just add ':portnumber' to this parameter.")
group = parser.add_argument_group(title="authentication arguments")
group.add_argument("-u", "--username", help="the login user name", default="user")
group.add_argument("-p", "--password", help="the login password")
group = parser.add_argument_group(title="optional arguments")
group.add_argument("-h", "--help", action='store_true',
help="show this help message and exit")
group.add_argument("-w", "--warning", default="",
help="set warning value")
group.add_argument("-c", "--critical", default="",
help="set critical value")
group.add_argument("--timeout-login", type=int, default=10,
help="timeout wait for login to appear, default 10 (seconds)")
group.add_argument("--timeout-quickstatus", type=int, default=10,
help="timeout wait for quickstatus to appear, default 10 (seconds)")
group.add_argument("-v", "--verbose", action='store_true',
help="this will add verbose output")
group.add_argument("-d", "--debug", action='store_true',
help="this will log all webdriver events to output")
group.add_argument("-i", "--ignoreCertificateErrors", action='store_true',
help="ignoreCertificateErrors")
result = parser.parse_args()
if result.ignoreCertificateErrors:
result.verify_ssl = False
else:
result.verify_ssl = True
if result.help:
parser.print_help()
print("")
exit(0)
# if result.requested_query is None:
# parser.error("You need to specify at least one query command.")
# need to check this our self otherwise it's not
# possible to put the help command into a arguments group
if result.host is None:
parser.error("no remote host defined")
return result
if __name__ == "__main__":
# Get the arguments from the command line
args = parse_command_line()
# If verbosity is set, increase verbosity of logging
if args.debug:
logging.basicConfig(level="DEBUG", format='%(asctime)s - %(levelname)s: %(message)s')
try:
# Create a usable URL
url = 'https://' + args.host + ""
# Initialize the Options from Chrome
options = Options()
# Headless chrome is a requirement
options.add_argument('--headless')
# Initialize the webdriver, verify_ssl must be set straight to seleniumwire if used
driver = webdriver.Chrome('/usr/bin/chromedriver',options=options,seleniumwire_options={'verify_ssl':args.verify_ssl})
# Scrape the data
result = scrapeData(args,driver)
output = list()
perfdata = list()
outputSummary = str()
returncode = 0
# Iterate over the event log
eventRead = False
for i in range(len(result['TICKET_LOG'])):
ticket = result['TICKET_LOG'][i]
if ticket['STATUS'] not in ['TICKET_RESOLVED','CLOSED']:
if ticket['SEVERITY'] == 'WARNING':
returncode = 1
else:
returncode = 3
eventRead = True
output.append('Event severity \'' + ticket['SEVERITY'] + '\' for \'' + ticket['COMPONENTTYPE'] + '\' at \'' + ticket['TIMESTAMP'] + '\' - ' +str(ticket['ERRORTEXT']))
outputSummary = 'Event severity \'' + ticket['SEVERITY'] + '\' for \'' + ticket['COMPONENTTYPE'] + '\' at \'' + ticket['TIMESTAMP'] + '\' - ' + str(ticket['ERRORTEXT'])
# If there was no uncleared events to print, print out the newest one that was read.
if not eventRead:
output.append('Latest event was at \'' + result['TICKET_LOG'][0]['TIMESTAMP'] + '\' for \'' + result['TICKET_LOG'][0]['COMPONENTTYPE'] + '\'')
# The stack
if int(result['QUICKSTATUS_STACK'][0]['ERRORS']) > 0 and returncode < 2:
returncode = 2
outputSummary = 'Number of errors in the library \'' + str(result['QUICKSTATUS_STACK'][0]['ERRORS']) + '\''
elif int(result['QUICKSTATUS_STACK'][0]['WARNINGS']) > 0 and returncode < 1:
returncode = 1
outputSummary = 'Number of warnings in the library \'' + str(result['QUICKSTATUS_STACK'][0]['WARNINGS']) + '\''
elif returncode == 0:
outputSummary = 'The stack is currently \'' + str(activity_status[result['QUICKSTATUS_STACK'][0]['STACK_ACTIVITY_STATUS']]) + '\' and the library has been running for a total of \'' + '{}'.format(str(timedelta(seconds=int(result['QUICKSTATUS_SYSTEM'][0]['STACK_POWERON_TIME_RT'])))) + '\''
# Stack Activity Status
# Is the status currently in an unknown state?
if int(result['QUICKSTATUS_STACK'][0]['STACK_ACTIVITY_STATUS']) in [4,5,6,7,8,14,15,16,20,22,23,24] and returncode < 3:
returncode = 3
# Is the status currently in a critical state?
elif int(result['QUICKSTATUS_STACK'][0]['STACK_ACTIVITY_STATUS']) in [11,12] and returncode < 2:
returncode = 2
# Is the status currently in a warning state?
elif int(result['QUICKSTATUS_STACK'][0]['STACK_ACTIVITY_STATUS']) in [10] and returncode < 1:
returncode = 1
output.append('Stack Activity Status: \'' + str(activity_status[result['QUICKSTATUS_STACK'][0]['STACK_ACTIVITY_STATUS']]) + '\'')
perfdata.append('stack_activity_status=' + str(result['QUICKSTATUS_STACK'][0]['STACK_ACTIVITY_STATUS']))
# The module status
n = 0
module_to_phys = {}
for i in range(len(result['QUICKSTATUS_UNIT'])):
n = n + 1
module_to_phys[result['QUICKSTATUS_UNIT'][i]['UNIT_PHY_NUM']] = n
if result['QUICKSTATUS_UNIT'][i]['UNIT_READY'] != 'TRUE' and returncode < 3:
returncode = 3
outputSummary = 'Module ' + str(n) + ' is reporting it is currently in a UNIT_READY state of \'' + str(result['QUICKSTATUS_UNIT'][i]['UNIT_READY']) + '\''
elif int(result['QUICKSTATUS_UNIT'][i]['ERRORS']) > 0 and returncode < 2:
returncode = 2
outputSummary = 'Number of errors in Module ' + str(n) + ' \'' + str(result['QUICKSTATUS_UNIT'][i]['ERRORS']) + '\''
elif int(result['QUICKSTATUS_UNIT'][i]['WARNINGS']) > 0 and returncode < 1:
returncode = 1
outputSummary = 'Number of warnings in Module ' + str(n) + ' \'' + str(result['QUICKSTATUS_UNIT'][i]['WARNINGS']) + '\''
if int(result['QUICKSTATUS_UNIT'][i]['UNIT_POWER_STATUS']) != 0 and returncode < 2:
returncode = 2
outputSummary = 'Module ' + str(n) + ' reports its power status is not OK'
if int(result['QUICKSTATUS_UNIT'][i]['WARNINGS_CART']) > 0 and returncode < 1:
returncode = 1
outputSummary = 'Module ' + str(n) + ' reports there are \'' + int(result['QUICKSTATUS_UNIT']['WARNINGS_CART']) + '\' cartridges with warnings'
perfdata.append('module_' + str(n) + '_warnings_cart=' + str(result['QUICKSTATUS_UNIT'][i]['WARNINGS_CART']))
perfdata.append('module_' + str(n) + '_cartridges=' + str(result['QUICKSTATUS_UNIT'][i]['CARTRIDGES']) + ';;;0;' + str(result['QUICKSTATUS_UNIT'][i]['SLOTS']))
perfdata.append('module_' + str(n) + '_cartridges_in_drives=' + str(result['QUICKSTATUS_UNIT'][i]['CARTRIDGES_IN_DRIVES']))
perfdata.append('module_' + str(n) + '_errors=' + str(result['QUICKSTATUS_UNIT'][i]['ERRORS']))
perfdata.append('module_' + str(n) + '_warnings=' + str(result['QUICKSTATUS_UNIT'][i]['WARNINGS']))
output.append('Module ' + str(n) + ' Ready: \'' + str(result['QUICKSTATUS_UNIT'][i]['UNIT_READY']) + '\'')
# The drive status
n = 0
for i in range(len(result['QUICKSTATUS_DRIVE'])):
n = n + 1
module_int = module_to_phys[result['QUICKSTATUS_DRIVE'][i]['DRIVE_UNIT_PHY_NUM']]
if int(result['QUICKSTATUS_DRIVE'][i]['ERRORS']) > 0 and returncode < 2:
returncode = 2
outputSummary = 'Number of errors in Module ' + str(module_int) + ' Drive ' + str(n) + ' \'' + str(result['QUICKSTATUS_DRIVE'][i]['ERRORS']) + '\''
elif int(result['QUICKSTATUS_DRIVE'][i]['WARNINGS']) > 0 and returncode < 1:
returncode = 1
outputSummary = 'Number of warnings in Module ' + str(module_int) + ' Drive ' + str(n) + ' \'' + str(result['QUICKSTATUS_DRIVE'][i]['WARNINGS']) + '\''
elif result['QUICKSTATUS_DRIVE'][i]['DRIVE_ERROR'] != 'FALSE' and returncode < 2:
returncode = 2
outputSummary = 'Module ' + str(module_int) + ' Drive ' + str(n) + ' reports it has errors'
elif result['QUICKSTATUS_DRIVE'][i]['DRIVE_READY'] != 'TRUE' and returncode < 2:
returncode = 2
outputSummary = 'Module ' + str(module_int) + ' Drive ' + str(n) + ' reports it is not ready'
output.append('Module ' + str(module_int) + ' Drive ' + str(n) + ' Load status \'' + str(drive_load_status[result['QUICKSTATUS_DRIVE'][i]['DRIVE_LOAD_STATUS']]) + '\'')
output.append('Module ' + str(module_int) + ' Drive ' + str(n) + ' Ready \'' + str(result['QUICKSTATUS_DRIVE'][i]['DRIVE_READY']) + '\'')
perfdata.append('module_' + str(module_int) + '_drive_' + str(n) + '_errors=' + str(result['QUICKSTATUS_DRIVE'][i]['ERRORS']))
perfdata.append('module_' + str(module_int) + '_drive_' + str(n) + '_warnings=' + str(result['QUICKSTATUS_DRIVE'][i]['WARNINGS']))
# Not sure what this is, seems related to security
#output.append('STACK_PLK_TOKEN_STATUS: \'' + str(result['QUICKSTATUS_STACK'][0]['STACK_PLK_TOKEN_STATUS']) + '\'')
if returncode == 3:
print('UNKNOWN: ' + outputSummary)
elif returncode == 2:
print('CRITICAL: ' + outputSummary)
elif returncode == 1:
print('WARNING: ' + outputSummary)
elif returncode == 0:
print('OK: ' + outputSummary)
else:
print('UNKNOWN unhandled: ' + outputSummary)
print('\n'.join(output) + ' | ' + ' '.join(perfdata))
# print("-"*20)
# if 'QUICKSTATUS_STACK' in result:
# print(result['QUICKSTATUS_STACK'][0])
# if 'QUICKSTATUS_UNIT' in result:
# print(result['QUICKSTATUS_UNIT'])
# if 'QUICKSTATUS_DRIVE' in result:
# print(result['QUICKSTATUS_DRIVE'])
# if 'QUICKSTATUS_SYSTEM' in result:
# print(result['QUICKSTATUS_SYSTEM'])
except TimeoutException:
print("UNKNOWN: Timeout Exception occured, is the certificate trusted by the host? Try with the switch to ignore certificate errors or use the switch for verbose output")
sys.exit(3)
except Exception:
print("UNKNOWN: Exception occured")
traceback.print_exc(file=sys.stdout)
sys.exit(3)
finally:
driver.close()
driver.quit()
sys.exit(returncode)
@TheFlyingCorpse
Copy link
Author

Icinga2 command definition

object CheckCommand "hpe_msl_health" {
    import "plugin-check-command"
    command = [ PluginDir + "/check_hpe_msl_quickstatus.py" ]
    timeout = 30s
    arguments += {
        "--host" = "$host.address$"
        "--ignoreCertificateErrors" = {
            set_if = "$msl_ignoreCertificateErrors$"
        }
        "--password" = "$msl_password$"
        "--username" = "$msl_username$"
    }
    vars.msl_ignoreCertificateErrors = false
    vars.msl_username = "user"
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment