Last active
May 21, 2023 03:12
-
-
Save sjlongland/5a4fd21047cd505b91fc048eb6831049 to your computer and use it in GitHub Desktop.
fail2ban-subnet scraper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Automatically coalesce IPs from fail2ban into subnets and flag them for | |
banning. | |
""" | |
# © 2023 Stuart Longland <[email protected]> | |
# SPDX-License-Identifier: BSD-2-Clause | |
# Usage, run this periodically from `cron`, then configure your | |
# `fail2ban` instance to actually act on the bans: | |
# | |
# ==> /etc/fail2ban/filter.d/fail2ban-subnet.conf <== | |
# [INCLUDES] | |
# | |
# before = common.conf | |
# | |
# [Definition] | |
# | |
# failregex = fail2ban-subnet-scrape\.py: <SUBNET> is currently misbehaving$ | |
# | |
# ==> /etc/fail2ban/jail.d/subnet.conf <== | |
# [subnet] | |
# enabled = true | |
# filter = fail2ban-subnet | |
# banaction= iptables-allports | |
# logpath = /var/log/messages | |
# maxretry = 1 | |
# | |
# Known bugs: | |
# - scraping /var/log/fail2ban.log is horrible, it'd be nice to retrieve all | |
# the "recently seen" IPs from the `fail2ban-client` (or better yet, import | |
# `fail2ban`'s API since it is Python too), but it works. | |
# - it'd also be nice to "filter out" the subnets currently banned so we can | |
# avoid the "WARNING: <subnet> is already banned" | |
import re | |
import sys | |
import os | |
import time | |
import datetime | |
import ipaddress | |
import sqlite3 | |
import hashlib | |
import logging | |
import syslog | |
# fail2ban.log full path | |
FAIL2BAN_LOG = "/var/log/fail2ban.log" | |
# Subnet status database | |
SUBNET_DB = "/var/lib/fail2ban/subnet.sqlite3" | |
# Min subnet length: we don't block anything bigger than this | |
# e.g. 16 subnet bits minimum → do not block subnets bigger than a /16 | |
# One for IPv4; the other for IPv6 | |
MIN_SUBNET4_LENGTH = 16 | |
MIN_SUBNET6_LENGTH = 56 | |
# Max subnet length: we don't block anything smaller than this | |
# One for IPv4; the other for IPv6 | |
MAX_SUBNET4_LENGTH = 28 | |
MAX_SUBNET6_LENGTH = 64 | |
# Ignore these jails | |
IGNORE_JAILS = set(["subnet"]) | |
# Step size for subnet length | |
SUBNET_LENGTH_STEP = 4 | |
# Base number of strikes before we ban a subnet | |
SUBNET_STRIKES = 4 | |
# Adjustment based on subnet size, bigger subnets require more hits | |
SUBNET_STRIKES_SIZE_FACTOR = 2 | |
# Decrement delay: after N seconds, we will decrement a record that has | |
# not been seen a second time. | |
DECREMENT_DELAY = 14400 | |
# How long do we ban whole subnets for? | |
SUBNET_BAN_EXPIRY = 86400 | |
# fail2ban.log date/time format (we'll ignore milliseconds) | |
# Assume system local time | |
LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S" | |
# fail2ban.log log format regular expression | |
LOG_FORMAT_RE = re.compile( | |
r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} " # log timestamp | |
r"fail2ban\.filter +\[\d+\]: +INFO +" # logger name and level | |
# jail name, offending address, daemon log date/time | |
r"\[([^\[\]]+)\] Found (.+) - (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})$" | |
) | |
# Initialise logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger("fail2ban.subnet") | |
# Initialise or open the database | |
db_new = not os.path.exists(SUBNET_DB) | |
db = sqlite3.connect(SUBNET_DB) | |
if db_new: | |
# Create the tables | |
db.execute( | |
""" | |
CREATE TABLE lastrun ( | |
path TEXT NOT NULL, | |
firstlinemd5 TEXT, | |
lastfilepos INTEGER, | |
mtime INTEGER | |
); | |
""" | |
) | |
db.execute( | |
""" | |
CREATE UNIQUE INDEX lastrun_path ON lastrun(path); | |
""" | |
) | |
db.execute( | |
""" | |
CREATE TABLE found_ips ( | |
ip TEXT NOT NULL, | |
count INTEGER NOT NULL DEFAULT 0, | |
lastseen INTEGER NOT NULL, | |
decrement INTEGER NOT NULL | |
); | |
""" | |
) | |
db.execute( | |
""" | |
CREATE UNIQUE INDEX found_ips_ip ON found_ips(ip); | |
""" | |
) | |
# Pick up the last-known state | |
logfile_firstlinemd5 = None | |
logfile_lastfilepos = None | |
logfile_mtime = 0 | |
for (logfile_firstlinemd5, logfile_filelinepos, logfile_mtime) in db.execute( | |
""" | |
SELECT firstlinemd5, lastfilepos, mtime | |
FROM lastrun | |
WHERE path=? | |
""", | |
(FAIL2BAN_LOG,), | |
): | |
# Should only be one record | |
break | |
# Get the current log file mtime | |
logfile_mtime_now = os.stat(FAIL2BAN_LOG).st_mtime | |
if logfile_mtime_now <= logfile_mtime: | |
# Nothing changed | |
logger.info("Log file has not changed") | |
sys.exit(0) | |
# Scan the log file | |
ips = {} | |
for (linenum, line) in enumerate(open(FAIL2BAN_LOG, "r")): | |
if linenum == 0: | |
# First line, does it match the MD5 of our existing line | |
md5hash = hashlib.md5() | |
md5hash.update(line.encode()) | |
md5 = md5hash.hexdigest() | |
logger.debug( | |
"First line MD5: was %s now %s", logfile_firstlinemd5, md5 | |
) | |
if md5 != logfile_firstlinemd5: | |
# MD5 has changed, start at the beginning! | |
logger.info( | |
"Log file newly rolled over, starting at the beginning." | |
) | |
logfile_filelinepos = 0 | |
logfile_firstlinemd5 = md5 | |
else: | |
logger.info( | |
"Log file continues, seeking to position %d.", | |
logfile_filelinepos, | |
) | |
if linenum < logfile_filelinepos: | |
# We've seen this, carry on! | |
continue | |
match = LOG_FORMAT_RE.match(line) | |
if not match: | |
logger.debug("Not matching: %r", line) | |
continue | |
try: | |
(jail, ip_str, ts_str) = match.groups() | |
if jail in IGNORE_JAILS: | |
continue | |
ts = datetime.datetime.strptime(ts_str, LOG_TIMESTAMP_FORMAT) | |
ips[ipaddress.ip_address(ip_str)] = int(ts.timestamp()) | |
except ValueError: | |
# Unparseable data, continue | |
logger.debug("Not parsed: %r", line, exc_info=1) | |
continue | |
if logfile_firstlinemd5 is None: | |
# File is empty | |
sys.exit(0) | |
# Record our position | |
logfile_lastfilepos = linenum | |
# Make a note of the IPs we just saw | |
NOW = int(time.time()) | |
for (ip, ts) in ips.items(): | |
# Did we have a count for this already? | |
count = 0 | |
for (count,) in db.execute( | |
"SELECT count FROM found_ips WHERE ip=?;", (str(ip),) | |
): | |
# Should be just one record | |
break | |
# New values | |
count += 1 | |
decrement = ts + DECREMENT_DELAY | |
db.execute( | |
""" | |
INSERT INTO found_ips | |
(ip, count, lastseen, decrement) | |
VALUES | |
(?, ?, ?, ?) | |
ON CONFLICT (ip) | |
DO UPDATE SET | |
count=?, lastseen=?, decrement=?; | |
""", | |
(str(ip), count, ts, decrement, count, ts, decrement), | |
) | |
logger.info( | |
"%s seen %d time(s), last at %s, decrement at %s", | |
ip, | |
count, | |
ts, | |
decrement, | |
) | |
# Update the last-seen record | |
db.execute( | |
""" | |
INSERT INTO lastrun | |
(path, firstlinemd5, lastfilepos, mtime) | |
VALUES | |
(?, ?, ?, ?) | |
ON CONFLICT (path) | |
DO UPDATE SET | |
firstlinemd5=?, lastfilepos=?, mtime=? | |
""", | |
( | |
FAIL2BAN_LOG, | |
logfile_firstlinemd5, | |
logfile_lastfilepos, | |
logfile_mtime_now, | |
logfile_firstlinemd5, | |
logfile_lastfilepos, | |
logfile_mtime_now, | |
), | |
) | |
logger.info( | |
"recording file %r firstline=%s, pos=%d, mtime=%s", | |
FAIL2BAN_LOG, | |
logfile_firstlinemd5, | |
logfile_lastfilepos, | |
logfile_mtime_now, | |
) | |
db.commit() | |
# Decrement those who are behaving | |
db.execute("UPDATE found_ips SET count=count-1 WHERE decrement < ?;", (NOW,)) | |
# Purge those who have continued behaving | |
db.execute("DELETE FROM found_ips WHERE count <= 0;") | |
db.commit() | |
# Coalesce misbehaving subnets | |
subnets = {} | |
for (ip_str, count, lastseen) in db.execute( | |
""" | |
SELECT ip, count, lastseen FROM found_ips; | |
""" | |
): | |
ip = ipaddress.ip_address(ip_str) | |
# IPv4 or IPv6? | |
if ip.version == 4: | |
min_length = MIN_SUBNET4_LENGTH | |
max_length = MAX_SUBNET4_LENGTH | |
else: | |
min_length = MIN_SUBNET6_LENGTH | |
max_length = MAX_SUBNET6_LENGTH | |
for length in range( | |
min_length, max_length + SUBNET_LENGTH_STEP, SUBNET_LENGTH_STEP | |
): | |
# strict=False → ignore host bits | |
subnet = ipaddress.ip_network((ip, length), strict=False) | |
try: | |
(subnet_count, subnet_lastseen) = subnets[subnet] | |
except KeyError: | |
subnet_count = 0 | |
subnet_lastseen = 0 | |
subnet_count += count | |
subnet_lastseen = max(subnet_lastseen, lastseen) | |
logger.debug( | |
"%s seen %d time(s) last at %s", | |
subnet, | |
subnet_count, | |
subnet_lastseen, | |
) | |
subnets[subnet] = (subnet_count, subnet_lastseen) | |
# Figure out who of these are misbehaving | |
subnet_expiry = NOW - SUBNET_BAN_EXPIRY | |
misbehaving = set() | |
for (subnet, (count, lastseen)) in subnets.items(): | |
# IPv4 or IPv6? | |
if ip.version == 4: | |
max_length = MAX_SUBNET4_LENGTH | |
else: | |
max_length = MAX_SUBNET6_LENGTH | |
# Size factor for the subnet | |
size = round((max_length - subnet.prefixlen) / SUBNET_LENGTH_STEP) | |
logger.debug("%s is size %d", subnet, size) | |
# Compute number of strikes | |
strikes = SUBNET_STRIKES + round(pow(SUBNET_STRIKES_SIZE_FACTOR, size)) | |
logger.debug("will accept %d strikes, has %d strikes", strikes, count) | |
if (count >= strikes) and (lastseen > subnet_expiry): | |
# Still misbehaving, don't bother looking at smaller subnets | |
logger.info( | |
"%s is misbehaving, seen %d time(s), last at %s", | |
subnet, | |
count, | |
lastseen, | |
) | |
misbehaving.add(subnet) | |
# Reduce overlapping subnets to the smallest possible set | |
done = False | |
while not done: | |
done = True | |
logger.debug( | |
"Cleaning up overlapping subnets: %d subnet(s)", len(misbehaving) | |
) | |
for subnet in list(misbehaving): | |
children = [ | |
s | |
for s in misbehaving | |
if (subnet is not s) and subnet.supernet_of(s) | |
] | |
num_children = len(children) | |
if num_children == 1: | |
# Keep the child, ditch the parent | |
logger.debug("Keeping %s in favour of %s", children[0], subnet) | |
misbehaving.discard(subnet) | |
done = False | |
break | |
elif num_children > 1: | |
# Prefer the parent since it's clear _multiple_ addresses in this | |
# subnet are behaving badly. | |
logger.debug( | |
"Keeping %s in favour of %s", | |
subnet, | |
", ".join([str(s) for s in children]), | |
) | |
for s in children: | |
misbehaving.discard(s) | |
done = False | |
break | |
if misbehaving: | |
logger.info( | |
"All misbehaving subnets: %s", | |
", ".join([str(s) for s in misbehaving]), | |
) | |
for subnet in misbehaving: | |
syslog.syslog( | |
syslog.LOG_NOTICE | syslog.LOG_DAEMON, | |
"%s is currently misbehaving" % subnet, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment