Skip to content

Instantly share code, notes, and snippets.

@ph33nx
Last active May 8, 2025 04:12
Show Gist options
  • Save ph33nx/b0b7a0ff64bd3f7c8961675db22b4716 to your computer and use it in GitHub Desktop.
Save ph33nx/b0b7a0ff64bd3f7c8961675db22b4716 to your computer and use it in GitHub Desktop.
Python script that converts videos to H.264/AAC MP4 so Jellyfin, Plex, Emby, Android TV and Chromecast devices stream without transcoding. Perfect for media servers, homelabs, NAS boxes and Proxmox LXC VMs. #ffmpeg #chromecast #jellyfin #transcode #remux #batch-encode #python #smart-tv #media-server #directplay
#!/usr/bin/env python3
"""
directplay.py
=============
Batch-optimise any video library so every file direct-plays on
Chromecast, smart-TVs and desktop/mobile browsers — no server-side
transcoding required.
Key features
------------
• Smart decision tree – tries lightning-fast remux first; falls back
to H.264 re-encode only when codecs or resolution exceed safe limits.
• Recursive scan – pass one or more root folders; sub-directories
handled automatically.
• Parallel workers (-j / --workers) – use all CPU cores by default or
limit to keep the box responsive.
• Quality knobs – --crf and --preset expose x264 controls; defaults
tuned for "Netflix-like" 1080 p quality.
• Safe by default – outputs <name>.mp4 in the same folder; originals
retained unless you add --delete-original true.
• Idempotent – re-runs skip files already compliant; perfect for
nightly cron/systemd-timer jobs.
Dependencies
------------
ffmpeg and ffprobe must be in your PATH.
Windows 10/11 : winget install Gyan.FFmpeg
macOS (brew) : brew install ffmpeg
Debian/Ubuntu : sudo apt install ffmpeg
Fedora/RHEL : sudo dnf install -y ffmpeg ffprobe
Python 3.8 or newer is required.
Quick start
-----------
# dry-run, show what would happen
python3 directplay.py /mnt/media
# real conversion, two workers, better quality, delete sources after success
python3 directplay.py -j 2 --crf 18 --preset slow \
--delete-original true /mnt/media /video/new
CLI flags
---------
positional:
roots One or more library root folders.
optional:
-j, --workers N Parallel encode workers (default = all CPUs)
--crf N x264 Constant Rate Factor (quality slider, default 19)
--preset P x264 preset: ultrafast … slow (default medium)
--delete-original true|false Remove source file on success (default false)
--exts .mkv,.mp4 Comma-separated list of file extensions to consider
License
-------
Apache-2.0 — use it, fork it, profit.
Author
------
© 2025 ph33nx https://github.com/ph33nx
"""
import platform, textwrap, sys
import argparse
import json
import os
import shutil
import subprocess
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import List
# ------------------------- pretty logging ------------------------------ #
class C:
RESET = "\033[0m"
GRN = "\033[32m"
CYN = "\033[36m"
YEL = "\033[33m"
RED = "\033[31m"
def log(symbol: str, color: str, action: str, path: Path):
print(f'{color}{symbol} {action}: "{path}"{C.RESET}')
# ---------------------------- constants ---------------------------------- #
DEFAULT_EXTS = {".mkv", ".mp4", ".avi", ".mov"}
H264_LEVEL_THRESHOLD = 41 # High 4.1 is Chromecast safe
MAX_HEIGHT = 1080 # Down-scale 4 K → 1080p to stay in 4.1
AUDIO_CHANNELS = 2 # stereo AAC
# ------------------------------------------------------------------------- #
def run(cmd: List[str]) -> str:
"Run command, return stdout text, raise on error."
return subprocess.check_output(cmd, text=True)
def ffprobe(path: Path) -> dict:
"Return ffprobe metadata for video & audio streams."
return json.loads(
run(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"error",
"-show_streams",
"-print_format",
"json",
str(path),
]
)
)
# ---------------------- util: decision helpers --------------------------- #
def is_h264_ok(v: dict) -> bool:
return (
v["codec_name"] == "h264"
and float(v.get("level", H264_LEVEL_THRESHOLD)) <= H264_LEVEL_THRESHOLD
and int(v.get("coded_height", MAX_HEIGHT)) <= MAX_HEIGHT
)
def has_aac_stereo(streams: List[dict]) -> bool:
for s in streams:
if s["codec_type"] == "audio" and s["codec_name"] == "aac":
return True
return False
def classify(job) -> str:
"""
Decide what to do with `job.src`.
Returns: "skip", "remux", or "encode".
"""
streams = job.meta["streams"]
v = next((s for s in streams if s["codec_type"] == "video"), None)
if v is None: # no video stream at all
log("⚠", C.YEL, "No video stream; skipping", job.src)
return "skip"
direct_ok = (
job.src.suffix.lower() == ".mp4" and is_h264_ok(v) and has_aac_stereo(streams)
)
if direct_ok:
return "skip"
# Remux path: codecs OK but container wrong
if is_h264_ok(v) and has_aac_stereo(streams):
return "remux"
return "encode"
# ----------------------------- job type ---------------------------------- #
class Job:
def __init__(self, src: Path, opts):
self.src = src
self.opts = opts
try:
self.meta = ffprobe(src)
self.action = classify(self)
except subprocess.CalledProcessError:
# Attempt auto-repair for MP4/MOV/M4V files with corrupted headers
if src.suffix.lower() in {".mp4", ".m4v", ".mov"}:
print(f"⚠️ Attempting to repair corrupted MP4 header: {src}")
tmp = src.with_suffix(".tmp.mp4")
repair = [
"ffmpeg",
"-y",
"-err_detect",
"ignore_err",
"-i",
str(src),
"-c",
"copy",
"-movflags",
"+faststart",
str(tmp),
]
if subprocess.run(repair).returncode == 0 and tmp.stat().st_size > 0:
tmp.rename(src) # swap in the repaired file
print(f"✓ Successfully repaired: {src}")
self.meta = ffprobe(src)
self.action = classify(self) # try again
else:
print(f"⚠️ Repair failed, skipping: {src}")
self.action = "skip"
# For MKV/WebM, try a simple remux
elif src.suffix.lower() in {".mkv", ".webm"}:
print(f"⚠️ Attempting to remux corrupted container: {src}")
tmp = src.with_suffix(".tmp" + src.suffix)
remux = ["ffmpeg", "-y", "-i", str(src), "-c", "copy", str(tmp)]
if subprocess.run(remux).returncode == 0 and tmp.stat().st_size > 0:
tmp.rename(src) # swap in the remuxed file
print(f"✓ Successfully remuxed: {src}")
self.meta = ffprobe(src)
self.action = classify(self) # try again
else:
print(f"⚠️ Remux failed, skipping: {src}")
self.action = "skip"
else:
print(f"⚠️ Unreadable or non-video file, skipping {src}")
self.action = "skip"
self.dst = src.with_suffix(".mp4") if self.action != "skip" else None
# --------------------------- worker logic -------------------------------- #
def looks_ok(path: Path) -> bool:
"""
Return True only if `path` is a playable MP4 that already
satisfies the direct-play profile (container + codecs).
"""
try:
meta = ffprobe(path)
streams = meta["streams"]
v = next((s for s in streams if s["codec_type"] == "video"), None)
if v is None:
return False
container_ok = path.suffix.lower() == ".mp4"
video_ok = is_h264_ok(v)
audio_ok = has_aac_stereo(streams)
return container_ok and video_ok and audio_ok
except subprocess.CalledProcessError:
return False
# ---------------------------------------------------------------------- #
# helper: detect obvious stubs (0-byte or < 1 MiB)
def is_stub(p: Path) -> bool:
try:
return p.stat().st_size < 1 * 1024 * 1024 # < 1 MiB
except FileNotFoundError:
return True
def build_encode_cmd(job: Job):
vfilters = []
v = next(s for s in job.meta["streams"] if s["codec_type"] == "video")
need_downscale = int(v.get("coded_height", 0)) > MAX_HEIGHT
# Only use the CUDA scaler if we also decoded on CUDA (i.e. input was H.264)
if job.opts.encoder == "h264_nvenc" and v.get("codec_name") == "h264":
if need_downscale:
vfilters.append(f"scale_cuda=-2:{MAX_HEIGHT}:format=yuv420p")
else:
vfilters.append("scale_cuda=format=yuv420p")
else:
# either CPU‐only path or non‐H.264 input → use normal scale
if need_downscale:
vfilters.append(f"scale=-2:{MAX_HEIGHT}")
vf = ",".join(vfilters) if vfilters else None
# -------- encoder-specific branch --------
if job.opts.encoder == "libx264":
video_flags = [
"-c:v",
"libx264",
"-preset",
job.opts.preset,
"-crf",
str(job.opts.crf),
]
else: # h264_nvenc
video_flags = [
# --- NVENC encode flags ---
"-c:v",
"h264_nvenc",
"-preset",
job.opts.nv_preset,
"-rc",
"vbr",
"-tune",
"hq",
# pix_fmt handled by format filter above
"-cq",
str(job.opts.crf),
"-b:v",
"0",
]
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-xerror",
"-y",
]
# Only use CUDA hwaccel if input is H.264 (AV1 on hardware will fail)
if job.opts.encoder == "h264_nvenc" and v.get("codec_name") == "h264":
cmd.extend(
[
"-hwaccel",
"cuda",
"-hwaccel_output_format",
"cuda",
]
)
tmp_out = job.dst.with_suffix(".tmp.mp4")
cmd.extend(
[
"-i",
str(job.src),
# ========== video ==========
"-map",
"0:v:0",
*video_flags,
"-profile:v",
"high",
"-level",
"4.1",
]
+ (["-vf", vf] if vf else [])
+ [
# ========== audio ==========
# 1) include *all* input audio streams
"-map",
"0:a:0",
"-c:a:0",
"aac",
"-ac:0",
"2",
"-q:a:0",
"0.45",
"-map",
"0:a:1?",
"-c:a:1",
"copy",
"-map",
"0:a:2?",
"-c:a:2",
"copy",
# ========== subtitles ==========
# take only text subtitles (SRT/ASS) and convert to MP4-native mov_text
"-map",
"0:s:m:codec_name:srt?",
"-c:s",
"mov_text",
"-map",
"0:s:m:codec_name:ass?",
"-c:s",
"mov_text",
str(tmp_out),
]
)
return cmd, tmp_out
def process(job: Job):
"""
Process a single Job:
- skip if already compliant
- remux or encode otherwise
- on any ffmpeg error, log & return (i.e. skip the bad file)
"""
# 1) If we don't need to touch it, skip early
if job.action == "skip":
log("✓", C.GRN, "Direct-play already supported", job.src)
return
# 2) If dst exists and is good, skip; if stub or bad, prepare to overwrite
if job.dst.exists():
if looks_ok(job.dst):
log("✓", C.GRN, "Output exists; skipping", job.dst)
return
elif is_stub(job.dst):
log("⚠", C.YEL, "Stub file detected; will overwrite", job.dst)
job.dst.unlink()
else:
log("⚠", C.YEL, "Will overwrite non-compliant MP4 after encode", job.dst)
# 3) Build the ffmpeg command
if job.action == "remux":
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-xerror",
"-y",
"-i",
str(job.src),
"-c",
"copy",
"-map_metadata",
"0",
str(job.dst),
]
tmp_out = None
else:
cmd, tmp_out = build_encode_cmd(job)
log("→", C.CYN, job.action.upper(), job.src)
# 4) Run ffmpeg, but don't let a non-zero exit kill the whole batch
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
# Log the failure and skip to the next file
log("✗", C.RED, f"FFmpeg failed ({e.returncode}); skipping", job.src)
return
# 5) On success: atomic swap & optional delete
if tmp_out and tmp_out.exists():
tmp_out.replace(job.dst)
if job.opts.delete_original and job.action == "encode" and job.src != job.dst:
job.src.unlink()
log("✘", C.RED, "Deleted original", job.src)
# ----------------------------- main -------------------------------------- #
def gather_files(roots, exts) -> List[Path]:
files = []
for root in roots:
for p in Path(root).rglob("*"):
# skip dot-files created by macOS (._foo, .DS_Store, etc.)
if p.is_file() and not p.name.startswith(".") and p.suffix.lower() in exts:
files.append(p)
# sort so MP4s are handled after MKV/AVI in each directory
files.sort(key=lambda p: (p.parent, p.suffix.lower() == ".mp4", p.name))
return files
def main():
p = argparse.ArgumentParser(description="Pre-encode / remux videos for Chromecast.")
p.add_argument("roots", nargs="+", help="Root directories to scan.")
p.add_argument(
"-j",
"--workers",
type=int,
default=os.cpu_count() or 1,
help="Parallel workers (default = all CPUs).",
)
p.add_argument(
"--crf",
type=int,
default=19,
help="x264 Constant Rate Factor (lower = higher quality, default 19).",
)
p.add_argument(
"--preset",
default="medium",
help="x264 preset: ultrafast…slow (default medium).",
)
p.add_argument(
"--delete-original",
choices=("true", "false"),
default="false",
help="Remove source file after successful processing.",
)
p.add_argument(
"--exts",
default=",".join(DEFAULT_EXTS),
help="Comma-separated list of file extensions to consider.",
)
opts = p.parse_args()
# -- Make sure ffmpeg exists BEFORE calling it -------------
if not shutil.which("ffmpeg"):
os_hint = {
"Windows": "winget install Gyan.FFmpeg",
"Darwin": "brew install ffmpeg",
"Linux": "sudo apt install ffmpeg # Debian/Ubuntu\n"
" sudo dnf install ffmpeg # Fedora/RHEL",
}.get(platform.system(), "Install ffmpeg from your package manager")
msg = textwrap.dedent(
f"""
ffmpeg/ffprobe not found in PATH.
Quick install:
{os_hint}
Then re-run: {' '.join(map(str, sys.argv))}
"""
).strip()
raise SystemExit(msg)
# -- Detect hardware encoder once --------------------------
encoders = run(["ffmpeg", "-v", "quiet", "-encoders"])
opts.encoder = "h264_nvenc" if "h264_nvenc" in encoders else "libx264"
# quick helper for preset mapping when NVENC is active
def _nv_map(x):
map_ = {
"ultrafast": "p1",
"superfast": "p2",
"veryfast": "p3",
"faster": "p3",
"fast": "p4",
"medium": "p4",
"slow": "p5",
"slower": "p6",
"veryslow": "p7",
}
return map_.get(x, "p4")
# Add the NVENC preset mapping to the options
opts.nv_preset = _nv_map(opts.preset)
opts.delete_original = opts.delete_original.lower() == "true"
exts = {e if e.startswith(".") else f".{e}" for e in opts.exts.split(",")}
print("Scanning …")
files = gather_files(opts.roots, exts)
print(f"Found {len(files)} candidate files.\n")
with ProcessPoolExecutor(max_workers=opts.workers) as exe:
jobs = [Job(f, opts) for f in files]
exe.map(process, jobs)
print("\nDone.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment