Last active
May 8, 2025 04:12
-
-
Save ph33nx/b0b7a0ff64bd3f7c8961675db22b4716 to your computer and use it in GitHub Desktop.
Python script that converts videos to H.264/AAC MP4 so Jellyfin, Plex, Emby, Android TV and Chromecast devices stream without transcoding. Perfect for media servers, homelabs, NAS boxes and Proxmox LXC VMs. #ffmpeg #chromecast #jellyfin #transcode #remux #batch-encode #python #smart-tv #media-server #directplay
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
directplay.py | |
============= | |
Batch-optimise any video library so every file direct-plays on | |
Chromecast, smart-TVs and desktop/mobile browsers — no server-side | |
transcoding required. | |
Key features | |
------------ | |
• Smart decision tree – tries lightning-fast remux first; falls back | |
to H.264 re-encode only when codecs or resolution exceed safe limits. | |
• Recursive scan – pass one or more root folders; sub-directories | |
handled automatically. | |
• Parallel workers (-j / --workers) – use all CPU cores by default or | |
limit to keep the box responsive. | |
• Quality knobs – --crf and --preset expose x264 controls; defaults | |
tuned for "Netflix-like" 1080 p quality. | |
• Safe by default – outputs <name>.mp4 in the same folder; originals | |
retained unless you add --delete-original true. | |
• Idempotent – re-runs skip files already compliant; perfect for | |
nightly cron/systemd-timer jobs. | |
Dependencies | |
------------ | |
ffmpeg and ffprobe must be in your PATH. | |
Windows 10/11 : winget install Gyan.FFmpeg | |
macOS (brew) : brew install ffmpeg | |
Debian/Ubuntu : sudo apt install ffmpeg | |
Fedora/RHEL : sudo dnf install -y ffmpeg ffprobe | |
Python 3.8 or newer is required. | |
Quick start | |
----------- | |
# dry-run, show what would happen | |
python3 directplay.py /mnt/media | |
# real conversion, two workers, better quality, delete sources after success | |
python3 directplay.py -j 2 --crf 18 --preset slow \ | |
--delete-original true /mnt/media /video/new | |
CLI flags | |
--------- | |
positional: | |
roots One or more library root folders. | |
optional: | |
-j, --workers N Parallel encode workers (default = all CPUs) | |
--crf N x264 Constant Rate Factor (quality slider, default 19) | |
--preset P x264 preset: ultrafast … slow (default medium) | |
--delete-original true|false Remove source file on success (default false) | |
--exts .mkv,.mp4 Comma-separated list of file extensions to consider | |
License | |
------- | |
Apache-2.0 — use it, fork it, profit. | |
Author | |
------ | |
© 2025 ph33nx https://github.com/ph33nx | |
""" | |
import platform, textwrap, sys | |
import argparse | |
import json | |
import os | |
import shutil | |
import subprocess | |
from concurrent.futures import ProcessPoolExecutor | |
from pathlib import Path | |
from typing import List | |
# ------------------------- pretty logging ------------------------------ # | |
class C: | |
RESET = "\033[0m" | |
GRN = "\033[32m" | |
CYN = "\033[36m" | |
YEL = "\033[33m" | |
RED = "\033[31m" | |
def log(symbol: str, color: str, action: str, path: Path): | |
print(f'{color}{symbol} {action}: "{path}"{C.RESET}') | |
# ---------------------------- constants ---------------------------------- # | |
DEFAULT_EXTS = {".mkv", ".mp4", ".avi", ".mov"} | |
H264_LEVEL_THRESHOLD = 41 # High 4.1 is Chromecast safe | |
MAX_HEIGHT = 1080 # Down-scale 4 K → 1080p to stay in 4.1 | |
AUDIO_CHANNELS = 2 # stereo AAC | |
# ------------------------------------------------------------------------- # | |
def run(cmd: List[str]) -> str: | |
"Run command, return stdout text, raise on error." | |
return subprocess.check_output(cmd, text=True) | |
def ffprobe(path: Path) -> dict: | |
"Return ffprobe metadata for video & audio streams." | |
return json.loads( | |
run( | |
[ | |
"ffprobe", | |
"-hide_banner", | |
"-loglevel", | |
"error", | |
"-show_streams", | |
"-print_format", | |
"json", | |
str(path), | |
] | |
) | |
) | |
# ---------------------- util: decision helpers --------------------------- # | |
def is_h264_ok(v: dict) -> bool: | |
return ( | |
v["codec_name"] == "h264" | |
and float(v.get("level", H264_LEVEL_THRESHOLD)) <= H264_LEVEL_THRESHOLD | |
and int(v.get("coded_height", MAX_HEIGHT)) <= MAX_HEIGHT | |
) | |
def has_aac_stereo(streams: List[dict]) -> bool: | |
for s in streams: | |
if s["codec_type"] == "audio" and s["codec_name"] == "aac": | |
return True | |
return False | |
def classify(job) -> str: | |
""" | |
Decide what to do with `job.src`. | |
Returns: "skip", "remux", or "encode". | |
""" | |
streams = job.meta["streams"] | |
v = next((s for s in streams if s["codec_type"] == "video"), None) | |
if v is None: # no video stream at all | |
log("⚠", C.YEL, "No video stream; skipping", job.src) | |
return "skip" | |
direct_ok = ( | |
job.src.suffix.lower() == ".mp4" and is_h264_ok(v) and has_aac_stereo(streams) | |
) | |
if direct_ok: | |
return "skip" | |
# Remux path: codecs OK but container wrong | |
if is_h264_ok(v) and has_aac_stereo(streams): | |
return "remux" | |
return "encode" | |
# ----------------------------- job type ---------------------------------- # | |
class Job: | |
def __init__(self, src: Path, opts): | |
self.src = src | |
self.opts = opts | |
try: | |
self.meta = ffprobe(src) | |
self.action = classify(self) | |
except subprocess.CalledProcessError: | |
# Attempt auto-repair for MP4/MOV/M4V files with corrupted headers | |
if src.suffix.lower() in {".mp4", ".m4v", ".mov"}: | |
print(f"⚠️ Attempting to repair corrupted MP4 header: {src}") | |
tmp = src.with_suffix(".tmp.mp4") | |
repair = [ | |
"ffmpeg", | |
"-y", | |
"-err_detect", | |
"ignore_err", | |
"-i", | |
str(src), | |
"-c", | |
"copy", | |
"-movflags", | |
"+faststart", | |
str(tmp), | |
] | |
if subprocess.run(repair).returncode == 0 and tmp.stat().st_size > 0: | |
tmp.rename(src) # swap in the repaired file | |
print(f"✓ Successfully repaired: {src}") | |
self.meta = ffprobe(src) | |
self.action = classify(self) # try again | |
else: | |
print(f"⚠️ Repair failed, skipping: {src}") | |
self.action = "skip" | |
# For MKV/WebM, try a simple remux | |
elif src.suffix.lower() in {".mkv", ".webm"}: | |
print(f"⚠️ Attempting to remux corrupted container: {src}") | |
tmp = src.with_suffix(".tmp" + src.suffix) | |
remux = ["ffmpeg", "-y", "-i", str(src), "-c", "copy", str(tmp)] | |
if subprocess.run(remux).returncode == 0 and tmp.stat().st_size > 0: | |
tmp.rename(src) # swap in the remuxed file | |
print(f"✓ Successfully remuxed: {src}") | |
self.meta = ffprobe(src) | |
self.action = classify(self) # try again | |
else: | |
print(f"⚠️ Remux failed, skipping: {src}") | |
self.action = "skip" | |
else: | |
print(f"⚠️ Unreadable or non-video file, skipping {src}") | |
self.action = "skip" | |
self.dst = src.with_suffix(".mp4") if self.action != "skip" else None | |
# --------------------------- worker logic -------------------------------- # | |
def looks_ok(path: Path) -> bool: | |
""" | |
Return True only if `path` is a playable MP4 that already | |
satisfies the direct-play profile (container + codecs). | |
""" | |
try: | |
meta = ffprobe(path) | |
streams = meta["streams"] | |
v = next((s for s in streams if s["codec_type"] == "video"), None) | |
if v is None: | |
return False | |
container_ok = path.suffix.lower() == ".mp4" | |
video_ok = is_h264_ok(v) | |
audio_ok = has_aac_stereo(streams) | |
return container_ok and video_ok and audio_ok | |
except subprocess.CalledProcessError: | |
return False | |
# ---------------------------------------------------------------------- # | |
# helper: detect obvious stubs (0-byte or < 1 MiB) | |
def is_stub(p: Path) -> bool: | |
try: | |
return p.stat().st_size < 1 * 1024 * 1024 # < 1 MiB | |
except FileNotFoundError: | |
return True | |
def build_encode_cmd(job: Job): | |
vfilters = [] | |
v = next(s for s in job.meta["streams"] if s["codec_type"] == "video") | |
need_downscale = int(v.get("coded_height", 0)) > MAX_HEIGHT | |
# Only use the CUDA scaler if we also decoded on CUDA (i.e. input was H.264) | |
if job.opts.encoder == "h264_nvenc" and v.get("codec_name") == "h264": | |
if need_downscale: | |
vfilters.append(f"scale_cuda=-2:{MAX_HEIGHT}:format=yuv420p") | |
else: | |
vfilters.append("scale_cuda=format=yuv420p") | |
else: | |
# either CPU‐only path or non‐H.264 input → use normal scale | |
if need_downscale: | |
vfilters.append(f"scale=-2:{MAX_HEIGHT}") | |
vf = ",".join(vfilters) if vfilters else None | |
# -------- encoder-specific branch -------- | |
if job.opts.encoder == "libx264": | |
video_flags = [ | |
"-c:v", | |
"libx264", | |
"-preset", | |
job.opts.preset, | |
"-crf", | |
str(job.opts.crf), | |
] | |
else: # h264_nvenc | |
video_flags = [ | |
# --- NVENC encode flags --- | |
"-c:v", | |
"h264_nvenc", | |
"-preset", | |
job.opts.nv_preset, | |
"-rc", | |
"vbr", | |
"-tune", | |
"hq", | |
# pix_fmt handled by format filter above | |
"-cq", | |
str(job.opts.crf), | |
"-b:v", | |
"0", | |
] | |
cmd = [ | |
"ffmpeg", | |
"-hide_banner", | |
"-loglevel", | |
"error", | |
"-xerror", | |
"-y", | |
] | |
# Only use CUDA hwaccel if input is H.264 (AV1 on hardware will fail) | |
if job.opts.encoder == "h264_nvenc" and v.get("codec_name") == "h264": | |
cmd.extend( | |
[ | |
"-hwaccel", | |
"cuda", | |
"-hwaccel_output_format", | |
"cuda", | |
] | |
) | |
tmp_out = job.dst.with_suffix(".tmp.mp4") | |
cmd.extend( | |
[ | |
"-i", | |
str(job.src), | |
# ========== video ========== | |
"-map", | |
"0:v:0", | |
*video_flags, | |
"-profile:v", | |
"high", | |
"-level", | |
"4.1", | |
] | |
+ (["-vf", vf] if vf else []) | |
+ [ | |
# ========== audio ========== | |
# 1) include *all* input audio streams | |
"-map", | |
"0:a:0", | |
"-c:a:0", | |
"aac", | |
"-ac:0", | |
"2", | |
"-q:a:0", | |
"0.45", | |
"-map", | |
"0:a:1?", | |
"-c:a:1", | |
"copy", | |
"-map", | |
"0:a:2?", | |
"-c:a:2", | |
"copy", | |
# ========== subtitles ========== | |
# take only text subtitles (SRT/ASS) and convert to MP4-native mov_text | |
"-map", | |
"0:s:m:codec_name:srt?", | |
"-c:s", | |
"mov_text", | |
"-map", | |
"0:s:m:codec_name:ass?", | |
"-c:s", | |
"mov_text", | |
str(tmp_out), | |
] | |
) | |
return cmd, tmp_out | |
def process(job: Job): | |
""" | |
Process a single Job: | |
- skip if already compliant | |
- remux or encode otherwise | |
- on any ffmpeg error, log & return (i.e. skip the bad file) | |
""" | |
# 1) If we don't need to touch it, skip early | |
if job.action == "skip": | |
log("✓", C.GRN, "Direct-play already supported", job.src) | |
return | |
# 2) If dst exists and is good, skip; if stub or bad, prepare to overwrite | |
if job.dst.exists(): | |
if looks_ok(job.dst): | |
log("✓", C.GRN, "Output exists; skipping", job.dst) | |
return | |
elif is_stub(job.dst): | |
log("⚠", C.YEL, "Stub file detected; will overwrite", job.dst) | |
job.dst.unlink() | |
else: | |
log("⚠", C.YEL, "Will overwrite non-compliant MP4 after encode", job.dst) | |
# 3) Build the ffmpeg command | |
if job.action == "remux": | |
cmd = [ | |
"ffmpeg", | |
"-hide_banner", | |
"-loglevel", | |
"error", | |
"-xerror", | |
"-y", | |
"-i", | |
str(job.src), | |
"-c", | |
"copy", | |
"-map_metadata", | |
"0", | |
str(job.dst), | |
] | |
tmp_out = None | |
else: | |
cmd, tmp_out = build_encode_cmd(job) | |
log("→", C.CYN, job.action.upper(), job.src) | |
# 4) Run ffmpeg, but don't let a non-zero exit kill the whole batch | |
try: | |
subprocess.run(cmd, check=True) | |
except subprocess.CalledProcessError as e: | |
# Log the failure and skip to the next file | |
log("✗", C.RED, f"FFmpeg failed ({e.returncode}); skipping", job.src) | |
return | |
# 5) On success: atomic swap & optional delete | |
if tmp_out and tmp_out.exists(): | |
tmp_out.replace(job.dst) | |
if job.opts.delete_original and job.action == "encode" and job.src != job.dst: | |
job.src.unlink() | |
log("✘", C.RED, "Deleted original", job.src) | |
# ----------------------------- main -------------------------------------- # | |
def gather_files(roots, exts) -> List[Path]: | |
files = [] | |
for root in roots: | |
for p in Path(root).rglob("*"): | |
# skip dot-files created by macOS (._foo, .DS_Store, etc.) | |
if p.is_file() and not p.name.startswith(".") and p.suffix.lower() in exts: | |
files.append(p) | |
# sort so MP4s are handled after MKV/AVI in each directory | |
files.sort(key=lambda p: (p.parent, p.suffix.lower() == ".mp4", p.name)) | |
return files | |
def main(): | |
p = argparse.ArgumentParser(description="Pre-encode / remux videos for Chromecast.") | |
p.add_argument("roots", nargs="+", help="Root directories to scan.") | |
p.add_argument( | |
"-j", | |
"--workers", | |
type=int, | |
default=os.cpu_count() or 1, | |
help="Parallel workers (default = all CPUs).", | |
) | |
p.add_argument( | |
"--crf", | |
type=int, | |
default=19, | |
help="x264 Constant Rate Factor (lower = higher quality, default 19).", | |
) | |
p.add_argument( | |
"--preset", | |
default="medium", | |
help="x264 preset: ultrafast…slow (default medium).", | |
) | |
p.add_argument( | |
"--delete-original", | |
choices=("true", "false"), | |
default="false", | |
help="Remove source file after successful processing.", | |
) | |
p.add_argument( | |
"--exts", | |
default=",".join(DEFAULT_EXTS), | |
help="Comma-separated list of file extensions to consider.", | |
) | |
opts = p.parse_args() | |
# -- Make sure ffmpeg exists BEFORE calling it ------------- | |
if not shutil.which("ffmpeg"): | |
os_hint = { | |
"Windows": "winget install Gyan.FFmpeg", | |
"Darwin": "brew install ffmpeg", | |
"Linux": "sudo apt install ffmpeg # Debian/Ubuntu\n" | |
" sudo dnf install ffmpeg # Fedora/RHEL", | |
}.get(platform.system(), "Install ffmpeg from your package manager") | |
msg = textwrap.dedent( | |
f""" | |
ffmpeg/ffprobe not found in PATH. | |
Quick install: | |
{os_hint} | |
Then re-run: {' '.join(map(str, sys.argv))} | |
""" | |
).strip() | |
raise SystemExit(msg) | |
# -- Detect hardware encoder once -------------------------- | |
encoders = run(["ffmpeg", "-v", "quiet", "-encoders"]) | |
opts.encoder = "h264_nvenc" if "h264_nvenc" in encoders else "libx264" | |
# quick helper for preset mapping when NVENC is active | |
def _nv_map(x): | |
map_ = { | |
"ultrafast": "p1", | |
"superfast": "p2", | |
"veryfast": "p3", | |
"faster": "p3", | |
"fast": "p4", | |
"medium": "p4", | |
"slow": "p5", | |
"slower": "p6", | |
"veryslow": "p7", | |
} | |
return map_.get(x, "p4") | |
# Add the NVENC preset mapping to the options | |
opts.nv_preset = _nv_map(opts.preset) | |
opts.delete_original = opts.delete_original.lower() == "true" | |
exts = {e if e.startswith(".") else f".{e}" for e in opts.exts.split(",")} | |
print("Scanning …") | |
files = gather_files(opts.roots, exts) | |
print(f"Found {len(files)} candidate files.\n") | |
with ProcessPoolExecutor(max_workers=opts.workers) as exe: | |
jobs = [Job(f, opts) for f in files] | |
exe.map(process, jobs) | |
print("\nDone.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment