Skip to content

Instantly share code, notes, and snippets.

@nwaughachukwuma
Created November 9, 2024 13:43
Show Gist options
  • Save nwaughachukwuma/ad42141a8c0ab37bdf95ad15abad0b20 to your computer and use it in GitHub Desktop.
Save nwaughachukwuma/ad42141a8c0ab37bdf95ad15abad0b20 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import logging
import os
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from fractions import Fraction
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
@dataclass
class VideoVariant:
resolution: str
bitrate: str
output_name: str
profile: str
level: str
width: int
height: int
@property
def bandwidth(self) -> int:
"""Calculate total bandwidth including audio"""
video_bitrate = int(self.bitrate.rstrip("k")) * 1000
audio_bitrate = 128000 # 128k audio
return video_bitrate + audio_bitrate
class FFmpegNotFoundError(Exception):
"""Raised when FFmpeg or FFprobe are not found"""
pass
class VideoProcessor:
def __init__(self, input_file: Path, output_dir: Path | None = None):
self.input_file = Path(input_file)
self.output_dir = output_dir or Path.cwd()
self.basename = self.input_file.stem
self.output_path = self.output_dir / self.basename
# Define video variants (resolution, bitrate, name, profile, level)
self.variants = [
VideoVariant("1280x720", "1200k", "720p", "main", "3.1", 1280, 720),
VideoVariant("1920x1080", "2500k", "1080p", "high", "4.2", 1920, 1080),
VideoVariant("3840x2160", "8000k", "2160p", "high", "5.1", 3840, 2160),
]
@staticmethod
def check_dependencies() -> None:
"""Verify FFmpeg and FFprobe are installed"""
for cmd in ["ffmpeg", "ffprobe"]:
if not shutil.which(cmd):
raise FFmpegNotFoundError(f"{cmd} is not installed or not in PATH")
def validate_input(self) -> None:
"""Validate input file exists and is accessible"""
if not self.input_file.is_file():
raise FileNotFoundError(f"Input file '{self.input_file}' does not exist")
def setup_output_directory(self) -> None:
"""Setup output directory, removing if it exists"""
if self.output_path.exists():
logger.warning(f"Removing existing output directory: {self.output_path}")
shutil.rmtree(self.output_path)
self.output_path.mkdir(parents=True, exist_ok=True)
def get_frame_rate(self) -> int:
"""Get frame rate from input video"""
cmd = [
"ffprobe",
"-v",
"0",
"-of",
"default=noprint_wrappers=1:nokey=1",
"-select_streams",
"v:0",
"-show_entries",
"stream=avg_frame_rate",
str(self.input_file),
]
try:
output = subprocess.check_output(cmd, stderr=subprocess.PIPE).decode().strip()
num, denom = map(int, output.split("/"))
return int(Fraction(num, denom))
except (subprocess.CalledProcessError, ValueError) as e:
logger.error(f"Failed to get frame rate: {e}")
return 30 # Default to 30 fps if detection fails
def process_variant(self, variant: VideoVariant, gop_size: int) -> bool:
"""Process a single video variant"""
output_playlist = self.output_path / f"{variant.output_name}.m3u8"
segment_pattern = self.output_path / f"{variant.output_name}_%03d.ts"
cmd = [
"ffmpeg",
"-y",
"-i",
str(self.input_file),
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-profile:v",
variant.profile,
"-level:v",
variant.level,
"-b:v",
variant.bitrate,
"-threads",
"0",
"-s",
variant.resolution,
"-c:a",
"aac",
"-b:a",
"128k",
"-ac",
"2",
"-g",
str(gop_size),
"-keyint_min",
str(gop_size),
"-sc_threshold",
"0",
"-force_key_frames",
"expr:gte(t,n_forced*4)",
"-hls_time",
"4",
"-hls_list_size",
"0",
"-hls_flags",
"independent_segments",
"-hls_segment_filename",
str(segment_pattern),
str(output_playlist),
]
try:
subprocess.run(cmd, check=True, capture_output=True)
logger.info(f"Successfully processed {variant.output_name}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to process {variant.output_name}: {e.stderr.decode()}")
return False
def generate_master_playlist(self) -> None:
"""Generate the master HLS playlist"""
master_playlist_path = self.output_path / "playlist.m3u8"
with open(master_playlist_path, "w") as f:
f.write("#EXTM3U\n")
f.write("#EXT-X-VERSION:3\n")
for variant in self.variants:
f.write("\n")
f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={variant.bandwidth}," f"RESOLUTION={variant.resolution}\n")
f.write(f"{variant.output_name}.m3u8\n")
logger.info(f"Generated master playlist: {master_playlist_path}")
def process(self) -> None:
"""Main processing function"""
try:
# Initial setup and validation
self.check_dependencies()
self.validate_input()
self.setup_output_directory()
# Get video parameters
frame_rate = self.get_frame_rate()
gop_size = frame_rate * 4 # 4-second GOP
# Process variants in parallel
max_workers = int(max((os.cpu_count() or 1) / 2, 1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_variant, variant, gop_size) for variant in self.variants]
# Wait for all processes to complete
if not all(future.result() for future in futures):
raise RuntimeError("One or more variants failed to process")
# Generate master playlist
self.generate_master_playlist()
logger.info(f"Processing completed successfully. Output directory: {self.output_path}")
except Exception as e:
logger.error(f"Processing failed: {str(e)}")
raise
@nwaughachukwuma
Copy link
Author

What did you try? You need to run the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment