Created
February 27, 2023 02:59
-
-
Save Rudd-O/2d6f071abe343b9098e5f9a749fdc6ca to your computer and use it in GitHub Desktop.
(very green) ZFS driver for Qubes volume pools.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Driver for storing qube images in ZFS pool volumes. | |
""" | |
import dataclasses | |
import logging | |
import os | |
import subprocess | |
import time | |
import asyncio | |
import qubes | |
import qubes.storage | |
import qubes.utils | |
from typing import cast, Optional, TypedDict, Dict, List, Union | |
ZVOL_DIR = "/dev/zvol" | |
EXPORTED = ".exported" | |
REVISION_PREFIX = "qubes-" | |
_sudo, _dd, _zfs, _zpool = "sudo", "dd", "zfs", "zpool" | |
logging.getLogger(__name__).setLevel(logging.DEBUG) | |
class Vid(str): | |
@classmethod | |
def make(klass, container: str, vm_name: str, volume_name: str) -> "Vid": | |
return Vid("{!s}/{!s}/{!s}".format(container, vm_name, volume_name)) | |
def timestamp_to_revision(timestamp: Union[int, str, float]) -> str: | |
""" | |
Converts a timestamp to a revision. | |
>>> timestamp_to_revision(123) | |
"qubes-123" | |
""" | |
return REVISION_PREFIX + str(int(timestamp)) | |
def dataset_in_root(dataset: str, root: str): | |
return dataset == root or (dataset + "/").startswith(root + "/") | |
def is_revision_dataset(fsname: str) -> bool: | |
return fsname.split("@")[-1].startswith(REVISION_PREFIX) | |
def timestamp_from_revision(fsname: str) -> int: | |
ln = len(REVISION_PREFIX) | |
return int(fsname.split("@")[-1][ln:]) | |
async def dd( | |
inpath: str, | |
outpath: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
): | |
thecmd = [ | |
_dd, | |
"if=" + inpath, | |
"of=" + outpath, | |
"conv=sparse,nocreat,fsync", | |
"status=progress", | |
"bs=1M", | |
] | |
if not os.access(outpath, os.W_OK) or not os.access(inpath, os.R_OK): | |
thecmd = [_sudo] + thecmd | |
log.debug("Invoked with arguments %r", thecmd) | |
p = await asyncio.create_subprocess_exec(*thecmd) | |
ret = await p.wait() | |
if ret != 0: | |
raise qubes.storage.StoragePoolException( | |
"%s failed with error %s" % (thecmd, ret) | |
) | |
def _process_zfs_output(returncode, stdout, stderr, log) -> str: | |
"""Process output of ZFS, determine if the call was successful and | |
possibly log warnings.""" | |
# Filter out warning about intended over-provisioning. | |
# Upstream discussion about missing option to silence it: | |
# https://bugzilla.redhat.com/1347008 | |
err = "\n".join(line for line in stderr.decode().splitlines()) | |
if stdout: | |
log.debug("Result: %s", stdout.decode().rstrip()) | |
if returncode == 0 and err: | |
log.warning("Stderr: %s", err) | |
elif returncode != 0: | |
assert err, "Command exited unsuccessful, but printed nothing to stderr" | |
err = err.replace("%", "%%") | |
raise qubes.storage.StoragePoolException(err) | |
return stdout.decode() | |
def qubes_zfs( | |
*cmd: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> str: | |
""" | |
Call :program:`zfs` to execute a ZFS operation. | |
This version is synchronous. | |
""" | |
environ = {"LC_ALL": "C.UTF-8", **os.environ} | |
if cmd and cmd[0] == "zpool": | |
thecmd = [_zpool] + list(cmd)[1:] | |
else: | |
thecmd = [_zfs] + list(cmd) | |
if os.getuid() != 0: | |
thecmd = [_sudo] + thecmd | |
log.debug("Invoked with arguments %r", thecmd) | |
p = subprocess.Popen( | |
thecmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
close_fds=True, | |
env=environ, | |
) | |
out, err = p.communicate() | |
return _process_zfs_output(p.returncode, out, err, log) | |
async def qubes_zfs_coro( | |
*cmd: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> str: | |
""" | |
Call :program:`zfs` to execute a ZFS operation | |
This version is asynchronous. | |
""" | |
def synced() -> str: | |
return qubes_zfs(*cmd, log=log) | |
loop = asyncio.get_event_loop() | |
return await loop.run_in_executor(None, synced) | |
class ZFSAccessor(object): | |
def __init__(self, root: str) -> None: | |
self.root = root | |
async def volume_exists_async( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
): | |
assert dataset_in_root(volume, self.root) | |
try: | |
await qubes_zfs_coro("list", "-Hp", "-o", "name", volume, log=log) | |
return True | |
except qubes.storage.StoragePoolException: | |
pass | |
return False | |
def volume_exists( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
): | |
assert dataset_in_root(volume, self.root) | |
try: | |
qubes_zfs("list", volume, log=log) | |
return True | |
except qubes.storage.StoragePoolException: | |
pass | |
return False | |
async def remove_volume_async( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert dataset_in_root(volume, self.root) | |
await qubes_zfs_coro( | |
"destroy", | |
"-r", | |
volume, | |
log=log, | |
) | |
async def snapshot_volume_async( | |
self, | |
volumesnapshotname: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert "@" in volumesnapshotname | |
dataset = volumesnapshotname.split("@")[0] | |
assert dataset_in_root(dataset, self.root) | |
await qubes_zfs_coro( | |
"snapshot", | |
volumesnapshotname, | |
log=log, | |
) | |
async def clone_snapshot_to_volume_async( | |
self, | |
source: str, | |
dest: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert "@" in source | |
await qubes_zfs_coro( | |
"clone", | |
"-p", | |
source, | |
dest, | |
log=log, | |
) | |
async def rename_volume_async( | |
self, | |
source: str, | |
dest: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
await qubes_zfs_coro( | |
"rename", | |
source, | |
dest, | |
log=log, | |
) | |
async def create_volume_async( | |
self, | |
volume: str, | |
size: int, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert dataset_in_root(volume, self.root) | |
await qubes_zfs_coro( | |
"create", | |
"-p", | |
"-s", | |
"-V", | |
str(size), | |
volume, | |
log=log, | |
) | |
async def set_volume_readonly_async( | |
self, | |
volume: str, | |
readonly: bool, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert dataset_in_root(volume, self.root) | |
await qubes_zfs_coro( | |
"set", | |
"readonly=%s" % ("on" if readonly else "off"), | |
volume, | |
log=log, | |
) | |
async def resize_volume_async( | |
self, | |
volume: str, | |
size: int, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert dataset_in_root(volume, self.root) | |
await qubes_zfs_coro( | |
"set", | |
"size=%s" % size, | |
volume, | |
log=log, | |
) | |
async def set_volume_dirty_async( | |
self, | |
volume: str, | |
dirty: bool, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> None: | |
assert dataset_in_root(volume, self.root) | |
await qubes_zfs_coro( | |
"set", | |
"org.qubes:dirty=%s" % ("on" if dirty else "off"), | |
volume, | |
log=log, | |
) | |
def is_volume_dirty( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> bool: | |
assert dataset_in_root(volume, self.root) | |
v = qubes_zfs( | |
"list", | |
"-Hp", | |
"-o", | |
"org.qubes:dirty", | |
volume, | |
log=log, | |
).strip() | |
return v == "on" | |
async def is_volume_dirty_async( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> bool: | |
assert dataset_in_root(volume, self.root) | |
v = await ( | |
qubes_zfs_coro( | |
"list", | |
"-Hp", | |
"-o", | |
"org.qubes:dirty", | |
volume, | |
log=log, | |
) | |
).strip() | |
return v == "on" | |
def get_volume_snapshots( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> List[str]: | |
assert dataset_in_root(volume, self.root) | |
return [ | |
s | |
for s in qubes_zfs( | |
"list", | |
"-Hp", | |
"-o", | |
"name", | |
"-t", | |
"snapshot", | |
volume, | |
log=log, | |
).splitlines() | |
if s.strip() | |
] | |
def get_volume_usage( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> int: | |
assert dataset_in_root(volume, self.root) | |
return int( | |
qubes_zfs( | |
"list", | |
"-Hp", | |
"-o", | |
"used", | |
volume, | |
log=log, | |
).strip() | |
) | |
def get_volume_creation( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> int: | |
assert dataset_in_root(volume, self.root) | |
return int( | |
qubes_zfs( | |
"list", | |
"-Hp", | |
"-o", | |
"creation", | |
volume, | |
log=log, | |
).strip() | |
) | |
def get_volume_size( | |
self, | |
volume: str, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> int: | |
assert dataset_in_root(volume, self.root) | |
return int( | |
qubes_zfs( | |
"list", | |
"-Hp", | |
"-o", | |
"volsize", | |
volume, | |
log=log, | |
).strip() | |
) | |
def get_pool_available( | |
self, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> int: | |
return int( | |
qubes_zfs( | |
"zpool", | |
"list", | |
"-Hp", | |
"-o", | |
"free", | |
self.root.split("/")[0], | |
log=log, | |
).strip() | |
) | |
def get_pool_size( | |
self, | |
log: logging.Logger = logging.getLogger(__name__), | |
) -> int: | |
return int( | |
qubes_zfs( | |
"zpool", | |
"list", | |
"-Hp", | |
"-o", | |
"size", | |
self.root.split("/")[0], | |
log=log, | |
).strip() | |
) | |
class ZFSVolume(qubes.storage.Volume): | |
""" | |
ZFS thin volume implementation. | |
""" | |
def __init__( | |
self, | |
name: str, | |
pool: "ZFSPool", | |
vid: Vid, | |
revisions_to_keep: int = 1, | |
rw: bool = False, | |
save_on_stop: bool = False, | |
size: int = 0, | |
snap_on_start: bool = False, | |
source: Optional[qubes.storage.Volume] = None, | |
ephemeral: Optional[bool] = None, | |
**kwargs, | |
): | |
super().__init__( | |
name=name, | |
pool=pool, | |
vid=vid, | |
revisions_to_keep=revisions_to_keep, | |
rw=rw, | |
save_on_stop=save_on_stop, | |
size=size, | |
snap_on_start=snap_on_start, | |
source=source, | |
ephemeral=ephemeral, | |
**kwargs, | |
) | |
self.pool = cast(ZFSPool, self.pool) | |
self.log = logging.getLogger("%s.%s" % (__name__, self.pool.name)) | |
@property | |
def path(self): | |
return self.vid | |
@property | |
def revisions(self) -> Dict[str, str]: | |
snapshots = self.pool.accessor.get_volume_snapshots( | |
self.path, | |
log=self.log, | |
) | |
revisions: Dict[str, str] = {} | |
for snapshot in snapshots: | |
if is_revision_dataset(snapshot): | |
timestamp = timestamp_from_revision(snapshot) | |
name = timestamp_to_revision(timestamp) | |
revisions[name] = qubes.storage.isodate(timestamp) | |
return revisions | |
async def _purge_old_revisions(self): | |
self.log.debug("_purge_old_revisions %s", self.path) | |
revisions = self.revisions | |
if not revisions: | |
return | |
revs = list( | |
reversed( | |
sorted( | |
revisions.items(), | |
key=lambda m: m[1], | |
) | |
) | |
) | |
r = self.revisions_to_keep | |
for snapshot, _ in revs[r:]: | |
await self.pool.accessor.remove_volume_async( | |
"%s@%s" % (self.path, snapshot), | |
log=self.log, | |
) | |
return | |
_cached_size: Optional[int] = None | |
@property | |
def size(self) -> Optional[int]: | |
if self._cached_size is None: | |
try: | |
self._cached_size = self.pool.accessor.get_volume_size( | |
self.path, | |
log=self.log, | |
) | |
except qubes.storage.StoragePoolException: | |
pass | |
return self._cached_size | |
@property | |
def usage(self) -> int: | |
try: | |
return self.pool.accessor.get_volume_usage( | |
self.path, | |
log=self.log, | |
) | |
except qubes.storage.StoragePoolException: | |
return 0 | |
async def _clone_from(self, source: qubes.storage.Volume): | |
# FIXME use suffix in create empty or clone before destroying final | |
self.log.debug("Cloning into %s from %s", self.path, source) | |
self._cached_size = None | |
if isinstance(source, ZFSVolume): | |
# The source device is a ZFS one; | |
# simply find out what its latest qubes snapshot is, | |
# then clone it from there. | |
# FIXME: perhaps clone to a temp dataset | |
# before deleting the target dataset. | |
snaps = source.revisions | |
assert snaps, "the source %s has no revisions" % (source.path,) | |
snapshot = list(sorted(snaps.items(), key=lambda m: m[1]))[-1][0] | |
if await self.pool.accessor.volume_exists_async( | |
self.path, | |
log=self.log, | |
): | |
self.log.debug( | |
"Volume %s exists, removing prior to ZFS clone", | |
self.path, | |
) | |
await self.pool.accessor.remove_volume_async( | |
self.path, | |
log=self.log, | |
) | |
src = "%s@%s" % (source.path, snapshot) | |
self.log.debug( | |
"Creating volume %s with cloning from %s", | |
self.path, | |
src, | |
) | |
await self.pool.accessor.clone_snapshot_to_volume_async( | |
src, | |
self.path, | |
log=self.log, | |
) | |
else: | |
# Source is not a ZFS one; | |
# create the dataset with the size of the | |
# source (or larger if requested by user) | |
# and dd the contents sparsely. | |
self.log.debug("This is not a ZFS volume") | |
assert self._size, "no size set for this volume" | |
# FIXME optimize, if the volume exists but is smaller | |
# than the source, simply grow the volume instead of | |
# nuking it completely. | |
self.log.debug("Sizes in play: %s %s", self._size, source.size) | |
size = max([self._size, source.size]) | |
self.log.debug("Creating empty volume %s for cloning", self.path) | |
if await self.pool.accessor.volume_exists_async( | |
self.path, | |
log=self.log, | |
): | |
self.log.debug( | |
"Volume %s exists, removing prior to non-ZFS clone", | |
self.path, | |
) | |
await self.pool.accessor.remove_volume_async( | |
self.path, | |
log=self.log, | |
) | |
await self.pool.accessor.create_volume_async( | |
self.path, | |
size, | |
log=self.log, | |
) | |
infile = await source.export() | |
try: | |
self.log.debug( | |
"Copying %s to %s", | |
infile, | |
os.path.join(ZVOL_DIR, self.path), | |
) | |
await dd(infile, os.path.join(ZVOL_DIR, self.path), self.log) | |
finally: | |
await source.export_end(infile) | |
async def _create_empty( | |
self, | |
size: Optional[int] = None, | |
suffix: str = "", | |
) -> None: | |
self.log.debug( | |
"_create_empty %s with size %s self._size %s", | |
self.path, | |
size, | |
self._size, | |
) | |
self._cached_size = None | |
if size is None: | |
assert self._size | |
if await self.pool.accessor.volume_exists_async( | |
self.path + suffix, | |
log=self.log, | |
): | |
await self.pool.accessor.remove_volume_async( | |
self.path + suffix, | |
log=self.log, | |
) | |
await self.pool.accessor.create_volume_async( | |
self.path + suffix, | |
size if size is not None else self._size, | |
log=self.log, | |
) | |
async def _remove_all_exported(self) -> bool: | |
self.log.debug("_remove_all_exported %s", self.path) | |
exported = os.path.join( | |
self.pool.container, | |
EXPORTED, | |
self.vid.replace("/", "_"), | |
) | |
if await self.pool.accessor.volume_exists_async( | |
exported, | |
log=self.log, | |
): | |
await self.pool.accessor.remove_volume_async( | |
exported, | |
log=self.log, | |
) | |
return True | |
return False | |
async def _remove_unlocked(self, suffix: str = "") -> "ZFSVolume": | |
self.log.debug("Removing %s with suffix %s", self.path, suffix) | |
if suffix == "" and await self._remove_all_exported(): | |
pass | |
if await self.pool.accessor.volume_exists_async( | |
self.path + suffix, | |
log=self.log, | |
): | |
self._cached_size = None | |
await self.pool.accessor.remove_volume_async( | |
self.path + suffix, | |
log=self.log, | |
) | |
return self | |
async def _adopt(self, suffix: str): | |
self.log.debug("Adopting %s from suffix %s", self.path, suffix) | |
self._cached_size = None | |
await self.remove() | |
await self.pool.accessor.rename_volume_async( | |
self.path + suffix, | |
self.path, | |
log=self.log, | |
) | |
@qubes.storage.Volume.locked | |
async def remove(self, suffix: str = "") -> "ZFSVolume": | |
return await self._remove_unlocked(suffix=suffix) | |
@qubes.storage.Volume.locked | |
async def create(self) -> "ZFSVolume": | |
self.log.debug("Creating %s", self.path) | |
if self.snap_on_start and self.save_on_stop: | |
assert 0, "snap_on_start true && save_on_stop true" | |
if self.save_on_stop: | |
if self.source: | |
await self._clone_from(self.source) | |
else: | |
await self._create_empty() | |
return self | |
@qubes.storage.Volume.locked | |
async def start(self): | |
self.log.debug("Starting volume %s", self.path) | |
snap_on_start, save_on_stop = (self.snap_on_start, self.save_on_stop) | |
if not snap_on_start and not save_on_stop: | |
# Volatile. | |
if self.source: | |
self.log.debug( | |
"Cloning volatile %s from %s", | |
self.path, | |
self.source, | |
) | |
await self._clone_from(self.source) | |
else: | |
self.log.debug( | |
"Creating volatile %s empty", | |
self.path, | |
) | |
await self._create_empty() | |
elif not snap_on_start and save_on_stop: | |
# Private / persistent. Dataset already created. | |
self.log.debug("Dirtying up %s", self.path) | |
await self.pool.accessor.set_volume_dirty_async( | |
self.path, | |
True, | |
log=self.log, | |
) | |
elif snap_on_start and not save_on_stop: | |
# Root / ephemeral. Clone or create from source. | |
if self.source: | |
self.log.debug( | |
"Cloning ephemeral %s from %s", | |
self.path, | |
self.source, | |
) | |
await self._clone_from(self.source) | |
else: | |
self.log.debug( | |
"Creating ephemeral %s empty", | |
self.path, | |
) | |
await self._create_empty() | |
elif snap_on_start and save_on_stop: | |
assert 0, "snap_on_start && save_on_stop on %s" % self.path | |
await self.pool.accessor.set_volume_readonly_async( | |
self.path, | |
not self.rw, | |
log=self.log, | |
) | |
@qubes.storage.Volume.locked | |
async def stop(self) -> "ZFSVolume": | |
self.log.debug("Stopping volume %s", self.path) | |
snap_on_start, save_on_stop = (self.snap_on_start, self.save_on_stop) | |
if not snap_on_start and not save_on_stop: | |
# Volatile. | |
await self._remove_unlocked() | |
elif not snap_on_start and save_on_stop: | |
# Private / persistent. | |
if await self.pool.accessor.is_volume_dirty_async(self.path): | |
await self.pool.accessor.set_volume_dirty_async( | |
self.path, | |
False, | |
log=self.log, | |
) | |
# We snapshot in every case to allow for the ability | |
# to export / clone the volume. This volume | |
# is now clean and therefore cleanly exportable. | |
await self.pool.accessor.snapshot_volume_async( | |
"%s@%s" | |
% ( | |
self.path, | |
timestamp_to_revision(time.time()), | |
), | |
log=self.log, | |
) | |
await self._purge_old_revisions() | |
elif snap_on_start and not save_on_stop: | |
# Root / ephemeral. | |
pass | |
elif snap_on_start and save_on_stop: | |
assert 0, "snap_on_start && save_on_stop on %s" % self.path | |
return self | |
async def export(self) -> str: | |
"""Returns an object that can be `open()`.""" | |
self.log.debug("Start of export of %s", self.path) | |
snaps = self.revisions | |
assert snaps, "%s has no revisions" % self.path | |
latest_snap = list(sorted(snaps.items(), key=lambda m: m[1]))[-1][0] | |
exported = os.path.join( | |
self.pool.container, | |
EXPORTED, | |
self.vid.replace("/", "_"), | |
latest_snap, | |
) | |
await self.pool.accessor.clone_snapshot_to_volume_async( | |
"%s@%s" % (self.path, latest_snap), | |
exported, | |
log=self.log, | |
) | |
return os.path.join(ZVOL_DIR, exported) | |
async def export_end(self, exported_path: str) -> None: | |
"""Removes the previous export.""" | |
self.log.debug( | |
"End of export of %s to path %s", | |
self.path, | |
exported_path, | |
) | |
snapname = os.path.basename(exported_path) | |
exported = os.path.join( | |
self.pool.container, | |
EXPORTED, | |
self.vid.replace("/", "_"), | |
snapname, | |
) | |
await self.pool.accessor.remove_volume_async( | |
exported, | |
log=self.log, | |
) | |
def block_device(self): | |
"""Return :py:class:`qubes.storage.BlockDevice` for serialization in | |
the libvirt XML template as <disk>. | |
""" | |
return qubes.storage.BlockDevice( | |
os.path.join(ZVOL_DIR, self.path), | |
self.name, | |
None, | |
self.rw, | |
self.domain, | |
self.devtype, | |
) | |
@qubes.storage.Volume.locked | |
async def import_volume( | |
self, | |
src_volume: qubes.storage.Volume, | |
) -> "ZFSVolume": | |
self.log.debug( | |
"Importing volume %s from source %s", | |
self.path, | |
src_volume, | |
) | |
if not src_volume.save_on_stop: | |
return self | |
if self.is_dirty(): | |
raise qubes.storage.StoragePoolException( | |
"Cannot import to dirty volume {} -" | |
" start and stop a qube to cleanup".format(self.path) | |
) | |
self._cached_size = None | |
await self._clone_from(src_volume) | |
return self | |
def abort_if_import_in_progress(self): | |
if self.pool.accessor.volume_exists( | |
self.path + "-import-data", | |
log=self.log, | |
): | |
raise qubes.storage.StoragePoolException( | |
"Import operation in progress on {}".format(self.path) | |
) | |
@qubes.storage.Volume.locked | |
async def import_data(self, size) -> str: | |
"""Returns an object that can be `open()`.""" | |
self.log.debug("Importing data of size %s into %s", size, self.path) | |
if self.is_dirty(): | |
raise qubes.storage.StoragePoolException( | |
"Cannot import data to dirty volume {} -" | |
" stop the qube using it first".format(self.path) | |
) | |
self.abort_if_import_in_progress() # FIXME | |
await self._create_empty(size, suffix="-import-data") | |
return os.path.join(ZVOL_DIR, self.path + "-import-data") | |
@qubes.storage.Volume.locked | |
async def import_data_end(self, success): | |
"""Either commit imported data, or discard temporary volume""" | |
self.log.debug("End of importing data into %s", self.path) | |
if success: | |
await self._adopt(suffix="-import-data") | |
else: | |
await self.remove(suffix="-import-data") | |
def is_dirty(self) -> bool: | |
if self.save_on_stop: | |
return "on" == self.pool.accessor.is_volume_dirty( | |
self.path, | |
log=self.log, | |
) | |
return False | |
@qubes.storage.Volume.locked | |
async def resize(self, size): | |
""" | |
Expands volume. | |
Throws | |
:py:class:`qubst.storage.qubes.storage.StoragePoolException` if | |
given size is less than current_size. | |
""" | |
self.log.debug("Resizing %s to %s", self.path, size) | |
if size == self.size: | |
return | |
if size < self.size: | |
raise qubes.storage.StoragePoolException( | |
"Shrinking of ZFS volume %s is not possible" % (self.path,) | |
) | |
self._cached_size = None | |
await self.pool.accessor.resize_volume_async( | |
self.path, | |
size, | |
log=self.log, | |
) | |
def is_outdated(self): | |
self.log.debug("is_outdated %s", self.path) | |
if not self.snap_on_start: | |
return False | |
if not self.is_dirty(): | |
return False | |
if not self.source: | |
return False | |
assert isinstance(self.source, ZFSVolume) | |
source_revs = self.source.revisions.items() | |
s = list( | |
sorted( | |
source_revs, | |
key=lambda m: m[1], | |
) | |
) | |
assert source_revs | |
last_source_rev_isodate = s[-1][1] | |
this_volume_timestamp = self.pool.accessor.get_volume_creation( | |
self.path, | |
log=self.log, | |
) | |
this_isodate = qubes.storage.isodate(this_volume_timestamp) | |
return last_source_rev_isodate > this_isodate | |
@qubes.storage.Volume.locked | |
async def revert(self, revision: str = None): | |
self.log.debug("revert %s to %s", self.path, revision) | |
if self.is_dirty(): | |
raise qubes.storage.StoragePoolException( | |
"Cannot revert dirty volume {} -" | |
" stop the qube first".format( | |
self.path, | |
) | |
) | |
self.abort_if_import_in_progress() | |
snaps = self.revisions | |
assert snaps, "volume %s has no revisions" % (self.path,) | |
if revision is None: | |
snap = list(sorted(snaps.items(), key=lambda m: m[1]))[-1][0] | |
else: | |
snap = revision | |
await self.pool.accessor.rollback_to_snapshot_async( | |
self.path, | |
snap, | |
log=self.log, | |
) | |
return self | |
async def verify(self) -> bool: | |
"""Verifies the volume.""" | |
self.log.debug("verify %s", self.path) | |
if not self.snap_on_start and not self.save_on_stop: | |
return True | |
if await self.pool.accessor.volume_exists_async( | |
self.path, | |
log=self.log, | |
): | |
return True | |
raise qubes.storage.StoragePoolException( | |
"volume {} missing".format( | |
self.path, | |
) | |
) | |
@dataclasses.dataclass | |
class ZFSSnapshot: | |
name: str | |
timestamp: int | |
@dataclasses.dataclass | |
class ZFSVolumeStat: | |
vid: Vid | |
size: int | |
used: int | |
snapshots: List[ZFSSnapshot] = dataclasses.field(default_factory=list) | |
@dataclasses.dataclass | |
class ZFSVolumeCache: | |
cache: Dict[Vid, ZFSVolumeStat] = dataclasses.field(default_factory=dict) | |
pool_used: int = 0 | |
pool_available: int = 0 | |
time: float = 0 | |
class ZFSPoolConfig(TypedDict): | |
name: str | |
container: str | |
driver: str | |
revisions_to_keep: int | |
ephemeral_volatile: bool | |
class ZFSPool(qubes.storage.Pool): | |
"""ZFS thin storage for Qubes OS. | |
Volumes are stored as ZFS volumes, under a container dataset | |
specified by the *container* argument. Here is the general | |
naming scheme for the volumes: | |
{vm_name}/{volume_name} | |
On VM startup, the volume contents are modified, depending on | |
volume type, according to the table below: | |
snap_on_start, save_on_stop typical use | |
False False volatile | |
upon domain start: | |
the volume is recursively destroyed and recreated | |
to its specifications, or cloned from its source | |
upon domain stop: | |
the volume is removed completely | |
False True private / full persistence | |
upon domain start: | |
the volume is used as-is, but a revision snapshot | |
is created before starting the qube. | |
upon domain stop: | |
revision snapshots beyond revisions_to_keep (not | |
including the volume itself) are removed. | |
True False root / volatile | |
upon domain start: | |
the volume is recursively destroyed and recreated, | |
cloning it from the last committed state of the | |
corresponding source volume, and then applying | |
the volume's storage specifications (size) | |
upon domain stop: | |
the volume is kept, knowing the next start it | |
will be recreated; revision snapshots beyond | |
revisions_to_keep (not including the volume itself) | |
are removed. | |
True True unsupported | |
The format of the revision name is `qubes-{timestamp}`, | |
corresponding to a volume snapshot name of `@qubes-{timestamp}`, | |
where `timestamp` is in '%s' format (seconds since unix epoch), | |
""" # pylint: disable=protected-access | |
driver = "zfs" | |
def __init__( | |
self, | |
*, | |
name: str, | |
revisions_to_keep: int = 1, | |
container: str, | |
ephemeral_volatile: bool = False, | |
): | |
super().__init__( | |
name=name, | |
revisions_to_keep=revisions_to_keep, | |
ephemeral_volatile=ephemeral_volatile, | |
) | |
self.container = container | |
self._volume_objects_cache: Dict[Vid, ZFSVolume] = {} | |
self.log = logging.getLogger("%s.%s" % (__name__, self.name)) | |
self.accessor = ZFSAccessor(self.container) | |
def __repr__(self): | |
return "<{} at {:#x} name={!r} container={!r}>".format( | |
type(self).__name__, id(self), self.name, self.container | |
) | |
@property | |
def config(self) -> ZFSPoolConfig: | |
return ZFSPoolConfig( | |
{ | |
"name": self.name, | |
"container": self.container, | |
"driver": self.driver, | |
"revisions_to_keep": self.revisions_to_keep, | |
"ephemeral_volatile": self.ephemeral_volatile, | |
} | |
) | |
async def destroy(self): | |
""" | |
Destroy this pool. | |
In the current implementation we ignore this request. | |
A full implementation would simply zfs destroy recursively. | |
""" | |
pass # TODO Should we remove an existing pool? | |
def init_volume(self, vm, volume_config): | |
""" | |
Initialize a :py:class:`qubes.storage.Volume` from `volume_config`. | |
""" | |
c = volume_config | |
if "vid" not in c: | |
if vm and hasattr(vm, "name"): | |
vm_name = vm.name | |
else: | |
# for the future if we have volumes not belonging to a vm | |
vm_name = qubes.utils.random_string() | |
vid = Vid.make(self.container, vm_name, volume_config["name"]) | |
else: | |
vid = c["vid"] | |
revisions_to_keep = ( | |
self.revisions_to_keep | |
if "revisions_to_keep" not in c | |
else c["revisions_to_keep"] | |
) | |
volume = ZFSVolume( | |
c["name"], | |
self, | |
vid, | |
revisions_to_keep, | |
c.get("rw", False), | |
c.get("save_on_stop", False), | |
c.get("size", 0), | |
c.get("snap_on_Start", False), | |
c.get("source", None), | |
c.get("ephemeral", self.ephemeral_volatile), | |
) | |
self._volume_objects_cache[vid] = volume | |
return volume | |
async def __init_container(self): | |
try: | |
await qubes_zfs_coro("list", self.container) | |
except qubes.storage.StoragePoolException: | |
await qubes_zfs_coro( | |
"create", | |
"-o", | |
"mountpoint=none", | |
"-p", | |
self.container, | |
) | |
async def setup(self): | |
await self.__init_container() | |
def get_volume(self, vid: Vid): | |
"""Return a volume with given vid""" | |
if vid in self._volume_objects_cache: | |
return self._volume_objects_cache[vid] | |
# don't cache this object, as it doesn't carry full configuration | |
return ZFSVolume("unconfigured", self, vid) | |
def list_volumes(self) -> List[ZFSVolume]: | |
"""Return a list of volumes managed by this pool""" | |
return [v for v in self._volume_objects_cache.values()] | |
@property | |
def size(self) -> int: | |
""" | |
Return size in bytes of the pool | |
""" | |
return self.accessor.get_pool_size() | |
@property | |
def usage(self) -> int: | |
""" | |
Return usage of pool in percent (0-100). | |
Synchronously refreshes the cache just like the LVM driver does. | |
""" | |
usage = self.size - self.accessor.get_pool_available() | |
return int(usage / self.accessor.get_pool_size() * 100) | |
@property | |
def usage_details(self): | |
""" | |
Return usage details of pool. | |
Synchronously refreshes the cache. | |
""" | |
result = {} | |
result["data_size"] = self.size | |
result["data_usage"] = self.usage | |
# I am not sure what metadata_size or metadata_usage | |
# are supposed to mean in the context of ZFS. | |
# ZFS does not provide such numbers. | |
# I will let the reviewers check that out. | |
metadata_size = 0 | |
metadata_usage = 0 | |
result["metadata_size"] = metadata_size | |
result["metadata_usage"] = metadata_usage | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment