Last active
June 4, 2025 06:47
-
-
Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.
Transcoder (layout migrator) for CephFS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CephFS pool/layout migration tool ("transcoder") | |
# | |
# Loosely inspired by: | |
# https://git.sr.ht/~pjjw/cephfs-layout-tool/tree/master/item/cephfs_layout_tool/migrate_pools.py | |
# https://gist.github.com/ervwalter/5ff6632c930c27a1eb6b07c986d7439b | |
# | |
# MIT license (https://opensource.org/license/mit) | |
import os, stat, time, signal, shutil, logging, sys | |
from concurrent.futures import ThreadPoolExecutor | |
import threading, uuid, argparse | |
replace_lock = threading.Lock() | |
do_exit = threading.Event() | |
thread_count = None | |
class CephLayout: | |
def __init__(self, layout): | |
vals = {} | |
for s in layout.split(): | |
k, v = s.split("=", 1) | |
vals[k] = v | |
self.stripe_unit = int(vals["stripe_unit"]) | |
self.stripe_count = int(vals["stripe_count"]) | |
self.object_size = int(vals["object_size"]) | |
self.pool = vals["pool"] | |
self.layout = layout | |
@classmethod | |
def from_dir(self, path): | |
try: | |
return CephLayout(os.getxattr(path, "ceph.dir.layout", follow_symlinks=False).decode("utf-8")) | |
except OSError as e: | |
assert e.errno == 61 | |
return None | |
@classmethod | |
def from_file(self, path): | |
try: | |
return CephLayout(os.getxattr(path, "ceph.file.layout", follow_symlinks=False).decode("utf-8")) | |
except OSError as e: | |
assert e.errno == 61 | |
return None | |
def apply_file(self, path): | |
os.setxattr(path, "ceph.file.layout", self.layout.encode("utf-8"), follow_symlinks=False) | |
def __str__(self): | |
return self.layout | |
def __eq__(self, other): | |
return self.layout == other.layout | |
def diff(self, other): | |
diff = [] | |
for i in ("stripe_unit", "stripe_count", "object_size", "pool"): | |
a = getattr(self, i) | |
b = getattr(other, i) | |
if a != b: | |
diff.append(f"{i}=[{a} -> {b}]") | |
return " ".join(diff) | |
def process_file(args, filepaths, st, layout, file_layout): | |
if do_exit.is_set(): | |
return | |
tmp_file = os.path.join(args.tmpdir, uuid.uuid4().hex) | |
if len(filepaths) == 1: | |
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes]: {file_layout.diff(layout)}") | |
else: | |
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes] (+ {len(filepaths) - 1} hardlink(s)): {file_layout.diff(layout)} [{tmp_file}]") | |
with open(tmp_file, "wb") as ofd: | |
layout.apply_file(tmp_file) | |
with open(filepaths[0], "rb") as ifd: | |
shutil.copyfileobj(ifd, ofd, layout.object_size) | |
shutil.copystat(filepaths[0], tmp_file) | |
os.chown(tmp_file, st.st_uid, st.st_gid) | |
if args.dry_run or do_exit.is_set(): | |
os.unlink(tmp_file) | |
return | |
with replace_lock: | |
try: | |
signal.pthread_sigmask(signal.SIG_BLOCK,[signal.SIGINT]) | |
st2 = os.stat(filepaths[0], follow_symlinks=False) | |
if st2.st_mtime != st.st_mtime: | |
logging.error(f"Failed to replace {filepaths[0]} (+ {len(filepaths) - 1} hardlink(s)): Source file changed") | |
os.unlink(tmp_file) | |
return | |
for i, path in enumerate(filepaths): | |
parent_path = os.path.split(path)[0] | |
parent_st = os.stat(parent_path, follow_symlinks=False) | |
if i == 0: | |
logging.info(f"Renaming {tmp_file} -> {path}") | |
os.rename(tmp_file, path) | |
else: | |
logging.info(f"Linking {filepaths[0]} -> {path}") | |
os.link(filepaths[0], tmp_file, follow_symlinks=False) | |
os.rename(tmp_file, path) | |
os.utime(parent_path, ns=(parent_st.st_atime_ns, parent_st.st_mtime_ns), follow_symlinks=False) | |
finally: | |
signal.pthread_sigmask(signal.SIG_UNBLOCK,[signal.SIGINT]) | |
def handler(future): | |
future.result() | |
thread_count.release() | |
def process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts): | |
for dirpath, dirnames, filenames in os.walk(start_dir, True): | |
if do_exit.is_set(): | |
return | |
if dirpath in mountpoints: | |
logging.warning(f"Skipping {dirpath}: path is a mountpoint") | |
del dirnames[:] | |
continue | |
if dirpath == args.tmpdir: | |
logging.info(f"Skipping {dirpath}: path is the temporary dir") | |
del dirnames[:] | |
continue | |
layout = dir_layouts.get(dirpath, None) | |
if layout is None: | |
layout = CephLayout.from_dir(dirpath) | |
if layout is None: | |
layout = dir_layouts[os.path.split(dirpath)[0]] | |
dirnames.sort() | |
filenames.sort() | |
logging.debug(f"Scanning {dirpath} ({layout}): {len(dirnames)} dirs and {len(filenames)} files") | |
dir_layouts[dirpath] = layout | |
def submit(filepaths, st, file_layout): | |
if do_exit.is_set(): | |
return | |
thread_count.acquire() | |
future = executor.submit(process_file, args, filepaths, st, layout, file_layout) | |
future.add_done_callback(handler) | |
for filename in filenames: | |
if do_exit.is_set(): | |
return | |
filepath = os.path.join(dirpath, filename) | |
st = os.stat(filepath, follow_symlinks=False) | |
if not stat.S_ISREG(st.st_mode): | |
logging.debug(f"Skipping {filepath}: not a regular file") | |
continue | |
if st.st_mtime > (time.time() - 86400 * args.min_age): | |
logging.info(f"Skipping {filepath}: modified too recently") | |
continue | |
file_layout = CephLayout.from_file(filepath) | |
assert file_layout is not None | |
if file_layout == layout: | |
continue | |
if st.st_nlink == 1: | |
submit([filepath], st, file_layout) | |
else: | |
file_id = (st.st_dev, st.st_ino) | |
if file_id not in hard_links: | |
hard_links[file_id] = ([filepath], [layout]) | |
else: | |
hard_links[file_id][0].append(filepath) | |
hard_links[file_id][1].append(layout) | |
if len(hard_links[file_id][0]) == st.st_nlink: | |
filepaths = hard_links[file_id][0] | |
layouts = hard_links[file_id][1] | |
del hard_links[file_id] | |
if not all(i == layouts[0] for i in layouts[1:]): | |
logging.error("Hardlinked file has inconsistent directory layouts:") | |
for fp, ly in zip(filepaths, layouts): | |
logging.error(f" [{ly}]: {fp}") | |
else: | |
submit(filepaths, st, file_layout) | |
else: | |
logging.info(f"Deferring {filepath} due to hardlinks ({st.st_nlink - len(hard_links[file_id][0])} link(s) left)") | |
def process_files(args): | |
args.tmpdir = os.path.abspath(args.tmpdir) | |
if not os.path.exists(args.tmpdir): | |
os.makedirs(args.tmpdir) | |
hard_links = {} | |
dir_layouts = {} | |
lock = threading.Lock() | |
mountpoints = set() | |
for line in open("/proc/self/mounts", "r"): | |
mountpoints.add(line.split()[1]) | |
with ThreadPoolExecutor(max_workers=args.threads) as executor: | |
for start_dir in args.dirs: | |
start_dir = os.path.abspath(start_dir) | |
if start_dir in mountpoints: | |
mountpoints.remove(start_dir) | |
layout = CephLayout.from_dir(start_dir) | |
parent = start_dir | |
while layout is None and parent != "/": | |
parent = os.path.split(parent)[0] | |
layout = CephLayout.from_dir(parent) | |
assert layout | |
dir_layouts[start_dir] = layout | |
logging.info(f"Starting at {start_dir} ({layout})") | |
process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts) | |
if do_exit.is_set(): | |
break | |
if hard_links and not do_exit.is_set(): | |
logging.warning(f"Some hard links could not be located. Refusing to transcode these inodes:") | |
for file_id, v in hard_links.items(): | |
dev, inode = file_id | |
st = os.stat(v[0][0], follow_symlinks=False) | |
logging.warning(f" Inode {dev}:{inode} ({len(v[0])}/{st.st_nlink} links):") | |
for path in v[0]: | |
logging.warning(f" - {path}") | |
def main(): | |
global thread_count | |
parser = argparse.ArgumentParser( | |
description="Transcode cephfs files to their directory layout" | |
) | |
parser.add_argument("dirs", help="Directories to scan", nargs="+") | |
parser.add_argument("--tmpdir", default="/data/tmp", help="Temporary directory to copy files to. Important: This directory should have its layout set to the *default* data pool for the FS, to avoid excess backtrace objects.") | |
parser.add_argument("--debug", "-d", action="store_true") | |
parser.add_argument("--min-age", "-m", default=1, type=int, help="Minimum age of file before transcoding, in days") | |
parser.add_argument("--threads", "-t", default=4, type=int, help="Number of threads for data copying") | |
parser.add_argument("--dry-run", "-n", action="store_true", help="Perform transcode but do not replace files") | |
args = parser.parse_args() | |
thread_count = threading.BoundedSemaphore(args.threads) | |
layout = CephLayout.from_dir(args.tmpdir) | |
parent = args.tmpdir | |
while layout is None and parent != "/": | |
parent = os.path.split(parent)[0] | |
layout = CephLayout.from_dir(parent) | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
else: | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logging.warning(f"Temporary directory is {args.tmpdir} with pool {layout.pool}. " | |
f"This should be the *default* data pool for the FS (NOT the target pool for your files). " | |
f"If it is not, abort this script with ^C and configure it with `setfattr -n ceph.dir.layout.pool -v default_data_pool_name {args.tmpdir}`.") | |
def signal_handler(sig, frame): | |
logging.error("SIGINT received, exiting cleanly...") | |
do_exit.set() | |
signal.signal(signal.SIGINT, signal_handler) | |
process_files(args) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment