Last active
March 28, 2025 14:04
-
-
Save jonaslejon/8919731d05cb6c2f5f0ae4ed329b4c2f to your computer and use it in GitHub Desktop.
Rapberry Pi sensor IDS healthchecks for tcpdump, zeek, suricata and disk space
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" Runs on the Rapberry Pi sensor IDS and alerts if the disk space is low or if the tcpdump process is not running """ | |
import os | |
import shutil | |
import requests | |
import psutil | |
from dotenv import load_dotenv | |
from retry import retry | |
import argparse | |
# Load environment variables from .env file | |
load_dotenv() | |
# Read the API key, UUID, and Pushover keys from the .env file | |
MAILGUN_API_KEY = os.getenv('MAILGUN_API_KEY') | |
HEALTHCHECKS_UUID = os.getenv('HEALTHCHECKS_UUID') | |
PUSHOVER_USER_KEY = os.getenv('PUSHOVER_USER_KEY') | |
PUSHOVER_API_TOKEN = os.getenv('PUSHOVER_API_TOKEN') | |
MAILGUN_DOMAIN = os.getenv('MAILGUN_DOMAIN') | |
ALERT_EMAIL = os.getenv('ALERT_EMAIL') | |
def check_disk_space(partition='/data'): | |
if not os.path.exists(partition): | |
raise FileNotFoundError(f"Partition '{partition}' does not exist. Exiting and sending alert.") | |
total, used, free = shutil.disk_usage(partition) | |
free_percent = (free / total) * 100 | |
return free_percent | |
def check_all_processes_running(process_names): | |
print(f"Checking if all these processes are running: {', '.join(process_names)}...") | |
running_processes = set() | |
# Iterate through running processes | |
for proc in psutil.process_iter(['pid', 'name']): | |
if proc.info['name'] in process_names: | |
running_processes.add(proc.info['name']) | |
# Check if all specified processes are found | |
missing_processes = set(process_names) - running_processes | |
if not missing_processes: | |
print(f"All specified processes ({', '.join(process_names)}) are running.") | |
return True | |
else: | |
print(f"The following processes are not running: {', '.join(missing_processes)}") | |
return False | |
def send_mailgun_alert(subject, message): | |
if MAILGUN_API_KEY is None: | |
print("Mailgun API key not found. Please check your .env file.") | |
return | |
print(f"Sending alert email to {ALERT_EMAIL} via Mailgun EU endpoint...") | |
response = requests.post( | |
f"https://api.eu.mailgun.net/v3/{MAILGUN_DOMAIN}/messages", # EU API endpoint | |
auth=("api", MAILGUN_API_KEY), | |
data={ | |
"from": f"Disk Monitor <mailgun@{MAILGUN_DOMAIN}>", | |
"to": ALERT_EMAIL, | |
"subject": subject, | |
"text": message | |
} | |
) | |
if response.status_code == 200: | |
print("Alert sent successfully via Mailgun.") | |
else: | |
print("Failed to send alert via Mailgun. Response:", response.text) | |
def send_pushover_alert(message): | |
if PUSHOVER_USER_KEY is None or PUSHOVER_API_TOKEN is None: | |
print("Pushover keys not found. Please check your .env file.") | |
return | |
print("Sending alert via Pushover...") | |
response = requests.post( | |
"https://api.pushover.net/1/messages.json", | |
data={ | |
"token": PUSHOVER_API_TOKEN, | |
"user": PUSHOVER_USER_KEY, | |
"message": message, | |
"title": "Alert Notification" | |
} | |
) | |
if response.status_code == 200: | |
print("Alert sent successfully via Pushover.") | |
else: | |
print("Failed to send alert via Pushover. Response:", response.text) | |
@retry(tries=3, delay=5) | |
def ping_healthchecks(): | |
if HEALTHCHECKS_UUID is None: | |
print("Healthchecks UUID not found. Please check your .env file.") | |
return | |
print("Pinging Healthchecks.io...") | |
response = requests.get(f"https://hc-ping.com/{HEALTHCHECKS_UUID}") | |
if response.status_code == 200 and response.text == "OK": | |
print("Ping successful.") | |
else: | |
raise Exception("Failed to ping Healthchecks.io. Retrying...") | |
def main(): | |
parser = argparse.ArgumentParser(description='Disk and process monitoring script.') | |
parser.add_argument('--test-mailgun', action='store_true', help='Send a test email via Mailgun.') | |
parser.add_argument('--test-pushover', action='store_true', help='Send a test notification via Pushover.') | |
args = parser.parse_args() | |
# Handle test arguments | |
if args.test_mailgun: | |
send_mailgun_alert("Test Alert", "This is a test email from the monitoring script.") | |
return | |
if args.test_pushover: | |
send_pushover_alert("This is a test notification from the monitoring script.") | |
return | |
# Perform regular monitoring tasks | |
try: | |
free_percent = check_disk_space() | |
except FileNotFoundError as e: | |
print(e) | |
send_mailgun_alert("Disk Space Alert", str(e)) | |
send_pushover_alert(str(e)) | |
return | |
except Exception as e: | |
print(f"Error checking disk space: {e}") | |
send_mailgun_alert("Disk Space Alert", f"Error checking disk space: {e}") | |
send_pushover_alert(f"Error checking disk space: {e}") | |
else: | |
# Only print the free disk space if no exceptions were raised | |
print(f"Free disk space on /data: {free_percent:.2f}%") | |
if free_percent < 10: | |
send_mailgun_alert("Disk Space Alert", f"Warning! The /data partition is running low on disk space: {free_percent:.2f}% free.") | |
send_pushover_alert(f"Warning! The /data partition is running low on disk space: {free_percent:.2f}% free.") | |
# Check if tcpdump process is running | |
process_list = ['tcpdump', 'zeek', 'Suricata-Main'] | |
if not check_all_processes_running(process_list): | |
send_mailgun_alert("tcpdump, suricata or zeek Process Alert", "tcpdump or zeek process is not running!") | |
send_pushover_alert("tcpdump, suricata or zeek process is not running!") | |
# Always ping Healthchecks.io in the end | |
try: | |
ping_healthchecks() | |
except Exception as e: | |
print(f"Final attempt to ping Healthchecks.io failed: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment