Skip to content

Instantly share code, notes, and snippets.

@lucmos
Last active November 27, 2025 05:39
Show Gist options
  • Select an option

  • Save lucmos/0d38f31831533ba1bc559053030fc851 to your computer and use it in GitHub Desktop.

Select an option

Save lucmos/0d38f31831533ba1bc559053030fc851 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
ZFS Snapshot Staging Script for TrueNAS Backups
This script prepares a consistent, read-only view of ZFS snapshots for backup tools like restic.
It cleans up existing mounts, finds a common snapshot across target datasets,
and mounts the snapshots under a staging root.
Usage:
python3 zfs_backup_staging.py [--dry-run] [-v]
python3 zfs_backup_staging.py --cleanup [--dry-run] [-v]
Designed for cron: Run this script first to stage, then trigger the backup (e.g., backrest).
The --cleanup flag can be used after a backup is complete.
Assumes snapshots are created with consistent naming (e.g., via TrueNAS periodic tasks)
and replicated where needed.
Configuration:
- TARGET_TREES: Dict of root datasets to staging subdirs.
- REFERENCE_DATASET: Used as baseline for snapshot candidates.
- SNAP_PREFIX: Filter snapshots by prefix.
- STAGING_ROOT: Parent dir for mounts (create as /mnt/data-pool/backup-staging/mounts).
- LOCK_FILE: Prevents concurrent runs.
- LOG_FILE: Persistent logs.
- BACKREST_CONTAINER: Name of the Docker container running backrest/restic.
- ADMIN_EMAIL: Email address for failure notifications.
Dependencies: Python 3, ZFS commands (zfs, mount, umount), Docker (for checking restic), midclt.
"""
import argparse
import json
import logging
import os
import subprocess
import sys
import traceback
from typing import Dict, List, Optional, Tuple
# Configuration
STAGING_ROOT: str = "/mnt/data-pool/views"
STAGING_DATASET: str = "data-pool/views" # The ZFS dataset name for the root
TARGET_TREES: Dict[str, str] = {
"data-pool/users": "users",
"data-pool/photos": "photos",
"data-pool/replications/app-pool/apps": "apps",
}
REFERENCE_DATASET: str = "data-pool/replications/app-pool/apps"
SNAP_PREFIX: str = "hourly-"
LOCK_FILE: str = "/var/run/backup-staging.lock"
LOG_FILE: str = os.path.join(STAGING_ROOT, "log.txt") # Save inside the views dataset
CANDIDATE_LIMIT: int = 10 # Max recent snapshots to check for commonality
BACKREST_CONTAINER: str = "backrest" # Docker container name for backrest
ADMIN_EMAIL: str = "<admin>@gmail.com" # Email for failure notifications
# Logger is defined globally, but configured in main()
logger = logging.getLogger(__name__)
def send_failure_email(subject: str, body: str, dry_run: bool = False) -> None:
"""Sends a failure notification email using TrueNAS midclt."""
logger.info(f"Preparing to send failure email: {subject}")
if dry_run:
logger.info(f"DRY-RUN: Would send email with subject '{subject}'")
return
payload = {
"to": [ADMIN_EMAIL],
"subject": subject,
"text": body,
}
payload_str = json.dumps(payload)
# Build the command as a list to avoid shell injection/quoting issues
cmd_list = ["midclt", "call", "mail.send", payload_str]
try:
# Run the command as a list, with shell=False (the default)
process = subprocess.run(cmd_list, check=True, capture_output=True, text=True)
logger.info("Failure email sent successfully.")
except subprocess.CalledProcessError as e:
# If sending the email fails, log the command list and error for easier debugging
logger.error(
f"CRITICAL: FAILED TO SEND FAILURE EMAIL. Command '{cmd_list}' failed (exit {e.returncode}): {e.stderr.strip()}"
)
def run_cmd(
cmd: str,
dry_run: bool = False,
check: bool = True,
success_exit_codes: Optional[Tuple[int, ...]] = None,
) -> str:
"""
Run a shell command, optionally in dry-run mode.
Args:
cmd: Command string to execute.
dry_run: If True, log but don't execute.
check: If True, raise on non-zero exit.
success_exit_codes: A tuple of exit codes to consider as success, in addition to 0.
Returns:
Stripped stdout.
Raises:
subprocess.CalledProcessError: If check=True and command fails with an unapproved exit code.
"""
if dry_run:
logger.info(f"DRY-RUN: Would execute: {cmd}")
return ""
try:
process = subprocess.run(
cmd, shell=True, check=True, capture_output=True, text=True
)
return process.stdout.strip()
except subprocess.CalledProcessError as e:
if success_exit_codes and e.returncode in success_exit_codes:
logger.debug(
f"Command '{cmd}' exited with acceptable code {e.returncode}. Treating as success."
)
return e.stdout.strip()
err_msg = f"Command '{cmd}' failed (exit {e.returncode}): {e.stderr.strip()}"
logger.error(err_msg)
if check:
raise
return ""
def is_restic_running(dry_run: bool = False) -> bool:
"""
Check if restic is running inside the backrest Docker container.
Args:
dry_run: If True, simulate check (always return False).
Returns:
True if restic process is found, False otherwise.
"""
if dry_run:
logger.info("DRY-RUN: Skipping restic running check.")
return False
cmd = f"docker exec {BACKREST_CONTAINER} pgrep -f restic || true"
output = run_cmd(cmd, check=False)
return bool(output)
def acquire_lock() -> None:
"""Acquire lock file to prevent concurrent runs."""
if os.path.exists(LOCK_FILE):
raise RuntimeError(
f"Lock file {LOCK_FILE} exists. Another instance may be running."
)
try:
with open(LOCK_FILE, "w") as f:
f.write(str(os.getpid()))
except IOError as e:
raise IOError(f"Failed to create lock file: {e}") from e
def release_lock() -> None:
"""Release lock file if it exists."""
if os.path.exists(LOCK_FILE):
try:
os.remove(LOCK_FILE)
except IOError as e:
logger.warning(f"Failed to remove lock file: {e}")
def get_snapshots(dataset: str) -> List[str]:
"""
Get list of snapshots for a dataset, filtered by prefix, newest first.
Args:
dataset: ZFS dataset name.
Returns:
List of snapshot names (without dataset@).
"""
cmd = f"zfs list -t snapshot -d 1 -H -o name -s creation {dataset}"
raw = run_cmd(cmd)
if not raw:
return []
snaps = [
line.split("@")[1]
for line in raw.split("\n")
if "@" in line and line.split("@")[1].startswith(SNAP_PREFIX)
]
return snaps[::-1] # Newest first
def snapshot_exists(dataset: str, snap_name: str) -> bool:
"""Check if a specific snapshot exists for the dataset."""
return bool(run_cmd(f"zfs list -t snapshot {dataset}@{snap_name}", check=False))
def get_recursive_datasets(root_ds: str) -> List[str]:
"""
Get all datasets under a root, including itself.
Args:
root_ds: Root ZFS dataset.
Returns:
List of dataset names.
"""
raw = run_cmd(f"zfs list -r -H -o name {root_ds}")
return raw.split("\n") if raw else []
def get_path_depth(path: str) -> int:
"""Get depth of a path (number of / segments)."""
return path.count("/")
def is_path_mounted(path: str) -> bool:
"""
Checks if a path is a mountpoint.
Returns True if mounted (exit code 0), False otherwise.
"""
# We use subprocess.run directly because we need the return code, not stdout.
result = subprocess.run(["mountpoint", "-q", path], check=False)
return result.returncode == 0
def ensure_staging_root_ready(path: str, dataset: str, dry_run: bool = False) -> None:
"""
Ensures the staging root is mounted and has 'rshared' propagation.
"""
# 1. Check if mounted
if not is_path_mounted(path):
logger.warning(
f"Staging root {path} is not mounted. Attempting to mount {dataset}..."
)
# Try to mount the ZFS dataset
run_cmd(f"zfs mount {dataset}", dry_run=dry_run)
# Double check
if not is_path_mounted(path):
raise RuntimeError(f"Failed to mount {dataset} at {path}")
# 2. Force 'rshared' propagation
# This allows Docker to see mounts created inside this folder
logger.info(f"Setting propagation to rshared for {path}...")
run_cmd(f"mount --make-rshared {path}", dry_run=dry_run)
def cleanup(dry_run: bool = False) -> None:
"""
Cleanup existing mounts by removing directories from leaf to parent.
"""
logger.info("Starting cleanup...")
# 1. Unmount everything currently mounted in the staging area (deepest first)
mounts_cmd = f"mount | grep '{STAGING_ROOT}' | awk '{{print $3}}'"
mounts_raw = run_cmd(mounts_cmd, check=False)
mounts = mounts_raw.split("\n") if mounts_raw else []
# Filter for our staging root, BUT EXCLUDE THE ROOT ITSELF
# This prevents unmounting 'data-pool/views'
mounts = [m for m in mounts if m.startswith(STAGING_ROOT) and m != STAGING_ROOT]
mounts.sort(key=get_path_depth, reverse=True)
for mount_point in mounts:
if mount_point:
logger.info(f"Unmounting: {mount_point}")
run_cmd(f"umount -l {mount_point}", check=False, dry_run=dry_run)
if not dry_run:
# 2. Build a list of all potential directories and remove them from deepest to shallowest.
logger.info("Cleaning staging directories from leaf to root...")
if os.path.isdir(STAGING_ROOT):
for dirpath, _, _ in os.walk(STAGING_ROOT, topdown=False):
if dirpath == STAGING_ROOT:
continue
try:
os.rmdir(dirpath)
except OSError as e:
logger.warning(
f"Could not remove directory '{dirpath}': {e}. It might not be empty."
)
mounts_raw = run_cmd(
f"mount | grep '{STAGING_ROOT}' | awk '{{print $3}}'", check=False
)
remaining_mounts = []
if mounts_raw:
remaining_mounts = [
m for m in mounts_raw.split("\n") if m and m != STAGING_ROOT
]
if remaining_mounts:
raise RuntimeError(
f"Cleanup failed: The following mount points could not be unmounted: {remaining_mounts}. Manual intervention required."
)
logger.info("Cleanup complete.")
def main() -> None:
parser = argparse.ArgumentParser(description="ZFS Snapshot Staging for Backups")
parser.add_argument(
"--dry-run", action="store_true", help="Simulate without making changes"
)
parser.add_argument(
"--cleanup", action="store_true", help="Only perform cleanup and exit"
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose output to console"
)
args = parser.parse_args()
dry_run = args.dry_run
# Setup logging: Log to file always, log to console only if verbose.
log_handlers = []
if args.verbose:
log_handlers.append(logging.StreamHandler(sys.stdout))
# Define format
log_fmt = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
# Apply basic config
logging.basicConfig(level=logging.INFO, handlers=log_handlers, force=True)
# If no verbose flag, we still want to capture critical errors to stderr for Cron
if not args.verbose:
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(log_fmt)
logging.getLogger().addHandler(stderr_handler)
try:
acquire_lock()
try:
# Main logic block, wrapped to ensure lock is always released
if is_restic_running(dry_run):
raise RuntimeError(
"Restic is currently running in the backrest container. Aborting to avoid conflicts."
)
ensure_staging_root_ready(STAGING_ROOT, STAGING_DATASET, args.dry_run)
if not dry_run:
try:
file_handler = logging.FileHandler(LOG_FILE, mode="w")
file_handler.setFormatter(log_fmt)
logger.addHandler(file_handler)
logger.info(f"Log file initialized at {LOG_FILE}")
except IOError as e:
logger.error(f"Could not write to log file {LOG_FILE}: {e}")
if args.cleanup:
logger.info("Cleanup-only mode activated.")
cleanup(dry_run)
logger.info("Cleanup-only mode finished.")
return # Exit successfully
# --- Normal execution path (staging) ---
if not os.path.isdir(STAGING_ROOT):
raise FileNotFoundError(
f"Staging root {STAGING_ROOT} does not exist. Create it first."
)
cleanup(dry_run) # Always start with cleanup for idempotency
dataset_map: Dict[str, str] = {}
for root_ds, dest_folder in TARGET_TREES.items():
children = get_recursive_datasets(root_ds)
for child in children:
rel_path = child[len(root_ds) + 1 :] if child != root_ds else ""
full_mount = os.path.join(STAGING_ROOT, dest_folder, rel_path)
dataset_map[child] = full_mount
sorted_datasets = sorted(
dataset_map.keys(), key=lambda ds: get_path_depth(dataset_map[ds])
)
logger.info("Finding common snapshot...")
candidates = get_snapshots(REFERENCE_DATASET)[:CANDIDATE_LIMIT]
chosen_snap = None
for snap in candidates:
if all(snapshot_exists(root_ds, snap) for root_ds in TARGET_TREES):
chosen_snap = snap
break
if not chosen_snap:
raise RuntimeError("No common snapshot found across targets.")
logger.info(f"Selected common snapshot: {chosen_snap}")
success_count = 0
try:
for ds in sorted_datasets:
mount_point = dataset_map[ds]
if not snapshot_exists(ds, chosen_snap):
logger.warning(
f"Skipping {ds} (snapshot {chosen_snap} missing)"
)
continue
logger.info(f"Mounting {ds}@{chosen_snap} at {mount_point}")
os.makedirs(mount_point, exist_ok=True)
run_cmd(
f"mount -t zfs -o ro {ds}@{chosen_snap} {mount_point}",
dry_run=dry_run,
)
success_count += 1
except Exception:
logger.error(
"A failure occurred during mounting. Rolling back changes."
)
cleanup(dry_run) # Rollback
raise # Re-raise to trigger email notification
logger.info("="*40)
logger.info("BACKUP MANIFEST SUMMARY")
logger.info(f"Snapshot: {chosen_snap}")
logger.info(f"Source: {REFERENCE_DATASET}")
logger.info("Mounted Views:")
for ds in sorted_datasets:
if snapshot_exists(ds, chosen_snap):
# Clean up path for display
relative_mount = dataset_map[ds].replace(STAGING_ROOT, "")
logger.info(f" {relative_mount:<30} <- {ds}")
logger.info("="*40)
logger.info(
f"\nStaging complete. Mounted {success_count} datasets with snapshot {chosen_snap}."
)
finally:
release_lock()
except Exception as e:
# This is the master error handler. It catches any exception, logs it,
# sends an email, and exits with a non-zero status code.
err_msg = f"ZFS backup staging script failed: {type(e).__name__}: {e}"
tb_str = traceback.format_exc()
# Log the full error to the file
logger.critical(f"{err_msg}\n{tb_str}")
# Also print a concise error to stderr for cron logs
print(f"CRITICAL: {err_msg}", file=sys.stderr)
email_subject = "CRITICAL: TrueNAS Backup Staging Failed"
email_body = f"The ZFS snapshot staging script encountered a critical error.\n\nError:\n{err_msg}\n\nTraceback:\n{tb_str}\n\nPlease check the log file for more details: {LOG_FILE}"
send_failure_email(email_subject, email_body, dry_run)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment