Skip to content

Instantly share code, notes, and snippets.

@AfroThundr3007730
Last active September 2, 2025 03:19
Show Gist options
  • Save AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 to your computer and use it in GitHub Desktop.
Save AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 to your computer and use it in GitHub Desktop.
Borg-like ZFS pool snapshot and retention with configurable policies
#!/usr/bin/env python3
# Borg-like ZFS pool snapshot and retention with configurable policies
# with inspiration from borgbackup: borg/src/borg/archiver/prune_cmd.py
# Version 0.2.6 modified 2025-09-01 by AfroThundr
# SPDX-License-Identifier: GPL-3.0-or-later
# For issues or updated versions of this script, browse to the following URL:
# https://gist.github.com/AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5
"""Borg-like ZFS pool snapshot and retention with configurable policies"""
from collections.abc import Callable
from datetime import datetime, UTC
from subprocess import getoutput
from sys import argv, exit as die
from dateutil.parser import parse
type IntervalString = str | None
type LongInterval = str
type PolicySpec = str
type RetentionSlots = int
type RetentionTag = str | None
type ShortInterval = str
type TimeStampObject = datetime
type TimeStampString = str
type ZFSPoolName = str
type IntervalFilter = Callable[[TimeStampObject], IntervalString]
type IntervalSpec = dict[ShortInterval, tuple[LongInterval, IntervalFilter]]
type RetentionPolicy = dict[LongInterval, tuple[RetentionSlots, IntervalFilter]]
type SnapshotList = dict[TimeStampString, RetentionTag]
INTERVALS: IntervalSpec = {
"S": ("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")),
"M": ("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")),
"H": ("hourly", lambda t: t.strftime("%Y-%m-%d %H")),
"Q": ("4xdaily", lambda t: f"{t.toordinal()}-P{-(t.hour // -6)}"),
"d": ("daily", lambda t: t.strftime("%Y-%m-%d")),
"w": ("weekly", lambda t: t.strftime("%G-W%V")),
"m": ("monthly", lambda t: t.strftime("%Y-%m")),
"q": ("quarterly", lambda t: f"{t.year}-Q{-(t.month // -4)}"),
"y": ("yearly", lambda t: t.strftime("%Y")),
}
def make_policy(spec: PolicySpec) -> RetentionPolicy:
"""Create a snapshot retention policy from input spec"""
return {
value[0]: (sdict[term], value[1])
for term, value in INTERVALS.items()
if term in (sdict := {i[-1]: int(i[:-1]) for i in spec.split(",")})
}
def take_snapshot(pool: ZFSPoolName, timestamp: TimeStampString) -> None:
"""Create a ZFS snapshot based on a timestamp"""
print(f"Taking snapshot: {pool}@{timestamp}")
getoutput(f"zfs snapshot -r {pool}@{timestamp}")
def get_snapshots(pool: ZFSPoolName) -> SnapshotList:
"""Get all snapshots in a pool (top dataset)"""
return {
snap.split("@")[1]: None
for snap in getoutput(
f"zfs list -Ht snapshot -S name -o name {pool}"
).split("\n")
}
def tag_snapshots(
policy: RetentionPolicy, snap_list: SnapshotList
) -> SnapshotList:
"""Tag snapshots based on the retention policy"""
last: IntervalString = None
for term in policy:
count: RetentionSlots = 0
for snap in snap_list:
current: IntervalString = policy[term][1](parse(snap))
if last != current and count < policy[term][0]:
last = current
if snap_list[snap] is None:
snap_list[snap] = f"{term} #{(count := count + 1)}"
oldest: TimeStampString = list(snap_list.keys())[-1]
if count < policy[term][0] and snap_list[oldest] is None:
snap_list[oldest] = f"{term} (oldest) #{count + 1}"
return snap_list
def prune_snapshots(pool: ZFSPoolName, snap_list: SnapshotList) -> None:
"""Remove snapshots not tagged for retention by the policy"""
print(f"Pruning snapshots on pool: {pool}")
for snap in snap_list:
if tag := snap_list[snap]:
print(f"Keeping snapshot: {pool}@{snap} [{tag}]")
else:
print(f"Pruning snapshot: {pool}@{snap}")
getoutput(f"zfs destroy -r {pool}@{snap}")
print(f"Pruning complete on pool: {pool}")
def snapshot_lifecycle(pool: ZFSPoolName, spec: PolicySpec) -> None:
"""Apply the specified snapshot lifecycle policy to the target pool"""
take_snapshot(pool, datetime.now(UTC).strftime("%FT%TZ"))
prune_snapshots(pool, tag_snapshots(make_policy(spec), get_snapshots(pool)))
if __name__ == "__main__":
try:
snapshot_lifecycle(argv[1], argv[2])
except (IndexError, KeyError):
print(f"Usage: {argv[0].rsplit('/', 1)[-1]} pool_name snap_policy\n")
print("Specify snapshot policy in the form: #S,#M,#H,#Q,#d,#w,#m,#q,#y")
print("Unneeded policy intervals may be ommitted (e.g. 7d,4w,12m)")
die(1)
# pylint: disable=broad-exception-caught
except Exception as e:
print(f"Something odd happened (got {type(e).__name__}).")
die(1)
@AfroThundr3007730
Copy link
Author

AfroThundr3007730 commented Jan 2, 2025

With inspiration from borg's prune_cmd.py (for 2.x), or misc.py (for 1.x).

It can be automated via systemd timer using the following template units:

# /etc/systemd/system/[email protected]
[Unit]
Description=Snapshot entire ZFS pool - %I
RequiresMountsFor=/mnt/%I

[Service]
Type=simple
ExecStart=/bin/bash -c 'exec zfs-autosnap %I $POLICY'
Restart=no
# /etc/systemd/system/[email protected]
[Unit]
Description=Snapshot entire zfs pool - %I

[Timer]
OnCalendar=0:05
Persistent=1

[Install]
WantedBy=multi-user.target

Enable an instance for each ZFS pool to be snapshot (example: pool0):

systemctl enable --now [email protected]

To set the snapshot policy per-instance, edit the instance like so:

systemctl edit --drop-in policy [email protected]

Then add the following with your chosen policy (example: 7d,4w,12m:

[Service]
Environment=POLICY=7d,4w,12m

The timer can be adjusted with a similar override of OnCalendar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment