Skip to content

Instantly share code, notes, and snippets.

@basperheim
Last active April 8, 2025 00:23
Show Gist options
  • Save basperheim/aad8920c85df9d7a3f081a2f3e41fc46 to your computer and use it in GitHub Desktop.
Save basperheim/aad8920c85df9d7a3f081a2f3e41fc46 to your computer and use it in GitHub Desktop.
Compress MP3s Using Python
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess
import os
import platform
from pathlib import Path
DEBUG = False
MAX_FILES = 200
TARGET_BITRATE = 128 # in kbps
def notify_done():
system = platform.system()
if system == "Darwin":
os.system("osascript -e 'display notification \"All files processed!\" with title \"MP3 Compression\"'")
print("\a")
elif system == "Windows":
import winsound
winsound.MessageBeep()
elif system == "Linux":
os.system('notify-send "MP3 Compression" "All files processed!"')
print("\a")
else:
print("✔️ All files processed. (No system alert for this OS)")
def get_bitrate(filepath: Path) -> tuple[Path, int | None]:
try:
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=bit_rate", "-of", "default=noprint_wrappers=1:nokey=1", str(filepath)]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
bitrate_str = result.stdout.strip()
if bitrate_str.isdigit():
return filepath, int(bitrate_str) // 1000 # Convert bps → kbps
except Exception as e:
print(f"❌ Failed to parse bitrate for {filepath.name}: {e}")
return filepath, None
def compress_mp3s():
root_dir = Path(".")
mp3_files = list(root_dir.rglob("*.mp3"))
if not mp3_files:
print("No MP3 files found.")
return
files_to_compress: list[Path] = []
print("🔎 Scanning files (in parallel)...")
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
futures = [executor.submit(get_bitrate, file) for file in mp3_files]
for future in as_completed(futures):
file, bitrate = future.result()
if bitrate is not None and bitrate > TARGET_BITRATE:
files_to_compress.append(file)
print(f"🎧 {file.name} → {bitrate} kbps")
if len(files_to_compress) >= MAX_FILES:
break
if not files_to_compress:
print(f"✅ All files are already {TARGET_BITRATE}kbps or lower.")
return
print(f"\n🔧 Preparing to compress {len(files_to_compress)} file(s)...")
for file in files_to_compress:
safe_name = file.stem.replace(" ", "-") + f"-{TARGET_BITRATE}kbps.mp3"
output_file = file.with_name(safe_name)
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-y",
"-i", str(file.resolve()),
"-b:a", f"{TARGET_BITRATE}k",
str(output_file.resolve())
]
print(f"\n🛠️ Convert command for {file.name}:")
print(" ".join(cmd))
if not DEBUG:
try:
subprocess.run(cmd, check=True)
print(f"✅ Compressed: {output_file.name}")
_, converted_bitrate = get_bitrate(output_file)
if converted_bitrate and converted_bitrate <= TARGET_BITRATE:
print(f"🔍 Verified: {output_file.name} ({converted_bitrate} kbps)")
file.unlink()
print(f"🗑️ Deleted original: {file.name}")
else:
print(f"⚠️ Verification failed for {output_file.name}. Keeping original.")
except subprocess.CalledProcessError as e:
print(f"❌ Failed to compress {file.name}: {e}")
else:
print("⚠️ DEBUG mode: Not executing conversion.")
compress_mp3s()
notify_done()
notify_done()
notify_done()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment