Last active
September 19, 2024 21:11
-
-
Save pirhoo/f919f6f2dd19ba0766ddfacc2b0a16ce to your computer and use it in GitHub Desktop.
Python script that will help you clean up duplicate folders with names like foldername, foldername (1), foldername (2), etc. The script scans the specified directory, groups folders with the same base name, and presents a summary of operations before asking you to confirm actions for each group.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import re | |
import argparse | |
import shutil | |
import logging | |
from logging import handlers | |
from datetime import datetime | |
from pathlib import Path | |
import sqlite3 | |
import sys | |
import json | |
from collections import defaultdict | |
class JsonFormatter(logging.Formatter): | |
"""Custom logging formatter to output JSON-formatted logs.""" | |
def format(self, record): | |
log_record = { | |
"timestamp": self.formatTime(record, self.datefmt), | |
"level": record.levelname, | |
"message": record.msg, # Expected to be a dict | |
"function": record.funcName, | |
"line": record.lineno, | |
} | |
if record.exc_info: | |
log_record["exception"] = self.formatException(record.exc_info) | |
return json.dumps(log_record) | |
def setup_logging(): | |
"""Sets up JSON logging to a file with the current timestamp.""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
log_filename = f"clean_duplicate_folders.{timestamp}.log" | |
handler = handlers.RotatingFileHandler(log_filename, maxBytes=10485760, backupCount=5) | |
handler.setFormatter(JsonFormatter()) | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
logger.addHandler(handler) | |
def parse_arguments(): | |
parser = argparse.ArgumentParser(description='Clean duplicate sibling folders with "(n)" suffixes.') | |
parser.add_argument('directory', nargs='?', default='.', help='Directory to start from (default: current directory)') | |
parser.add_argument('-r', '--recursive', action='store_true', help='Recursively process subdirectories') | |
parser.add_argument('-n', '--dry-run', action='store_true', help='Perform a dry run (do not make any changes)') | |
parser.add_argument('-l', '--level', type=int, default=None, help='Maximum depth level for recursion (default: unlimited)') | |
parser.add_argument('-c', '--default-choice', type=int, choices=[1, 2, 3], | |
help='Default choice to apply to all groups (1: delete duplicates, 2: merge and delete duplicates, 3: skip)') | |
args = parser.parse_args() | |
return args | |
def log_configuration(args): | |
"""Logs the configuration used to run the script.""" | |
config = { | |
"action": "configuration", | |
"directory": args.directory, | |
"recursive": args.recursive, | |
"dry_run": args.dry_run, | |
"level": args.level, | |
"default_choice": args.default_choice, | |
} | |
logging.info(config) | |
def initialize_database(db_path='filesystem_index.db'): | |
"""Initializes the SQLite database and creates tables if they don't exist.""" | |
conn = sqlite3.connect(db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS directories ( | |
id INTEGER PRIMARY KEY, | |
path TEXT UNIQUE, | |
parent_path TEXT, | |
mtime REAL | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS files ( | |
id INTEGER PRIMARY KEY, | |
directory_path TEXT, | |
name TEXT, | |
size INTEGER, | |
mtime REAL | |
) | |
''') | |
conn.commit() | |
return conn | |
def prompt_use_existing_index(): | |
"""Prompts the user to decide whether to use the existing index.""" | |
while True: | |
choice = input("An index file was found. Do you want to use the existing index? (Y/n): ").strip().lower() | |
if choice in {'y', 'yes', ''}: | |
return True | |
elif choice in {'n', 'no'}: | |
return False | |
print("Invalid input. Please enter 'Y' or 'n'.") | |
def collect_directories(conn, directory, recursive, level=None): | |
"""Scans the filesystem and populates the database with directory and file information.""" | |
dir_count = 0 # Initialize the directory counter | |
file_count = 0 # Initialize the file counter | |
cursor = conn.cursor() | |
cursor.execute('DELETE FROM directories') | |
cursor.execute('DELETE FROM files') | |
conn.commit() | |
def scan_dir(current_dir, current_level): | |
nonlocal dir_count, file_count | |
try: | |
with os.scandir(current_dir) as entries: | |
dir_entries = [] | |
file_entries = [] | |
for entry in entries: | |
full_path = Path(entry.path) | |
if entry.is_dir(follow_symlinks=False): | |
mtime = entry.stat(follow_symlinks=False).st_mtime | |
dir_entries.append((str(full_path), str(current_dir), mtime)) | |
dir_count += 1 | |
# Update live output | |
formatted_dir_count = f"{dir_count:,}" | |
formatted_file_count = f"{file_count:,}" | |
print(f"Scanning directories: {formatted_dir_count}, files: {formatted_file_count}", end='\r', flush=True) | |
if recursive and (level is None or current_level < level): | |
scan_dir(full_path, current_level + 1) | |
elif entry.is_file(follow_symlinks=False): | |
mtime = entry.stat(follow_symlinks=False).st_mtime | |
size = entry.stat(follow_symlinks=False).st_size | |
file_entries.append((str(current_dir), entry.name, size, mtime)) | |
file_count += 1 | |
# Update live output | |
formatted_dir_count = f"{dir_count:,}" | |
formatted_file_count = f"{file_count:,}" | |
print(f"Scanning directories: {formatted_dir_count}, files: {formatted_file_count}", end='\r', flush=True) | |
# Insert directory and file entries in batches | |
if dir_entries: | |
cursor.executemany('INSERT OR IGNORE INTO directories (path, parent_path, mtime) VALUES (?, ?, ?)', dir_entries) | |
if file_entries: | |
cursor.executemany('INSERT OR IGNORE INTO files (directory_path, name, size, mtime) VALUES (?, ?, ?, ?)', file_entries) | |
conn.commit() | |
except PermissionError as e: | |
print(f"Permission denied: {current_dir}") | |
logging.error({ | |
"action": "scan_error", | |
"directory": str(current_dir), | |
"error": str(e) | |
}) | |
scan_dir(Path(directory), 1) | |
print() # Move to the next line after scanning is complete | |
# Log the total number of directories and files scanned | |
logging.info({ | |
"action": "scanning_complete", | |
"total_directories_scanned": dir_count, | |
"total_files_scanned": file_count | |
}) | |
def load_directories_from_index(conn): | |
"""Loads directory paths from the database.""" | |
cursor = conn.cursor() | |
cursor.execute('SELECT path FROM directories') | |
directories = [Path(row[0]) for row in cursor.fetchall()] | |
return directories | |
def group_directories(conn): | |
"""Groups duplicate sibling directories based on their base names and parent directories.""" | |
cursor = conn.cursor() | |
pattern = re.compile(r'^(.*?)(?: \((\d+)\))?$') | |
cursor.execute('SELECT path, parent_path FROM directories') | |
groups = defaultdict(list) | |
for path_str, parent_path_str in cursor.fetchall(): | |
dir_path = Path(path_str) | |
parent_dir = Path(parent_path_str) | |
dir_name = dir_path.name | |
match = pattern.match(dir_name) | |
if match: | |
base_name = match.group(1) | |
group_key = (str(parent_dir), base_name) | |
groups[group_key].append(dir_path) | |
# Only keep groups with more than one directory | |
duplicate_groups = {k: v for k, v in groups.items() if len(v) > 1} | |
return duplicate_groups | |
def get_directory_size(conn, dir_path): | |
"""Calculates the total size and number of files in a directory using the database.""" | |
cursor = conn.cursor() | |
cursor.execute('SELECT size FROM files WHERE directory_path = ?', (str(dir_path),)) | |
sizes = cursor.fetchall() | |
total_size = sum(size[0] for size in sizes) | |
num_files = len(sizes) | |
return total_size, num_files | |
def summarize_group(group_key, dir_paths, conn): | |
parent_dir, base_name = group_key | |
print(f"\nFound duplicate directories in '{parent_dir}': '{base_name}'") | |
for dir_path in sorted(dir_paths): | |
size, num_files = get_directory_size(conn, dir_path) | |
formatted_size = f"{size:,}" | |
formatted_num_files = f"{num_files:,}" | |
print(f" - {dir_path.name} : {formatted_num_files} files, {formatted_size} bytes") | |
def prompt_user_action(default_choice=None): | |
if default_choice: | |
print(f"\nApplying default choice: {default_choice}") | |
return str(default_choice) | |
print("\nSelect an action:") | |
print("1) Delete duplicate folders (keep only the base folder)") | |
print("2) Merge contents into base folder, then delete duplicates") | |
print("3) Skip (do nothing)") | |
while True: | |
choice = input("Enter your choice (1/2/3): ").strip() | |
if choice in {'1', '2', '3'}: | |
return choice | |
else: | |
print("Invalid input. Please enter 1, 2, or 3.") | |
def delete_duplicates(base_dir, duplicate_dirs, dry_run): | |
for dup_dir in duplicate_dirs: | |
if dry_run: | |
print(f"Dry run: would delete {dup_dir}") | |
logging.info({ | |
"action": "delete", | |
"status": "dry_run", | |
"directory": str(dup_dir) | |
}) | |
else: | |
try: | |
print(f"Deleting {dup_dir}") | |
shutil.rmtree(dup_dir) | |
logging.info({ | |
"action": "delete", | |
"status": "success", | |
"directory": str(dup_dir) | |
}) | |
except Exception as e: | |
logging.error({ | |
"action": "delete", | |
"status": "error", | |
"directory": str(dup_dir), | |
"error": str(e) | |
}) | |
def merge_contents(base_dir, duplicate_dirs, dry_run): | |
for dup_dir in duplicate_dirs: | |
for item in os.listdir(dup_dir): | |
src = dup_dir / item | |
dst = base_dir / item | |
if dst.exists(): | |
print(f"Conflict: {dst} already exists.") | |
print(f"Skipping {src}") | |
logging.info({ | |
"action": "merge", | |
"status": "conflict", | |
"source": str(src), | |
"destination": str(dst) | |
}) | |
else: | |
if dry_run: | |
print(f"Dry run: would move {src} to {dst}") | |
logging.info({ | |
"action": "move", | |
"status": "dry_run", | |
"source": str(src), | |
"destination": str(dst) | |
}) | |
else: | |
try: | |
print(f"Moving {src} to {dst}") | |
shutil.move(str(src), str(dst)) | |
logging.info({ | |
"action": "move", | |
"status": "success", | |
"source": str(src), | |
"destination": str(dst) | |
}) | |
except Exception as e: | |
print(f"Error moving {src} to {dst}: {e}") | |
logging.error({ | |
"action": "move", | |
"status": "error", | |
"source": str(src), | |
"destination": str(dst), | |
"error": str(e) | |
}) | |
# Delete the duplicate directory | |
if dry_run: | |
print(f"Dry run: would delete {dup_dir}") | |
logging.info({ | |
"action": "delete", | |
"status": "dry_run", | |
"directory": str(dup_dir) | |
}) | |
else: | |
try: | |
print(f"Deleting {dup_dir}") | |
shutil.rmtree(dup_dir) | |
logging.info({ | |
"action": "delete", | |
"status": "success", | |
"directory": str(dup_dir) | |
}) | |
except Exception as e: | |
print(f"Error deleting {dup_dir}: {e}") | |
logging.error({ | |
"action": "delete", | |
"status": "error", | |
"directory": str(dup_dir), | |
"error": str(e) | |
}) | |
def process_group(group_key, dir_paths, dry_run, default_choice=None): | |
parent_dir, base_name = group_key | |
# Separate base directory and duplicates | |
base_dir = None | |
suffix_pattern = re.compile(r'.* \(\d+\)$') | |
for dir_path in dir_paths: | |
if not suffix_pattern.match(dir_path.name): | |
base_dir = dir_path | |
break | |
if base_dir is None: | |
# No base directory without suffix, pick the one with the lowest suffix number | |
def get_suffix_num(dir_name): | |
match = re.match(r'.* \((\d+)\)$', dir_name) | |
return int(match.group(1)) if match else float('inf') | |
base_dir = min(dir_paths, key=lambda d: get_suffix_num(d.name)) | |
duplicate_dirs = [d for d in dir_paths if d != base_dir] | |
summarize_group(group_key, dir_paths, conn) | |
logging.info({ | |
"action": "found_duplicate_group", | |
"parent_directory": parent_dir, | |
"base_name": base_name, | |
"directories": [str(d) for d in dir_paths], | |
"base_directory": str(base_dir), | |
"duplicate_directories": [str(d) for d in duplicate_dirs] | |
}) | |
action = prompt_user_action(default_choice) | |
if action == '1': | |
logging.info({ | |
"action": "process_group", | |
"method": "delete_duplicates", | |
"group": f"{parent_dir}/{base_name}" | |
}) | |
delete_duplicates(base_dir, duplicate_dirs, dry_run) | |
elif action == '2': | |
logging.info({ | |
"action": "process_group", | |
"method": "merge_contents", | |
"group": f"{parent_dir}/{base_name}" | |
}) | |
merge_contents(base_dir, duplicate_dirs, dry_run) | |
elif action == '3': | |
print("Skipping this group.") | |
logging.info({ | |
"action": "process_group", | |
"method": "skip", | |
"group": f"{parent_dir}/{base_name}" | |
}) | |
def main(): | |
global conn # Make conn accessible in process_group | |
setup_logging() | |
args = parse_arguments() | |
log_configuration(args) | |
db_path = 'filesystem_index.db' | |
index_exists = os.path.exists(db_path) | |
conn = initialize_database(db_path) | |
if index_exists: | |
use_existing = prompt_use_existing_index() | |
if not use_existing: | |
print("Rescanning the filesystem and rebuilding the index...") | |
collect_directories(conn, args.directory, args.recursive, args.level) | |
else: | |
print(f"Using existing index from {db_path}") | |
logging.info({"action": "using_existing_index"}) | |
else: | |
print("No index file found. Scanning the filesystem...") | |
collect_directories(conn, args.directory, args.recursive, args.level) | |
directories = load_directories_from_index(conn) | |
total_directories = len(directories) | |
print(f"Total directories indexed: {total_directories:,}") | |
logging.info({ | |
"action": "directories_indexed", | |
"total_directories": total_directories | |
}) | |
groups = group_directories(conn) | |
if not groups: | |
print("No duplicate directories found.") | |
logging.info({"action": "no_duplicates_found"}) | |
return | |
for group_key, dir_paths in groups.items(): | |
process_group(group_key, dir_paths, args.dry_run, args.default_choice) | |
logging.info({"action": "script_complete"}) | |
conn.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment