Skip to content

Instantly share code, notes, and snippets.

@hensing
Created October 4, 2025 14:50
Show Gist options
  • Save hensing/6a1edfbeab3003c7799a25faf9055226 to your computer and use it in GitHub Desktop.
Save hensing/6a1edfbeab3003c7799a25faf9055226 to your computer and use it in GitHub Desktop.
TrueNAS: check and run overdue snapshots on boot

TrueNAS Periodic Snapshot Overdue Checker

This Python script is designed for TrueNAS CORE/SCALE systems to proactively monitor all configured Periodic Snapshot Tasks. It checks if any task is significantly overdue based on its schedule and manually triggers a run using midclt to ensure no snapshots are missed.

This is particularly useful after a system reboot, a power outage, or any prolonged downtime where the TrueNAS internal scheduler might have missed its execution window.

(c) by Henning Dickten 2025 (@hensing)

🚀 Key Features

  • Dynamic Task Handling: Fetches all active tasks using midclt call pool.snapshottask.query and does not require hardcoded IDs.
  • Overdue Logic: Compares the task's configured frequency (Daily, Weekly, Monthly) against its last successful run time to determine if it's missed its window.
  • Safe Execution: Uses midclt call pool.snapshottask.run [ID] for safe, controlled manual execution.
  • Clean Logging: Implements the Python logging module to record all checks, warnings, and errors to a dedicated log file.

🛠️ Setup and Installation

  1. Save the Script

    • SSH into your TrueNAS system.

    • Create a dedicated directory for the script:

      mkdir -p /root/scripts
      
    • Save the Python code as snapshot_check.py inside the /root/scripts/ directory.

    • Make the script executable:

      chmod +x /root/scripts/snapshot_check.py
      
  2. Configure Logging The script writes logs to /var/log/snapshot_checker.log. No manual setup is typically needed, as the script will create the file if it doesn't exist, but it's good practice to verify the location.

  3. Verification (Optional) You can run the script manually to ensure it executes without errors and checks your current tasks:

    /root/scripts/snapshot_check.py
    

⚙️ Automating the Check (TrueNAS Tasks)

To ensure continuous coverage, the script should be configured to run on startup and hourly.

  1. Run on System Startup (Post Init): This catches any snapshots missed during the system's downtime.

    • Navigate to Tasks → Init/Shutdown Scripts in the TrueNAS Web UI.
    • Click ADD.
    • Set the following:
    • Select: Command
    • Command: /root/scripts/snapshot_check.py
    • When: Post Init
    • Enabled: (Checked)
    • Click SUBMIT.
  2. Run Hourly (Cron Job) This ensures that the checker runs frequently to catch any mid-operation delays.

    • Navigate to Tasks → Cron Jobs in the TrueNAS Web UI.
    • Click ADD.
    • Set the following:
    • User: root
    • Command: /root/scripts/snapshot_check.py
    • Description: Snapshot Overdue Check (Hourly)
    • Schedule: Select Custom.
    • Minute: 0 (Runs on the hour)
    • Hour: * (Every hour)
    • Day of Month/Month/Day of Week: *
    • Hide Standard Output/Error: (Checked) — Recommended, as all output is sent to the log file.
    • Click SUBMIT.

📝 Checking Logs

All activity, including successful checks and manual executions, is logged. To monitor the script's output in real-time from the SSH shell:

tail -f /var/log/snapshot_checker.log
#!/usr/bin/python
#
# TrueNAS run overdue snapshots-script by @hensing (Henning Dickten 2025)
#
import json
import subprocess
from datetime import datetime, timedelta, timezone
import sys
import logging
import os
# --- Configuration ---
# The log file will be saved in the system log directory.
LOG_DIR = "/var/log/"
LOG_FILE = os.path.join(LOG_DIR, "snapshot_checker.log")
MIDCLT_QUERY = ["midclt", "call", "pool.snapshottask.query"]
MIDCLT_RUN = ["midclt", "call", "pool.snapshottask.run"]
# Conservative estimates for the maximum expected time between two runs.
# We add a buffer (slack) to prevent triggering due to minor cron delays.
MAX_INTERVALS = {
# If set to run hourly (e.g., at :00), allow 2 hours + slack
"HOURLY": timedelta(hours=2, minutes=15),
# If set to run daily (e.g., at 00:00), allow 2 days + slack
"DAILY": timedelta(days=2, minutes=15),
# If set to run weekly (e.g., every Sunday), allow 8 days + slack
"WEEKLY": timedelta(days=8, minutes=15),
# If set to run monthly (e.g., on the 1st), allow 32 days + slack
"MONTHLY": timedelta(days=32, minutes=15),
# Fallback for unknown/complex schedules
"DEFAULT": timedelta(hours=25)
}
# --- Logging Setup ---
def setup_logging():
"""Configures the logging system."""
# Ensure the log file is readable/writable
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout) # Optional: output to console as well
]
)
# --- Helper Functions ---
def determine_interval_type(schedule: dict) -> str:
"""Attempts to determine the task frequency (daily/weekly/monthly) from the Cron schedule."""
minute = schedule.get('minute', '*')
hour = schedule.get('hour', '*')
dom = schedule.get('dom', '*') # Day of Month
month = schedule.get('month', '*')
dow = schedule.get('dow', '*') # Day of Week
# Check for MONTHLY schedules (runs on a specific Day of Month)
# E.g., dom="1" (runs on the 1st of the month)
if dom != '*' and not any(c in dom for c in ['/', ',']):
return "MONTHLY"
# Check for WEEKLY schedules (runs on a specific Day of Week)
# E.g., dow="sun" (runs every Sunday)
if dow != '*' and not any(c in dow for c in ['/', ',']):
return "WEEKLY"
# Check for DAILY schedules (runs at a specific Hour/Minute)
if hour != '*' and minute != '*' and dom == '*' and month == '*' and dow == '*':
# Check if hour/minute are single values or simple ranges/steps
if not any(c in hour for c in ['/', ',']) and not any(c in minute for c in ['/', ',']):
return "DAILY"
# Check for HOURLY schedules (minute is specific, hour is '*')
if minute != '*' and hour == '*' and dom == '*' and month == '*' and dow == '*':
return "HOURLY"
# Default fallback for complex or undefined schedules
return "DEFAULT"
def is_overdue(task: dict, last_run_dt: datetime) -> tuple[bool, str]:
"""Checks if the task is overdue based on its expected maximum interval."""
interval_type = determine_interval_type(task['schedule'])
expected_delta = MAX_INTERVALS.get(interval_type, MAX_INTERVALS["DEFAULT"])
now_utc = datetime.now(timezone.utc)
time_since_last_run = now_utc - last_run_dt
log_msg = f"Expected interval: {expected_delta}. Actual time since last run: {time_since_last_run}."
if time_since_last_run > expected_delta:
return True, "OVERDUE. " + log_msg
else:
return False, "On schedule. " + log_msg
# --- Main Logic ---
def main():
"""Main function to query, analyze, and execute overdue tasks."""
setup_logging()
logging.info("=========================================================")
logging.info(">>> TrueNAS Snapshot Task Overdue Checker Started <<<")
logging.info("=========================================================")
try:
# 1. Query tasks
result = subprocess.run(MIDCLT_QUERY, capture_output=True, text=True, check=True)
tasks = json.loads(result.stdout)
except subprocess.CalledProcessError as e:
logging.error(f"Error querying tasks with midclt: {e.stderr}")
sys.exit(1)
except json.JSONDecodeError:
logging.error("Error parsing midclt output (not valid JSON).")
sys.exit(1)
overdue_tasks = []
# 2. Dynamically analyze tasks
for task in tasks:
task_id = task['id']
dataset = task['dataset']
is_enabled = task['enabled']
naming_schema = task['naming_schema']
log_prefix = f"[Task ID {task_id} / {dataset} / {naming_schema}]"
if not is_enabled:
logging.info(f"{log_prefix} Skipped (Disabled).")
continue
# Extract last run timestamp and status
state = task.get('state', {})
last_run_msec = state.get('datetime', {}).get('$date')
status = state.get('state')
if not last_run_msec or status != 'FINISHED':
logging.warning(f"{log_prefix} Could not determine last successful run (Status: {status}). Marking as overdue.")
overdue_tasks.append(task_id)
continue
# Convert milliseconds timestamp to a Python datetime object (UTC)
last_run_dt = datetime.fromtimestamp(last_run_msec / 1000, tz=timezone.utc)
# Check for overdue status
is_over, reason = is_overdue(task, last_run_dt)
logging.info(f"{log_prefix} Last Run: {last_run_dt.strftime('%Y-%m-%d %H:%M:%S UTC')}. Check: {reason}")
if is_over:
overdue_tasks.append(task_id)
# 3. Execute overdue tasks
if overdue_tasks:
logging.warning("="*70)
logging.warning(f"!!! {len(overdue_tasks)} overdue task(s) found. Starting manual execution: {overdue_tasks} !!!")
logging.warning("="*70)
for task_id in overdue_tasks:
log_prefix = f"[Task ID {task_id}]"
logging.info(f"{log_prefix} Starting execution...")
try:
run_cmd = MIDCLT_RUN + [str(task_id)]
run_result = subprocess.run(run_cmd, capture_output=True, text=True, check=True)
# Successful execution updates the internal timestamp.
logging.info(f"{log_prefix} Successfully executed. Status code: {run_result.returncode}")
except subprocess.CalledProcessError as e:
logging.error(f"{log_prefix} ERROR executing: {e.stderr}")
else:
logging.info("All found and enabled snapshot tasks are on schedule.")
logging.info(">>> TrueNAS Snapshot Task Overdue Checker Finished <<<")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment