Skip to content

Instantly share code, notes, and snippets.

@galeksandrp
Forked from marcan/cephfs_transcoder.py
Last active July 13, 2025 07:49
Show Gist options
  • Save galeksandrp/c43083d16cb83ecbd89b5e6d8204d610 to your computer and use it in GitHub Desktop.
Save galeksandrp/c43083d16cb83ecbd89b5e6d8204d610 to your computer and use it in GitHub Desktop.
Transcoder (layout migrator) for CephFS
#!/usr/bin/env python3
# CephFS pool/layout migration tool ("transcoder")
#
# Loosely inspired by:
# https://git.sr.ht/~pjjw/cephfs-layout-tool/tree/master/item/cephfs_layout_tool/migrate_pools.py
# https://gist.github.com/ervwalter/5ff6632c930c27a1eb6b07c986d7439b
#
# MIT license (https://opensource.org/license/mit)
import os, stat, time, signal, shutil, logging, sys
from concurrent.futures import ThreadPoolExecutor
import threading, uuid, argparse
replace_lock = threading.Lock()
do_exit = threading.Event()
thread_count = None
class CephLayout:
def __init__(self, layout):
vals = {}
for s in layout.split():
k, v = s.split("=", 1)
vals[k] = v
self.stripe_unit = int(vals["stripe_unit"])
self.stripe_count = int(vals["stripe_count"])
self.object_size = int(vals["object_size"])
self.pool = vals["pool"]
self.layout = layout
@classmethod
def from_dir(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.dir.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
@classmethod
def from_file(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.file.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
def apply_file(self, path):
os.setxattr(path, "ceph.file.layout", self.layout.encode("utf-8"), follow_symlinks=False)
def __str__(self):
return self.layout
def __eq__(self, other):
return self.layout == other.layout
def diff(self, other):
diff = []
for i in ("stripe_unit", "stripe_count", "object_size", "pool"):
a = getattr(self, i)
b = getattr(other, i)
if a != b:
diff.append(f"{i}=[{a} -> {b}]")
return " ".join(diff)
def process_file(args, filepaths, st, layout, file_layout):
if do_exit.is_set():
return
tmp_file = os.path.join(args.tmpdir, uuid.uuid4().hex)
if len(filepaths) == 1:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes]: {file_layout.diff(layout)}")
else:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes] (+ {len(filepaths) - 1} hardlink(s)): {file_layout.diff(layout)} [{tmp_file}]")
with open(tmp_file, "wb") as ofd:
layout.apply_file(tmp_file)
with open(filepaths[0], "rb") as ifd:
shutil.copyfileobj(ifd, ofd, layout.object_size)
shutil.copystat(filepaths[0], tmp_file)
os.chown(tmp_file, st.st_uid, st.st_gid)
if args.dry_run or do_exit.is_set():
os.unlink(tmp_file)
return
with replace_lock:
try:
signal.pthread_sigmask(signal.SIG_BLOCK,[signal.SIGINT])
st2 = os.stat(filepaths[0], follow_symlinks=False)
if st2.st_mtime != st.st_mtime:
logging.error(f"Failed to replace {filepaths[0]} (+ {len(filepaths) - 1} hardlink(s)): Source file changed")
os.unlink(tmp_file)
return
for i, path in enumerate(filepaths):
parent_path = os.path.split(path)[0]
parent_st = os.stat(parent_path, follow_symlinks=False)
if i == 0:
logging.info(f"Renaming {tmp_file} -> {path}")
os.rename(tmp_file, path)
else:
logging.info(f"Linking {filepaths[0]} -> {path}")
os.link(filepaths[0], tmp_file, follow_symlinks=False)
os.rename(tmp_file, path)
os.utime(parent_path, ns=(parent_st.st_atime_ns, parent_st.st_mtime_ns), follow_symlinks=False)
finally:
signal.pthread_sigmask(signal.SIG_UNBLOCK,[signal.SIGINT])
def handler(future):
future.result()
thread_count.release()
def process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts):
for dirpath, dirnames, filenames in os.walk(start_dir, True):
if do_exit.is_set():
return
if dirpath in mountpoints:
logging.warning(f"Skipping {dirpath}: path is a mountpoint")
del dirnames[:]
continue
if dirpath == args.tmpdir:
logging.info(f"Skipping {dirpath}: path is the temporary dir")
del dirnames[:]
continue
layout = dir_layouts.get(dirpath, None)
if layout is None:
layout = CephLayout.from_dir(dirpath)
if layout is None:
layout = dir_layouts[os.path.split(dirpath)[0]]
dirnames.sort()
filenames.sort()
logging.debug(f"Scanning {dirpath} ({layout}): {len(dirnames)} dirs and {len(filenames)} files")
dir_layouts[dirpath] = layout
def submit(filepaths, st, file_layout):
if do_exit.is_set():
return
thread_count.acquire()
future = executor.submit(process_file, args, filepaths, st, layout, file_layout)
future.add_done_callback(handler)
for filename in filenames:
if do_exit.is_set():
return
filepath = os.path.join(dirpath, filename)
st = os.stat(filepath, follow_symlinks=False)
if not stat.S_ISREG(st.st_mode):
logging.debug(f"Skipping {filepath}: not a regular file")
continue
if st.st_mtime > (time.time() - 86400 * args.min_age):
logging.info(f"Skipping {filepath}: modified too recently")
continue
file_layout = CephLayout.from_file(filepath)
assert file_layout is not None
if file_layout == layout:
continue
if st.st_nlink == 1:
submit([filepath], st, file_layout)
else:
file_id = (st.st_dev, st.st_ino)
if file_id not in hard_links:
hard_links[file_id] = ([filepath], [layout])
else:
hard_links[file_id][0].append(filepath)
hard_links[file_id][1].append(layout)
if len(hard_links[file_id][0]) == st.st_nlink:
filepaths = hard_links[file_id][0]
layouts = hard_links[file_id][1]
del hard_links[file_id]
if not all(i == layouts[0] for i in layouts[1:]):
logging.error("Hardlinked file has inconsistent directory layouts:")
for fp, ly in zip(filepaths, layouts):
logging.error(f" [{ly}]: {fp}")
else:
submit(filepaths, st, file_layout)
else:
logging.info(f"Deferring {filepath} due to hardlinks ({st.st_nlink - len(hard_links[file_id][0])} link(s) left)")
def process_files(args):
args.tmpdir = os.path.abspath(args.tmpdir)
if not os.path.exists(args.tmpdir):
os.makedirs(args.tmpdir)
hard_links = {}
dir_layouts = {}
lock = threading.Lock()
mountpoints = set()
for line in open("/proc/self/mounts", "r"):
mountpoints.add(line.split()[1])
with ThreadPoolExecutor(max_workers=args.threads) as executor:
for start_dir in args.dirs:
start_dir = os.path.abspath(start_dir)
if start_dir in mountpoints:
mountpoints.remove(start_dir)
layout = CephLayout.from_dir(start_dir)
parent = start_dir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
assert layout
dir_layouts[start_dir] = layout
logging.info(f"Starting at {start_dir} ({layout})")
process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts)
if do_exit.is_set():
break
if hard_links and not do_exit.is_set():
logging.warning(f"Some hard links could not be located. Refusing to transcode these inodes:")
for file_id, v in hard_links.items():
dev, inode = file_id
st = os.stat(v[0][0], follow_symlinks=False)
logging.warning(f" Inode {dev}:{inode} ({len(v[0])}/{st.st_nlink} links):")
for path in v[0]:
logging.warning(f" - {path}")
def main():
global thread_count
parser = argparse.ArgumentParser(
description="Transcode cephfs files to their directory layout"
)
parser.add_argument("dirs", help="Directories to scan", nargs="+")
parser.add_argument("--tmpdir", default="/data/tmp", help="Temporary directory to copy files to. Important: This directory should have its layout set to the *default* data pool for the FS, to avoid excess backtrace objects.")
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--min-age", "-m", default=1, type=int, help="Minimum age of file before transcoding, in days")
parser.add_argument("--threads", "-t", default=4, type=int, help="Number of threads for data copying")
parser.add_argument("--dry-run", "-n", action="store_true", help="Perform transcode but do not replace files")
args = parser.parse_args()
thread_count = threading.BoundedSemaphore(args.threads)
layout = CephLayout.from_dir(args.tmpdir)
parent = args.tmpdir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning(f"Temporary directory is {args.tmpdir} with pool {layout.pool}. "
f"This should be the *default* data pool for the FS (NOT the target pool for your files). "
f"If it is not, abort this script with ^C and configure it with `setfattr -n ceph.dir.layout.pool -v default_data_pool_name {args.tmpdir}`.")
def signal_handler(sig, frame):
logging.error("SIGINT received, exiting cleanly...")
do_exit.set()
signal.signal(signal.SIGINT, signal_handler)
process_files(args)
if __name__ == "__main__":
main()
@galeksandrp
Copy link
Author

galeksandrp commented Jul 13, 2025

SOURCE: https://gist.github.com/ervwalter/5ff6632c930c27a1eb6b07c986d7439b?permalink_comment_id=5602194#gistcomment-5602194
SOURCE: https://www.reddit.com/r/ceph/comments/1l1ey87/cephfs_layoutpool_migration_script/

Hi all, just wanted to share a little tool I wrote (inspired by a couple others linked in the header). It's a script to automatically migrate CephFS files to the layout/pool they should have in their given directory. This is useful if you move files between directories with different layouts configured, or if you change the layout on an existing directory. The script will migrate files if any of the layout parameters differ (pool, striping config, or object size). It should be run as root.

Usage is simply:

python3 cephfs_transcoder.py --tmpdir <path to temporary directory> <starting directory 1> <starting directory 2>...

tmpdir just needs to be some temporary path in the same CephFS filesystem (that should have its layout explicitly or implicitly set to the default/primary data pool for technical reasons, the script takes care of creating files with the intended layout/pool).

By default it will copy data with 4 threads. The script can be safely interrupted with ^C at any time (may take some time to complete as it finishes copying data for any in-progress files, although it discards them). See --help for a couple extra options.

Compared to previous scripts for doing this it has these improvements:

  • Automatically migrates to the "intended" layout for any given directory, including object size (no need to specify intended layout)
  • Can handle any number of target layouts without requiring a pre-configured scratch directory (it sets the layout for every new file independently)
  • Ignores mountpoints and the scratch dir itself more correctly
  • Correctly ignores symlinks and special files
  • Does not have race conditions updating the parent dir
  • Handles hard links more safely, by only doing the conversion and re-linking once all links are found, so it never breaks links if interrupted (or not all links are found). Also avoid race conditions recreating links (links are never inaccessible)
  • Skips files that are recently modified
  • Keeps atime/mtime for parent dirs after the conversion
  • Clean exit if interrupted (without leaving garbage files or breaking any links)
  • Dry run mode
  • Avoids scanning/queuing of files running "ahead" of the copying (=avoids unbounded memory usage & issues with interruption)
  • Supports multiple target paths

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment