Last active
February 8, 2024 21:11
-
-
Save hvanmegen/0b15adcf67b32507e0c339a763e84f51 to your computer and use it in GitHub Desktop.
cleanup script for ceph
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import sys | |
import time | |
import argparse | |
import shutil | |
import math | |
try: | |
from tqdm import tqdm | |
except ImportError: | |
print("tqdm module not found. Please install it using 'pip install tqdm'") | |
sys.exit(1) | |
MAX_ERRORS = 5 | |
MAX_WAIT_TIME = 10 | |
NUM_DELETED = 0 | |
NOT_DELETED = 0 | |
SCRIPT_START_TIME = time.time() | |
NUM_ITEMS = 0 | |
def numerize(n): | |
if isinstance(n, int): | |
return '{:,}'.format(n) | |
elif isinstance(n, float): | |
return '{:,.2f}'.format(n) | |
else: | |
return str(n) | |
def get_terminal_width(): | |
terminal_size = shutil.get_terminal_size(fallback=(80, 20)) | |
return terminal_size.columns | |
def truncate_path(path: str, max_length: int = 80, filler: str = "...") -> str: | |
if len(path) <= max_length: | |
return path | |
fn = path.split("/")[-1] | |
while len(path) > max_length: | |
parts = path.lstrip("/").split("/")[:-1] | |
if len(parts) <= 1: | |
return f"/.../{fn}" | |
m = math.floor(len(parts) / 2)-1 | |
if parts[m] == filler: | |
del parts[m] | |
elif parts[m+1] == filler: | |
del parts[m+1] | |
parts[m] = filler | |
path = "/" + "/".join(parts) + f"/{fn}" | |
return path | |
def sort_by_depth(paths): | |
return sorted(paths, key=lambda x: (os.path.dirname(x).count(os.sep), x), reverse=False) | |
def retrieve_objects(path, limit, max_length): | |
objects = [] | |
progress_bar = tqdm(total=limit, disable=not VERBOSE, smoothing=0, leave=True, ascii=False, mininterval=0.2, maxinterval=2, bar_format="Indexing {l_bar} {bar} {r_bar}") | |
def _recursive_scan(cur_path): | |
nonlocal objects, progress_bar, limit | |
if len(objects) >= limit: | |
progress_bar.update(limit-len(objects)) | |
return | |
if VERBOSE: | |
truncated_cur_path = truncate_path(cur_path, max_length) | |
progress_bar.display(msg="Current path: \033[1m" + truncated_cur_path + "\033[0m", pos=1) | |
for entry in os.scandir(cur_path): | |
if len(objects) >= limit: | |
break | |
full_path = entry.path | |
objects.append(full_path) | |
progress_bar.update(1) | |
if entry.is_dir(follow_symlinks=False): | |
_recursive_scan(full_path) | |
if entry.is_symlink() and not entry.is_file() and not entry.is_dir(): | |
objects.append(full_path) | |
progress_bar.update(1) | |
_recursive_scan(path) | |
if VERBOSE: | |
progress_bar.display(msg=None, pos=1) | |
progress_bar.close() | |
return sort_by_depth(objects) | |
def remove_object(item): | |
try: | |
if os.access(item, os.W_OK): | |
try: | |
if os.path.isfile(item) or os.path.islink(item): | |
os.remove(item) | |
elif os.path.isdir(item): | |
os.rmdir(item) | |
return True | |
except Exception as e: | |
return False | |
else: | |
return False | |
except Exception as e: | |
return False | |
def main(args, repeat=False): | |
def deletion_action(args): | |
global NUM_DELETED | |
global NUM_ITEMS | |
global NOT_DELETED | |
global VERBOSE | |
dir_path, num_items, verbose = args.directory, args.number, args.verbose | |
VERBOSE = verbose | |
NUM_ITEMS = num_items | |
NOT_DELETED = 0 | |
items = [] | |
error_count = 0 | |
error_list = [] | |
wait_time = 2 | |
firstfiles = os.listdir(dir_path) | |
if len(firstfiles) == 0: | |
print("Nothing to delete!") | |
exit(0) | |
print(f"Scanning \033[1m{dir_path}\033[0m and compiling list of \033[1m{numerize(num_items)}\033[0m items to delete:") | |
max_path_length = get_terminal_width() - 20 # Assuming you want to leave some padding | |
items = retrieve_objects(dir_path, num_items, max_path_length) | |
len_items = len(items) | |
if len_items < num_items: | |
print(f"Location \033[1m{dir_path}\033[0m only contains \033[1m{numerize(len_items)}\033[0m items available for deletion; object limit adjusted.") | |
num_items = len_items | |
print(f"List of \033[1m{numerize(num_items)}\033[0m items compiled. Proceeding with deletion:") | |
with tqdm(total=num_items, disable=not verbose, smoothing=0, leave=True, ascii=False, mininterval=0.2, maxinterval=2, bar_format="Deleting {l_bar} {bar} {r_bar}") as progress_bar: | |
start_time = time.time() | |
for item in items: | |
try: | |
deletion_start = time.time() | |
if verbose: | |
truncated_item = truncate_path(item, max_path_length) | |
progress_bar.display(msg="Deleting: \033[1m" + truncated_item + "\033[0m", pos=1) | |
if remove_object(item): | |
NUM_DELETED += 1 | |
else: | |
NOT_DELETED += 1 | |
deletion_time = time.time() - deletion_start | |
if deletion_time > wait_time: | |
wait_time *= 2 if wait_time < MAX_WAIT_TIME // 2 else MAX_WAIT_TIME | |
time.sleep(wait_time) | |
if wait_time >= MAX_WAIT_TIME: | |
print(f"File deletion exceeded {MAX_WAIT_TIME}s. Exiting.") | |
sys.exit(1) | |
progress_bar.update(1) | |
except Exception as e: | |
error_count += 1 | |
if error_count > MAX_ERRORS: | |
timeused = round(time.time() - start_time, 2) | |
print(f"Error: {e}. Took {timeused} {'second' if (timeused == 1) else 'seconds'}.") | |
sys.exit(1) | |
time.sleep(wait_time * (2 ** (error_count - 1))) | |
if verbose: | |
progress_bar.refresh() | |
if verbose: | |
progress_bar.display(msg=None, pos=1) | |
if (NOT_DELETED >= 1): | |
if verbose: | |
print(f"Could not delete \033[1m{NOT_DELETED}\033[0m {'item' if (NOT_DELETED == 1) else 'items'}:") | |
for item in error_list: | |
item_type = "(dir)" if os.path.isdir(item) else "(file)" | |
print(f"- {item_type} {item}") | |
del error_list | |
NOT_DELETED = 0 | |
print(f"Deleted \033[1m{numerize(num_items)}\033[0m items in \033[1m{time.time() - SCRIPT_START_TIME:.2f}\033[0m seconds at \033[1m{num_items / (time.time() - SCRIPT_START_TIME):.2f}\033[0m it/s.\n") | |
while True: | |
deletion_action(args) | |
if not repeat: | |
break | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="This script deletes limited number of files and directories from a given directory. It accepts a directory path, number of items to delete, and an optional verbose flag (-v) for detailed output. To keep repeating the deletion order, use --repeat (-r). (Henry van Megen <[email protected]>)") | |
parser.add_argument("directory") | |
parser.add_argument("number", type=int) | |
parser.add_argument("-v", "--verbose", action="store_true") | |
parser.add_argument("-r", "--repeat", action="store_true") | |
args = parser.parse_args() | |
try: | |
main(args, repeat=args.repeat) | |
except KeyboardInterrupt: | |
print("Process interrupted by user (CTRL-C). Exiting.") | |
if NUM_DELETED >= 1: | |
print(f"\nDeleted \033[1m{numerize(NUM_DELETED)}\033[0m items in \033[1m{time.time() - SCRIPT_START_TIME:.2f}\033[0m seconds at \033[1m{NUM_DELETED / (time.time() - SCRIPT_START_TIME):.2f}\033[0m it/s.\n") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment