Created
April 7, 2026 17:48
-
-
Save TrevorBenson/6699f27caabd43d59f5a10b8a73b4d8c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Get metrics from endpoints via SSH, parse to spec; send to Influx | |
| Uses a provided inventory YAML file which defines the devices and | |
| commands to process. Then references the environment | |
| optionsconfig.yaml which contains management IP addresses and | |
| credentials. Once the commands are efficiently extracted via SSH | |
| it references the parsing specifications (from the original | |
| inventory YAML file) which defines the regex matching rules, | |
| and the Influx measurement name, tags, keys, and data types. | |
| Finally the script forms the output into Influx line protocol for | |
| injection to InfluxDB. | |
| Args: | |
| usage: SSH2Influx.py [-h] [-d] -p paramfile [-g group] [-f frequency] | |
| Obtain metrics from a device via SSH; parse and format for InfluxDB | |
| options: | |
| -h, --help show this help message and exit | |
| -d, --debug Enables debug with copious console output, but none to InfluxDB | |
| -p paramfile, --paramfile paramfile | |
| YAML file with inventory and parsing specs | |
| -g group, --group group | |
| Device group from optionsconfig.yaml (default of "device_inventory") | |
| -f frequency, --frequency frequency | |
| Frequency (in seconds) to repeat collection (default of 300 seconds) | |
| Inputs/Reference files: | |
| parameters.yaml - (optional name) contains inventory, command, | |
| regex pattern and influx key/tag specs in YAML format | |
| (See examples/*.yaml for formatting guidance) | |
| optionsconfig.yaml - contains Influx server parameters and | |
| device secret credential information | |
| Returns: | |
| Output to console while running shows progress of periodic | |
| polls. Running in 'debug mode' will provide much more detailed | |
| information | |
| Version History: | |
| 5 2022-0420 | |
| normalized to take any command input | |
| 6 2023-0605 | |
| Added capability to specify target Influx server in YAML file | |
| 7 2023-0629 | |
| Updates to documentation | |
| 8 2023-0815 | |
| Enhancements to use ThreadPoolExecutor, better prompt handling | |
| and NX-OS inclusion in examples/ and processing | |
| 9 2023-0826 | |
| Packaging fixups for IMPACT24 | |
| 10 2023-0826 | |
| Added per param file thread parameter | |
| """ | |
| # Credits: | |
| __version__ = '10' | |
| __author__ = 'Jason Davis - jadavis@cisco.com' | |
| __license__ = 'Cisco Sample Code License, Version 1.1 - ' \ | |
| 'https://developer.cisco.com/site/license/cisco-sample-code-license/' | |
| import asyncio | |
| import asyncssh | |
| import sys | |
| import time | |
| import yaml | |
| import re | |
| import argparse | |
| import os | |
| import pprint | |
| import datetime | |
| import schedule | |
| import threading | |
| from common import getEnv | |
| import requests | |
| from concurrent.futures import ThreadPoolExecutor | |
| from concurrent.futures import as_completed | |
| import logging | |
| import cryptography.fernet | |
| from cryptography.fernet import Fernet | |
| # Global vars | |
| #MAX_THREADS = 10 # Number of threads to run in parallel - adjust to suit | |
| if os.path.exists('.key.file'): | |
| key_file = open('.key.file', 'r') | |
| key = key_file.read().encode('ASCII') | |
| key_file.close() | |
| if key: | |
| try: | |
| cipher_suite = Fernet(key) | |
| except ValueError: | |
| sys.exit("The key isn't valid") | |
| else: | |
| sys.exit('No key found in file') | |
| else: | |
| sys.exit('Error: key file is missing') | |
| class SSHTarget: | |
| # Main class for devices to be polled; includes device parameters | |
| # including prompt definition for follow-on polling | |
| # Additional methods for running commands | |
| def __init__(self, info): | |
| self.alias = info["hostalias"] | |
| self.mgmt = info["host"] | |
| self.reachable = None | |
| # self.username = info["username"] | |
| # self.password = info["password"] | |
| try: | |
| self.username = cipher_suite.decrypt(info["username"]).decode('ASCII') | |
| self.password = cipher_suite.decrypt(info["password"]).decode('ASCII') | |
| except cryptography.fernet.InvalidToken: | |
| sys.exit('Error - key failed check') | |
| self.commands = info["commands"] | |
| #if DEBUG: print(f'\n\n=====Learning device: {self.alias}') | |
| logging.debug(f'=====Learning device: {self.alias}') | |
| presult = self.get_prompt(self.mgmt, self.username, self.password) | |
| if presult == 'Failed': | |
| self.server_version, self.prompt = ('Undefined', 'Undefined') | |
| self.reachable = False | |
| else: | |
| self.server_version, self.prompt = presult[0], presult[1] | |
| self.reachable = True | |
| #print(self.server_version, self.prompt) | |
| if self.reachable: | |
| print(f'{self.alias} initialized') | |
| logging.debug(f'prompt is [{self.prompt}]\n' | |
| f'SSH server type is [{self.server_version}]') | |
| def __str__(self): | |
| return str(self.__class__) + ": " + str(self.__dict__) | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| print("exited") | |
| async def _get_prompt(self, device, username, password): | |
| async with asyncssh.connect(device, username=username, | |
| password=password, | |
| client_keys=None, | |
| known_hosts=None, | |
| connect_timeout=10) as conn: | |
| server_version = conn.get_extra_info(name='server_version') | |
| #if DEBUG: print(f'DEBUG: Socket connection info:\n' | |
| # f'{conn.get_extra_info("socket")}' | |
| # f'\nserver version: {server_version}') | |
| logging.debug(f'DEBUG: Socket connection info:\n' | |
| f'{conn.get_extra_info("socket")}\n' | |
| f'server version: {server_version}') | |
| if 'Cisco' in server_version or 'PKIX' in server_version or \ | |
| 'SSH-2.0-OpenSSH' in server_version: | |
| # Yeah - got a Cisco device | |
| # Initial list of common prompt delimiters/separators; | |
| # we use generic ones until we learn the device-specific | |
| delims = ('#', '$', '>') | |
| result = '' | |
| async with conn.create_process(term_type="vt100") as process: | |
| #process.stdin.write('!test\n\n') | |
| process.stdin.write('\n\n') | |
| try: | |
| result = await asyncio.wait_for( | |
| process.stdout.readuntil(delims), | |
| timeout=45) | |
| except Exception as e: | |
| print(f'prompt timeout in _get_prompt step {e}') | |
| NEWLINE = '\n' | |
| prompt = result.strip().split(NEWLINE)[-1].strip() | |
| return server_version, prompt | |
| elif 'Ubuntu' in server_version: | |
| # OK - we got an Ubuntu endpoint | |
| return server_version, ':~$' | |
| else: | |
| # OK - something else, assume a simple $ ending prompt | |
| return server_version, '$' | |
| def get_prompt(self, device, username, password): | |
| #if DEBUG: print('Starting get_prompt') | |
| logging.debug('Starting get_prompt') | |
| try: | |
| return asyncio.run(self._get_prompt(device, username, | |
| password)) | |
| except (OSError, asyncssh.Error) as exc: | |
| print(f'SSH connection failed in get_prompt to {device}') | |
| #sys.exit('SSH connection failed: ' + str(exc)) | |
| return ('Failed') | |
| async def _run_command(self): | |
| async with asyncssh.connect(self.mgmt, username=self.username, | |
| password=self.password, | |
| client_keys=None, | |
| known_hosts=None, | |
| connect_timeout=10) as conn: | |
| print(f'Connection made to {self.alias} / ' | |
| f'{conn.get_extra_info("peername")[0]}:' | |
| f'{conn.get_extra_info("peername")[1]} with prompt ' | |
| f'<{self.prompt}>') | |
| process = await conn.create_process(request_pty='force', | |
| term_type="vt100") | |
| result = '' | |
| try: | |
| result += await asyncio.wait_for( | |
| process.stdout.readuntil(self.prompt), | |
| timeout=45) | |
| except Exception as e: | |
| print(f'Prompt timeout in get_prompt for header with error:\n {e}') | |
| #print(f'Login session header/output: [{result}]') | |
| logging.debug(f'Login session header/output: [{result}]') | |
| if 'Cisco' in self.server_version or 'PKIX' in self.server_version: | |
| # Prep env with 'term len 0' | |
| command = 'terminal length 0' | |
| logging.debug(f'Working setup command - [{command}]') | |
| process.stdin.write(command + '\n') | |
| logging.debug(f'Sent - [{command}]') | |
| logging.debug(f'Waiting for prompt [{self.prompt}] for {self.alias}') | |
| time.sleep(1) | |
| result = '' | |
| try: | |
| result += await asyncio.wait_for( | |
| process.stdout.readuntil(self.prompt), | |
| timeout=45) | |
| except Exception as e: | |
| print(f'prompt timeout in _run_command step {e}') | |
| #print(f'tl0 command [{command}] output:\n[{result}]') | |
| logging.debug(f'tl0 command [{command}] output:\n[{result}]') | |
| result = '' | |
| #print('COMPLETE TERM LEN 0 injection') | |
| # Start specific command collection | |
| output_records = [] | |
| logging.debug(f'Commands to execute are:\n{self.commands}') | |
| for item in self.commands: | |
| command = item['cmd'] | |
| parsespec = item['parsespec'] | |
| #if DEBUG: print(f'DEBUG: Working command - <{command}> ' | |
| # f'and parsespec <{parsespec}>') | |
| logging.debug(f'Working job command - <{command}> ' | |
| f'and parsespec <{parsespec}>') | |
| process.stdin.write(command + "\n") | |
| time.sleep(1) | |
| #await asyncio.wait_for(process.stdout.readuntil(self.prompt), | |
| # timeout=10) | |
| #process.stdin.write("\n") | |
| #result = '' | |
| result = '' | |
| try: | |
| result += await asyncio.wait_for( | |
| process.stdout.readuntil(self.prompt), | |
| timeout=90) | |
| except Exception as e: | |
| print(f'prompt timeout in _run_command with error:\n {e}') | |
| #if DEBUG: print(f'Command specific [{command}] output:\n[{result}]') | |
| #print(f'Command specific [{command}] output:\n[{result}]') | |
| logging.debug(f'Command specific [{command}] output:\n[{result}]') | |
| output_records.append((self.alias, command, parsespec, | |
| result)) | |
| conn.close() | |
| return output_records | |
| def run_commands(self): | |
| #if DEBUG: print(f' =Collecting commands for device: {self.alias}') | |
| logging.debug(f' =Collecting commands for device: {self.alias}') | |
| try: | |
| return asyncio.run(self._run_command()) | |
| except Exception as exc: | |
| print(f'ALERT - Got an exception - [{exc}]') | |
| print(f'SSH connection failed in run_commands to ' | |
| f'{self.alias}: ' + str(exc)) | |
| def get_arguments(): | |
| # Obtain user options - parameters YAML file is required | |
| # defaults of no debug mode and polling every 300 seconds | |
| parser = argparse.ArgumentParser(description='Obtain metrics from ' | |
| 'a device via SSH; parse and ' | |
| 'format for InfluxDB') | |
| parser.add_argument('-d', '--debug', action='store_const', | |
| default=False, | |
| const=True, | |
| dest='debug', | |
| help='Enables debug with copious console ' | |
| 'output, but none to InfluxDB') | |
| parser.add_argument('-p', '--paramfile', metavar='paramfile', | |
| required=True, | |
| help='YAML file with inventory and parsing specs') | |
| parser.add_argument('-g', '--group', metavar='group', | |
| default="device_inventory", | |
| help=('Device group from optionsconfig.yaml ' | |
| '(default of "device_inventory")')) | |
| parser.add_argument('-f', '--frequency', metavar='frequency', | |
| default=300, | |
| type=int, | |
| help='Frequency (in seconds) to repeat ' | |
| 'collection (default of 300 seconds)') | |
| parser.add_argument('-t', '--threads', metavar='threads', | |
| default=1, | |
| type=int, | |
| help='Thread count to use with collection ' | |
| '(default of 1)') | |
| args = parser.parse_args() | |
| return args | |
| def get_params(paramfile, params): | |
| # Read specifications file (YAML) that defines how to parse the | |
| # CLI output - return to keep in memory for other processing | |
| with open(paramfile, 'r') as file: | |
| dictionary = yaml.safe_load(file) | |
| try: | |
| paramresults = eval(f'dictionary{params}') | |
| except KeyError: | |
| paramresults = None | |
| return paramresults | |
| def get_work(workparams, devicecreds): | |
| # Get items, credentials and commands to execute | |
| # Start by getting list of environment devices from optionsconfig.yaml | |
| # allow user to define device group that maps to optionconfig.yaml | |
| groupcommands = workparams.get('groupcommands', None) | |
| if groupcommands == None: groupcommands = list() | |
| default_cred_set = workparams['credential_set'] | |
| default_creds = getEnv.getparam(default_cred_set) | |
| logging.debug(f'Host list for processing {workparams["hosts"]}') | |
| worklist = [] | |
| for item in workparams['hosts']: | |
| host = item.get("host") | |
| specificcommands = item.get("commands") | |
| logging.debug(f'Working host {host} with commands {specificcommands}') | |
| try: | |
| device = [device for device in devicecreds if device['alias'] == host][0] | |
| except IndexError: | |
| print(f'WARNING: device {host} is not found in ' | |
| 'optionsconfig.yaml - skipping') | |
| continue | |
| logging.debug(f'Working device - {device}') | |
| username = device.get('username', default_creds["username"]) | |
| password = device.get('password', default_creds["password"]) | |
| mgmthostnameip = device['mgmt_hostnameip'] | |
| if specificcommands is None: | |
| commands = groupcommands | |
| else: | |
| commands = item['commands'] + groupcommands | |
| worklist.append({"hostalias": host, | |
| "host": mgmthostnameip, | |
| "username": username, | |
| "password": password, | |
| "commands": commands, | |
| }) | |
| logging.debug(f'Entire worklist is:\n{worklist}') | |
| return worklist | |
| def get_run_specs(args): | |
| # Read Influx target, if needed | |
| altinflux = get_params(args.paramfile, "['InfluxDB']") | |
| if altinflux is None: | |
| influxenv = getEnv.getparam("InfluxDB") | |
| print(f'Using project-wide Influx server: {influxenv["alias"]}') | |
| else: | |
| influxenv = getEnv.getparam(altinflux) | |
| print(f'Using alternative Influx server: {influxenv["alias"]}') | |
| # Read inventory from job-specific parameters file to build work list | |
| inventory = get_params(args.paramfile, '["inventory"]') | |
| logging.debug(f'==Inventory specs\n{inventory}') | |
| # Read group parameters info from environment optionconfig.yaml to | |
| # map device IPs and creds | |
| deviceparams = getEnv.getparam(args.group) | |
| logging.debug(f'==Device Parameters\n{deviceparams}') | |
| worklist = get_work(inventory, deviceparams) | |
| logging.debug(f'==Work list\n{worklist}') | |
| # Do initial connections and prompt determination with devices | |
| print('\n=====Learning device prompts') | |
| with ThreadPoolExecutor(max_workers=THREADS, | |
| thread_name_prefix='DeviceLearn') as executor: | |
| processed_results = list(executor.map(SSHTarget, worklist)) | |
| inventory = {} | |
| logging.debug(f'Processed device targets are:\n{processed_results}') | |
| for item in processed_results: | |
| logging.debug(f'{item.alias}: {item}') | |
| inventory[item.alias] = item | |
| logging.debug(inventory) | |
| logging.debug(f'==Inventory\n{inventory}') | |
| reachable_devices = [device.alias for device in processed_results if device.reachable] | |
| unreachable_devices = [device.alias for device in processed_results if not device.reachable] | |
| if len(unreachable_devices) > 0: | |
| print(f'Unreachable devices {len(unreachable_devices)}\n' | |
| f'{unreachable_devices}\n') | |
| logging.debug(f'==Reachable Devices:\n{reachable_devices}') | |
| logging.debug(f'==UNReachable Devices:\n{unreachable_devices}') | |
| # Read the parsing specifications file containing regex matches and | |
| # influx measurement/tag/key assignments | |
| parse_specs = get_params(args.paramfile, '["parsespecs"]') | |
| logging.debug(f'== Pattern Matching Specs are:\n{parse_specs}') | |
| return worklist, inventory, parse_specs, influxenv, \ | |
| reachable_devices, unreachable_devices | |
| def extract_matches(parsespecs, aggregate_output): | |
| # Use the specs input to do pattern matches against the collected output | |
| measurements = [] | |
| print(f'\n=====Processing output of hosts...') | |
| for device_results in aggregate_output: | |
| static_measurements = [] | |
| for output in device_results: | |
| print(f'Processing: [{output[0]}]') | |
| logging.debug(f'Working on device <{output[0]}> ' | |
| f'for command <{output[1]}> ' | |
| f'with parsespec <{output[2]}>') | |
| parsespec = [parsespec for parsespec in parsespecs | |
| if parsespec["parsespec"] == output[2]][0] | |
| measurement = [output[0], parsespec["measurement"]] | |
| logging.debug(f'Measurement currently: {measurement}') | |
| statictags = parsespec.get("statictags") | |
| staticfields = parsespec.get("staticfields") | |
| if statictags: | |
| logging.debug(f'extract_matches: Static tags are: {statictags}') | |
| for statictag in statictags: | |
| tagname = statictag.get("tagname") | |
| tagvalue = statictag.get("tagvalue") | |
| tagtype = statictag.get("tagtype") | |
| measurement.append((tagname, 'tag', 'string', | |
| tagvalue)) | |
| found_existing_element = False | |
| for counter, static_measure_element in enumerate(static_measurements): | |
| logging.debug(f'static_measure_element: {counter}, {static_measure_element}') | |
| if tagname in static_measure_element: | |
| found_existing_element = True | |
| # static_measurements[counter] = (fieldname, 'field', fieldtype, fieldvalue) | |
| static_measurements[counter] = (tagname, 'tag', tagtype, tagvalue) | |
| if not found_existing_element: | |
| # static_measurements.append((fieldname, 'field', fieldtype, fieldvalue)) | |
| static_measurements.append((tagname, 'tag', tagtype, tagvalue)) | |
| if staticfields: | |
| logging.debug(f'extract_matches: Static fields are: {staticfields}') | |
| for staticfield in staticfields: | |
| fieldname = staticfield.get("fieldname") | |
| fieldvalue = staticfield.get("fieldvalue") | |
| fieldtype = staticfield.get("fieldtype") | |
| measurement.append((fieldname, 'field', fieldtype, | |
| fieldvalue)) | |
| found_existing_element = False | |
| for counter, static_measure_element in enumerate(static_measurements): | |
| logging.debug(f'static_measure_element: {counter}, {static_measure_element}') | |
| if fieldname in static_measure_element: | |
| found_existing_element = True | |
| static_measurements[counter] = (fieldname, 'field', fieldtype, fieldvalue) | |
| if not found_existing_element: | |
| static_measurements.append((fieldname, 'field', fieldtype, fieldvalue)) | |
| ''' There are three matchtypes to handle - | |
| single - scans over output and associates tags to output serially | |
| multiple - scans over output and associates tags to output | |
| multiple times - e.g. interface or process data, line-by-line | |
| iterative - multiple scans over the same output | |
| ''' | |
| if parsespec["matchtype"] == 'single': | |
| logging.debug(f'Working a single matchtype') | |
| logging.debug(f'regex pattern is:\n{parsespec["regex"]}') | |
| logging.debug(f'output to search is:\n{output[3]}') | |
| x = re.search(fr'{parsespec["regex"]}', output[3], | |
| re.S | re.M) | |
| logging.debug(f'Got a search result of {x}') | |
| if x: | |
| logging.debug(f'Matching groups - {x.groups()}') | |
| for index, item in enumerate(x.groups(), start=1): | |
| matchname = f'parsespec["match{index}"]' | |
| matchkeytype = f'parsespec["match{index}keytype"]' | |
| matchvaluetype = f'parsespec["match{index}valuetype"]' | |
| matchvalue = f'{item}' | |
| logging.debug(f'Tag |{eval(matchname)}|' | |
| f'is a |{eval(matchkeytype)}| ' | |
| f'of type {eval(matchvaluetype)} ' | |
| f'with value: |{matchvalue}|') | |
| measurement.append((eval(matchname), | |
| eval(matchkeytype), | |
| eval(matchvaluetype), | |
| matchvalue)) | |
| measurements.append(measurement) | |
| elif parsespec["matchtype"] == 'multiple': | |
| logging.debug(f'Working a multiple matchtype') | |
| logging.debug(f'regex pattern is:\n{parsespec["regex"]}') | |
| logging.debug(f'output to search is:\n{output[3]}') | |
| matches = re.findall(fr'{parsespec["regex"]}', output[3], | |
| re.S | re.M) | |
| logging.debug(f'Groups matching are:\n{matches}') | |
| for matcheditem in matches: | |
| logging.debug(f'Processing item: {matcheditem}') | |
| # Reset measurement for each instance (only multiple) | |
| if static_measurements: | |
| logging.debug(f'static_measurements is: {static_measurements}') | |
| measurement = [output[0], parsespec["measurement"]] | |
| for static_measurement in static_measurements: | |
| measurement.append(static_measurement) | |
| else: | |
| measurement = [output[0], parsespec["measurement"]] | |
| logging.debug(f'Current measurement list after resetting: {measurement}') | |
| for index, item in enumerate(matcheditem, start=1): | |
| # Get individual match data | |
| matchname = f'parsespec["match{index}"]' | |
| matchkeytype = f'parsespec["match{index}keytype"]' | |
| matchvaluetype = f'parsespec["match{index}valuetype"]' | |
| matchvalue = f'matcheditem[{index - 1}]' | |
| logging.debug(f' Tag |{eval(matchname)}| ' | |
| f'is a |{eval(matchkeytype)}| ' | |
| f'of type {eval(matchvaluetype)} ' | |
| f'with value: |{eval(matchvalue)}|') | |
| measurement.append((eval(matchname), | |
| eval(matchkeytype), | |
| eval(matchvaluetype), | |
| eval(matchvalue))) | |
| measurements.append(measurement) | |
| logging.debug(f'Current measurements are:' | |
| f'{pprint.pprint(measurements)}') | |
| elif parsespec["matchtype"] == 'iterative': | |
| logging.debug(f'Working an iterative matchtype') | |
| measurement = [output[0], parsespec["measurement"]] | |
| logging.debug(f'extract_matches(iterative): Current measurement is: {measurement}') | |
| regexmatches = parsespec["regexmatches"] | |
| for groupspec in regexmatches: | |
| logging.debug(f'DEBUG extract_matches(iterative): Current groupspec is: {groupspec}') | |
| # See if we have a multimatch group - special handling | |
| if "groups" in groupspec: | |
| x = re.findall(fr'{groupspec["regex"]}', | |
| output[3], | |
| re.S | re.M) | |
| # TO-DO: Add logic for no match | |
| logging.debug(f'DEBUG: group match(es) is/are:\n {x}') | |
| for count, match in enumerate(x): | |
| measurement.append((groupspec["groups"][count]["groupname"], | |
| groupspec["groups"][count]["groupkeytype"], | |
| groupspec["groups"][count]["groupvaluetype"], | |
| match.strip())) | |
| else: | |
| # Regular processing | |
| x = re.search(fr'{groupspec["regex"]}', | |
| output[3], | |
| re.S | re.M) | |
| if x == None: | |
| print(f'WARNING: No match of [{groupspec["regex"]}] on {output[0]} - skipping') | |
| continue | |
| logging.debug(f'DEBUG groupmatch is: {x.group(1)}') | |
| measurement.append((groupspec["groupname"], | |
| groupspec["groupkeytype"], | |
| groupspec["groupvaluetype"], | |
| x.group(1).strip())) | |
| measurements.append(measurement) | |
| return measurements | |
| def assemble_influx_lp(measurements): | |
| # Take list of measurements and assemble into Influx Line Protocol | |
| logging.debug(f'assemble_influx_lp: Measurements to process:\n{measurements}') | |
| influxlines = "" | |
| for item in measurements: | |
| logging.debug(f'assemble_influx_lp: Working item:\n{item}') | |
| device = item.pop(0) | |
| measurement = item.pop(0) | |
| mtags = [x for x in item if x[1] == 'tag'] | |
| mfields = [x for x in item if x[1] == 'field'] | |
| logging.debug(f'Tags: {mtags}\nFields: {mfields}') | |
| influxline = f'{measurement},device={device},' | |
| for mtagitem in mtags: | |
| tagkey = f'{mtagitem[0]}={mtagitem[3]}'.strip().replace(' ', '\ ') | |
| # print(f'|{tagkey}|') | |
| nonspacestr = re.sub(r'(\\\s){2,}', "", tagkey) | |
| if nonspacestr.endswith('\ '): | |
| nonspacestr = nonspacestr.replace('\ ', '') | |
| influxline += nonspacestr + ',' | |
| influxline = influxline.rstrip(',') | |
| influxline += ' ' | |
| for mfielditem in mfields: | |
| # if string | |
| if mfielditem[2] == 'string': | |
| influxline += f'{mfielditem[0]}="{mfielditem[3]}",' | |
| else: | |
| # if float, int, boolean, decimal | |
| influxline += f'{mfielditem[0]}={mfielditem[3]},' | |
| influxline = influxline.rstrip(',') | |
| logging.debug(f'DEBUG assemble_influx_lp: current influxline - {influxline}') | |
| influxlines += influxline + '\n' | |
| logging.debug(f'DEBUG assemble_influx_lp: influxlines are\n{influxlines}') | |
| return influxlines | |
| def send_to_influx(influxenv, measurements): | |
| # Send incoming measurements to InfluxDB - measurements must be | |
| # in Influx line protocol format. influxenv is a dictionary of | |
| # Influx server parameters - protcol, host, port, bucket, org, | |
| # and API token for writing | |
| influxurl = (f'{influxenv["protocol"]}://' | |
| f'{influxenv["host"]}:{influxenv["port"]}' | |
| f'/api/v2/write?bucket={influxenv["bucket"]}' | |
| f'&org={influxenv["org"]}&precision=s') | |
| headers = { | |
| 'Accept': 'application/json', | |
| 'Authorization': 'Token ' + influxenv["token"], | |
| 'Content-Type': 'text/plain' | |
| } | |
| try: | |
| response = requests.request("POST", influxurl, headers=headers, | |
| data=measurements) | |
| response.raise_for_status() | |
| print(f'{response.status_code} - {response.reason} - {response.text}') | |
| if response.status_code == 204: | |
| print('Good data push to InfluxDB') | |
| except requests.exceptions.HTTPError as errh: | |
| print("Http Error:", errh) | |
| except requests.exceptions.ConnectionError as errc: | |
| print("Error Connecting to InfluxDB server! Error was:\n", errc) | |
| except requests.exceptions.Timeout as errt: | |
| print("Timeout Error in send_to_influx:", errt) | |
| except requests.exceptions.RequestException as err: | |
| print("Oops: Something Else", err) | |
| print(f'\nFinished at: {str(time.ctime())}') | |
| def main_loop(worklist, inventory, parse_specs, influxenv): | |
| # At this point we've built a list of workable items, we can now | |
| # build an asynchronous work queue and execute as fast as they | |
| # respond | |
| startTime = time.time() | |
| command_results = [] | |
| print(f'\n=====Collecting commands for hosts on {time.ctime()}') | |
| # Refactored | |
| with ThreadPoolExecutor(max_workers=THREADS, | |
| thread_name_prefix='CollectCmds') as executor: | |
| futureresults = {executor.submit(inventory[item['hostalias']].run_commands) for item in worklist} | |
| for future in as_completed(futureresults): | |
| command_results.append(future.result()) | |
| logging.debug(f'Total command_results:\n{command_results}') | |
| #print(command_results) | |
| measurements = extract_matches(parse_specs, command_results) | |
| logging.debug(f'measurements after extract_matches: {measurements}') | |
| influx_lines = assemble_influx_lp(measurements) | |
| print(f'\n=====COMPLETED processing - Final Influx line protocol output is:\n{influx_lines}') | |
| # Send to Influx | |
| if not DEBUG: | |
| send_to_influx(influxenv, influx_lines) | |
| executionTime = (time.time() - startTime) | |
| print(f'Execution time in seconds: {executionTime:.3f}') | |
| print('==========\n') | |
| #print(f'\nWaiting for next polling interval [scheduled every {FREQUENCY} sec]', end='', flush=True) | |
| print(f'\nWaiting for next polling interval [scheduled every {FREQUENCY} sec]') | |
| def run_threaded(job_func, worklist, inventory, parse_specs, influxenv): | |
| job_thread = threading.Thread(target=job_func, args=(worklist, | |
| inventory, | |
| parse_specs, | |
| influxenv,)) | |
| print(f'\nRunning new thread {threading.current_thread().ident} at {time.ctime()}') | |
| job_thread.start() | |
| #### | |
| if __name__ == '__main__': | |
| execstartTime = datetime.datetime.now() | |
| # Get command-line arguments or warn and exit | |
| args = get_arguments() | |
| DEBUG = args.debug | |
| FREQUENCY = args.frequency | |
| THREADS = args.threads | |
| if DEBUG: | |
| logging.basicConfig(level=logging.DEBUG, | |
| format='%(relativeCreated)6d %(threadName)s %(message)s') | |
| pool = ThreadPoolExecutor() | |
| print(f'Starting {os.path.basename(__file__)} with ' | |
| f'parameters file "{args.paramfile}" at {time.ctime()}\n' | |
| f'DEBUG mode is {DEBUG}\nThread count is {THREADS} of ' | |
| f'{pool._max_workers} maximum available on this platform') | |
| # Run process manually first, then schedule per spec | |
| worklist, inventory, parse_specs, influxenv, reachable_devices, \ | |
| unreachable_devices = get_run_specs(args) | |
| if len(reachable_devices) == 0: | |
| sys.exit('EXITING - NO reachable devices') | |
| # TO-DO Add some logic to handle unreachable devices being skipped | |
| # in next process...later to be added after polling cycle for retry | |
| main_loop(worklist, inventory, parse_specs, influxenv) | |
| # If we're running in DEBUG mode we won't schedule repeated runs | |
| if DEBUG: sys.exit('\nCompleted debug run') | |
| schedule.every(args.frequency).seconds.do(run_threaded, | |
| main_loop, | |
| worklist, | |
| inventory, | |
| parse_specs, | |
| influxenv) | |
| # Keep looping while scheduler is running, output incremental | |
| # periods to console so user can see continued progress | |
| try: | |
| while True: | |
| if FREQUENCY - schedule.idle_seconds() > 20: | |
| print(f'\rNext run in {round(schedule.idle_seconds())} seconds.', end='', flush=True) | |
| schedule.run_pending() | |
| time.sleep(5) | |
| #print(schedule.next_run()) | |
| #print(schedule.idle_seconds()) | |
| except KeyboardInterrupt: | |
| print('\nUser initiated stop - shutting down...') | |
| try: | |
| sys.exit(0) | |
| except SystemExit: | |
| os._exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment