Skip to content

Instantly share code, notes, and snippets.

@patroqueeet
Last active December 29, 2024 10:21
Show Gist options
  • Save patroqueeet/ed48078de9b758cd35d92fa667894a84 to your computer and use it in GitHub Desktop.
Save patroqueeet/ed48078de9b758cd35d92fa667894a84 to your computer and use it in GitHub Desktop.
naxsi munin monitoring
#!/usr/bin/env python3
"""Munin plugin to monitor naxsi rule hits
from https://gist.github.com/patroqueeet/ed48078de9b758cd35d92fa667894a84
needs logtail command being installed
=head1 NAME
naxsi - monitor naxsi rule hits
=head1 CONFIGURATION
Following config is needed:
[naxsi_exceptions]
user root
group root
env.nginx_error_log_file /var/log/nginx/error.log
=head1 AUTHOR
Jirka Schaefer <[email protected]>
=head1 LICENSE
GPLv2
=head1 MAGIC MARKERS
#%# family=auto
#%# capabilities=autoconf
=cut
"""
import os
import subprocess
import sys
import re
import json
state_file = os.path.join(os.environ.get('MUNIN_PLUGSTATE'), os.environ.get('MUNIN_STATEFILE'))
def run_binary(args):
"""Run binary and return output."""
try:
return subprocess.run(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, check=False,
encoding='utf-8', errors='ignore').stdout
except FileNotFoundError:
return ''
def get_values():
logfile=os.environ.get('nginx_error_log_file')
if not logfile:
raise ValueError('no nginx err log file configured')
output = run_binary([
'logtail',
# '-t', # FIXME test mode,: remove
'-f',
logfile ]
)
with open(state_file, 'rt') as f:
rule_ids = json.loads(f.read() or '[]')
res = dict((k, 0) for k in rule_ids)
for line in output.splitlines():
if 'NAXSI_FMT' in line:
matches = re.findall(f'&id\d=(\d+)&', line)
for rule_id in matches:
res[rule_id] = res.setdefault(rule_id, 0)+1
if not res:
#raise ValueError(f'no results in {len(output.splitlines())} lines:\n{output}')
return
for k,v in res.items():
print(f"ID_{k}.value {v}")
with open(state_file, 'wt') as f:
f.write(json.dumps(list(res.keys())))
def print_config():
print('graph_title Naxsi exceptions count by rule id')
print('graph_vlabel count')
print('graph_args --base 1000 --lower-limit 0')
print('graph_category waf')
print('env.nginx_error_log_file')
with open(state_file, 'rt') as f:
rule_ids = json.loads(f.read() or '[]')
for k in rule_ids:
print(f'ID_{k}.label ID_{k}')
print(f'ID_{k}.warning 10')
print(f'ID_{k}.critical 100')
if __name__ == '__main__':
if len(sys.argv) > 1 and sys.argv[1] == 'autoconf':
print('no')
elif len(sys.argv) > 1 and sys.argv[1] == 'config':
print_config()
else:
get_values()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment