Skip to content

Instantly share code, notes, and snippets.

@marcan
Last active June 4, 2025 06:47
Show Gist options
  • Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.
Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.
Transcoder (layout migrator) for CephFS
# CephFS pool/layout migration tool ("transcoder")
#
# Loosely inspired by:
# https://git.sr.ht/~pjjw/cephfs-layout-tool/tree/master/item/cephfs_layout_tool/migrate_pools.py
# https://gist.github.com/ervwalter/5ff6632c930c27a1eb6b07c986d7439b
#
# MIT license (https://opensource.org/license/mit)
import os, stat, time, signal, shutil, logging, sys
from concurrent.futures import ThreadPoolExecutor
import threading, uuid, argparse
replace_lock = threading.Lock()
do_exit = threading.Event()
thread_count = None
class CephLayout:
def __init__(self, layout):
vals = {}
for s in layout.split():
k, v = s.split("=", 1)
vals[k] = v
self.stripe_unit = int(vals["stripe_unit"])
self.stripe_count = int(vals["stripe_count"])
self.object_size = int(vals["object_size"])
self.pool = vals["pool"]
self.layout = layout
@classmethod
def from_dir(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.dir.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
@classmethod
def from_file(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.file.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
def apply_file(self, path):
os.setxattr(path, "ceph.file.layout", self.layout.encode("utf-8"), follow_symlinks=False)
def __str__(self):
return self.layout
def __eq__(self, other):
return self.layout == other.layout
def diff(self, other):
diff = []
for i in ("stripe_unit", "stripe_count", "object_size", "pool"):
a = getattr(self, i)
b = getattr(other, i)
if a != b:
diff.append(f"{i}=[{a} -> {b}]")
return " ".join(diff)
def process_file(args, filepaths, st, layout, file_layout):
if do_exit.is_set():
return
tmp_file = os.path.join(args.tmpdir, uuid.uuid4().hex)
if len(filepaths) == 1:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes]: {file_layout.diff(layout)}")
else:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes] (+ {len(filepaths) - 1} hardlink(s)): {file_layout.diff(layout)} [{tmp_file}]")
with open(tmp_file, "wb") as ofd:
layout.apply_file(tmp_file)
with open(filepaths[0], "rb") as ifd:
shutil.copyfileobj(ifd, ofd, layout.object_size)
shutil.copystat(filepaths[0], tmp_file)
os.chown(tmp_file, st.st_uid, st.st_gid)
if args.dry_run or do_exit.is_set():
os.unlink(tmp_file)
return
with replace_lock:
try:
signal.pthread_sigmask(signal.SIG_BLOCK,[signal.SIGINT])
st2 = os.stat(filepaths[0], follow_symlinks=False)
if st2.st_mtime != st.st_mtime:
logging.error(f"Failed to replace {filepaths[0]} (+ {len(filepaths) - 1} hardlink(s)): Source file changed")
os.unlink(tmp_file)
return
for i, path in enumerate(filepaths):
parent_path = os.path.split(path)[0]
parent_st = os.stat(parent_path, follow_symlinks=False)
if i == 0:
logging.info(f"Renaming {tmp_file} -> {path}")
os.rename(tmp_file, path)
else:
logging.info(f"Linking {filepaths[0]} -> {path}")
os.link(filepaths[0], tmp_file, follow_symlinks=False)
os.rename(tmp_file, path)
os.utime(parent_path, ns=(parent_st.st_atime_ns, parent_st.st_mtime_ns), follow_symlinks=False)
finally:
signal.pthread_sigmask(signal.SIG_UNBLOCK,[signal.SIGINT])
def handler(future):
future.result()
thread_count.release()
def process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts):
for dirpath, dirnames, filenames in os.walk(start_dir, True):
if do_exit.is_set():
return
if dirpath in mountpoints:
logging.warning(f"Skipping {dirpath}: path is a mountpoint")
del dirnames[:]
continue
if dirpath == args.tmpdir:
logging.info(f"Skipping {dirpath}: path is the temporary dir")
del dirnames[:]
continue
layout = dir_layouts.get(dirpath, None)
if layout is None:
layout = CephLayout.from_dir(dirpath)
if layout is None:
layout = dir_layouts[os.path.split(dirpath)[0]]
dirnames.sort()
filenames.sort()
logging.debug(f"Scanning {dirpath} ({layout}): {len(dirnames)} dirs and {len(filenames)} files")
dir_layouts[dirpath] = layout
def submit(filepaths, st, file_layout):
if do_exit.is_set():
return
thread_count.acquire()
future = executor.submit(process_file, args, filepaths, st, layout, file_layout)
future.add_done_callback(handler)
for filename in filenames:
if do_exit.is_set():
return
filepath = os.path.join(dirpath, filename)
st = os.stat(filepath, follow_symlinks=False)
if not stat.S_ISREG(st.st_mode):
logging.debug(f"Skipping {filepath}: not a regular file")
continue
if st.st_mtime > (time.time() - 86400 * args.min_age):
logging.info(f"Skipping {filepath}: modified too recently")
continue
file_layout = CephLayout.from_file(filepath)
assert file_layout is not None
if file_layout == layout:
continue
if st.st_nlink == 1:
submit([filepath], st, file_layout)
else:
file_id = (st.st_dev, st.st_ino)
if file_id not in hard_links:
hard_links[file_id] = ([filepath], [layout])
else:
hard_links[file_id][0].append(filepath)
hard_links[file_id][1].append(layout)
if len(hard_links[file_id][0]) == st.st_nlink:
filepaths = hard_links[file_id][0]
layouts = hard_links[file_id][1]
del hard_links[file_id]
if not all(i == layouts[0] for i in layouts[1:]):
logging.error("Hardlinked file has inconsistent directory layouts:")
for fp, ly in zip(filepaths, layouts):
logging.error(f" [{ly}]: {fp}")
else:
submit(filepaths, st, file_layout)
else:
logging.info(f"Deferring {filepath} due to hardlinks ({st.st_nlink - len(hard_links[file_id][0])} link(s) left)")
def process_files(args):
args.tmpdir = os.path.abspath(args.tmpdir)
if not os.path.exists(args.tmpdir):
os.makedirs(args.tmpdir)
hard_links = {}
dir_layouts = {}
lock = threading.Lock()
mountpoints = set()
for line in open("/proc/self/mounts", "r"):
mountpoints.add(line.split()[1])
with ThreadPoolExecutor(max_workers=args.threads) as executor:
for start_dir in args.dirs:
start_dir = os.path.abspath(start_dir)
if start_dir in mountpoints:
mountpoints.remove(start_dir)
layout = CephLayout.from_dir(start_dir)
parent = start_dir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
assert layout
dir_layouts[start_dir] = layout
logging.info(f"Starting at {start_dir} ({layout})")
process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts)
if do_exit.is_set():
break
if hard_links and not do_exit.is_set():
logging.warning(f"Some hard links could not be located. Refusing to transcode these inodes:")
for file_id, v in hard_links.items():
dev, inode = file_id
st = os.stat(v[0][0], follow_symlinks=False)
logging.warning(f" Inode {dev}:{inode} ({len(v[0])}/{st.st_nlink} links):")
for path in v[0]:
logging.warning(f" - {path}")
def main():
global thread_count
parser = argparse.ArgumentParser(
description="Transcode cephfs files to their directory layout"
)
parser.add_argument("dirs", help="Directories to scan", nargs="+")
parser.add_argument("--tmpdir", default="/data/tmp", help="Temporary directory to copy files to. Important: This directory should have its layout set to the *default* data pool for the FS, to avoid excess backtrace objects.")
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--min-age", "-m", default=1, type=int, help="Minimum age of file before transcoding, in days")
parser.add_argument("--threads", "-t", default=4, type=int, help="Number of threads for data copying")
parser.add_argument("--dry-run", "-n", action="store_true", help="Perform transcode but do not replace files")
args = parser.parse_args()
thread_count = threading.BoundedSemaphore(args.threads)
layout = CephLayout.from_dir(args.tmpdir)
parent = args.tmpdir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning(f"Temporary directory is {args.tmpdir} with pool {layout.pool}. "
f"This should be the *default* data pool for the FS (NOT the target pool for your files). "
f"If it is not, abort this script with ^C and configure it with `setfattr -n ceph.dir.layout.pool -v default_data_pool_name {args.tmpdir}`.")
def signal_handler(sig, frame):
logging.error("SIGINT received, exiting cleanly...")
do_exit.set()
signal.signal(signal.SIGINT, signal_handler)
process_files(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment