Skip to content

Instantly share code, notes, and snippets.

@haginara
Last active October 9, 2019 21:45
Show Gist options
  • Save haginara/ba362d6e5ff18a352c99c6614962579f to your computer and use it in GitHub Desktop.
Save haginara/ba362d6e5ff18a352c99c6614962579f to your computer and use it in GitHub Desktop.
Get NSM status to dict()
#!/usr/bin/env python3
import os
import re
import sys
import json
import argparse
import pprint
import logging
__version__ = (0, 1, 0)
logger = logging.getLogger(__name__)
def get_options():
args = argparse.ArgumentParser()
args.add_argument("-H", "--hostname", default="HQ-IDS1", help="Hostname")
args.add_argument(
"-v", "--version", action="version", version=".".join(map(str, __version__))
)
options = args.parse_args()
return options
def nsm_strip(line: bytes) -> str:
line = re.sub("\\x1b\[\d+;\d+m", "", line.decode().strip())
line = re.sub("\\x1b\[\w+\\x1b\[\w+", "", line)
line = line.replace("\r", "")
line = line.replace("\n", "")
return line
strip_status = lambda l: l.replace("[", "").replace("]", "").strip()
def check_nsm_status(hostname):
def _get_bro_status(data, lines, next_line):
bro_header = next_line
while True:
_row = next(lines)
if f"Status: {hostname}" in _row:
module = _row.replace("Status: ", "").strip()
break
row = _row.split()
logger.debug(f"Bro: {row}")
data[f"bro-{row[0]}"] = {
"desc": " ".join(row),
"status": "OK" if row[3] == "running" else row[3],
}
return
process = subprocess.Popen(["nsm", "--all", "--status"], stdout=subprocess.PIPE)
out, err = process.communicate()
lines = filter(lambda l: l, [nsm_strip(line) for line in out.splitlines()])
module = ""
data = {}
while True:
try:
line = next(lines)
next_line = next(lines)
logger.debug(f"line: {line}, nxt_line: {next_line}")
if "Status: securityonion" in line:
next_line = strip_status(next(lines))
logger.debug(f"sguil server: {next_line}")
data["sguil"] = {"status": next_line, "desc": ""}
elif "Status: HIDS" in line:
next_line = strip_status(next(lines))
logger.debug(f"ossec_agent: {next_line}")
data["ossec_agent"] = {"desc": "", "status": next_line}
elif "Status: Bro" in line:
_get_bro_status(data, lines, next_line)
elif line.startswith(f"Status: {hostname}"):
module = line.replace("Status: ", "").strip()
line = next_line
next_line = next(lines)
if module:
m = re.match(r"\* (?P<name>[-_\w]+) \((?P<desc>.*)\)", line)
next_line = strip_status(next_line)
if m:
header = m.groupdict()
logger.debug(f"{module}-{header['name']}: {next_line}")
data[f"{module}-{header['name']}"] = {
"desc": header["desc"],
"status": next_line,
}
else:
logger.debug(f"{module}-{line}: {next_line}")
data[f"{module}-{line}"] = {"desc": "", "status": next_line}
except StopIteration:
break
for name in data:
if data[name].get("status", None) != "OK":
logger.debug(f"{name}: {data[name]['status']}")
return data
if __name__ == "__main__":
options = get_options()
logger.setLevel(logging.INFO)
# Check nsm status
data = check_nsm_status(options.hostname)
for name in data:
logger.info(f"{name}: {data[name].get('status', None)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment