Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from Druark/convert_7z_v7.py
Created October 30, 2025 20:20
Show Gist options
  • Save datavudeja/e91c3cdaed1ead41f33c702bb601e850 to your computer and use it in GitHub Desktop.
Save datavudeja/e91c3cdaed1ead41f33c702bb601e850 to your computer and use it in GitHub Desktop.
Converts zips and other archive types, to 7z with optimal compression in the current directory.
import os
import sys
import subprocess
from pathlib import Path
import shutil
import stat
import time
from dataclasses import dataclass
import multiprocessing
# ANSI colors
RED, GREEN, YELLOW, CYAN, RESET = '\033[91m', '\033[92m', '\033[93m', '\033[96m', '\033[0m'
# Constants
THREAD_MB_RATIO = 5
CPU_THREAD_OFFSET = 1
COMPRESSED_EXTENSIONS = ('.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz', '.iso', '.cab', '.arj')
# Helper functions for formatting
def format_size(bytes_size):
"""Format bytes to MB or GB"""
if bytes_size >= 1024**3:
return f"{bytes_size / (1024**3):.1f}GB"
else:
return f"{bytes_size / (1024**2):.0f}MB"
def safe_subprocess(cmd, **kwargs):
"""Run subprocess with error handling"""
try:
return subprocess.run(cmd, **kwargs)
except Exception:
return None
@dataclass
class ConversionResult:
"""Store conversion results with calculated properties"""
filename: str
original_size: int
new_size: int
success: bool
@property
def space_diff(self):
return self.original_size - self.new_size
@property
def space_diff_percent(self):
return (self.space_diff / self.original_size * 100) if self.original_size > 0 else 0
def __str__(self):
sign = "+" if self.space_diff < 0 else "-"
size_mb = abs(round(self.space_diff / (1024 ** 2), 2))
percent = abs(round(self.space_diff_percent, 2))
color = GREEN if self.space_diff > 0 else YELLOW
return f"{YELLOW}{self.filename}{RESET}: {color}{sign}{size_mb}MB ({percent}%){RESET}"
def safe_rmtree(path):
"""Safely remove directory tree, handling permission errors"""
def handle_error(func, path, exc_info):
try:
os.chmod(path, stat.S_IWUSR | stat.S_IWRITE)
func(path)
except Exception:
pass
try:
shutil.rmtree(path, onexc=handle_error)
except Exception:
time.sleep(1)
try:
shutil.rmtree(path, onexc=handle_error)
except Exception as e:
print(f"{YELLOW}Warning: Could not fully clean up temp directory: {e}{RESET}")
def find_7z_executable():
"""Find 7z executable"""
# Try system PATH first
cmd = 'where' if sys.platform == 'win32' else 'which'
result = safe_subprocess([cmd, '7z.exe' if sys.platform == 'win32' else '7z'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result and result.returncode == 0:
return result.stdout.strip().split('\n')[0]
# Try just running '7z' directly
if test_executable('7z.exe' if sys.platform == 'win32' else '7z'):
return '7z.exe' if sys.platform == 'win32' else '7z'
return None
def test_executable(path):
"""Test if executable works"""
try:
subprocess.run([path, '--help'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=3)
return True
except:
return False
def check_7z():
"""Verify 7z installation and return path"""
seven_z_path = find_7z_executable()
if seven_z_path:
print(f"Found {CYAN}7z{RESET} at: {CYAN}{seven_z_path}{RESET}")
return seven_z_path
print(f"{RED}Error: 7z not found!")
print(f"{YELLOW}Please install a 7z-compatible archiver:")
if sys.platform == 'win32':
print("Recommended: NanaZip from Microsoft Store (modern 7-Zip fork)")
print("Alternative: Download 7-Zip from https://7-zip.org/")
elif sys.platform == 'linux':
print("Run: sudo apt-get install p7zip-full")
elif sys.platform == 'darwin':
print("Run: brew install p7zip")
print(f"{RESET}")
return None
def get_thread_count(file_size_bytes):
"""Calculate optimal thread count"""
cpu_max = max(1, multiprocessing.cpu_count() - CPU_THREAD_OFFSET)
base_threads = max(1, int(file_size_bytes / (THREAD_MB_RATIO * 1024 * 1024)))
return min(base_threads, cpu_max)
def track_progress(process, operation, filename):
"""Track subprocess progress with simple elapsed time"""
start_time = time.time()
while process.poll() is None:
elapsed = int(time.time() - start_time)
print(f"\r{operation} {YELLOW}{filename}{RESET}: {elapsed}s".ljust(80), end='', flush=True)
time.sleep(1)
# Clear the progress line when done
print(f"\r".ljust(80), end='')
def convert_single_archive(args):
"""Convert single archive to 7z with progress tracking"""
directory, filename, compression_level, dictionary_size, word_size, keep_date, seven_z_path, force, verbose = args
directory_path = Path(directory).absolute()
temp_hash = str(abs(hash(filename)))[:8]
extract_dir = directory_path / f"tmp_{temp_hash}"
orig_path = directory_path / filename
output_path = directory_path / f"{filename[:-4]}.7z"
# Check if output already exists
if output_path.exists() and not force:
print(f"{CYAN}Skipping {YELLOW}{filename}{RESET} - output already exists{RESET}")
return ConversionResult(filename, 0, 0, False)
try:
total_start_time = time.time() # Track total time
orig_stat = orig_path.stat()
thread_count = get_thread_count(orig_stat.st_size)
# Setup extraction directory
if extract_dir.exists():
safe_rmtree(extract_dir)
extract_dir.mkdir(parents=True)
# Extract with progress
extract_proc = subprocess.Popen(
[seven_z_path, 'x', str(orig_path), f'-o{str(extract_dir)}', '-y'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
track_progress(extract_proc, "Extracting", filename)
if extract_proc.returncode != 0:
stderr_output = extract_proc.stderr.read().decode('utf-8', errors='ignore')
raise Exception(f"Extraction failed: {stderr_output}")
if not any(extract_dir.iterdir()):
raise Exception("No files extracted")
# Compress with progress
compress_proc = subprocess.Popen([
seven_z_path, 'a', '-t7z',
f'-mx={compression_level}',
f'-md={dictionary_size}m',
f'-mfb={word_size}',
f'-mmt={thread_count}',
str(output_path),
f'{extract_dir}/*'
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
track_progress(compress_proc, "Compressing", f"{filename[:-4]}.7z")
if compress_proc.returncode != 0:
stderr_output = compress_proc.stderr.read().decode('utf-8', errors='ignore')
raise Exception(f"Compression failed: {stderr_output}")
# Calculate stats
output_size = output_path.stat().st_size
space_diff = orig_stat.st_size - output_size
space_saved = space_diff > 0
size_mb = abs(round(space_diff / (1024 ** 2), 2))
percent = abs(round((space_diff / orig_stat.st_size * 100), 2))
# Show completion message with space info
total_time = int(time.time() - total_start_time)
space_info = f", {GREEN if space_saved else YELLOW}{'-' if space_saved else '+'}{size_mb}MB ({percent}%){RESET}"
print(f"\r{GREEN}Compressed {YELLOW}{filename}{RESET} to {CYAN}7z{RESET} in {CYAN}{total_time}s{RESET}{space_info}")
# Show detailed compression info only if verbose
if verbose:
print(
f"[Level: {CYAN}{compression_level}{RESET}] "
f"[Dict: {CYAN}{dictionary_size}MB{RESET}] "
f"[Word: {CYAN}{word_size}{RESET}] "
f"[Threads: {CYAN}{thread_count}{RESET}] "
f"[Space {'saved' if space_saved else 'increased'}: "
f"{GREEN if space_saved else YELLOW}{size_mb}MB ({percent}%){RESET}]"
)
print() # Add newline only when verbose
# Set file date if requested
if keep_date:
orig_atime, orig_mtime = orig_stat.st_atime, orig_stat.st_mtime
os.utime(output_path, (orig_atime, orig_mtime))
return ConversionResult(filename, orig_stat.st_size, output_size, True)
except Exception as e:
print(f"\n{RED}Error processing {filename}: {e}{RESET}")
if output_path.exists():
output_path.unlink()
return ConversionResult(filename, 0, 0, False)
finally:
if extract_dir.exists():
safe_rmtree(extract_dir)
def print_summary_and_delete(successful_conversions):
"""Print conversion summary and handle deletion"""
total_space_saved = sum(result.space_diff for result in successful_conversions)
space_color = GREEN if total_space_saved > 0 else YELLOW
print(f"\nTotal space {'saved' if total_space_saved > 0 else 'increased'}: "
f"{space_color}{format_size(abs(total_space_saved))}{RESET}")
if input(f"\n{RED}Delete original archives? {RESET}(Y/n): ").lower() == 'y':
for result in successful_conversions:
try:
(Path('.') / result.filename).unlink()
print(f"{YELLOW}Deleted:{RESET} {result.filename}")
except Exception as e:
print(f"{RED}Error deleting {result.filename}: {e}{RESET}")
def convert_archives(directory='.', compression_level=7, dictionary_size=128, word_size=64,
keep_date=False, extensions=None, force=False, verbose=False):
"""Convert multiple archives to 7z format"""
# Validate arguments
if not (0 <= compression_level <= 9): sys.exit(f"{RED}Compression level must be 0-9{RESET}")
if not (1 <= dictionary_size <= 1536): sys.exit(f"{RED}Dictionary size must be 1-1536 MB{RESET}")
if not (5 <= word_size <= 273): sys.exit(f"{RED}Word size must be 5-273{RESET}")
if not (seven_z_path := check_7z()): sys.exit(1)
extensions = extensions or COMPRESSED_EXTENSIONS
directory_path = Path(directory).absolute()
archive_files = [f for f in directory_path.iterdir() if f.suffix.lower() in extensions]
if not archive_files:
print(f"\n{YELLOW}No archives found{RESET}\n")
return
# Filter files - show what will be converted
to_convert = []
skipped_count = 0
for f in archive_files:
if f.suffix.lower() == '.7z':
skipped_count += 1
else:
to_convert.append(f)
if skipped_count > 0:
print(f"Skipping {YELLOW}{skipped_count}{RESET} {CYAN}7z{RESET} files\n")
# Show files to convert (limit listing to 5 files)
if len(to_convert) <= 5:
for f in to_convert:
print(f"{GREEN}Will convert {YELLOW}{f.name}{RESET}")
else:
print(f"{GREEN}Will convert {YELLOW}{len(to_convert)}{RESET} files")
# Calculate total size and ask for confirmation if needed
total_size = sum(f.stat().st_size for f in to_convert)
if len(to_convert) >= 10 or total_size >= 1024**3:
print(f"\nTotal size to process: {CYAN}{format_size(total_size)}{RESET}")
if input(f"{YELLOW}Continue with conversion? {RESET}(Y/n): ").lower() == 'n':
print("Conversion cancelled.")
return
if not to_convert:
print(f"\n{YELLOW}No files need conversion{RESET}\n")
return
print(f"\nStarting conversion of {CYAN}{len(to_convert)}{RESET} files...\n")
# Convert files
successful_conversions = []
for f in to_convert:
result = convert_single_archive((directory, f.name, compression_level, dictionary_size, word_size, keep_date, seven_z_path, force, verbose))
if result and result.success:
successful_conversions.append(result)
if successful_conversions:
print_summary_and_delete(successful_conversions)
def main():
"""Main entry point"""
if sys.platform == 'win32':
os.system('') # Enable ANSI colors
import argparse
parser = argparse.ArgumentParser(description='Convert archives to 7z format')
parser.add_argument('--compress', type=int, default=7, choices=range(10),
help='Compression level (0-9)')
parser.add_argument('--dictionary', type=int, default=128,
help='Dictionary size in MB')
parser.add_argument('--word-size', type=int, default=64,
help='Word size (number of fast bytes)')
parser.add_argument('--keep-date', action='store_true',
help='Keep original file modified date')
parser.add_argument('--force', action='store_true',
help='Overwrite existing .7z files')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed compression parameters')
parser.add_argument('--extensions', type=str,
default=','.join(ext[1:] for ext in COMPRESSED_EXTENSIONS),
help='Comma-separated list of file extensions to process (without dots)')
args = parser.parse_args()
custom_exts = tuple(f'.{ext.strip()}' for ext in args.extensions.split(',') if ext.strip())
convert_archives('.', args.compress, args.dictionary, args.word_size, args.keep_date,
custom_exts, args.force, args.verbose)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment