Last active
December 29, 2024 10:21
-
-
Save patroqueeet/ed48078de9b758cd35d92fa667894a84 to your computer and use it in GitHub Desktop.
naxsi munin monitoring
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Munin plugin to monitor naxsi rule hits | |
from https://gist.github.com/patroqueeet/ed48078de9b758cd35d92fa667894a84 | |
needs logtail command being installed | |
=head1 NAME | |
naxsi - monitor naxsi rule hits | |
=head1 CONFIGURATION | |
Following config is needed: | |
[naxsi_exceptions] | |
user root | |
group root | |
env.nginx_error_log_file /var/log/nginx/error.log | |
=head1 AUTHOR | |
Jirka Schaefer <[email protected]> | |
=head1 LICENSE | |
GPLv2 | |
=head1 MAGIC MARKERS | |
#%# family=auto | |
#%# capabilities=autoconf | |
=cut | |
""" | |
import os | |
import subprocess | |
import sys | |
import re | |
import json | |
state_file = os.path.join(os.environ.get('MUNIN_PLUGSTATE'), os.environ.get('MUNIN_STATEFILE')) | |
def run_binary(args): | |
"""Run binary and return output.""" | |
try: | |
return subprocess.run(args, stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, check=False, | |
encoding='utf-8', errors='ignore').stdout | |
except FileNotFoundError: | |
return '' | |
def get_values(): | |
logfile=os.environ.get('nginx_error_log_file') | |
if not logfile: | |
raise ValueError('no nginx err log file configured') | |
output = run_binary([ | |
'logtail', | |
# '-t', # FIXME test mode,: remove | |
'-f', | |
logfile ] | |
) | |
with open(state_file, 'rt') as f: | |
rule_ids = json.loads(f.read() or '[]') | |
res = dict((k, 0) for k in rule_ids) | |
for line in output.splitlines(): | |
if 'NAXSI_FMT' in line: | |
matches = re.findall(f'&id\d=(\d+)&', line) | |
for rule_id in matches: | |
res[rule_id] = res.setdefault(rule_id, 0)+1 | |
if not res: | |
#raise ValueError(f'no results in {len(output.splitlines())} lines:\n{output}') | |
return | |
for k,v in res.items(): | |
print(f"ID_{k}.value {v}") | |
with open(state_file, 'wt') as f: | |
f.write(json.dumps(list(res.keys()))) | |
def print_config(): | |
print('graph_title Naxsi exceptions count by rule id') | |
print('graph_vlabel count') | |
print('graph_args --base 1000 --lower-limit 0') | |
print('graph_category waf') | |
print('env.nginx_error_log_file') | |
with open(state_file, 'rt') as f: | |
rule_ids = json.loads(f.read() or '[]') | |
for k in rule_ids: | |
print(f'ID_{k}.label ID_{k}') | |
print(f'ID_{k}.warning 10') | |
print(f'ID_{k}.critical 100') | |
if __name__ == '__main__': | |
if len(sys.argv) > 1 and sys.argv[1] == 'autoconf': | |
print('no') | |
elif len(sys.argv) > 1 and sys.argv[1] == 'config': | |
print_config() | |
else: | |
get_values() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment