-
-
Save nwaughachukwuma/ad42141a8c0ab37bdf95ad15abad0b20 to your computer and use it in GitHub Desktop.
Python version of https://gist.github.com/stenuto/9ff19ce89f07c7419a8d0976736ebe12
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
import os | |
import shutil | |
import subprocess | |
from concurrent.futures import ThreadPoolExecutor | |
from dataclasses import dataclass | |
from fractions import Fraction | |
from pathlib import Path | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
@dataclass | |
class VideoVariant: | |
resolution: str | |
bitrate: str | |
output_name: str | |
profile: str | |
level: str | |
width: int | |
height: int | |
@property | |
def bandwidth(self) -> int: | |
"""Calculate total bandwidth including audio""" | |
video_bitrate = int(self.bitrate.rstrip("k")) * 1000 | |
audio_bitrate = 128000 # 128k audio | |
return video_bitrate + audio_bitrate | |
class FFmpegNotFoundError(Exception): | |
"""Raised when FFmpeg or FFprobe are not found""" | |
pass | |
class VideoProcessor: | |
def __init__(self, input_file: Path, output_dir: Path | None = None): | |
self.input_file = Path(input_file) | |
self.output_dir = output_dir or Path.cwd() | |
self.basename = self.input_file.stem | |
self.output_path = self.output_dir / self.basename | |
# Define video variants (resolution, bitrate, name, profile, level) | |
self.variants = [ | |
VideoVariant("1280x720", "1200k", "720p", "main", "3.1", 1280, 720), | |
VideoVariant("1920x1080", "2500k", "1080p", "high", "4.2", 1920, 1080), | |
VideoVariant("3840x2160", "8000k", "2160p", "high", "5.1", 3840, 2160), | |
] | |
@staticmethod | |
def check_dependencies() -> None: | |
"""Verify FFmpeg and FFprobe are installed""" | |
for cmd in ["ffmpeg", "ffprobe"]: | |
if not shutil.which(cmd): | |
raise FFmpegNotFoundError(f"{cmd} is not installed or not in PATH") | |
def validate_input(self) -> None: | |
"""Validate input file exists and is accessible""" | |
if not self.input_file.is_file(): | |
raise FileNotFoundError(f"Input file '{self.input_file}' does not exist") | |
def setup_output_directory(self) -> None: | |
"""Setup output directory, removing if it exists""" | |
if self.output_path.exists(): | |
logger.warning(f"Removing existing output directory: {self.output_path}") | |
shutil.rmtree(self.output_path) | |
self.output_path.mkdir(parents=True, exist_ok=True) | |
def get_frame_rate(self) -> int: | |
"""Get frame rate from input video""" | |
cmd = [ | |
"ffprobe", | |
"-v", | |
"0", | |
"-of", | |
"default=noprint_wrappers=1:nokey=1", | |
"-select_streams", | |
"v:0", | |
"-show_entries", | |
"stream=avg_frame_rate", | |
str(self.input_file), | |
] | |
try: | |
output = subprocess.check_output(cmd, stderr=subprocess.PIPE).decode().strip() | |
num, denom = map(int, output.split("/")) | |
return int(Fraction(num, denom)) | |
except (subprocess.CalledProcessError, ValueError) as e: | |
logger.error(f"Failed to get frame rate: {e}") | |
return 30 # Default to 30 fps if detection fails | |
def process_variant(self, variant: VideoVariant, gop_size: int) -> bool: | |
"""Process a single video variant""" | |
output_playlist = self.output_path / f"{variant.output_name}.m3u8" | |
segment_pattern = self.output_path / f"{variant.output_name}_%03d.ts" | |
cmd = [ | |
"ffmpeg", | |
"-y", | |
"-i", | |
str(self.input_file), | |
"-c:v", | |
"libx264", | |
"-preset", | |
"ultrafast", | |
"-profile:v", | |
variant.profile, | |
"-level:v", | |
variant.level, | |
"-b:v", | |
variant.bitrate, | |
"-threads", | |
"0", | |
"-s", | |
variant.resolution, | |
"-c:a", | |
"aac", | |
"-b:a", | |
"128k", | |
"-ac", | |
"2", | |
"-g", | |
str(gop_size), | |
"-keyint_min", | |
str(gop_size), | |
"-sc_threshold", | |
"0", | |
"-force_key_frames", | |
"expr:gte(t,n_forced*4)", | |
"-hls_time", | |
"4", | |
"-hls_list_size", | |
"0", | |
"-hls_flags", | |
"independent_segments", | |
"-hls_segment_filename", | |
str(segment_pattern), | |
str(output_playlist), | |
] | |
try: | |
subprocess.run(cmd, check=True, capture_output=True) | |
logger.info(f"Successfully processed {variant.output_name}") | |
return True | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Failed to process {variant.output_name}: {e.stderr.decode()}") | |
return False | |
def generate_master_playlist(self) -> None: | |
"""Generate the master HLS playlist""" | |
master_playlist_path = self.output_path / "playlist.m3u8" | |
with open(master_playlist_path, "w") as f: | |
f.write("#EXTM3U\n") | |
f.write("#EXT-X-VERSION:3\n") | |
for variant in self.variants: | |
f.write("\n") | |
f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={variant.bandwidth}," f"RESOLUTION={variant.resolution}\n") | |
f.write(f"{variant.output_name}.m3u8\n") | |
logger.info(f"Generated master playlist: {master_playlist_path}") | |
def process(self) -> None: | |
"""Main processing function""" | |
try: | |
# Initial setup and validation | |
self.check_dependencies() | |
self.validate_input() | |
self.setup_output_directory() | |
# Get video parameters | |
frame_rate = self.get_frame_rate() | |
gop_size = frame_rate * 4 # 4-second GOP | |
# Process variants in parallel | |
max_workers = int(max((os.cpu_count() or 1) / 2, 1)) | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
futures = [executor.submit(self.process_variant, variant, gop_size) for variant in self.variants] | |
# Wait for all processes to complete | |
if not all(future.result() for future in futures): | |
raise RuntimeError("One or more variants failed to process") | |
# Generate master playlist | |
self.generate_master_playlist() | |
logger.info(f"Processing completed successfully. Output directory: {self.output_path}") | |
except Exception as e: | |
logger.error(f"Processing failed: {str(e)}") | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment