Last active
January 9, 2025 05:46
-
-
Save AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 to your computer and use it in GitHub Desktop.
Borg-like ZFS pool snapshot and retention with configurable policies
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Borg-like ZFS pool snapshot and retention with configurable policies | |
# with inspiration from borgbackup: borg/src/borg/archiver/prune_cmd.py | |
# Version 0.2.4 modified 2025-01-08 by AfroThundr | |
# SPDX-License-Identifier: GPL-3.0-or-later | |
# For issues or updated versions of this script, browse to the following URL: | |
# https://gist.github.com/AfroThundr3007730/a4c58c13a97cb20a08720eed5d53d4c5 | |
"""Borg-like ZFS pool snapshot and retention with configurable policies""" | |
from datetime import datetime, UTC | |
from subprocess import getoutput | |
from sys import argv, exit as die | |
from typing import Callable, TypeAlias as T | |
from dateutil.parser import parse | |
IntervalString: T = str | None | |
LongInterval: T = str | |
PolicySpec: T = str | |
RetentionSlots: T = int | |
RetentionTag: T = str | None | |
ShortInterval: T = str | |
TimeStampObject: T = datetime | |
TimeStampString: T = str | |
ZFSPoolName: T = str | |
IntervalFilter: T = Callable[[TimeStampObject], IntervalString] | |
IntervalSpec: T = dict[ShortInterval, tuple[LongInterval, IntervalFilter]] | |
RetentionPolicy: T = dict[LongInterval, tuple[RetentionSlots, IntervalFilter]] | |
SnapshotList: T = dict[TimeStampString, RetentionTag] | |
INTERVALS: IntervalSpec = { | |
"S": ("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")), | |
"M": ("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")), | |
"H": ("hourly", lambda t: t.strftime("%Y-%m-%d %H")), | |
"Q": ("4xdaily", lambda t: f"{t.toordinal()}-P{-(t.hour // -6)}"), | |
"d": ("daily", lambda t: t.strftime("%Y-%m-%d")), | |
"w": ("weekly", lambda t: t.strftime("%G-W%V")), | |
"m": ("monthly", lambda t: t.strftime("%Y-%m")), | |
"q": ("quarterly", lambda t: f"{t.year}-Q{-(t.month // -4)}"), | |
"y": ("yearly", lambda t: t.strftime("%Y")), | |
} | |
def make_policy(spec: PolicySpec) -> RetentionPolicy: | |
"""Create a snapshot retention policy from input spec""" | |
return { | |
value[0]: (sdict[term], value[1]) | |
for term, value in INTERVALS.items() | |
if term in (sdict := {i[-1]: int(i[:-1]) for i in spec.split(",")}) | |
} | |
def take_snapshot(pool: ZFSPoolName, timestamp: TimeStampString) -> None: | |
"""Create a ZFS snapshot based on a timestamp""" | |
print(f"Taking snapshot: {pool}@{timestamp}") | |
getoutput(f"zfs snapshot -r {pool}@{timestamp}") | |
def get_snapshots(pool: ZFSPoolName) -> SnapshotList: | |
"""Get all snapshots in a pool (top dataset)""" | |
return { | |
snap.split("@")[1]: None | |
for snap in getoutput( | |
f"zfs list -Ht snapshot -S name -o name {pool}" | |
).split("\n") | |
} | |
def tag_snapshots( | |
policy: RetentionPolicy, snap_list: SnapshotList | |
) -> SnapshotList: | |
"""Tag snapshots based on the retention policy""" | |
last: IntervalString = None | |
for term in policy: | |
count: RetentionSlots = 0 | |
for snap in snap_list: | |
current: IntervalString = policy[term][1](parse(snap)) | |
if last != current and count < policy[term][0]: | |
last = current | |
if snap_list[snap] is None: | |
snap_list[snap] = f"{term} #{(count := count + 1)}" | |
oldest: TimeStampString = list(snap_list.keys())[-1] | |
if count < policy[term][0] and snap_list[oldest] is None: | |
snap_list[oldest] = f"{term} (oldest) #{count + 1}" | |
return snap_list | |
def prune_snapshots(pool: ZFSPoolName, snap_list: SnapshotList) -> None: | |
"""Remove snapshots not tagged for retention by the policy""" | |
print(f"Pruning snapshots on pool: {pool}") | |
for snap in snap_list: | |
if tag := snap_list[snap]: | |
print(f"Keeping snapshot: {pool}@{snap} [{tag}]") | |
else: | |
print(f"Pruning snapshot: {pool}@{snap}") | |
getoutput(f"zfs destroy -r {pool}@{snap}") | |
print(f"Pruning complete on pool: {pool}") | |
def snapshot_lifecycle(pool: ZFSPoolName, spec: PolicySpec) -> None: | |
"""Apply the snapshot lifecycle policy on the specified pool""" | |
take_snapshot(pool, datetime.now(UTC).strftime("%FT%TZ")) | |
prune_snapshots(pool, tag_snapshots(make_policy(spec), get_snapshots(pool))) | |
def main() -> None: | |
"""ZFS Autosnap entrypoint""" | |
try: | |
snapshot_lifecycle(argv[1], argv[2]) | |
except (IndexError, KeyError): | |
print("Usage: zfs_autosnap pool_name snapshot_policy") | |
print("Specify policy in the form: #S,#M,#H,#Q,#d,#w,#m,#q,#y") | |
print("Unneeded intervals may be ommitted. Example: 7d,4w,12m") | |
die(1) | |
# pylint: disable=broad-exception-caught | |
except Exception: | |
print("Something odd happened.") | |
die(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
With inspiration from borg's
prune_cmd.py
Automated via systemd timer (example:
pool0
):Or to make the schedule tuneable per pool, edit the service like so:
Then
systemctl edit [email protected]
and add:Naturally adjust the timer to the desired frequency (with a similar override of
OnCalendar
).