Skip to content

Instantly share code, notes, and snippets.

@marcan
Last active January 29, 2026 16:54
Show Gist options
  • Select an option

  • Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.

Select an option

Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.
Transcoder (layout migrator) for CephFS
# CephFS pool/layout migration tool ("transcoder")
#
# Loosely inspired by:
# https://git.sr.ht/~pjjw/cephfs-layout-tool/tree/master/item/cephfs_layout_tool/migrate_pools.py
# https://gist.github.com/ervwalter/5ff6632c930c27a1eb6b07c986d7439b
#
# MIT license (https://opensource.org/license/mit)
import os, stat, time, signal, shutil, logging, sys
from concurrent.futures import ThreadPoolExecutor
import threading, uuid, argparse
replace_lock = threading.Lock()
do_exit = threading.Event()
thread_count = None
class CephLayout:
def __init__(self, layout):
vals = {}
for s in layout.split():
k, v = s.split("=", 1)
vals[k] = v
self.stripe_unit = int(vals["stripe_unit"])
self.stripe_count = int(vals["stripe_count"])
self.object_size = int(vals["object_size"])
self.pool = vals["pool"]
self.layout = layout
@classmethod
def from_dir(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.dir.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
@classmethod
def from_file(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.file.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
def apply_file(self, path):
os.setxattr(path, "ceph.file.layout", self.layout.encode("utf-8"), follow_symlinks=False)
def __str__(self):
return self.layout
def __eq__(self, other):
return self.layout == other.layout
def diff(self, other):
diff = []
for i in ("stripe_unit", "stripe_count", "object_size", "pool"):
a = getattr(self, i)
b = getattr(other, i)
if a != b:
diff.append(f"{i}=[{a} -> {b}]")
return " ".join(diff)
def process_file(args, filepaths, st, layout, file_layout):
if do_exit.is_set():
return
tmp_file = os.path.join(args.tmpdir, uuid.uuid4().hex)
if len(filepaths) == 1:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes]: {file_layout.diff(layout)}")
else:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes] (+ {len(filepaths) - 1} hardlink(s)): {file_layout.diff(layout)} [{tmp_file}]")
with open(tmp_file, "wb") as ofd:
layout.apply_file(tmp_file)
with open(filepaths[0], "rb") as ifd:
shutil.copyfileobj(ifd, ofd, layout.object_size)
shutil.copystat(filepaths[0], tmp_file)
os.chown(tmp_file, st.st_uid, st.st_gid)
if args.dry_run or do_exit.is_set():
os.unlink(tmp_file)
return
with replace_lock:
try:
signal.pthread_sigmask(signal.SIG_BLOCK,[signal.SIGINT])
st2 = os.stat(filepaths[0], follow_symlinks=False)
if st2.st_mtime != st.st_mtime:
logging.error(f"Failed to replace {filepaths[0]} (+ {len(filepaths) - 1} hardlink(s)): Source file changed")
os.unlink(tmp_file)
return
for i, path in enumerate(filepaths):
parent_path = os.path.split(path)[0]
parent_st = os.stat(parent_path, follow_symlinks=False)
if i == 0:
logging.info(f"Renaming {tmp_file} -> {path}")
os.rename(tmp_file, path)
else:
logging.info(f"Linking {filepaths[0]} -> {path}")
os.link(filepaths[0], tmp_file, follow_symlinks=False)
os.rename(tmp_file, path)
os.utime(parent_path, ns=(parent_st.st_atime_ns, parent_st.st_mtime_ns), follow_symlinks=False)
finally:
signal.pthread_sigmask(signal.SIG_UNBLOCK,[signal.SIGINT])
def handler(future):
future.result()
thread_count.release()
def process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts):
for dirpath, dirnames, filenames in os.walk(start_dir, True):
if do_exit.is_set():
return
if dirpath in mountpoints:
logging.warning(f"Skipping {dirpath}: path is a mountpoint")
del dirnames[:]
continue
if dirpath == args.tmpdir:
logging.info(f"Skipping {dirpath}: path is the temporary dir")
del dirnames[:]
continue
layout = dir_layouts.get(dirpath, None)
if layout is None:
layout = CephLayout.from_dir(dirpath)
if layout is None:
layout = dir_layouts[os.path.split(dirpath)[0]]
dirnames.sort()
filenames.sort()
logging.debug(f"Scanning {dirpath} ({layout}): {len(dirnames)} dirs and {len(filenames)} files")
dir_layouts[dirpath] = layout
def submit(filepaths, st, file_layout):
if do_exit.is_set():
return
thread_count.acquire()
future = executor.submit(process_file, args, filepaths, st, layout, file_layout)
future.add_done_callback(handler)
for filename in filenames:
if do_exit.is_set():
return
filepath = os.path.join(dirpath, filename)
st = os.stat(filepath, follow_symlinks=False)
if not stat.S_ISREG(st.st_mode):
logging.debug(f"Skipping {filepath}: not a regular file")
continue
if st.st_mtime > (time.time() - 86400 * args.min_age):
logging.info(f"Skipping {filepath}: modified too recently")
continue
file_layout = CephLayout.from_file(filepath)
assert file_layout is not None
if file_layout == layout:
continue
if st.st_nlink == 1:
submit([filepath], st, file_layout)
else:
file_id = (st.st_dev, st.st_ino)
if file_id not in hard_links:
hard_links[file_id] = ([filepath], [layout])
else:
hard_links[file_id][0].append(filepath)
hard_links[file_id][1].append(layout)
if len(hard_links[file_id][0]) == st.st_nlink:
filepaths = hard_links[file_id][0]
layouts = hard_links[file_id][1]
del hard_links[file_id]
if not all(i == layouts[0] for i in layouts[1:]):
logging.error("Hardlinked file has inconsistent directory layouts:")
for fp, ly in zip(filepaths, layouts):
logging.error(f" [{ly}]: {fp}")
else:
submit(filepaths, st, file_layout)
else:
logging.info(f"Deferring {filepath} due to hardlinks ({st.st_nlink - len(hard_links[file_id][0])} link(s) left)")
def process_files(args):
args.tmpdir = os.path.abspath(args.tmpdir)
if not os.path.exists(args.tmpdir):
os.makedirs(args.tmpdir)
hard_links = {}
dir_layouts = {}
lock = threading.Lock()
mountpoints = set()
for line in open("/proc/self/mounts", "r"):
mountpoints.add(line.split()[1])
with ThreadPoolExecutor(max_workers=args.threads) as executor:
for start_dir in args.dirs:
start_dir = os.path.abspath(start_dir)
if start_dir in mountpoints:
mountpoints.remove(start_dir)
layout = CephLayout.from_dir(start_dir)
parent = start_dir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
assert layout
dir_layouts[start_dir] = layout
logging.info(f"Starting at {start_dir} ({layout})")
process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts)
if do_exit.is_set():
break
if hard_links and not do_exit.is_set():
logging.warning(f"Some hard links could not be located. Refusing to transcode these inodes:")
for file_id, v in hard_links.items():
dev, inode = file_id
st = os.stat(v[0][0], follow_symlinks=False)
logging.warning(f" Inode {dev}:{inode} ({len(v[0])}/{st.st_nlink} links):")
for path in v[0]:
logging.warning(f" - {path}")
def main():
global thread_count
parser = argparse.ArgumentParser(
description="Transcode cephfs files to their directory layout"
)
parser.add_argument("dirs", help="Directories to scan", nargs="+")
parser.add_argument("--tmpdir", default="/data/tmp", help="Temporary directory to copy files to. Important: This directory should have its layout set to the *default* data pool for the FS, to avoid excess backtrace objects.")
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--min-age", "-m", default=1, type=int, help="Minimum age of file before transcoding, in days")
parser.add_argument("--threads", "-t", default=4, type=int, help="Number of threads for data copying")
parser.add_argument("--dry-run", "-n", action="store_true", help="Perform transcode but do not replace files")
args = parser.parse_args()
thread_count = threading.BoundedSemaphore(args.threads)
layout = CephLayout.from_dir(args.tmpdir)
parent = args.tmpdir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning(f"Temporary directory is {args.tmpdir} with pool {layout.pool}. "
f"This should be the *default* data pool for the FS (NOT the target pool for your files). "
f"If it is not, abort this script with ^C and configure it with `setfattr -n ceph.dir.layout.pool -v default_data_pool_name {args.tmpdir}`.")
def signal_handler(sig, frame):
logging.error("SIGINT received, exiting cleanly...")
do_exit.set()
signal.signal(signal.SIGINT, signal_handler)
process_files(args)
if __name__ == "__main__":
main()
@inDane
Copy link

inDane commented Aug 25, 2025

@marcan
Thanks for this script! I've got one question though:
Does this delete the old objects/files from the primary pool?
i.e. transcoding from Pool A (primary) to Pool B (as set in the xattr). Does Pool A get eventually smaller?

I am asking this, because I do not see my primary Pool A shrink, but Pool B is expanding.

@marcan
Copy link
Author

marcan commented Aug 26, 2025

CephFS always has a head object in the primary pool for every file. If your files are smaller than the object size, then in general, the object count in the primary pool will remain unchanged. However, since the head objects are zero-sized, the data usage of the primary pool will shrink, as those objects only have metadata. If the files are on average larger than the object size, then you will see a reduction to one object per file.

If you are not seeing the data usage of the primary pool shrink, perhaps you have snapshots?

@inDane
Copy link

inDane commented Aug 26, 2025

thanks for your answer, turns out, i was just too impatient. Pool A is slowly getting reduced! 👍

@inDane
Copy link

inDane commented Jan 29, 2026

hey, me again..

do you have an idea whats happening here?

# Running cepfs_transcoder.py finishes instantly.
root@HOST:~# python3 ~/cephfs_transcoder.py --tmpdir /my/dir/tmp_dir3 -n /my/dir/projects/archiving/TNJump_hybrid-vs-shortreadcomp.tar.gz
2026-01-29 16:45:13,193 - WARNING - Temporary directory is /my/dir/tmp_dir3 with pool cephfs.cephfs_01.data. This should be the *default* data pool for the FS (NOT the target pool for your files). If it is not, abort this script with ^C and configure it with `setfattr -n ceph.dir.layout.pool -v default_data_pool_name /my/dir/tmp_dir3`.
2026-01-29 16:45:13,200 - INFO - Starting at /my/dir/projects/archiving/TNJump_hybrid-vs-shortreadcomp.tar.gz (stripe_unit=4194304 stripe_count=1 object_size=4194304 pool=cephfs.cephfs_01.data.ec_01)

# File is still on default pool after running transcoder.py
root@HOST:~# getfattr -n ceph.file.layout.json --only-values ./TNJump_hybrid-vs-shortreadcomp.tar.gz 
{"stripe_unit": 4194304, "stripe_count": 1, "object_size": 4194304, "pool_name": "cephfs.cephfs_01.data", "pool_id": 13, "pool_namespace": "", "inheritance": "@default"}

# TEMP dir is default data 3xreplica pool.
root@HOST:~# getfattr -n ceph.dir.layout /my/dir/tmp_dir3
getfattr: Removing leading '/' from absolute path names
# file: /my/dir/tmp_dir3
ceph.dir.layout="stripe_unit=4194304 stripe_count=1 object_size=4194304 pool=cephfs.cephfs_01.data"

# parent dir is ec_01-datapool.
root@HOST:~# getfattr -n ceph.dir.layout.json --only-values ./
{"stripe_unit": 4194304, "stripe_count": 1, "object_size": 4194304, "pool_name": "cephfs.cephfs_01.data.ec_01", "pool_id": 18, "pool_namespace": "", "inheritance": "@inherited"}

copying the file with "cp" creates a file that resides on the ec_01 data pool... so the inheritance should work, but there is something up here...

EDIT: They inherit from '(at)default'. That is probably the problem... but how can i change that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment