Last active
September 9, 2023 05:42
-
-
Save memchr/5afb1554fad6aee41cb38b7884488c26 to your computer and use it in GitHub Desktop.
btrfs subvolume creation and rebuild tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import getopt | |
import shutil | |
import array | |
import sys | |
import os | |
import typing | |
from pathlib import Path | |
import subprocess | |
from fcntl import ioctl | |
PROG = Path(sys.argv[0]).name | |
USAGE = {} | |
USAGE[ | |
"default" | |
] = f"""\ | |
Usage: {PROG} [OPTION]... SUBVOLUME... | |
or: {PROG} -r [OPTION]... [SOURCE] DEST | |
Create or rebuild the btrfs SUBVOLUME(s), if they do no already exist | |
Mandatory arguments to long options are mandatory for short options too. | |
The following options apply to all usages. | |
-m, --mode=MODE set file mode (as in chmod), | |
-p, --parents no error if existing and is subvolume, make parent | |
directories as needed with their file modes unaffected by | |
any -m option. | |
-M, --no-compression disable file compression. | |
-C, --no-cow disable copy-on-wirte. | |
-h, --help show this help. | |
-v, --verbose | |
The following options only apply when ‘-r’ option is used. | |
-l, --reflink use reflink when possible instead of full copy. | |
-d, --directory create DEST as directory instead of btrfs subvolume. | |
-f, --force ignore existence of DEST | |
As a special case, when ‘-r’ is used and SOURCE is specified , DEST will be | |
overwritten with content from SOURCE if DEST ends with "/." and exists. | |
""" | |
USAGE["mkvol"] = USAGE["default"] | |
UMASK = os.umask(0) | |
os.umask(UMASK) | |
FS_IOC_GETFLAGS = 0x80086601 | |
FS_IOC_SETFLAGS = 0x40086602 | |
FS_NOCOW_FL = 0x00800000 | |
FS_NOCOMP_FL = 0x00000400 | |
def main(): | |
try: | |
opts, args = getopt.gnu_getopt( | |
sys.argv[1:], | |
"o:pmChv" + "rfld", | |
[ | |
"mode=", | |
"parents", | |
"no-compression", | |
"no-cow", | |
"help", | |
"verbose", | |
# rebuild, | |
"rebuild", | |
"force", | |
"reflink", | |
"directory", | |
], | |
) | |
except getopt.GetoptError as e: | |
perror(e) | |
help_() | |
return 2 | |
rebuild = False | |
vol_args: dict[str, typing.Any] = {} | |
create_args: dict[str, typing.Any] = {} | |
rebuild_args: dict[str, typing.Any] = {} | |
verbose = False | |
for opt, arg in opts: | |
match opt.lstrip("-"): | |
case "o" | "mode": | |
create_args["mode"] = int(arg, base=8) | |
case "p" | "parents": | |
create_args["create_parents"] = True | |
create_args["exist_ok"] = True | |
case "m" | "no-compression": | |
vol_args["compress"] = False | |
case "C" | "no-cow": | |
vol_args["cow"] = False | |
case "r": | |
rebuild = True | |
case "f" | "force": | |
rebuild_args["force"] = True | |
case "l" | "reflink": | |
rebuild_args["reflink"] = True | |
case "d" | "directory": | |
rebuild_args["as_dir"] = True | |
case "h" | "help": | |
usage() | |
return 0 | |
case "v" | "verbose": | |
verbose = True | |
if len(args) == 0: | |
perror("missing operand") | |
help_() | |
return 2 | |
if rebuild: | |
if len(args) == 1: | |
vol = Subvolume(args[0], **vol_args) | |
vol.rebuild(**create_args, **rebuild_args, verbose=verbose) | |
elif len(args) == 2: | |
vol = Subvolume(args[1], **vol_args) | |
vol.rebuild( | |
Path(args[0]), | |
**create_args, | |
**rebuild_args, | |
overwrite=args[1].endswith("/."), | |
verbose=verbose, | |
) | |
else: | |
if len(args) == 0: | |
perror("missing operand") | |
else: | |
perror("excessive operand") | |
help_() | |
return 2 | |
else: | |
for path in args: | |
vol = Subvolume(path, **vol_args) | |
vol.create(**create_args, verbose=verbose) | |
class SubvolumeError(Exception): | |
pass | |
class Subvolume: | |
def __init__( | |
self, | |
path: str | Path, | |
cow=True, | |
compress=True, | |
) -> None: | |
self.path = Path(path) | |
self._cow = cow | |
self._compress = compress | |
def create( | |
self, | |
mode: int = 0o777 ^ UMASK, | |
exist_ok=False, | |
create_parents=False, | |
verbose=False, | |
): | |
if self.exists(): | |
if not exist_ok: | |
raise SubvolumeError( | |
f"cannot create subvolume {self.path}: File exists" | |
) | |
elif not self.is_subvol(): | |
raise SubvolumeError(f"{self.path} is not a subvolume") | |
else: | |
return | |
if create_parents: | |
self.path.parent.mkdir(parents=True, exist_ok=True) | |
proc = subprocess.run( | |
["btrfs", "subvolume", "create", str(self.path)], capture_output=True | |
) | |
if proc.returncode != 0: | |
raise SubvolumeError(proc.stderr.decode()) | |
if verbose: | |
print(f"btrfs: {proc.stdout.decode()}", end="") | |
self.setattr() | |
self.chmod(mode) | |
def rebuild( | |
self, | |
source: typing.Optional[Path] = None, | |
mode=0o777 ^ UMASK, | |
exist_ok=False, | |
create_parents=False, | |
reflink=False, | |
as_dir=False, | |
overwrite=False, | |
force=False, | |
verbose=False, | |
): | |
"When source is None, rebuild the volume itself" | |
path_is_cwd = str(self.path.resolve()) == os.getcwd() | |
if source is not None: | |
if path_is_cwd: | |
self.path = self.path.parent / source.name | |
if self.path.exists() and not overwrite: | |
self.path = self.path / source.name | |
if source is None: | |
if path_is_cwd: | |
raise SubvolumeError("cannot rebuild current working directory") | |
if not self.path.is_dir(): | |
raise SubvolumeError(f"‘{self.path}’ is not a directory or subvolume") | |
else: | |
if not source.is_dir(): | |
raise SubvolumeError( | |
f"source ‘{source}’ is not a directory or subvolume" | |
) | |
if self.path.exists() and not force: | |
raise SubvolumeError(f"cannot rebuild ‘{self.path}’: File exists") | |
if source == self.path: | |
raise SubvolumeError(f"source and destination cannot be the same") | |
if source in self.path.parents: | |
raise SubvolumeError(f"destination is in source") | |
if self.path in source.parents: | |
raise SubvolumeError(f"source is in destination") | |
if verbose: | |
if source is None: | |
pinfo(f"rebuild {self.path}") | |
else: | |
pinfo(f"'{source}' -> '{self.path}'") | |
new = Subvolume( | |
self.path.parent / (self.path.name + ".mkvol"), | |
compress=self._compress, | |
cow=self._cow, | |
) | |
if new.path.exists(): | |
shutil.rmtree(new.path) | |
if as_dir: | |
new.path.mkdir(mode) | |
new.setattr() | |
else: | |
new.create( | |
mode=mode, | |
exist_ok=exist_ok, | |
create_parents=create_parents, | |
verbose=verbose, | |
) | |
# copy from source to target | |
if source is None: | |
cp(self.path, new.path, reflink=reflink) | |
fd = os.open(str(self.path), os.O_RDONLY | os.O_NONBLOCK | os.O_NOFOLLOW) | |
else: | |
cp(source, new.path, reflink=reflink) | |
fd = os.open(str(source), os.O_RDONLY | os.O_NONBLOCK | os.O_NOFOLLOW) | |
fsetattr(fd, fgetattr(fd)) | |
os.close(fd) | |
if self.path.exists(): | |
shutil.rmtree(self.path) | |
shutil.move(new.path, self.path) | |
def setattr(self): | |
fd = os.open(str(self.path), os.O_RDONLY | os.O_NONBLOCK | os.O_NOFOLLOW) | |
attr = fgetattr(fd) | |
attr[0] &= ~(FS_NOCOW_FL | FS_NOCOMP_FL) | |
if not self._cow: | |
attr[0] |= FS_NOCOW_FL | |
if not self._compress: | |
attr[0] |= FS_NOCOMP_FL | |
fsetattr(fd, attr) | |
os.close(fd) | |
def chmod(self, mode): | |
self.path.chmod(mode) | |
def exists(self) -> bool: | |
return self.path.exists() | |
def is_subvol(self) -> bool: | |
try: | |
stat = self.path.stat() | |
except FileNotFoundError: | |
return False | |
return stat.st_ino == 256 | |
def fsetattr(fd: int, attr: array.array): | |
buf = array.array("L", attr) | |
buf[0] &= ~(FS_NOCOW_FL | FS_NOCOMP_FL) | |
ioctl(fd, FS_IOC_SETFLAGS, buf, True) # clear NOCOW and NOCOMP bits | |
ioctl(fd, FS_IOC_SETFLAGS, attr, True) | |
def fgetattr(fd: int): | |
attr = array.array("L", [0]) | |
ioctl(fd, FS_IOC_GETFLAGS, attr, True) | |
return attr | |
def cp(source: Path, dest: Path, reflink=False): | |
cp_args = ["cp", "-a", str(source) + "/.", str(dest)] | |
if not reflink: | |
cp_args += ["--reflink=never"] | |
else: | |
cp_args += ["--reflink=auto"] | |
proc = subprocess.run(cp_args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) | |
if proc.returncode: | |
perror(proc.stderr.decode()) | |
if dest.exists(): | |
shutil.rmtree(dest) | |
raise IOError("copy failed") | |
def usage(): | |
if PROG in USAGE: | |
print(USAGE[PROG], end="") | |
else: | |
print(USAGE["default"], end="") | |
def help_(): | |
print(f"Try {PROG} --help for more information.") | |
def perror(message, *args, **kwargs): | |
print(f"{PROG}: {message}", *args, file=sys.stderr, **kwargs) | |
def pinfo(message, *args, **kwargs): | |
print(f"{PROG}: {message}", *args, **kwargs) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment