Skip to content

Instantly share code, notes, and snippets.

@AfroThundr3007730
Last active January 9, 2025 05:46
Show Gist options
  • Save AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 to your computer and use it in GitHub Desktop.
Save AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 to your computer and use it in GitHub Desktop.
Borg-like ZFS pool snapshot and retention with configurable policies
#!/usr/bin/env python3
# Borg-like ZFS pool snapshot and retention with configurable policies
# with inspiration from borgbackup: borg/src/borg/archiver/prune_cmd.py
# Version 0.2.4 modified 2025-01-08 by AfroThundr
# SPDX-License-Identifier: GPL-3.0-or-later
# For issues or updated versions of this script, browse to the following URL:
# https://gist.github.com/AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5
"""Borg-like ZFS pool snapshot and retention with configurable policies"""
from datetime import datetime, UTC
from subprocess import getoutput
from sys import argv, exit as die
from typing import Callable, TypeAlias as T
from dateutil.parser import parse
IntervalString: T = str | None
LongInterval: T = str
PolicySpec: T = str
RetentionSlots: T = int
RetentionTag: T = str | None
ShortInterval: T = str
TimeStampObject: T = datetime
TimeStampString: T = str
ZFSPoolName: T = str
IntervalFilter: T = Callable[[TimeStampObject], IntervalString]
IntervalSpec: T = dict[ShortInterval, tuple[LongInterval, IntervalFilter]]
RetentionPolicy: T = dict[LongInterval, tuple[RetentionSlots, IntervalFilter]]
SnapshotList: T = dict[TimeStampString, RetentionTag]
INTERVALS: IntervalSpec = {
"S": ("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")),
"M": ("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")),
"H": ("hourly", lambda t: t.strftime("%Y-%m-%d %H")),
"Q": ("4xdaily", lambda t: f"{t.toordinal()}-P{-(t.hour // -6)}"),
"d": ("daily", lambda t: t.strftime("%Y-%m-%d")),
"w": ("weekly", lambda t: t.strftime("%G-W%V")),
"m": ("monthly", lambda t: t.strftime("%Y-%m")),
"q": ("quarterly", lambda t: f"{t.year}-Q{-(t.month // -4)}"),
"y": ("yearly", lambda t: t.strftime("%Y")),
}
def make_policy(spec: PolicySpec) -> RetentionPolicy:
"""Create a snapshot retention policy from input spec"""
return {
value[0]: (sdict[term], value[1])
for term, value in INTERVALS.items()
if term in (sdict := {i[-1]: int(i[:-1]) for i in spec.split(",")})
}
def take_snapshot(pool: ZFSPoolName, timestamp: TimeStampString) -> None:
"""Create a ZFS snapshot based on a timestamp"""
print(f"Taking snapshot: {pool}@{timestamp}")
getoutput(f"zfs snapshot -r {pool}@{timestamp}")
def get_snapshots(pool: ZFSPoolName) -> SnapshotList:
"""Get all snapshots in a pool (top dataset)"""
return {
snap.split("@")[1]: None
for snap in getoutput(
f"zfs list -Ht snapshot -S name -o name {pool}"
).split("\n")
}
def tag_snapshots(
policy: RetentionPolicy, snap_list: SnapshotList
) -> SnapshotList:
"""Tag snapshots based on the retention policy"""
last: IntervalString = None
for term in policy:
count: RetentionSlots = 0
for snap in snap_list:
current: IntervalString = policy[term][1](parse(snap))
if last != current and count < policy[term][0]:
last = current
if snap_list[snap] is None:
snap_list[snap] = f"{term} #{(count := count + 1)}"
oldest: TimeStampString = list(snap_list.keys())[-1]
if count < policy[term][0] and snap_list[oldest] is None:
snap_list[oldest] = f"{term} (oldest) #{count + 1}"
return snap_list
def prune_snapshots(pool: ZFSPoolName, snap_list: SnapshotList) -> None:
"""Remove snapshots not tagged for retention by the policy"""
print(f"Pruning snapshots on pool: {pool}")
for snap in snap_list:
if tag := snap_list[snap]:
print(f"Keeping snapshot: {pool}@{snap} [{tag}]")
else:
print(f"Pruning snapshot: {pool}@{snap}")
getoutput(f"zfs destroy -r {pool}@{snap}")
print(f"Pruning complete on pool: {pool}")
def snapshot_lifecycle(pool: ZFSPoolName, spec: PolicySpec) -> None:
"""Apply the snapshot lifecycle policy on the specified pool"""
take_snapshot(pool, datetime.now(UTC).strftime("%FT%TZ"))
prune_snapshots(pool, tag_snapshots(make_policy(spec), get_snapshots(pool)))
def main() -> None:
"""ZFS Autosnap entrypoint"""
try:
snapshot_lifecycle(argv[1], argv[2])
except (IndexError, KeyError):
print("Usage: zfs_autosnap pool_name snapshot_policy")
print("Specify policy in the form: #S,#M,#H,#Q,#d,#w,#m,#q,#y")
print("Unneeded intervals may be ommitted. Example: 7d,4w,12m")
die(1)
# pylint: disable=broad-exception-caught
except Exception:
print("Something odd happened.")
die(1)
if __name__ == "__main__":
main()
@AfroThundr3007730
Copy link
Author

AfroThundr3007730 commented Jan 2, 2025

With inspiration from borg's prune_cmd.py

Automated via systemd timer (example: pool0):

cat <<'EOF' >/etc/systemd/system/[email protected]
[Unit]
Description=Snapshot entire ZFS pool - %I
RequiresMountsFor=/mnt/%I

[Service]
Type=simple
ExecStart=/usr/local/sbin/zfs-autosnap %I 7d,4w,12m
Restart=no
EOF
cat <<'EOF' >/etc/systemd/system/[email protected]
[Unit]
Description=Snapshot entire zfs pool - %I

[Timer]
OnCalendar=0:05
Persistent=1

[Install]
WantedBy=multi-user.target
EOF
systemctl enable --now [email protected]

Or to make the schedule tuneable per pool, edit the service like so:

-ExecStart=/usr/local/sbin/zfs-autosnap %I 7d,4w,12m
+ExecStart=/bin/bash -c "exec zfs-autosnap %I $SCHEDULE"

Then systemctl edit [email protected] and add:

[Service]
Environment=SCHEDULE=<the_schedule>

Naturally adjust the timer to the desired frequency (with a similar override of OnCalendar).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment