Created
November 6, 2023 19:14
-
-
Save socketpair/5432adadc9c7f50a01e5837a85cc6495 to your computer and use it in GitHub Desktop.
XFS snapshot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
import logging | |
import os | |
from contextlib import contextmanager | |
from errno import EINVAL | |
from fcntl import LOCK_EX, flock, ioctl | |
from os import O_CLOEXEC, O_DIRECTORY, O_RDONLY, close, open as os_open | |
from pathlib import Path | |
from shutil import rmtree | |
from signal import SIGKILL | |
from subprocess import CalledProcessError, Popen, TimeoutExpired, call, check_call, check_output | |
from tempfile import TemporaryDirectory | |
from time import monotonic, sleep | |
log = logging.getLogger(__name__) | |
FIFREEZE = 3221510263 | |
FITHAW = 3221510264 | |
@contextmanager | |
def _measure(operation: str): | |
start = monotonic() | |
# log.debug('Measuring "%s" operation.', operation) | |
try: | |
yield | |
finally: | |
log.debug('Operation "%s" completed in %2.2f seconds.', operation, monotonic() - start) | |
@contextmanager | |
def _frozen(path: Path): | |
fd = os_open(path, O_RDONLY | O_DIRECTORY | O_CLOEXEC) | |
def unfreeze() -> None: | |
log.debug('Unfreezing') | |
try: | |
ioctl(fd, FITHAW) | |
except OSError as err: | |
if err.errno != EINVAL: | |
raise | |
try: | |
log.debug('Freezing') | |
# KERNEL BUG:если здесь происходит сигнал (например, SIGINT), FS остается зависшей(!) | |
# вот почему разморозка всегда должна пытаться разморозить даже после неудачной заморозки. | |
ioctl(fd, FIFREEZE) | |
try: | |
yield unfreeze | |
finally: | |
unfreeze() | |
finally: | |
close(fd) | |
_RSYNC_ARGS = [ | |
'rsync', | |
'-a', | |
# '--checksum-choice=xxh128', # потому что '--only-write-batch' сбрасывается на SLOW MD5! | |
# Алгоритм случайного изменения контрольной суммы не работает. | |
'--inplace', | |
'--hard-links', | |
'--acls', | |
'--xattrs', | |
'--one-file-system', | |
'--delete', | |
'--numeric-ids', | |
'--preallocate', | |
'--trust-sender', | |
] | |
def _make_reflink_copy(source: Path, destination: Path) -> None: | |
log.debug('Removing snapshot-copy as %s', destination) | |
if destination.exists(): | |
with _measure('unlink destination'): | |
rmtree(destination) | |
destination.mkdir(parents=True) | |
for copy_attempt_count in range(1, 20): | |
# Исходный каталог активно меняется, возможен не нулевой код возврата | |
log.debug('Reflinking') | |
# -u - не делать reflink, если mtime то же самое/ | |
# high_prio - minimize race conditions on high disk load | |
with _measure('reflink copy'): | |
if not call(['cp', '-u', '-a', '--reflink=always', '--no-target-directory', '--one-file-system', source, destination]): | |
break | |
log.info('Reflink failed. Attempt: %d. Retrying', copy_attempt_count) | |
else: | |
log.warning('Reflink copy is not complete. High disk load ?') | |
def _atomic_freeze(source: Path, destination: Path, *, freeze_timeout: int, show_changes: bool) -> None: | |
with TemporaryDirectory() as tmpdir: | |
batch = Path(tmpdir) / 'batch' | |
with _frozen(source) as unfreeze: | |
with _measure('Rsync on frozen FS'): | |
log.debug('Running rsync on frozen FS to create batch') | |
with Popen( # pylint: disable=subprocess-popen-preexec-fn | |
_RSYNC_ARGS | |
+ [ | |
*(['--itemize-changes'] if show_changes else []), | |
'--only-write-batch', batch, | |
'--', | |
f'{source}/', | |
f'{destination}/', | |
], | |
start_new_session=True, | |
) as proc: | |
try: | |
deadline = monotonic() + freeze_timeout | |
while proc.returncode is None and monotonic() < deadline: | |
try: | |
# proc.wait() may be interrupted by SIGINT. | |
proc.wait(0.1) | |
except TimeoutExpired: | |
if not Path(f'/proc/{proc.pid}/fd/3').exists(): | |
continue | |
# Path().read_text() may raise ENOENT is process die unexpectedly (even successfully) | |
if 'xfs_free_eofblocks' not in Path(f'/proc/{proc.pid}/stack').read_text(): | |
continue | |
# [<0>] percpu_rwsem_wait+0x116/0x140 | |
# [<0>] xfs_trans_alloc+0x20c/0x220 [xfs] | |
# [<0>] xfs_free_eofblocks+0x83/0x120 [xfs] | |
# [<0>] xfs_release+0x143/0x180 [xfs] | |
# [<0>] __fput+0x8e/0x250 | |
# [<0>] task_work_run+0x5a/0x90 | |
# [<0>] exit_to_user_mode_prepare+0x1e6/0x1f0 | |
# [<0>] syscall_exit_to_user_mode+0x1b/0x40 | |
# [<0>] do_syscall_64+0x6b/0x90 | |
# [<0>] entry_SYSCALL_64_after_hwframe+0x72/0xdc | |
log.debug('XFS hang detected') | |
raise RuntimeError('Early DETECTED XFS HANG') from None | |
if proc.returncode is None: | |
log.debug('rsync timed out') | |
batch_size = batch.stat().st_size if batch.is_file() else 0 | |
raise RuntimeError(f'Rsync works too long (more than {freeze_timeout} sec). Batch size is {batch_size}, Aborting.') | |
log.debug('rsync finished with code %d.', proc.returncode) | |
except: # noqa. see code of original check_call | |
log.debug('Killing rsync') | |
# Сначала прибиваем процесс, и только потом расфризиваем. | |
# Если сначала сделать анфриз, то процесс может уже завершиться успехом ДО отправки KILL. | |
# Если к моменту прибития рсинк както магически развис и завершился успехом, | |
# то наше прибитие не сделает ничего ибо процесс уже умер. НО НЕ ЗАВЕЙТИЛСЯ. Поэтому ENOSRCH не будет. | |
os.killpg(proc.pid, SIGKILL) | |
unfreeze() # обязательно ДО .wait() который будет в Popen.__exit__() | |
raise | |
log.debug('rsync finally waited') | |
log.debug('Unfrozen') | |
assert proc.returncode is not None | |
if proc.returncode != 0: | |
log.debug('rsync has failed') | |
raise CalledProcessError(proc.returncode, proc.args) | |
log.debug('Rsync success. Applying batch of size: %2.2f MB', batch.stat().st_size / 1_000_000) | |
with _measure('apply patch'): | |
check_call( | |
_RSYNC_ARGS | |
+ [ | |
'--read-batch', batch, | |
'--', | |
f'{destination}/', | |
], | |
) | |
log.debug('Patch applied') | |
def _atomic_freeze_wrapper(source: Path, destination: Path, *, freeze_timeout: int, freeze_attempts: int, show_changes: bool) -> None: | |
for attempt in range(1, freeze_attempts + 1): | |
try: | |
_make_reflink_copy(source, destination) | |
_atomic_freeze(source, destination, freeze_timeout=freeze_timeout, show_changes=show_changes) | |
return | |
except Exception as err: | |
log.debug('Freeze copy failure. Attempt: %s. Error: %s', attempt, err) | |
log.debug('Sleeping for %d secs...', freeze_timeout) | |
sleep(freeze_timeout) # give system time to recover after long freeze | |
raise RuntimeError('Failed to create atomic snapshot using FSFREEZE.') | |
_SNAP_LV_NAME = 'atomic_fs_copy' | |
_SNAP_LV_TAG = 'atomic_fs_copy' | |
def _atomic_lvsnap(source: Path, destination: Path, *, show_changes: bool) -> None: | |
match json.loads(check_output(['findmnt', '-o', 'maj:min,fstype,target', '--nofsroot', '--json', '--target', source])): | |
case {'filesystems': [{'maj:min': str() as device_number, 'fstype': str() as fs_type, 'target': str() as root_mount}]}: | |
dev_maj, dev_min = device_number.split(':') | |
case _: | |
raise ValueError('Failed to parse findmnt result') | |
# Actually, should work on any FS. | |
# if fs_type != 'xfs': | |
# raise RuntimeError(f'Filesystem, type is not XFS: {fs_type}. Something went wrong.') | |
match json.loads(check_output([ | |
'lvs', | |
'--select', f'lv_kernel_major={dev_maj} && lv_kernel_minor={dev_min}', | |
'-o', 'vg_name,lv_name', | |
'--reportformat=json', | |
])): | |
case {'report': [{'lv': [{'vg_name': str() as vg_name, 'lv_name': str() as src_lv_name}]}]}: | |
pass | |
case _: | |
raise ValueError('Failed to parse lvs result') | |
lvm_snap_mount_dir = Path('/run', _SNAP_LV_NAME) | |
if lvm_snap_mount_dir.is_mount(): | |
log.warning('Snapshot was mounted. unmounting.') | |
check_call(['umount', lvm_snap_mount_dir]) | |
if lvm_snap_mount_dir.exists(): | |
rmtree(lvm_snap_mount_dir) | |
lvm_snap_mount_dir.mkdir() | |
# Will not exit with error if there are no such LVMs. | |
log.debug('Removing temporary LVMs if any.') | |
check_call(['lvremove', '--autobackup', 'n', '-y', '--select', f'lv_tags={_SNAP_LV_TAG}']) | |
check_call([ | |
'lvcreate', | |
'--snapshot', | |
'--addtag', _SNAP_LV_TAG, | |
'--extents', '100%FREE', | |
'--name', _SNAP_LV_NAME, | |
'--autobackup', 'n', | |
f'{vg_name}/{src_lv_name}', | |
]) | |
try: | |
snapshot_blockdev = Path(f'/dev/mapper/{vg_name}-{_SNAP_LV_NAME}') | |
log.debug('mounting snapshot %s to %s', snapshot_blockdev, lvm_snap_mount_dir) | |
with _measure('snapshot mounting'): | |
check_call(['mount', '-t', fs_type, '-o', 'ro,nouuid,norecovery', snapshot_blockdev, lvm_snap_mount_dir]) | |
try: | |
src_snap_dir = lvm_snap_mount_dir / source.relative_to(root_mount) | |
if not src_snap_dir.exists(): | |
raise RuntimeError('No same src dir on LVM snap FS. Should never happen.') | |
log.debug('Calling rsync %s -> %s', src_snap_dir, destination) | |
with _measure('rsync from snapshot'): | |
check_call( | |
_RSYNC_ARGS | |
+ [ | |
*(['--itemize-changes'] if show_changes else []), | |
'--', | |
f'{src_snap_dir}/', # Закрывающий / в rsync для исходного каталога важен. | |
f'{destination}/', | |
], | |
) | |
finally: | |
log.debug('unmounting') | |
check_call(['umount', lvm_snap_mount_dir]) | |
finally: | |
log.debug('lvremove snapshot') | |
check_call(['lvremove', '--autobackup', 'n', '-y', f'{vg_name}/{_SNAP_LV_NAME}']) | |
def _prepare() -> argparse.Namespace: | |
parser = argparse.ArgumentParser(description="Atomic Copy folder") | |
parser.add_argument('--debug', action='store_true', help='Enable debug mode.') | |
parser.add_argument("--method", type=str, choices=['freeze', 'lvmsnap', 'hybrid'], help="Type of the operation") | |
parser.add_argument( | |
"--freeze-timeout", | |
type=int, | |
help="Maximal time under FS freeze in one iteration. Ror 'freeze' or 'hybrid' methods", | |
# ICS-30307 Максимальное время заморозки fs 5 секунд. При изменении учесть _CONNECTION_LOST_DEADLINE. | |
default=5, | |
metavar='SECONDS', | |
) | |
parser.add_argument( | |
"--freeze-attempts", | |
type=int, | |
help="Max attempts to create snapshot. For 'freeze' or 'hybrid' methods", | |
default=5, | |
metavar='NUMBER', | |
) | |
parser.add_argument('--show-changes', action='store_true', help='Show changes while rsync is working.') | |
parser.add_argument("source", type=Path, help="Source directory path") | |
parser.add_argument("destination", type=Path, help="Destination directory path") | |
args = parser.parse_args() | |
return args | |
def main() -> None: | |
args = _prepare() | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO) | |
logging.raiseExceptions = False | |
logging.captureWarnings(True) | |
flock(os.open(__file__, O_RDONLY | O_CLOEXEC), LOCK_EX) | |
try: | |
args.source = args.source.resolve() | |
args.destination = args.destination.resolve() | |
if not args.source.is_dir(): | |
raise ValueError(f'Source path {args.source} does not exist or is not a dir.') | |
if args.source.is_relative_to(args.destination): | |
raise ValueError('Impossible combination of dirs') | |
if args.destination.is_relative_to(args.source): | |
raise ValueError('Impossible combination of dirs') | |
# actually does not work between upperdir and overlayfs. Same fsid reported... | |
if os.statvfs(args.source).f_fsid != os.statvfs(args.destination.parent).f_fsid: | |
raise ValueError('Source and destination are on different FS.') | |
# Python does not provide f_type (!) | |
# https://stackoverflow.com/questions/48319246/how-can-i-determine-filesystem-type-name-with-linux-api-for-c | |
# os.statvfs(args.source).f_type | |
# so we can not check that fs is XFS. | |
if args.method != 'lvmsnap': | |
log.info('Using fast FSFREEZE method.') | |
try: | |
_atomic_freeze_wrapper( | |
args.source, | |
args.destination, | |
freeze_timeout=args.freeze_timeout, | |
freeze_attempts=args.freeze_attempts, | |
show_changes=args.show_changes, | |
) | |
return | |
except Exception as exc: | |
if args.method == 'freeze': | |
raise | |
log.warning('Fast FSFREEZE method failed: %s', exc) | |
log.info('Using slower LVM snap method.') | |
_make_reflink_copy(args.source, args.destination) | |
_atomic_lvsnap(args.source, args.destination, show_changes=args.show_changes) | |
except Exception: | |
if args.destination.exists(): | |
with _measure('destination remove after lvm mount'): | |
args.destination.rmtree() | |
raise | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment