Last active
September 2, 2025 03:19
-
-
Save AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 to your computer and use it in GitHub Desktop.
Borg-like ZFS pool snapshot and retention with configurable policies
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Borg-like ZFS pool snapshot and retention with configurable policies | |
# with inspiration from borgbackup: borg/src/borg/archiver/prune_cmd.py | |
# Version 0.2.6 modified 2025-09-01 by AfroThundr | |
# SPDX-License-Identifier: GPL-3.0-or-later | |
# For issues or updated versions of this script, browse to the following URL: | |
# https://gist.github.com/AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 | |
"""Borg-like ZFS pool snapshot and retention with configurable policies""" | |
from collections.abc import Callable | |
from datetime import datetime, UTC | |
from subprocess import getoutput | |
from sys import argv, exit as die | |
from dateutil.parser import parse | |
type IntervalString = str | None | |
type LongInterval = str | |
type PolicySpec = str | |
type RetentionSlots = int | |
type RetentionTag = str | None | |
type ShortInterval = str | |
type TimeStampObject = datetime | |
type TimeStampString = str | |
type ZFSPoolName = str | |
type IntervalFilter = Callable[[TimeStampObject], IntervalString] | |
type IntervalSpec = dict[ShortInterval, tuple[LongInterval, IntervalFilter]] | |
type RetentionPolicy = dict[LongInterval, tuple[RetentionSlots, IntervalFilter]] | |
type SnapshotList = dict[TimeStampString, RetentionTag] | |
INTERVALS: IntervalSpec = { | |
"S": ("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")), | |
"M": ("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")), | |
"H": ("hourly", lambda t: t.strftime("%Y-%m-%d %H")), | |
"Q": ("4xdaily", lambda t: f"{t.toordinal()}-P{-(t.hour // -6)}"), | |
"d": ("daily", lambda t: t.strftime("%Y-%m-%d")), | |
"w": ("weekly", lambda t: t.strftime("%G-W%V")), | |
"m": ("monthly", lambda t: t.strftime("%Y-%m")), | |
"q": ("quarterly", lambda t: f"{t.year}-Q{-(t.month // -4)}"), | |
"y": ("yearly", lambda t: t.strftime("%Y")), | |
} | |
def make_policy(spec: PolicySpec) -> RetentionPolicy: | |
"""Create a snapshot retention policy from input spec""" | |
return { | |
value[0]: (sdict[term], value[1]) | |
for term, value in INTERVALS.items() | |
if term in (sdict := {i[-1]: int(i[:-1]) for i in spec.split(",")}) | |
} | |
def take_snapshot(pool: ZFSPoolName, timestamp: TimeStampString) -> None: | |
"""Create a ZFS snapshot based on a timestamp""" | |
print(f"Taking snapshot: {pool}@{timestamp}") | |
getoutput(f"zfs snapshot -r {pool}@{timestamp}") | |
def get_snapshots(pool: ZFSPoolName) -> SnapshotList: | |
"""Get all snapshots in a pool (top dataset)""" | |
return { | |
snap.split("@")[1]: None | |
for snap in getoutput( | |
f"zfs list -Ht snapshot -S name -o name {pool}" | |
).split("\n") | |
} | |
def tag_snapshots( | |
policy: RetentionPolicy, snap_list: SnapshotList | |
) -> SnapshotList: | |
"""Tag snapshots based on the retention policy""" | |
last: IntervalString = None | |
for term in policy: | |
count: RetentionSlots = 0 | |
for snap in snap_list: | |
current: IntervalString = policy[term][1](parse(snap)) | |
if last != current and count < policy[term][0]: | |
last = current | |
if snap_list[snap] is None: | |
snap_list[snap] = f"{term} #{(count := count + 1)}" | |
oldest: TimeStampString = list(snap_list.keys())[-1] | |
if count < policy[term][0] and snap_list[oldest] is None: | |
snap_list[oldest] = f"{term} (oldest) #{count + 1}" | |
return snap_list | |
def prune_snapshots(pool: ZFSPoolName, snap_list: SnapshotList) -> None: | |
"""Remove snapshots not tagged for retention by the policy""" | |
print(f"Pruning snapshots on pool: {pool}") | |
for snap in snap_list: | |
if tag := snap_list[snap]: | |
print(f"Keeping snapshot: {pool}@{snap} [{tag}]") | |
else: | |
print(f"Pruning snapshot: {pool}@{snap}") | |
getoutput(f"zfs destroy -r {pool}@{snap}") | |
print(f"Pruning complete on pool: {pool}") | |
def snapshot_lifecycle(pool: ZFSPoolName, spec: PolicySpec) -> None: | |
"""Apply the specified snapshot lifecycle policy to the target pool""" | |
take_snapshot(pool, datetime.now(UTC).strftime("%FT%TZ")) | |
prune_snapshots(pool, tag_snapshots(make_policy(spec), get_snapshots(pool))) | |
if __name__ == "__main__": | |
try: | |
snapshot_lifecycle(argv[1], argv[2]) | |
except (IndexError, KeyError): | |
print(f"Usage: {argv[0].rsplit('/', 1)[-1]} pool_name snap_policy\n") | |
print("Specify snapshot policy in the form: #S,#M,#H,#Q,#d,#w,#m,#q,#y") | |
print("Unneeded policy intervals may be ommitted (e.g. 7d,4w,12m)") | |
die(1) | |
# pylint: disable=broad-exception-caught | |
except Exception as e: | |
print(f"Something odd happened (got {type(e).__name__}).") | |
die(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
With inspiration from borg's
prune_cmd.py
(for 2.x), ormisc.py
(for 1.x).It can be automated via systemd timer using the following template units:
Enable an instance for each ZFS pool to be snapshot (example:
pool0
):To set the snapshot policy per-instance, edit the instance like so:
Then add the following with your chosen policy (example:
7d,4w,12m
:The timer can be adjusted with a similar override of
OnCalendar
.