Skip to content

Instantly share code, notes, and snippets.

@meramsey
Last active September 1, 2021 12:41
Show Gist options
  • Save meramsey/b4a2ab23b7674da3123b672885943ea2 to your computer and use it in GitHub Desktop.
Save meramsey/b4a2ab23b7674da3123b672885943ea2 to your computer and use it in GitHub Desktop.
Automated Accesslog Alerts with optional Cloudflare API Firewall rules enabling
#!/usr/bin/python3
import sys
import time
import os
import platform
import re
import urllib.request
from datetime import date, timedelta
from datetime import datetime
import collections
from collections import defaultdict
import requests
import json
import logging
try:
import configparser # py3
except ImportError:
import ConfigParser as configparser # py2
# Define log patterns based on host.
# /etc/nginx/nginx.conf
# # 10.10.84.3 - - [05/Aug/2021:14:49:45 -0400] "POST /dologin.php HTTP/1.1" 302 5 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" "176.103.45.63" "176.103.45.63" "clientarea.php?incorrect=true" 16.785 0.125 .
# log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for" "$http_cf_connecting_ip" "$sent_http_location" ' '$request_time $upstream_response_time $pipe';
logpats = r'(\S+) (\S+) (\S+) \[(.*?)\] "(\S+) (\S+) (\S+)" (\S+) (\S+) "(\S+)" "(.*)" "(\S+)" "(\S+)" "(\S+)"'
# Column name matching
colnames = (
"remote_addr",
"referrer",
"remote_user",
"time_local",
"method",
"request",
"proto",
"status",
"body_bytes_sent",
"http_referer",
"http_user_agent",
"http_x_forwarded_for",
"http_cf_connecting_ip",
"sent_http_location",
"request_time",
"upstream_response_time",
"pipe",
)
access_log_name = "access_log"
error_log_name = "error_log"
zone_identifier = ""
# Get hostname so we know if were live or dev.
machine_nicename = str(platform.node())
print("Machine Nicename: " + machine_nicename)
if "somedevhostname" in str(platform.node()):
default_hourly_threshold = 10
default_minute_threshold = 5
else:
default_hourly_threshold = 1000
default_minute_threshold = 500
####### Begin Config ########
# scripts/access_log_attack_mitigator.py
current_script_path = os.path.abspath(os.path.dirname(sys.argv[0]))
# Get base directory up one directory from scripts
base_directory = os.path.dirname(current_script_path)
# /home/whmcs/whmcs/logs/access_log or relative path logs/access_log
access_logfile = os.path.join(base_directory, "logs", access_log_name)
error_logfile = os.path.join(base_directory, "logs", error_log_name)
# print(current_script_path)
# print(base_directory)
logname = 'access_log_attack_mitigator'
print("Script path: ", current_script_path)
print("Access Log: ", access_logfile)
print("Error Log: ", error_logfile)
output_log_path = os.path.join(base_directory, "logs", logname)
logging.basicConfig(filename=output_log_path,
filemode='a',
format='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%M-%d:%H:%M:%S',
level=logging.DEBUG)
logging.info("Running Accesslog Attack Mitigator!")
logger = logging.getLogger('access_log_attack_mitigator')
####### End Config ########
def read_configs(config_paths, config_dict):
"""Read a config file from filesystem
:param config_paths: A list of config file paths.
:type config_paths: list
:param config_dict: A Config dictionary profile.
:type config_dict: dict
:return: Config profile dictionary
:rtype: dict
"""
# We return all these values
config = config_dict
profile = config['profile']
# grab values from config files
cp = configparser.ConfigParser()
try:
cp.read(config_paths)
except Exception as e:
raise Exception("%s: configuration file error" % profile)
if len(cp.sections()) > 0:
# we have a configuration file - lets use it
try:
# grab the section - as we will use it for all values
section = cp[profile]
except Exception as e:
# however section name is missing - this is an error
raise Exception("%s: configuration section missing" % profile)
for option in list(config.keys()):
if option not in config or config[option] is None:
try:
config[option] = re.sub(r"\s+", '', section.get(option))
if config[option] == '':
config.pop(option)
except (configparser.NoOptionError, configparser.NoSectionError):
pass
except Exception as e:
pass
# remove blank entries
for x in sorted(config.keys()):
if config[x] is None or config[x] == '':
try:
config.pop(x)
except:
pass
return config
slack_config_paths = ['.slack.cfg', os.path.expanduser('~/.slack.cfg'), os.path.expanduser('~/.slack/slack.cfg')]
slack_config_dict = {'slack_token': os.getenv('SLACK_TOKEN'),
'slack_channel': os.getenv('SLACK_CHANNEL'),
'slack_icon_emoji': os.getenv('SLACK_ICON_EMOJI'),
'slack_user_name': os.getenv('SLACK_USER_NAME'),
'slack_webhook_url': os.getenv('SLACK_WEBHOOK_URL'),
'profile': 'SLACK'}
"""
To setup config securely:
nano ~/.slack.cfg
"""
# Then enter the desired below with your details
"""
[SLACK]
slack_token =
slack_channel = devalerts
slack_icon_emoji =
slack_user_name =
slack_webhook_url =
"""
# Read config file if it exists and override the above
slack_profile = read_configs(slack_config_paths, slack_config_dict)
slack_token = None
slack_channel = None
slack_icon_emoji = None
slack_user_name = None
slack_webhook_url = None
if 'slack_token' in slack_profile:
slack_token = slack_profile['slack_token']
if 'slack_channel' in slack_profile:
slack_channel = slack_profile['slack_channel']
if 'slack_icon_emoji' in slack_profile:
slack_icon_emoji = slack_profile['slack_icon_emoji']
if 'slack_user_name' in slack_profile:
slack_user_name = slack_profile['slack_user_name']
if 'slack_webhook_url' in slack_profile:
slack_webhook_url = slack_profile['slack_webhook_url']
"""
To setup config securely:
mkdir -p ~/.cloudflare/
nano ~/.cloudflare/cloudflare.cfg
"""
# Then enter the desired below like outlined:
# https://github.com/cloudflare/python-cloudflare#using-configuration-file-to-store-email-and-keys
# certtoken and extras can be empty or undefined as were not using them currently now.
# zone_identifier is per domain in CF dashboard
# cloudflare_auto_mitigation defines whether you want the autocreated rule to be active after creation. It defaults even if unset to False.
"""
[CloudFlare]
email = [email protected]
token =
certtoken =
zone_identifier =
cloudflare_auto_mitigation = False
"""
cf_config_paths = [
'.cloudflare.cfg',
os.path.expanduser('~/.cloudflare.cfg'),
os.path.expanduser('~/.cloudflare/cloudflare.cfg')
]
cf_config_dict = {'email': os.getenv('CF_API_EMAIL'),
'token': os.getenv('CF_API_KEY'),
'certtoken': os.getenv('CF_API_CERTKEY'),
'zone_identifier': os.getenv('CF_ZONE_ID'),
'cloudflare_auto_mitigation': os.getenv('CF_AUTO_MITIGATION'),
'profile': 'CloudFlare'}
BASE_URL = 'https://api.cloudflare.com/client/v4'
# Setup defaults
x_auth_email = None
x_auth_key = None
zone_identifier = None
certtoken = None
cloudflare_auto_mitigation = False
base_url = BASE_URL
def cf_config_read():
global x_auth_email, x_auth_key, zone_identifier, certtoken, cloudflare_auto_mitigation, base_url
# Read config file if it exists and override the above
cf_profile = read_configs(cf_config_paths, cf_config_dict)
if 'email' in cf_profile:
x_auth_email = cf_profile['email']
if 'token' in cf_profile:
x_auth_key = cf_profile['token']
if 'zone_identifier' in cf_profile:
zone_identifier = cf_profile['zone_identifier']
if 'certtoken' in cf_profile:
certtoken = cf_profile['certtoken']
if 'cloudflare_auto_mitigation' in cf_profile:
cloudflare_auto_mitigation = cf_profile['cloudflare_auto_mitigation']
if 'base_url' in cf_profile:
base_url = cf_profile['base_url']
cf_config_read()
# Initialize useful globals
headers = {
"X-Auth-Email": x_auth_email,
"X-Auth-Key": x_auth_key,
"Content-Type": "application/json",
}
script_start_time = datetime.now()
logpat = re.compile(logpats)
# default dictionary: https://book.pythontips.com/en/testing/collections.html#defaultdict
tree = lambda: defaultdict(tree)
hits = tree()
# Here we will be keeping a running tally of currently mitigated urls tally.
mitigated_urls = tree()
# Here we will track new unique hits which we may not want to auto enable mitigations for but just be alerted of
unmitigated_urls_notifications = tree()
# Some functions
# Functions for printing colored text out: Source: https://stackoverflow.com/a/34443116/1621381
def black(text):
print('\033[30m', text, '\033[0m', sep='')
def red(text):
print('\033[31m', text, '\033[0m', sep='')
def green(text):
print('\033[32m', text, '\033[0m', sep='')
def yellow(text):
print('\033[33m', text, '\033[0m', sep='')
def blue(text):
print('\033[34m', text, '\033[0m', sep='')
def magenta(text):
print('\033[35m', text, '\033[0m', sep='')
def cyan(text):
print('\033[36m', text, '\033[0m', sep='')
def gray(text):
print('\033[90m', text, '\033[0m', sep='')
# Some slackbot post functions
def post_message_to_slack_channel(text, channel=None, blocks=None):
if channel is None:
channel = slack_channel
return requests.post('https://slack.com/api/chat.postMessage', {
'token': slack_token,
'channel': channel,
'text': text,
'icon_emoji': slack_icon_emoji,
'username': slack_user_name,
'blocks': json.dumps(blocks) if blocks else None
}).json()
def post_file_to_slack(text, file_name, file_bytes, channel=None, file_type=None, title=None):
if channel is None:
channel = slack_channel
return requests.post(
'https://slack.com/api/files.upload',
{
'token': slack_token,
'filename': file_name,
'channels': channel,
'filetype': file_type,
'initial_comment': text,
'title': title
},
files={'file': file_bytes}).json()
def post_message_to_slack_webhook(message, webhook_url=None):
if webhook_url is None:
webhook_url = slack_webhook_url
slack_data = json.dumps({'text': message})
response = requests.post(
webhook_url, data=slack_data,
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
def post_message_to_slack(message, endpoint=None, blocks=None):
# Wrapper so posting to slack can be done based on populated globals whether using slack webhook or channel with token
if slack_webhook_url is None:
post_message_to_slack_channel(message, endpoint, blocks)
if slack_channel is None and slack_token is None:
post_message_to_slack_webhook(message, endpoint)
# post_message_to_slack_webhook('Testing devalerts slack from python. Ignore me')
def get_remote_file_data(file):
response = urllib.request.urlopen(file)
data = response.read()
filename = response.info().get_filename()
data_meta = {'data': data, 'filename': filename}
return data_meta
def field_map(dictseq, name, func):
for d in dictseq:
d[name] = func(d[name])
yield d
def get_datetime_object_from_string(str):
return datetime.strptime(str, "%d/%b/%Y:%H:%M:%S %z")
def keyfunction(k):
"""Create a function which returns the value of a dictionary
:param k: key
:type k: (Iterable, None, bool)
:return: dictionary key's value
:rtype: dict[value]
"""
return d[k]
def access_log(lines):
"""Parse Access Log Line
:param lines: Access Log Lines to parse
:type lines: Generator[Any, Any, None]
:return: Returns dictionary of parsed lines
:rtype: dict
"""
groups = (logpat.match(line) for line in lines)
tuples = (g.groups() for g in groups if g)
log = (dict(zip(colnames, t)) for t in tuples)
# Change Access log items into Python types
log = field_map(log, 'status', int)
# Convert the timestamp into a datetime object
log = field_map(log, 'time_local', get_datetime_object_from_string)
# Some dashes become None
log = field_map(log, 'remote_user', lambda s: s if s != "-" else None)
log = field_map(log, 'http_referer', lambda s: s if s != "-" else None)
log = field_map(log, 'referrer', lambda s: s if s != "-" else None)
log = field_map(log, 'http_user_agent', lambda s: s if s != "-" else None)
# The size/body_bytes_sent dash becomes 0
log = field_map(log, 'body_bytes_sent', lambda s: int(s) if s != "-" else 0)
return log
def print_sorted_dict_top(d, top):
print("============================================")
for key in sorted(d, key=keyfunction, reverse=True)[:top]:
print(" %5d %s" % (d[key], key))
print(" %5d total hits" % sum(dict.values(d)))
print("============================================")
def tail_file(file):
"""generator function that yields new lines in a file
:param file:File Path as a string
:type file: str
:rtype: object
"""
seek_end = True
while True: # handle moved/truncated files by allowing to reopen
with open(file) as f:
if seek_end: # reopened files must not seek end
f.seek(0, 2)
while True: # line reading loop
line = f.readline()
if not line:
try:
if f.tell() > os.path.getsize(file):
# rotation occurred (copytruncate/create)
f.close()
seek_end = False
break
except FileNotFoundError:
# rotation occurred but new file still not created
pass # wait 1 second and retry
time.sleep(1)
yield line
def purge_old_stats():
"""
This function will compare the current time and day and then delete the nested keys in the global hits dictionary.
This ensures the hits kept in memory is low and self prunes.
"""
# logging.debug('purge_old_stats called.')
current_time = datetime.now()
current_day_hour = current_time.strftime("%Y-%m-%d:%H")
current_minute = current_time.strftime("%Y-%m-%d:%H:%M")
current_day_of = current_time.strftime("%Y-%m-%d")
for day in list(hits.keys()):
# Let's prune old days keys from dictionary to prevent excessive usage
if day != current_day_of:
del hits[day]
for hour in list(hits[day].keys()):
# Let's prune old hours keys from dictionary to prevent excessive usage
if hour != current_day_hour:
del hits[day][hour]
def check_date_past_interval(check_date, interval=None):
"""Return True if given check_date is past the interval from current time.
:param check_date: Datetime string like '2021-08-23 18:14:05'
:type check_date: str
:param interval: Hours amount to check in past.
:type interval: int
:return: Returns True if date exceeds interval.
:rtype: bool
"""
from datetime import datetime, time, timedelta
if interval is None:
interval = 1
date = datetime.strptime(str(check_date), '%Y-%m-%d %H:%M:%S')
# date = check_date
past = datetime.now() - timedelta(hours=interval)
# Change period to period timedelta range provided as needed.
period = 'hours'
if past > date:
print(f"This is older than {interval} {period}")
return True
elif past > date:
return False
else:
return False
def cf_list_firewall_rules_by_zone(zone_identifier):
# Cloudflare list filters for zoneid
# https://api.cloudflare.com/#firewall-rules-list-of-firewall-rules
url = f"{BASE_URL}/zones/{zone_identifier}/firewall/rules"
response = requests.get(url, headers=headers).text
json_response_dict = json.loads(str(response))
# print(json.dumps(filters_json_response_dict, indent=4))
return json_response_dict
def cf_list_filters_by_zone(zone_identifier):
# Cloudflare list filters for zoneid
# https://developers.cloudflare.com/firewall/api/cf-filters/get/#get-all-filters
url = f"{BASE_URL}/zones/{zone_identifier}/filters"
response = requests.get(url, headers=headers).text
json_response_dict = json.loads(str(response))
# print(json.dumps(filters_json_response_dict, indent=4))
return json_response_dict
def cf_create_filter(uri, expression=None, description=None):
# Cloudflare create filter for use in firewall rule below
# https://developers.cloudflare.com/firewall/api/cf-filters/post
url = f"{BASE_URL}/zones/{zone_identifier}/filters"
# Lets strip query parameters for base uri path
uri = uri.split("?")[0]
if expression is None:
expression = '(http.request.uri contains \"' + uri + '\")'
if description is None:
description = f'Rule for {uri} created {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
# Here we serialize and wrap the dictionary with brackets cause CF API is lame. :
# https://community.cloudflare.com/t/whats-the-problem-with-firewallrules-api-malformed-request-body/109433/2
payload = json.dumps([{
"expression": expression,
"description": description
}])
response = requests.post(url, headers=headers, data=payload).text
response_dict = json.loads(str(response))
# print(json.dumps(response_dict, indent=4))
# print(response_dict)
return response_dict
def cloudflare_toggle_mitigation_for_uri(uri, action, paused=True, id=None, filter_id=None, description=None, ref=None):
"""Toggle Cloudflare mitigation for an uri.
# https://api.cloudflare.com/#firewall-rules-create-firewall-rules
:param uri: Uri path
:type uri: str
:param action: The action to apply to a matched request. Actions allowed: block, challenge, js_challenge, allow, log, bypass
:type action: str
:param paused: Whether this filter is currently paused
:type paused: bool
:param id: for Rule
:type id: str Firewall Rule identifier
:param filter_id: Filter identifier
:type filter_id: str
:param description: A note that you can use to describe the purpose of the filter
:type description: str
:param ref: Short reference tag to quickly select related rules.
:type ref: str
:return: Returns dict of the cloudflare API response for the rule
:rtype: dict
"""
if description is None:
description = (
f'Rule for {uri} created {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
)
if ref is None:
ref = "None"
if filter_id is None:
filters_json_response_dict = cf_create_filter(uri)
if not filters_json_response_dict["success"]:
filter_id = filters_json_response_dict['errors'][0]["meta"]["id"]
elif filters_json_response_dict["result"][0]["id"]:
filter_id = filters_json_response_dict["result"][0]["id"]
else:
return filters_json_response_dict
payload = json.dumps([
{
"filter": {
"id": filter_id
},
"action": action,
"paused": paused,
"description": description,
}
])
url = f"{BASE_URL}/zones/{zone_identifier}/firewall/rules"
json_response = requests.post(url, data=payload, headers=headers).text
json_response_dict = json.loads(str(json_response))
# print(json.dumps(json_response_dict, indent=4))
return json_response_dict
def check_notification_url(url):
"""Check Notification has already been sent for url in unmitigated_urls
Args:
url (str): Url path to check
Returns:
bool: Returns True if notification already sent otherwise False.
"""
# If key does not exist
if not unmitigated_urls_notifications[url]:
unmitigated_urls_notifications[url]["Active"] = False
return False
# If key exists lets see if its True indicating mitigation already is enabled
if unmitigated_urls_notifications[url]["Active"] is True:
return True
elif unmitigated_urls_notifications[url]["Active"] is False:
return False
else:
logging.error(f'check_notification_url failed for an unknown reason for {url}')
def apply_mitigation(url, threshold, period):
# Put this here so we can toggle stuff in configs and its read again before taking actions.
# This prevents us from needing to stop and restart the script to make changes such as auto_mitigation_enabled.
cf_config_read()
message = f'Accesslog threshold {threshold} hit for {url} in {period}'
logging.debug(message)
red(message)
# If key does not exist
if not mitigated_urls[url]:
# If not already mitigated aka set to true
print("Creating key: ", url)
mitigated_urls[url]["Active"] = False
# If key exists lets see if its True indicating mitigation already is enabled
if mitigated_urls[url]["Active"] is False:
message = f'Accesslog threshold {threshold} hit for {url} in {period}. ' \
f'Attack Mitigation not enabled for: {url}. Attempting Mitigation now for: {url} '
post_message_to_slack(message)
logging.info(message)
print("Attack Mitigation not enabled for: ", url)
# Add url to track in mitigated_urls
print("Attempting Mitigation for: ", url)
# Lets only explicitly enable rules if environmental variable is enabled
if cloudflare_auto_mitigation == True and x_auth_key is not None:
message = f'As cloudflare_auto_mitigation is set to true creating and enabling rule for: {url}'
green(message)
logging.info(message)
post_message_to_slack(message)
cloudflare_response = cloudflare_toggle_mitigation_for_uri(url, "js_challenge", paused=False)
# print(cloudflare_response)
else:
if x_auth_key is not None:
message = f'As cloudflare_auto_mitigation is false creating but NOT enabling rule for: {url}. ' \
f'Please Login to Cloudflare Dashboard and enable rule manually if desired.'
yellow(message)
logging.info(message)
post_message_to_slack(message)
cloudflare_response = cloudflare_toggle_mitigation_for_uri(url, "js_challenge", paused=True)
else:
# Let's ensure that if no API key is set we can skip trying to hit the api which will fail.
cloudflare_response = {"success": False,
"errors": 'No API key is set'}
# If successfully added
if cloudflare_response["success"]:
# Let's set this uri as actively under mitigation
mitigated_urls[url]["Active"] = True
# Record time
mitigated_urls[url]["Start"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Record Cloudflare rule id for later use to toggle off
mitigated_urls[url]["FirewallID"] = cloudflare_response["result"][0]["id"]
message = f'Mitigation {cloudflare_response["result"][0]["id"]} enabled for attacks against {url}'
green(message)
logging.info(message)
post_message_to_slack(message)
elif not cloudflare_response["success"]:
# print(cloudflare_response)
if cloudflare_response["errors"]:
message = f'Mitigation failed to enable for attacks against {url} due to {cloudflare_response["errors"]}'
else:
message = f'Mitigation failed to enable for attacks against {url} due to: ' + str(cloudflare_response)
red(message)
logging.info(message)
post_message_to_slack(message)
else:
message = f'Mitigation failed to enable for attacks against {url} due to Unknown Error condition'
red(message)
logging.info(message)
post_message_to_slack(message)
elif mitigated_urls[url]["Active"] == True:
message = f'Mitigation already enabled for attacks against {url} since: {mitigated_urls[url]["Start"]}'
green(message)
logging.debug(message)
# Let's set mitigation as active so we don't get followup messages for each hit after threshold till hour resets
mitigated_urls[url]["Active"] = True
mitigated_urls[url]["Start"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
mitigated_urls[url]["Stop"] = None
# Let's only worry about setting up rules for known rules which match these common ones
urls_of_interest = ['/cart.php', '/dologin.php', '/clientarea.php', '/clientarea.php?incorrect=true']
# For future use:
thresholds = {'/cart.php': 500,
'/dologin.php': 250,
'/clientarea.php': 1000,
'/clientarea.php?incorrect=true': 250}
if __name__ == "__main__":
# loglines = tail_file(access_logfile)
loglines = access_log(tail_file(access_logfile))
# iterate over the generator
for line in loglines:
# print(line)
# Lines are an associated array dictionary like below.
# {'remote_addr': '10.10.84.3', 'referrer': None, 'remote_user': None, 'time_local': datetime.datetime(2021, 8, 25, 8, 37, 10, tzinfo=datetime.timezone(datetime.timedelta(-1, 72000))), 'method': 'GET', 'request': '/dologin.php', 'proto': 'HTTP/2.0', 'status': 302, 'body_bytes_sent': 5, 'http_referer': '-', 'http_user_agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36', 'http_x_forwarded_for': '176.103.45.63', 'http_cf_connecting_ip': '176.103.45.63', 'sent_http_location': 'clientarea.php?incorrect=true'}
day_time = line["time_local"].strftime("%Y-%m-%d:%H:%M:%S")
day_minute = line["time_local"].strftime("%Y-%m-%d:%H:%M")
day_hour = line["time_local"].strftime("%Y-%m-%d:%H")
day_of = line["time_local"].strftime("%Y-%m-%d")
# Loops through all logline keys and initializes hit counters for all keys and unique values
for item in line.items():
key, value = item
# print(key, value)
if value is not None and value != '-':
# the hour
if hits[day_of][day_hour][key][value]:
hits[day_of][day_hour][key][value] += 1
else:
hits[day_of][day_hour][key][value] = 1
# the minute
if hits[day_of][day_minute][key][value]:
hits[day_of][day_minute][key][value] += 1
else:
hits[day_of][day_minute][key][value] = 1
# Lets now loop through hour stats for each logline key and see top hits
for item in hits[day_of][day_hour].items():
key, value = item
# Filter the console output to the more useful fields for live viewing pleasure.
if key in ['request', 'remote_addr', 'http_user_agent', 'http_referer']:
print("")
print(f'Top hits for {key} for {day_hour}')
d = hits[day_of][day_hour][key]
print_sorted_dict_top(d, 5)
url = line["request"]
# Custom defined rules for automitigation or alerting
# Set hourly hit thresholds for known urls
if hits[day_of][day_hour]["request"][url] >= default_hourly_threshold and url in urls_of_interest:
message = f'Accesslog threshold {default_hourly_threshold} hit for {url} in last hour'
# logging.info(message)
# red(message)
apply_mitigation(url, default_hourly_threshold, 'hour')
# Set minute hit thresholds for known urls
if hits[day_of][day_minute]["request"][url] >= default_minute_threshold and url in urls_of_interest:
message = f'Accesslog threshold {default_minute_threshold} hit for {url} in minute'
# logging.info(message)
# red(message)
apply_mitigation(url, default_minute_threshold, 'minute')
# Set hour hit thresholds for urls not in urls of interest
if hits[day_of][day_hour]["request"][url] >= default_hourly_threshold and url not in urls_of_interest:
message = f'Accesslog threshold {default_hourly_threshold} hit for {url} in hour.' \
f'Url is not in whitelist to mitigate. ' \
f'Please add to urls of interest if this is not a false positive'
red(message)
logging.info(message)
# Need to add some logic so this alerts only once per period when enabled
# This is so were notified of unknown urls being targeted and can know to enable a rule.
if not check_notification_url(url):
logging.debug('Check notification url was false')
unmitigated_urls_notifications[url]["Active"] = True
unmitigated_urls_notifications[url]["Start"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
post_message_to_slack(message)
# This will check and prune old hours or days stats when day roll's over and next line iteration hits this.
purge_old_stats()
active_mitigations = [x for x in mitigated_urls.keys() if mitigated_urls[x]["Active"]]
inactive_mitigations = [x for x in mitigated_urls.keys() if not mitigated_urls[x]["Active"]]
print()
print('Active Mitigations:')
for mitigation in active_mitigations:
message = f'Url: {mitigation} active since: {mitigated_urls[mitigation]["Start"]}'
green(message)
logging.info(f'Active Mitigation: {message}')
print('============================================================')
print()
print('Previously active Mitigations:')
for mitigation in inactive_mitigations:
message = f'Url: {mitigation} last active : {mitigated_urls[mitigation]["Stop"]}'
yellow(message)
logging.info(f'Previously Active Mitigation: {message}')
print('============================================================')
print()
print('Checking if active mitigations should be disabled')
for mitigation in active_mitigations:
green(f'Url: {mitigation} active since: {mitigated_urls[mitigation]["Start"]}')
# Lets disable previously active mitigations after .5 hrs aka 30 mins if not already set with a Stop time
if check_date_past_interval(mitigated_urls[mitigation]["Start"], .5) and mitigated_urls[mitigation]["Stop"] is None:
message = f'Disabling mitigation for: {mitigation} due to expiration.'
yellow(message)
logging.info(message)
if cloudflare_auto_mitigation is True and x_auth_key is not None:
# Let's only try to disable something we have a rule for.
if mitigated_urls[mitigation]["FirewallID"]:
cf_response = cloudflare_toggle_mitigation_for_uri(mitigation, 'log', paused=True,
id=mitigated_urls[mitigation]["FirewallID"])
message = message + str(cf_response)
mitigated_urls[mitigation]["Active"] = False
mitigated_urls[mitigation]["Stop"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print()
logging.debug(message)
post_message_to_slack(message)
print('============================================================')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment