Skip to content

Instantly share code, notes, and snippets.

@levysantanna
Forked from ervwalter/migrate.py
Created February 5, 2025 17:36
Show Gist options
  • Save levysantanna/27b65ce53325857cb7ee0adb3e5cd271 to your computer and use it in GitHub Desktop.
Save levysantanna/27b65ce53325857cb7ee0adb3e5cd271 to your computer and use it in GitHub Desktop.
Migrate files in cephfs to a new file layout pool recursivly
import os
import shutil
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
import xattr
from pathlib import Path
start_directory = '.' # current directory
scratch_directory = '.scratch'
max_parallel_threads = 4
def has_ceph_pool_attr(file_path, pool_value):
""" Check if the file has the specified ceph pool attribute value using xattr. """
try:
attributes = xattr.xattr(file_path)
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8')
return ceph_pool == pool_value
except (IOError, KeyError):
# IOError for inaccessible files, KeyError if the attribute does not exist
return False
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock):
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """
try:
if has_ceph_pool_attr(file_path, ceph_pool_value):
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}")
return
logging.info(f"Processing file: {file_path}")
# after replacing file, parent folder atime and mtime are modified
# keep them to replace them
parent_path = Path(file_path).parent.absolute()
parent_stat_info = os.stat(parent_path, follow_symlinks=False)
parent_mtime = parent_stat_info.st_mtime
parent_atime = parent_stat_info.st_atime
# Generate a unique identifier and append it to the filename
unique_suffix = uuid.uuid4().hex
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}"
scratch_file_path = os.path.join(scratch_dir, scratch_file_name)
stat_info = os.stat(file_path, follow_symlinks=False)
inode = stat_info.st_ino
nlink = stat_info.st_nlink
if nlink > 1 or inode in hard_links:
with lock:
if inode in hard_links:
os.remove(file_path)
os.link(hard_links[inode], file_path)
logging.info(f"Hard link recreated for file: {file_path}")
return
else:
logging.info(f"Hard link added to list for file: {file_path}")
hard_links[inode] = file_path
if os.path.islink(file_path):
link_target = os.readlink(file_path)
os.unlink(file_path)
os.symlink(link_target, file_path)
os.lchown(file_path, uid, gid)
else:
shutil.copy2(file_path, scratch_file_path)
shutil.copystat(file_path, scratch_file_path)
os.remove(file_path)
shutil.move(scratch_file_path, file_path)
os.chown(file_path, uid, gid)
# update parent atime and mtime
os.utime(parent_path, (parent_atime, parent_mtime))
except Exception as e:
logging.error(f"Error processing {file_path}: {e}")
def handler(future):
future.result()
return
def process_files(start_dir, scratch_dir, ceph_pool_value):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
if not os.path.exists(scratch_dir):
os.makedirs(scratch_dir)
hard_links = {}
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor:
for root, dirs, files in os.walk(start_dir):
dirs.sort()
files.sort()
for file in files:
file_path = os.path.join(root, file)
if scratch_dir in file_path:
continue
stat_info = os.stat(file_path, follow_symlinks=False)
uid = stat_info.st_uid
gid = stat_info.st_gid
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock)
future.add_done_callback(handler)
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <ceph_pool_value>")
sys.exit(1)
ceph_pool_value = sys.argv[1]
process_files(start_directory, scratch_directory, ceph_pool_value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment