Skip to content

Instantly share code, notes, and snippets.

@nwaughachukwuma
Created November 9, 2024 13:43
Show Gist options
  • Save nwaughachukwuma/ad42141a8c0ab37bdf95ad15abad0b20 to your computer and use it in GitHub Desktop.
Save nwaughachukwuma/ad42141a8c0ab37bdf95ad15abad0b20 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import logging
import os
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from fractions import Fraction
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
@dataclass
class VideoVariant:
resolution: str
bitrate: str
output_name: str
profile: str
level: str
width: int
height: int
@property
def bandwidth(self) -> int:
"""Calculate total bandwidth including audio"""
video_bitrate = int(self.bitrate.rstrip("k")) * 1000
audio_bitrate = 128000 # 128k audio
return video_bitrate + audio_bitrate
class FFmpegNotFoundError(Exception):
"""Raised when FFmpeg or FFprobe are not found"""
pass
class VideoProcessor:
def __init__(self, input_file: Path, output_dir: Path | None = None):
self.input_file = Path(input_file)
self.output_dir = output_dir or Path.cwd()
self.basename = self.input_file.stem
self.output_path = self.output_dir / self.basename
# Define video variants (resolution, bitrate, name, profile, level)
self.variants = [
VideoVariant("1280x720", "1200k", "720p", "main", "3.1", 1280, 720),
VideoVariant("1920x1080", "2500k", "1080p", "high", "4.2", 1920, 1080),
VideoVariant("3840x2160", "8000k", "2160p", "high", "5.1", 3840, 2160),
]
@staticmethod
def check_dependencies() -> None:
"""Verify FFmpeg and FFprobe are installed"""
for cmd in ["ffmpeg", "ffprobe"]:
if not shutil.which(cmd):
raise FFmpegNotFoundError(f"{cmd} is not installed or not in PATH")
def validate_input(self) -> None:
"""Validate input file exists and is accessible"""
if not self.input_file.is_file():
raise FileNotFoundError(f"Input file '{self.input_file}' does not exist")
def setup_output_directory(self) -> None:
"""Setup output directory, removing if it exists"""
if self.output_path.exists():
logger.warning(f"Removing existing output directory: {self.output_path}")
shutil.rmtree(self.output_path)
self.output_path.mkdir(parents=True, exist_ok=True)
def get_frame_rate(self) -> int:
"""Get frame rate from input video"""
cmd = [
"ffprobe",
"-v",
"0",
"-of",
"default=noprint_wrappers=1:nokey=1",
"-select_streams",
"v:0",
"-show_entries",
"stream=avg_frame_rate",
str(self.input_file),
]
try:
output = subprocess.check_output(cmd, stderr=subprocess.PIPE).decode().strip()
num, denom = map(int, output.split("/"))
return int(Fraction(num, denom))
except (subprocess.CalledProcessError, ValueError) as e:
logger.error(f"Failed to get frame rate: {e}")
return 30 # Default to 30 fps if detection fails
def process_variant(self, variant: VideoVariant, gop_size: int) -> bool:
"""Process a single video variant"""
output_playlist = self.output_path / f"{variant.output_name}.m3u8"
segment_pattern = self.output_path / f"{variant.output_name}_%03d.ts"
cmd = [
"ffmpeg",
"-y",
"-i",
str(self.input_file),
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-profile:v",
variant.profile,
"-level:v",
variant.level,
"-b:v",
variant.bitrate,
"-threads",
"0",
"-s",
variant.resolution,
"-c:a",
"aac",
"-b:a",
"128k",
"-ac",
"2",
"-g",
str(gop_size),
"-keyint_min",
str(gop_size),
"-sc_threshold",
"0",
"-force_key_frames",
"expr:gte(t,n_forced*4)",
"-hls_time",
"4",
"-hls_list_size",
"0",
"-hls_flags",
"independent_segments",
"-hls_segment_filename",
str(segment_pattern),
str(output_playlist),
]
try:
subprocess.run(cmd, check=True, capture_output=True)
logger.info(f"Successfully processed {variant.output_name}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to process {variant.output_name}: {e.stderr.decode()}")
return False
def generate_master_playlist(self) -> None:
"""Generate the master HLS playlist"""
master_playlist_path = self.output_path / "playlist.m3u8"
with open(master_playlist_path, "w") as f:
f.write("#EXTM3U\n")
f.write("#EXT-X-VERSION:3\n")
for variant in self.variants:
f.write("\n")
f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={variant.bandwidth}," f"RESOLUTION={variant.resolution}\n")
f.write(f"{variant.output_name}.m3u8\n")
logger.info(f"Generated master playlist: {master_playlist_path}")
def process(self) -> None:
"""Main processing function"""
try:
# Initial setup and validation
self.check_dependencies()
self.validate_input()
self.setup_output_directory()
# Get video parameters
frame_rate = self.get_frame_rate()
gop_size = frame_rate * 4 # 4-second GOP
# Process variants in parallel
max_workers = int(max((os.cpu_count() or 1) / 2, 1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_variant, variant, gop_size) for variant in self.variants]
# Wait for all processes to complete
if not all(future.result() for future in futures):
raise RuntimeError("One or more variants failed to process")
# Generate master playlist
self.generate_master_playlist()
logger.info(f"Processing completed successfully. Output directory: {self.output_path}")
except Exception as e:
logger.error(f"Processing failed: {str(e)}")
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment