Last active
          October 30, 2025 20:20 
        
      - 
      
- 
        Save Druark/19505f2cfefe0e667d9c1f624ee39f87 to your computer and use it in GitHub Desktop. 
    Converts zips and other archive types, to 7z with optimal compression in the current directory. 
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import os | |
| import sys | |
| import subprocess | |
| from pathlib import Path | |
| import shutil | |
| import stat | |
| import time | |
| from dataclasses import dataclass | |
| import multiprocessing | |
| # ANSI colors | |
| RED, GREEN, YELLOW, CYAN, RESET = '\033[91m', '\033[92m', '\033[93m', '\033[96m', '\033[0m' | |
| # Constants | |
| THREAD_MB_RATIO = 5 | |
| CPU_THREAD_OFFSET = 1 | |
| COMPRESSED_EXTENSIONS = ('.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz', '.iso', '.cab', '.arj') | |
| # Helper functions for formatting | |
| def format_size(bytes_size): | |
| """Format bytes to MB or GB""" | |
| if bytes_size >= 1024**3: | |
| return f"{bytes_size / (1024**3):.1f}GB" | |
| else: | |
| return f"{bytes_size / (1024**2):.0f}MB" | |
| def safe_subprocess(cmd, **kwargs): | |
| """Run subprocess with error handling""" | |
| try: | |
| return subprocess.run(cmd, **kwargs) | |
| except Exception: | |
| return None | |
| @dataclass | |
| class ConversionResult: | |
| """Store conversion results with calculated properties""" | |
| filename: str | |
| original_size: int | |
| new_size: int | |
| success: bool | |
| @property | |
| def space_diff(self): | |
| return self.original_size - self.new_size | |
| @property | |
| def space_diff_percent(self): | |
| return (self.space_diff / self.original_size * 100) if self.original_size > 0 else 0 | |
| def __str__(self): | |
| sign = "+" if self.space_diff < 0 else "-" | |
| size_mb = abs(round(self.space_diff / (1024 ** 2), 2)) | |
| percent = abs(round(self.space_diff_percent, 2)) | |
| color = GREEN if self.space_diff > 0 else YELLOW | |
| return f"{YELLOW}{self.filename}{RESET}: {color}{sign}{size_mb}MB ({percent}%){RESET}" | |
| def safe_rmtree(path): | |
| """Safely remove directory tree, handling permission errors""" | |
| def handle_error(func, path, exc_info): | |
| try: | |
| os.chmod(path, stat.S_IWUSR | stat.S_IWRITE) | |
| func(path) | |
| except Exception: | |
| pass | |
| try: | |
| shutil.rmtree(path, onexc=handle_error) | |
| except Exception: | |
| time.sleep(1) | |
| try: | |
| shutil.rmtree(path, onexc=handle_error) | |
| except Exception as e: | |
| print(f"{YELLOW}Warning: Could not fully clean up temp directory: {e}{RESET}") | |
| def find_7z_executable(): | |
| """Find 7z executable""" | |
| # Try system PATH first | |
| cmd = 'where' if sys.platform == 'win32' else 'which' | |
| result = safe_subprocess([cmd, '7z.exe' if sys.platform == 'win32' else '7z'], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if result and result.returncode == 0: | |
| return result.stdout.strip().split('\n')[0] | |
| # Try just running '7z' directly | |
| if test_executable('7z.exe' if sys.platform == 'win32' else '7z'): | |
| return '7z.exe' if sys.platform == 'win32' else '7z' | |
| return None | |
| def test_executable(path): | |
| """Test if executable works""" | |
| try: | |
| subprocess.run([path, '--help'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=3) | |
| return True | |
| except: | |
| return False | |
| def check_7z(): | |
| """Verify 7z installation and return path""" | |
| seven_z_path = find_7z_executable() | |
| if seven_z_path: | |
| print(f"Found {CYAN}7z{RESET} at: {CYAN}{seven_z_path}{RESET}") | |
| return seven_z_path | |
| print(f"{RED}Error: 7z not found!") | |
| print(f"{YELLOW}Please install a 7z-compatible archiver:") | |
| if sys.platform == 'win32': | |
| print("Recommended: NanaZip from Microsoft Store (modern 7-Zip fork)") | |
| print("Alternative: Download 7-Zip from https://7-zip.org/") | |
| elif sys.platform == 'linux': | |
| print("Run: sudo apt-get install p7zip-full") | |
| elif sys.platform == 'darwin': | |
| print("Run: brew install p7zip") | |
| print(f"{RESET}") | |
| return None | |
| def get_thread_count(file_size_bytes): | |
| """Calculate optimal thread count""" | |
| cpu_max = max(1, multiprocessing.cpu_count() - CPU_THREAD_OFFSET) | |
| base_threads = max(1, int(file_size_bytes / (THREAD_MB_RATIO * 1024 * 1024))) | |
| return min(base_threads, cpu_max) | |
| def track_progress(process, operation, filename): | |
| """Track subprocess progress with simple elapsed time""" | |
| start_time = time.time() | |
| while process.poll() is None: | |
| elapsed = int(time.time() - start_time) | |
| print(f"\r{operation} {YELLOW}{filename}{RESET}: {elapsed}s".ljust(80), end='', flush=True) | |
| time.sleep(1) | |
| # Clear the progress line when done | |
| print(f"\r".ljust(80), end='') | |
| def convert_single_archive(args): | |
| """Convert single archive to 7z with progress tracking""" | |
| directory, filename, compression_level, dictionary_size, word_size, keep_date, seven_z_path, force, verbose = args | |
| directory_path = Path(directory).absolute() | |
| temp_hash = str(abs(hash(filename)))[:8] | |
| extract_dir = directory_path / f"tmp_{temp_hash}" | |
| orig_path = directory_path / filename | |
| output_path = directory_path / f"{filename[:-4]}.7z" | |
| # Check if output already exists | |
| if output_path.exists() and not force: | |
| print(f"{CYAN}Skipping {YELLOW}{filename}{RESET} - output already exists{RESET}") | |
| return ConversionResult(filename, 0, 0, False) | |
| try: | |
| total_start_time = time.time() # Track total time | |
| orig_stat = orig_path.stat() | |
| thread_count = get_thread_count(orig_stat.st_size) | |
| # Setup extraction directory | |
| if extract_dir.exists(): | |
| safe_rmtree(extract_dir) | |
| extract_dir.mkdir(parents=True) | |
| # Extract with progress | |
| extract_proc = subprocess.Popen( | |
| [seven_z_path, 'x', str(orig_path), f'-o{str(extract_dir)}', '-y'], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| track_progress(extract_proc, "Extracting", filename) | |
| if extract_proc.returncode != 0: | |
| stderr_output = extract_proc.stderr.read().decode('utf-8', errors='ignore') | |
| raise Exception(f"Extraction failed: {stderr_output}") | |
| if not any(extract_dir.iterdir()): | |
| raise Exception("No files extracted") | |
| # Compress with progress | |
| compress_proc = subprocess.Popen([ | |
| seven_z_path, 'a', '-t7z', | |
| f'-mx={compression_level}', | |
| f'-md={dictionary_size}m', | |
| f'-mfb={word_size}', | |
| f'-mmt={thread_count}', | |
| str(output_path), | |
| f'{extract_dir}/*' | |
| ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| track_progress(compress_proc, "Compressing", f"{filename[:-4]}.7z") | |
| if compress_proc.returncode != 0: | |
| stderr_output = compress_proc.stderr.read().decode('utf-8', errors='ignore') | |
| raise Exception(f"Compression failed: {stderr_output}") | |
| # Calculate stats | |
| output_size = output_path.stat().st_size | |
| space_diff = orig_stat.st_size - output_size | |
| space_saved = space_diff > 0 | |
| size_mb = abs(round(space_diff / (1024 ** 2), 2)) | |
| percent = abs(round((space_diff / orig_stat.st_size * 100), 2)) | |
| # Show completion message with space info | |
| total_time = int(time.time() - total_start_time) | |
| space_info = f", {GREEN if space_saved else YELLOW}{'-' if space_saved else '+'}{size_mb}MB ({percent}%){RESET}" | |
| print(f"\r{GREEN}Compressed {YELLOW}{filename}{RESET} to {CYAN}7z{RESET} in {CYAN}{total_time}s{RESET}{space_info}") | |
| # Show detailed compression info only if verbose | |
| if verbose: | |
| print( | |
| f"[Level: {CYAN}{compression_level}{RESET}] " | |
| f"[Dict: {CYAN}{dictionary_size}MB{RESET}] " | |
| f"[Word: {CYAN}{word_size}{RESET}] " | |
| f"[Threads: {CYAN}{thread_count}{RESET}] " | |
| f"[Space {'saved' if space_saved else 'increased'}: " | |
| f"{GREEN if space_saved else YELLOW}{size_mb}MB ({percent}%){RESET}]" | |
| ) | |
| print() # Add newline only when verbose | |
| # Set file date if requested | |
| if keep_date: | |
| orig_atime, orig_mtime = orig_stat.st_atime, orig_stat.st_mtime | |
| os.utime(output_path, (orig_atime, orig_mtime)) | |
| return ConversionResult(filename, orig_stat.st_size, output_size, True) | |
| except Exception as e: | |
| print(f"\n{RED}Error processing {filename}: {e}{RESET}") | |
| if output_path.exists(): | |
| output_path.unlink() | |
| return ConversionResult(filename, 0, 0, False) | |
| finally: | |
| if extract_dir.exists(): | |
| safe_rmtree(extract_dir) | |
| def print_summary_and_delete(successful_conversions): | |
| """Print conversion summary and handle deletion""" | |
| total_space_saved = sum(result.space_diff for result in successful_conversions) | |
| space_color = GREEN if total_space_saved > 0 else YELLOW | |
| print(f"\nTotal space {'saved' if total_space_saved > 0 else 'increased'}: " | |
| f"{space_color}{format_size(abs(total_space_saved))}{RESET}") | |
| if input(f"\n{RED}Delete original archives? {RESET}(Y/n): ").lower() == 'y': | |
| for result in successful_conversions: | |
| try: | |
| (Path('.') / result.filename).unlink() | |
| print(f"{YELLOW}Deleted:{RESET} {result.filename}") | |
| except Exception as e: | |
| print(f"{RED}Error deleting {result.filename}: {e}{RESET}") | |
| def convert_archives(directory='.', compression_level=7, dictionary_size=128, word_size=64, | |
| keep_date=False, extensions=None, force=False, verbose=False): | |
| """Convert multiple archives to 7z format""" | |
| # Validate arguments | |
| if not (0 <= compression_level <= 9): sys.exit(f"{RED}Compression level must be 0-9{RESET}") | |
| if not (1 <= dictionary_size <= 1536): sys.exit(f"{RED}Dictionary size must be 1-1536 MB{RESET}") | |
| if not (5 <= word_size <= 273): sys.exit(f"{RED}Word size must be 5-273{RESET}") | |
| if not (seven_z_path := check_7z()): sys.exit(1) | |
| extensions = extensions or COMPRESSED_EXTENSIONS | |
| directory_path = Path(directory).absolute() | |
| archive_files = [f for f in directory_path.iterdir() if f.suffix.lower() in extensions] | |
| if not archive_files: | |
| print(f"\n{YELLOW}No archives found{RESET}\n") | |
| return | |
| # Filter files - show what will be converted | |
| to_convert = [] | |
| skipped_count = 0 | |
| for f in archive_files: | |
| if f.suffix.lower() == '.7z': | |
| skipped_count += 1 | |
| else: | |
| to_convert.append(f) | |
| if skipped_count > 0: | |
| print(f"Skipping {YELLOW}{skipped_count}{RESET} {CYAN}7z{RESET} files\n") | |
| # Show files to convert (limit listing to 5 files) | |
| if len(to_convert) <= 5: | |
| for f in to_convert: | |
| print(f"{GREEN}Will convert {YELLOW}{f.name}{RESET}") | |
| else: | |
| print(f"{GREEN}Will convert {YELLOW}{len(to_convert)}{RESET} files") | |
| # Calculate total size and ask for confirmation if needed | |
| total_size = sum(f.stat().st_size for f in to_convert) | |
| if len(to_convert) >= 10 or total_size >= 1024**3: | |
| print(f"\nTotal size to process: {CYAN}{format_size(total_size)}{RESET}") | |
| if input(f"{YELLOW}Continue with conversion? {RESET}(Y/n): ").lower() == 'n': | |
| print("Conversion cancelled.") | |
| return | |
| if not to_convert: | |
| print(f"\n{YELLOW}No files need conversion{RESET}\n") | |
| return | |
| print(f"\nStarting conversion of {CYAN}{len(to_convert)}{RESET} files...\n") | |
| # Convert files | |
| successful_conversions = [] | |
| for f in to_convert: | |
| result = convert_single_archive((directory, f.name, compression_level, dictionary_size, word_size, keep_date, seven_z_path, force, verbose)) | |
| if result and result.success: | |
| successful_conversions.append(result) | |
| if successful_conversions: | |
| print_summary_and_delete(successful_conversions) | |
| def main(): | |
| """Main entry point""" | |
| if sys.platform == 'win32': | |
| os.system('') # Enable ANSI colors | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Convert archives to 7z format') | |
| parser.add_argument('--compress', type=int, default=7, choices=range(10), | |
| help='Compression level (0-9)') | |
| parser.add_argument('--dictionary', type=int, default=128, | |
| help='Dictionary size in MB') | |
| parser.add_argument('--word-size', type=int, default=64, | |
| help='Word size (number of fast bytes)') | |
| parser.add_argument('--keep-date', action='store_true', | |
| help='Keep original file modified date') | |
| parser.add_argument('--force', action='store_true', | |
| help='Overwrite existing .7z files') | |
| parser.add_argument('--verbose', '-v', action='store_true', | |
| help='Show detailed compression parameters') | |
| parser.add_argument('--extensions', type=str, | |
| default=','.join(ext[1:] for ext in COMPRESSED_EXTENSIONS), | |
| help='Comma-separated list of file extensions to process (without dots)') | |
| args = parser.parse_args() | |
| custom_exts = tuple(f'.{ext.strip()}' for ext in args.extensions.split(',') if ext.strip()) | |
| convert_archives('.', args.compress, args.dictionary, args.word_size, args.keep_date, | |
| custom_exts, args.force, args.verbose) | |
| if __name__ == "__main__": | |
| main() | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment