Skip to content

Instantly share code, notes, and snippets.

@jonaslejon
Last active March 28, 2025 14:04
Show Gist options
  • Save jonaslejon/8919731d05cb6c2f5f0ae4ed329b4c2f to your computer and use it in GitHub Desktop.
Save jonaslejon/8919731d05cb6c2f5f0ae4ed329b4c2f to your computer and use it in GitHub Desktop.
Rapberry Pi sensor IDS healthchecks for tcpdump, zeek, suricata and disk space
#!/usr/bin/python
""" Runs on the Rapberry Pi sensor IDS and alerts if the disk space is low or if the tcpdump process is not running """
import os
import shutil
import requests
import psutil
from dotenv import load_dotenv
from retry import retry
import argparse
# Load environment variables from .env file
load_dotenv()
# Read the API key, UUID, and Pushover keys from the .env file
MAILGUN_API_KEY = os.getenv('MAILGUN_API_KEY')
HEALTHCHECKS_UUID = os.getenv('HEALTHCHECKS_UUID')
PUSHOVER_USER_KEY = os.getenv('PUSHOVER_USER_KEY')
PUSHOVER_API_TOKEN = os.getenv('PUSHOVER_API_TOKEN')
MAILGUN_DOMAIN = os.getenv('MAILGUN_DOMAIN')
ALERT_EMAIL = os.getenv('ALERT_EMAIL')
def check_disk_space(partition='/data'):
if not os.path.exists(partition):
raise FileNotFoundError(f"Partition '{partition}' does not exist. Exiting and sending alert.")
total, used, free = shutil.disk_usage(partition)
free_percent = (free / total) * 100
return free_percent
def check_all_processes_running(process_names):
print(f"Checking if all these processes are running: {', '.join(process_names)}...")
running_processes = set()
# Iterate through running processes
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] in process_names:
running_processes.add(proc.info['name'])
# Check if all specified processes are found
missing_processes = set(process_names) - running_processes
if not missing_processes:
print(f"All specified processes ({', '.join(process_names)}) are running.")
return True
else:
print(f"The following processes are not running: {', '.join(missing_processes)}")
return False
def send_mailgun_alert(subject, message):
if MAILGUN_API_KEY is None:
print("Mailgun API key not found. Please check your .env file.")
return
print(f"Sending alert email to {ALERT_EMAIL} via Mailgun EU endpoint...")
response = requests.post(
f"https://api.eu.mailgun.net/v3/{MAILGUN_DOMAIN}/messages", # EU API endpoint
auth=("api", MAILGUN_API_KEY),
data={
"from": f"Disk Monitor <mailgun@{MAILGUN_DOMAIN}>",
"to": ALERT_EMAIL,
"subject": subject,
"text": message
}
)
if response.status_code == 200:
print("Alert sent successfully via Mailgun.")
else:
print("Failed to send alert via Mailgun. Response:", response.text)
def send_pushover_alert(message):
if PUSHOVER_USER_KEY is None or PUSHOVER_API_TOKEN is None:
print("Pushover keys not found. Please check your .env file.")
return
print("Sending alert via Pushover...")
response = requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": PUSHOVER_API_TOKEN,
"user": PUSHOVER_USER_KEY,
"message": message,
"title": "Alert Notification"
}
)
if response.status_code == 200:
print("Alert sent successfully via Pushover.")
else:
print("Failed to send alert via Pushover. Response:", response.text)
@retry(tries=3, delay=5)
def ping_healthchecks():
if HEALTHCHECKS_UUID is None:
print("Healthchecks UUID not found. Please check your .env file.")
return
print("Pinging Healthchecks.io...")
response = requests.get(f"https://hc-ping.com/{HEALTHCHECKS_UUID}")
if response.status_code == 200 and response.text == "OK":
print("Ping successful.")
else:
raise Exception("Failed to ping Healthchecks.io. Retrying...")
def main():
parser = argparse.ArgumentParser(description='Disk and process monitoring script.')
parser.add_argument('--test-mailgun', action='store_true', help='Send a test email via Mailgun.')
parser.add_argument('--test-pushover', action='store_true', help='Send a test notification via Pushover.')
args = parser.parse_args()
# Handle test arguments
if args.test_mailgun:
send_mailgun_alert("Test Alert", "This is a test email from the monitoring script.")
return
if args.test_pushover:
send_pushover_alert("This is a test notification from the monitoring script.")
return
# Perform regular monitoring tasks
try:
free_percent = check_disk_space()
except FileNotFoundError as e:
print(e)
send_mailgun_alert("Disk Space Alert", str(e))
send_pushover_alert(str(e))
return
except Exception as e:
print(f"Error checking disk space: {e}")
send_mailgun_alert("Disk Space Alert", f"Error checking disk space: {e}")
send_pushover_alert(f"Error checking disk space: {e}")
else:
# Only print the free disk space if no exceptions were raised
print(f"Free disk space on /data: {free_percent:.2f}%")
if free_percent < 10:
send_mailgun_alert("Disk Space Alert", f"Warning! The /data partition is running low on disk space: {free_percent:.2f}% free.")
send_pushover_alert(f"Warning! The /data partition is running low on disk space: {free_percent:.2f}% free.")
# Check if tcpdump process is running
process_list = ['tcpdump', 'zeek', 'Suricata-Main']
if not check_all_processes_running(process_list):
send_mailgun_alert("tcpdump, suricata or zeek Process Alert", "tcpdump or zeek process is not running!")
send_pushover_alert("tcpdump, suricata or zeek process is not running!")
# Always ping Healthchecks.io in the end
try:
ping_healthchecks()
except Exception as e:
print(f"Final attempt to ping Healthchecks.io failed: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment