Last active
August 27, 2024 15:41
-
-
Save pirhoo/b711177acc69ced6355e06553e66be3b to your computer and use it in GitHub Desktop.
A simple python script to extract all archives in the current directory.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import shutil | |
import argparse | |
import subprocess | |
from pathlib import Path | |
import multiprocessing | |
import logging | |
from logging import handlers | |
from datetime import datetime | |
from contextlib import redirect_stderr, redirect_stdout | |
import json | |
import time | |
# Constants for supported archive extensions | |
SUPPORTED_EXTENSIONS = {"7z", "zip", "tar", "gz", "xz", "rar"} | |
# ANSI color codes for terminal output | |
GREEN = "\033[32m" | |
MAGENTA = "\033[35m" | |
GRAY = "\033[90m" | |
RESET = "\033[0m" | |
BOLD = "\033[1m" | |
class JsonFormatter(logging.Formatter): | |
"""Custom logging formatter to output JSON-formatted logs.""" | |
def format(self, record): | |
# Directly format the log record as a JSON object | |
log_record = { | |
"timestamp": self.formatTime(record, self.datefmt), | |
"level": record.levelname, | |
"message": record.msg, # Log record is expected to be a JSON-serializable dict | |
"function": record.funcName, | |
"line": record.lineno, | |
} | |
if record.exc_info: | |
log_record["exception"] = self.formatException(record.exc_info) | |
return json.dumps(log_record) | |
def setup_logging() -> None: | |
"""Sets up JSON logging to a file with the current timestamp.""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
log_filename = f"archive.{timestamp}.log" | |
handler = handlers.RotatingFileHandler( | |
log_filename, maxBytes=10485760, backupCount=5 | |
) | |
handler.setFormatter(JsonFormatter()) | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
logger.addHandler(handler) | |
def log_configuration(args) -> None: | |
"""Logs the configuration used to run the script.""" | |
config = { | |
"action": "configuration", | |
"directory": args.directory, | |
"min_size_mb": args.size, | |
"confirm_extract": args.confirm_extract, | |
"confirm_remove": args.confirm_remove, | |
"skip_remove": args.skip_remove, | |
"parallel_jobs": args.jobs, | |
"max_depth": args.level, | |
"progress_bar": args.progress_bar, | |
"throttle": args.throttle, | |
} | |
logging.info(config) # Pass the config dictionary directly | |
def display_progress_bar(progress: int, total: int, width: int = 40) -> None: | |
"""Displays a Typer-style progress bar in the terminal with percentage at the start.""" | |
percent = int(progress * 100 / total) if total > 0 else 0 | |
fill = int(width * progress / total) if total > 0 else 0 | |
filled_bar = "━" * fill | |
empty_bar = "━" * (width - fill - 1) | |
glyph = "╸" if fill < width else "" | |
total_len = len(str(total)) | |
progress_total = f"{progress:{total_len}}/{total}" | |
filled_color = MAGENTA if percent < 100 else GREEN | |
# Print the progress bar with colors and percentage at the beginning | |
display = f"\rExtracting {progress_total} {filled_color}{filled_bar}{glyph}{GRAY}{empty_bar}{RESET} {BOLD}{percent}%{RESET}" | |
print(display, end="", flush=True) | |
def prompt_yes_no(question: str) -> bool: | |
"""Prompts the user for a yes/no response.""" | |
while True: | |
answer = input(f"{question} (Y/n): ").strip().lower() | |
if answer in {"y", "yes", ""}: | |
return True | |
if answer in {"n", "no"}: | |
return False | |
print("Invalid input. Please enter 'Y' or 'n'.") | |
def create_unique_foldername(filepath: Path) -> Path: | |
"""Creates a unique folder name based on the file path.""" | |
foldername = filepath.with_suffix("") | |
counter = 0 | |
new_foldername = foldername | |
while new_foldername.exists(): | |
counter += 1 | |
new_foldername = foldername.parent / f"{foldername.name} ({counter})" | |
return new_foldername | |
def extract_archive(filepath: Path, foldername: Path) -> None: | |
"""Extracts an archive based on its file extension.""" | |
try: | |
if filepath.suffix[1:] == "rar": | |
subprocess.run( | |
["unrar", "x", "-y", str(filepath), str(foldername)], | |
check=True, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.STDOUT, | |
) | |
elif filepath.suffix[1:] == "7z": | |
subprocess.run( | |
["7z", "x", "-y", str(filepath), f"-o{foldername}"], | |
check=True, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.STDOUT, | |
) | |
else: | |
shutil.unpack_archive(str(filepath), str(foldername)) | |
logging.info( | |
{ | |
"action": "extract", | |
"file": str(filepath), | |
"status": "success", | |
"output_folder": str(foldername), | |
} | |
) # Log directly as a JSON object | |
except Exception as e: | |
logging.error( | |
{ | |
"action": "extract", | |
"file": str(filepath), | |
"status": "error", | |
"error": str(e), | |
} | |
) # Log directly as a JSON object | |
def extract_file(filepath: Path, confirm_remove: bool, skip_remove: bool) -> None: | |
"""Extracts an archive using the appropriate tool and handles removal.""" | |
new_foldername = create_unique_foldername(filepath) | |
new_foldername.mkdir(parents=True, exist_ok=True) | |
extract_archive(filepath, new_foldername) | |
if not skip_remove and ( | |
not confirm_remove | |
or prompt_yes_no(f"Do you want to remove the original archive {filepath}?") | |
): | |
os.remove(filepath) | |
logging.info( | |
{"action": "remove", "file": str(filepath), "status": "success"} | |
) # Log directly as a JSON object | |
else: | |
logging.info( | |
{"action": "remove", "file": str(filepath), "status": "skipped"} | |
) # Log directly as a JSON object | |
logging.info( | |
{ | |
"action": "extraction_complete", | |
"file": str(filepath), | |
"output_folder": str(new_foldername), | |
"status": "success", | |
} | |
) # Log directly as a JSON object | |
def process_file( | |
filepath: Path, | |
min_size: int, | |
confirm_extract: bool, | |
confirm_remove: bool, | |
skip_remove: bool, | |
throttle: float, | |
) -> int: | |
"""Processes a single file to check if it should be extracted.""" | |
filesize = filepath.stat().st_size | |
if filepath.suffix[1:] in SUPPORTED_EXTENSIONS and filesize > min_size: | |
logging.info( | |
{ | |
"action": "found_large_archive", | |
"file": str(filepath), | |
"size_mb": filesize / (1024 * 1024), | |
} | |
) # Log directly as a JSON object | |
if not confirm_extract or prompt_yes_no("Do you want to extract this archive?"): | |
extract_file(filepath, confirm_remove, skip_remove) | |
else: | |
logging.info( | |
{ | |
"action": "skip_file", | |
"file": str(filepath), | |
"reason": ( | |
"unsupported_format_or_small_size" | |
if filepath.suffix[1:] not in SUPPORTED_EXTENSIONS | |
else "small_size" | |
), | |
} | |
) # Log directly as a JSON object | |
if throttle > 0: | |
time.sleep(throttle) | |
return 1 | |
def find_files( | |
directory: Path, current_depth: int, max_depth: int, files: list | |
) -> None: | |
"""Recursively finds files up to a specified directory depth level.""" | |
if current_depth > max_depth: | |
return | |
for entry in directory.iterdir(): | |
if entry.is_file(): | |
files.append(entry) | |
elif entry.is_dir(): | |
find_files(entry, current_depth + 1, max_depth, files) | |
def log_final_result(total_files: int, processed_files: int) -> None: | |
"""Logs the final result of the script execution.""" | |
result = { | |
"action": "final_result", | |
"total_files": total_files, | |
"processed_files": processed_files, | |
} | |
logging.info(result) # Log directly as a JSON object | |
def initialize_file_processing(args) -> tuple: | |
"""Initialize logging and find files to process.""" | |
setup_logging() | |
log_configuration(args) | |
files = [] | |
find_files(Path(args.directory), 0, args.level, files) | |
total_files = len(files) | |
return files, total_files | |
def process_file_parallel_init(filepath): | |
"""Helper function to initialize arguments for parallel processing.""" | |
return process_file(*filepath) | |
def process_files_serially(files: list, args) -> int: | |
"""Process files serially.""" | |
processed_files = 0 | |
for filepath in files: | |
processed_files += process_file( | |
filepath, | |
args.size * 1024 * 1024, | |
args.confirm_extract, | |
args.confirm_remove, | |
args.skip_remove, | |
args.throttle, | |
) | |
if args.progress_bar: | |
display_progress_bar(processed_files, len(files)) | |
return processed_files | |
def process_files_parallel(files: list, args) -> int: | |
"""Process files using multiprocessing.""" | |
processed_files = 0 | |
pool_args = [ | |
( | |
filepath, | |
args.size * 1024 * 1024, | |
args.confirm_extract, | |
args.confirm_remove, | |
args.skip_remove, | |
args.throttle, | |
) | |
for filepath in files | |
] | |
with multiprocessing.Pool(args.jobs) as pool: | |
results = pool.imap(process_file_parallel_init, pool_args) | |
for result in results: | |
processed_files += result | |
if args.progress_bar: | |
display_progress_bar(processed_files, len(files)) | |
return processed_files | |
def run_main(args) -> None: | |
"""Runs the main process of extracting files.""" | |
files, total_files = initialize_file_processing(args) | |
if args.confirm_extract or args.confirm_remove: | |
args.jobs = 1 # Force serial processing if user confirmation is needed | |
if args.jobs > 1: | |
processed_files = process_files_parallel(files, args) | |
else: | |
processed_files = process_files_serially(files, args) | |
if args.progress_bar: | |
print() # Move to the next line after the progress bar | |
# Log final result | |
log_final_result(total_files, processed_files) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Recursively find and extract archives." | |
) | |
parser.add_argument( | |
"-s", | |
"--size", | |
type=int, | |
default=10, | |
help="Minimum size in MB to process (default: 10 MB)", | |
) | |
parser.add_argument( | |
"-e", | |
"--confirm-extract", | |
action="store_true", | |
help="Confirm extraction for each archive (default: False)", | |
) | |
parser.add_argument( | |
"-r", | |
"--confirm-remove", | |
action="store_true", | |
help="Confirm removal of original archive after extraction (default: False)", | |
) | |
parser.add_argument( | |
"--skip-remove", | |
action="store_true", | |
help="Skip deleting the original archive after extraction", | |
) | |
parser.add_argument( | |
"-j", | |
"--jobs", | |
type=int, | |
default=multiprocessing.cpu_count(), | |
help="Number of parallel jobs (default: number of CPU cores)", | |
) | |
parser.add_argument( | |
"-l", | |
"--level", | |
type=int, | |
default=float("inf"), | |
help="Maximum depth level to search for files (default: infinite)", | |
) | |
parser.add_argument( | |
"-p", | |
"--progress-bar", | |
action="store_true", | |
help="Display progress bar", | |
) | |
parser.add_argument( | |
"--throttle", | |
type=float, | |
default=0, | |
help="Throttle duration in seconds between processing each file (default: 0)", | |
) | |
parser.add_argument( | |
"directory", | |
nargs="?", | |
default=".", | |
help="Directory to start searching from (default: current directory)", | |
) | |
args = parser.parse_args() | |
# Redirect stderr to hide unwanted error output, keep stdout open for progress bar | |
with open(os.devnull, "w") as fnull: | |
if args.progress_bar: | |
with redirect_stderr(fnull): | |
run_main(args) | |
else: | |
with redirect_stdout(fnull), redirect_stderr(fnull): | |
run_main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment