Skip to content

Instantly share code, notes, and snippets.

@memchr
Last active September 9, 2023 05:42
Show Gist options
  • Save memchr/5afb1554fad6aee41cb38b7884488c26 to your computer and use it in GitHub Desktop.
Save memchr/5afb1554fad6aee41cb38b7884488c26 to your computer and use it in GitHub Desktop.
btrfs subvolume creation and rebuild tool
#!/usr/bin/env python
import getopt
import shutil
import array
import sys
import os
import typing
from pathlib import Path
import subprocess
from fcntl import ioctl
PROG = Path(sys.argv[0]).name
USAGE = {}
USAGE[
"default"
] = f"""\
Usage: {PROG} [OPTION]... SUBVOLUME...
or: {PROG} -r [OPTION]... [SOURCE] DEST
Create or rebuild the btrfs SUBVOLUME(s), if they do no already exist
Mandatory arguments to long options are mandatory for short options too.
The following options apply to all usages.
-m, --mode=MODE set file mode (as in chmod),
-p, --parents no error if existing and is subvolume, make parent
directories as needed with their file modes unaffected by
any -m option.
-M, --no-compression disable file compression.
-C, --no-cow disable copy-on-wirte.
-h, --help show this help.
-v, --verbose
The following options only apply when ‘-r’ option is used.
-l, --reflink use reflink when possible instead of full copy.
-d, --directory create DEST as directory instead of btrfs subvolume.
-f, --force ignore existence of DEST
As a special case, when ‘-r’ is used and SOURCE is specified , DEST will be
overwritten with content from SOURCE if DEST ends with "/." and exists.
"""
USAGE["mkvol"] = USAGE["default"]
UMASK = os.umask(0)
os.umask(UMASK)
FS_IOC_GETFLAGS = 0x80086601
FS_IOC_SETFLAGS = 0x40086602
FS_NOCOW_FL = 0x00800000
FS_NOCOMP_FL = 0x00000400
def main():
try:
opts, args = getopt.gnu_getopt(
sys.argv[1:],
"o:pmChv" + "rfld",
[
"mode=",
"parents",
"no-compression",
"no-cow",
"help",
"verbose",
# rebuild,
"rebuild",
"force",
"reflink",
"directory",
],
)
except getopt.GetoptError as e:
perror(e)
help_()
return 2
rebuild = False
vol_args: dict[str, typing.Any] = {}
create_args: dict[str, typing.Any] = {}
rebuild_args: dict[str, typing.Any] = {}
verbose = False
for opt, arg in opts:
match opt.lstrip("-"):
case "o" | "mode":
create_args["mode"] = int(arg, base=8)
case "p" | "parents":
create_args["create_parents"] = True
create_args["exist_ok"] = True
case "m" | "no-compression":
vol_args["compress"] = False
case "C" | "no-cow":
vol_args["cow"] = False
case "r":
rebuild = True
case "f" | "force":
rebuild_args["force"] = True
case "l" | "reflink":
rebuild_args["reflink"] = True
case "d" | "directory":
rebuild_args["as_dir"] = True
case "h" | "help":
usage()
return 0
case "v" | "verbose":
verbose = True
if len(args) == 0:
perror("missing operand")
help_()
return 2
if rebuild:
if len(args) == 1:
vol = Subvolume(args[0], **vol_args)
vol.rebuild(**create_args, **rebuild_args, verbose=verbose)
elif len(args) == 2:
vol = Subvolume(args[1], **vol_args)
vol.rebuild(
Path(args[0]),
**create_args,
**rebuild_args,
overwrite=args[1].endswith("/."),
verbose=verbose,
)
else:
if len(args) == 0:
perror("missing operand")
else:
perror("excessive operand")
help_()
return 2
else:
for path in args:
vol = Subvolume(path, **vol_args)
vol.create(**create_args, verbose=verbose)
class SubvolumeError(Exception):
pass
class Subvolume:
def __init__(
self,
path: str | Path,
cow=True,
compress=True,
) -> None:
self.path = Path(path)
self._cow = cow
self._compress = compress
def create(
self,
mode: int = 0o777 ^ UMASK,
exist_ok=False,
create_parents=False,
verbose=False,
):
if self.exists():
if not exist_ok:
raise SubvolumeError(
f"cannot create subvolume {self.path}: File exists"
)
elif not self.is_subvol():
raise SubvolumeError(f"{self.path} is not a subvolume")
else:
return
if create_parents:
self.path.parent.mkdir(parents=True, exist_ok=True)
proc = subprocess.run(
["btrfs", "subvolume", "create", str(self.path)], capture_output=True
)
if proc.returncode != 0:
raise SubvolumeError(proc.stderr.decode())
if verbose:
print(f"btrfs: {proc.stdout.decode()}", end="")
self.setattr()
self.chmod(mode)
def rebuild(
self,
source: typing.Optional[Path] = None,
mode=0o777 ^ UMASK,
exist_ok=False,
create_parents=False,
reflink=False,
as_dir=False,
overwrite=False,
force=False,
verbose=False,
):
"When source is None, rebuild the volume itself"
path_is_cwd = str(self.path.resolve()) == os.getcwd()
if source is not None:
if path_is_cwd:
self.path = self.path.parent / source.name
if self.path.exists() and not overwrite:
self.path = self.path / source.name
if source is None:
if path_is_cwd:
raise SubvolumeError("cannot rebuild current working directory")
if not self.path.is_dir():
raise SubvolumeError(f"‘{self.path}’ is not a directory or subvolume")
else:
if not source.is_dir():
raise SubvolumeError(
f"source ‘{source}’ is not a directory or subvolume"
)
if self.path.exists() and not force:
raise SubvolumeError(f"cannot rebuild ‘{self.path}’: File exists")
if source == self.path:
raise SubvolumeError(f"source and destination cannot be the same")
if source in self.path.parents:
raise SubvolumeError(f"destination is in source")
if self.path in source.parents:
raise SubvolumeError(f"source is in destination")
if verbose:
if source is None:
pinfo(f"rebuild {self.path}")
else:
pinfo(f"'{source}' -> '{self.path}'")
new = Subvolume(
self.path.parent / (self.path.name + ".mkvol"),
compress=self._compress,
cow=self._cow,
)
if new.path.exists():
shutil.rmtree(new.path)
if as_dir:
new.path.mkdir(mode)
new.setattr()
else:
new.create(
mode=mode,
exist_ok=exist_ok,
create_parents=create_parents,
verbose=verbose,
)
# copy from source to target
if source is None:
cp(self.path, new.path, reflink=reflink)
fd = os.open(str(self.path), os.O_RDONLY | os.O_NONBLOCK | os.O_NOFOLLOW)
else:
cp(source, new.path, reflink=reflink)
fd = os.open(str(source), os.O_RDONLY | os.O_NONBLOCK | os.O_NOFOLLOW)
fsetattr(fd, fgetattr(fd))
os.close(fd)
if self.path.exists():
shutil.rmtree(self.path)
shutil.move(new.path, self.path)
def setattr(self):
fd = os.open(str(self.path), os.O_RDONLY | os.O_NONBLOCK | os.O_NOFOLLOW)
attr = fgetattr(fd)
attr[0] &= ~(FS_NOCOW_FL | FS_NOCOMP_FL)
if not self._cow:
attr[0] |= FS_NOCOW_FL
if not self._compress:
attr[0] |= FS_NOCOMP_FL
fsetattr(fd, attr)
os.close(fd)
def chmod(self, mode):
self.path.chmod(mode)
def exists(self) -> bool:
return self.path.exists()
def is_subvol(self) -> bool:
try:
stat = self.path.stat()
except FileNotFoundError:
return False
return stat.st_ino == 256
def fsetattr(fd: int, attr: array.array):
buf = array.array("L", attr)
buf[0] &= ~(FS_NOCOW_FL | FS_NOCOMP_FL)
ioctl(fd, FS_IOC_SETFLAGS, buf, True) # clear NOCOW and NOCOMP bits
ioctl(fd, FS_IOC_SETFLAGS, attr, True)
def fgetattr(fd: int):
attr = array.array("L", [0])
ioctl(fd, FS_IOC_GETFLAGS, attr, True)
return attr
def cp(source: Path, dest: Path, reflink=False):
cp_args = ["cp", "-a", str(source) + "/.", str(dest)]
if not reflink:
cp_args += ["--reflink=never"]
else:
cp_args += ["--reflink=auto"]
proc = subprocess.run(cp_args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if proc.returncode:
perror(proc.stderr.decode())
if dest.exists():
shutil.rmtree(dest)
raise IOError("copy failed")
def usage():
if PROG in USAGE:
print(USAGE[PROG], end="")
else:
print(USAGE["default"], end="")
def help_():
print(f"Try {PROG} --help for more information.")
def perror(message, *args, **kwargs):
print(f"{PROG}: {message}", *args, file=sys.stderr, **kwargs)
def pinfo(message, *args, **kwargs):
print(f"{PROG}: {message}", *args, **kwargs)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment