Last active
June 19, 2026 17:50
-
-
Save gphg/117f2403f8bb91b6e00e11db97b6ffb7 to your computer and use it in GitHub Desktop.
A wrapper utility to detect and extract video scenes from MP4, MKV, and WebM. https://www.scenedetect.com/docs/latest/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # /// script | |
| # dependencies = [ | |
| # "scenedetect[opencv]", | |
| # ] | |
| # /// | |
| """ | |
| SceneCutter: A wrapper utility to detect and extract video scenes from MP4, MKV, and WebM. | |
| This module provides an easy-to-use interface around PySceneDetect (scenedetect) library, | |
| handling scene detection, caching, and video splitting with customizable FFmpeg encoding. | |
| Can be used as a module: | |
| from scene_cutter import SceneCutterWrapper | |
| wrapper = SceneCutterWrapper("input.mp4", "output/") | |
| scenes = wrapper.detect_scenes() | |
| wrapper.process_video(scenes) | |
| Or as a CLI script: | |
| python scene_cutter.py input_video.mp4 -o output/ -M adaptive -t 2.5 | |
| Exit Codes: | |
| 0: Success | |
| 1: Invalid input (file not found, invalid arguments) | |
| 2: Detection error | |
| 3: Processing error | |
| 64: User error (e.g., no scenes selected) | |
| """ | |
| import os | |
| import sys | |
| import csv | |
| import argparse | |
| from typing import List, Tuple, Optional | |
| from scenedetect import ( | |
| detect, | |
| ContentDetector, | |
| AdaptiveDetector, | |
| split_video_ffmpeg, | |
| VideoManager, | |
| FrameTimecode, | |
| ) | |
| # ============================================================================ | |
| # CUSTOM EXCEPTIONS | |
| # ============================================================================ | |
| class SceneCutterError(Exception): | |
| """Base exception for SceneCutter errors.""" | |
| pass | |
| class SceneCutterInputError(SceneCutterError): | |
| """Raised when input validation fails.""" | |
| pass | |
| class SceneCutterDetectionError(SceneCutterError): | |
| """Raised when scene detection fails.""" | |
| pass | |
| class SceneCutterProcessingError(SceneCutterError): | |
| """Raised when video processing fails.""" | |
| pass | |
| # ============================================================================ | |
| # SCENE CACHING LAYER | |
| # ============================================================================ | |
| class SceneCache: | |
| """ | |
| Manages persistent caching of detected scenes in CSV format. | |
| This class decouples scene detection from persistence, allowing detection | |
| results to be cached and reused without re-running expensive detection. | |
| Attributes: | |
| output_dir: Directory where cache files are stored | |
| file_base: Base filename (without extension) used for cache naming | |
| """ | |
| def __init__(self, output_dir: str, file_base: str): | |
| """ | |
| Initialize the scene cache manager. | |
| Args: | |
| output_dir: Directory to store cache files | |
| file_base: Base filename for cache files (e.g., "video_scenes") | |
| """ | |
| self.output_dir = output_dir | |
| self.file_base = file_base | |
| def get_cache_path(self, method: str) -> str: | |
| """ | |
| Get the full path to a cache file for a given detection method. | |
| Args: | |
| method: Detection method name (e.g., 'content', 'adaptive') | |
| Returns: | |
| Full path to the cache CSV file | |
| """ | |
| return os.path.join(self.output_dir, f"{self.file_base}_scenes_{method}.csv") | |
| def load(self, method: str, framerate: float) -> Optional[List[Tuple[FrameTimecode, FrameTimecode]]]: | |
| """ | |
| Load previously detected scenes from cache. | |
| Args: | |
| method: Detection method name to identify the cache file | |
| framerate: Video framerate (needed to reconstruct FrameTimecode objects) | |
| Returns: | |
| List of (start, end) FrameTimecode tuples, or None if cache missing/invalid | |
| Raises: | |
| SceneCutterError: If cache file is corrupted or unreadable | |
| """ | |
| cache_file = self.get_cache_path(method) | |
| try: | |
| with open(cache_file, "r", newline="") as f: | |
| scenes = self._read_csv(f, framerate) | |
| print(f"[INFO] Loaded {len(scenes)} scenes from cache: {cache_file}") | |
| return scenes | |
| except FileNotFoundError: | |
| return None | |
| except Exception as e: | |
| raise SceneCutterError(f"Could not load cache from {cache_file}: {e}") | |
| def save(self, scenes: List[Tuple[FrameTimecode, FrameTimecode]], method: str): | |
| """ | |
| Save detected scenes to cache for future reuse. | |
| Args: | |
| scenes: List of (start, end) FrameTimecode tuples | |
| method: Detection method name (used in filename) | |
| Raises: | |
| SceneCutterError: If unable to write cache file | |
| """ | |
| cache_file = self.get_cache_path(method) | |
| try: | |
| with open(cache_file, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["Scene Number", "Start Time", "End Time", "Start Frame", "End Frame"]) | |
| for i, (start, end) in enumerate(scenes): | |
| writer.writerow( | |
| [ | |
| i + 1, | |
| start.get_timecode(), | |
| end.get_timecode(), | |
| start.get_frames(), | |
| end.get_frames(), | |
| ] | |
| ) | |
| print(f"[INFO] Scenes saved to cache: {cache_file}") | |
| except Exception as e: | |
| raise SceneCutterError(f"Could not save cache to {cache_file}: {e}") | |
| @staticmethod | |
| def _read_csv(file_handle, framerate: float) -> List[Tuple[FrameTimecode, FrameTimecode]]: | |
| """ | |
| Internal helper to read scene data from CSV file. | |
| Args: | |
| file_handle: Open file object to read from | |
| framerate: Video framerate for FrameTimecode reconstruction | |
| Returns: | |
| List of (start, end) FrameTimecode tuples | |
| Raises: | |
| SceneCutterError: If CSV format is invalid | |
| """ | |
| try: | |
| reader = csv.DictReader(file_handle) | |
| scenes = [] | |
| for row in reader: | |
| try: | |
| scenes.append( | |
| ( | |
| FrameTimecode(row["Start Time"], fps=framerate), | |
| FrameTimecode(row["End Time"], fps=framerate), | |
| ) | |
| ) | |
| except (KeyError, ValueError) as e: | |
| raise SceneCutterError(f"Invalid cache format: {e}") | |
| return scenes | |
| except SceneCutterError: | |
| raise | |
| except Exception as e: | |
| raise SceneCutterError(f"Error reading cache CSV: {e}") | |
| # ============================================================================ | |
| # SCENE DETECTION LAYER | |
| # ============================================================================ | |
| class SceneDetector: | |
| """ | |
| Factory for creating and configuring PySceneDetect detector instances. | |
| This class centralizes detector instantiation logic, applying sensible defaults | |
| and handling parameter validation. Supported methods: | |
| - 'content': ContentDetector (default, good for gradual scene changes) | |
| - 'adaptive': AdaptiveDetector (robust to camera movement and fast transitions) | |
| """ | |
| # Default sensitivity thresholds for each detection method | |
| DEFAULT_THRESHOLDS = { | |
| "content": 27.0, # Higher = less sensitive (fewer false positives) | |
| "adaptive": 3.0, # Lower = more sensitive | |
| } | |
| @staticmethod | |
| def build_detector( | |
| method: str, | |
| threshold: Optional[float], | |
| min_scene_len_frames: Optional[int], | |
| ): | |
| """ | |
| Create a configured detector instance based on method and parameters. | |
| Args: | |
| method: Detection method ('content' or 'adaptive') | |
| threshold: Sensitivity threshold. If None, uses method default. | |
| min_scene_len_frames: Minimum scene length in frames, or None for library default | |
| Returns: | |
| Configured ContentDetector or AdaptiveDetector instance | |
| Raises: | |
| SceneCutterInputError: If method is not recognized or parameters are invalid | |
| """ | |
| if method not in SceneDetector.DEFAULT_THRESHOLDS: | |
| raise SceneCutterInputError( | |
| f"Unknown detection method: {method}. Must be one of: " | |
| f"{', '.join(SceneDetector.DEFAULT_THRESHOLDS.keys())}" | |
| ) | |
| # Use default threshold if not specified | |
| threshold = threshold or SceneDetector.DEFAULT_THRESHOLDS.get(method, 27.0) | |
| if threshold <= 0: | |
| raise SceneCutterInputError(f"Threshold must be positive, got: {threshold}") | |
| kwargs = {} | |
| if min_scene_len_frames is not None: | |
| if min_scene_len_frames <= 0: | |
| raise SceneCutterInputError( | |
| f"Minimum scene length must be positive, got: {min_scene_len_frames} frames" | |
| ) | |
| kwargs["min_scene_len"] = min_scene_len_frames | |
| try: | |
| if method == "content": | |
| kwargs["threshold"] = threshold | |
| return ContentDetector(**kwargs) | |
| else: # adaptive | |
| kwargs["adaptive_threshold"] = threshold | |
| return AdaptiveDetector(**kwargs) | |
| except Exception as e: | |
| raise SceneCutterDetectionError(f"Failed to create detector: {e}") | |
| @staticmethod | |
| def parse_min_length(min_len_input: Optional[str], framerate: float) -> Optional[int]: | |
| """ | |
| Parse user-provided minimum scene length to frame count. | |
| Args: | |
| min_len_input: Timecode string (e.g., '15', '0.6s', '00:00:01') | |
| framerate: Video framerate for conversion | |
| Returns: | |
| Frame count, or None if parsing failed or input was None | |
| Raises: | |
| SceneCutterInputError: If timecode string is malformed | |
| """ | |
| if not min_len_input: | |
| return None | |
| try: | |
| return FrameTimecode(min_len_input, fps=framerate).get_frames() | |
| except ValueError as e: | |
| raise SceneCutterInputError(f"Invalid timecode format '{min_len_input}': {e}") | |
| # ============================================================================ | |
| # SCENE SELECTION LAYER | |
| # ============================================================================ | |
| class SceneSelector: | |
| """ | |
| Handles parsing and filtering of user-provided scene selections. | |
| Supports flexible selection syntax: | |
| - Empty string: all scenes | |
| - Single indices: "1,3,5" | |
| - Ranges: "1-3,5-7" | |
| - Mixed: "1,3-5,7" | |
| Supports two extraction modes: | |
| - 'selective': Extract selected scenes as-is | |
| - 'sequence': Extract sequences between selected markers (e.g., scene N to scene N+1) | |
| """ | |
| @staticmethod | |
| def parse_selection(selection_str: str, total_scenes: int) -> List[int]: | |
| """ | |
| Parse user selection string into 0-indexed scene indices. | |
| Args: | |
| selection_str: Selection string (e.g., "1,3-5,7") | |
| total_scenes: Total number of detected scenes | |
| Returns: | |
| Sorted list of 0-indexed scene indices | |
| Raises: | |
| SceneCutterInputError: If selection syntax is invalid | |
| """ | |
| if not selection_str: | |
| return list(range(total_scenes)) | |
| if total_scenes == 0: | |
| raise SceneCutterInputError("No scenes available to select from") | |
| selected_indices = set() | |
| for part in selection_str.split(","): | |
| part = part.strip() | |
| if not part: | |
| continue | |
| if "-" in part: | |
| SceneSelector._parse_range(part, total_scenes, selected_indices) | |
| else: | |
| SceneSelector._parse_single(part, total_scenes, selected_indices) | |
| if not selected_indices: | |
| raise SceneCutterInputError(f"No valid scenes selected from: {selection_str}") | |
| return sorted(list(selected_indices)) | |
| @staticmethod | |
| def _parse_range(range_str: str, total_scenes: int, selected_indices: set): | |
| """ | |
| Parse a range specification (e.g., "3-5") and add to selected indices. | |
| Args: | |
| range_str: Range string in format "start-end" | |
| total_scenes: Total number of available scenes (for bounds checking) | |
| selected_indices: Set to accumulate results (modified in-place) | |
| Raises: | |
| SceneCutterInputError: If range format is invalid | |
| """ | |
| try: | |
| parts = range_str.split("-") | |
| if len(parts) != 2: | |
| raise SceneCutterInputError(f"Invalid range format: {range_str}") | |
| start, end = int(parts[0]), int(parts[1]) | |
| if start < 1 or end < 1: | |
| raise SceneCutterInputError(f"Range indices must be >= 1, got: {range_str}") | |
| if start > end: | |
| raise SceneCutterInputError(f"Invalid range {start}-{end}: start > end") | |
| # User input is 1-indexed; convert to 0-indexed | |
| for i in range(start - 1, min(end, total_scenes)): | |
| selected_indices.add(i) | |
| except (ValueError, SceneCutterInputError) as e: | |
| if isinstance(e, SceneCutterInputError): | |
| raise | |
| raise SceneCutterInputError(f"Invalid range format: {range_str}") | |
| @staticmethod | |
| def _parse_single(idx_str: str, total_scenes: int, selected_indices: set): | |
| """ | |
| Parse a single scene index and add to selected indices. | |
| Args: | |
| idx_str: Scene index as string (1-indexed in user input) | |
| total_scenes: Total number of available scenes (for bounds checking) | |
| selected_indices: Set to accumulate results (modified in-place) | |
| Raises: | |
| SceneCutterInputError: If index format is invalid | |
| """ | |
| try: | |
| idx = int(idx_str) | |
| if idx < 1: | |
| raise SceneCutterInputError(f"Scene index must be >= 1, got: {idx}") | |
| if idx > total_scenes: | |
| raise SceneCutterInputError(f"Scene index {idx} out of range (total: {total_scenes})") | |
| selected_indices.add(idx - 1) # Convert from 1-indexed to 0-indexed | |
| except ValueError: | |
| raise SceneCutterInputError(f"Invalid scene index: {idx_str}") | |
| @staticmethod | |
| def filter_scenes( | |
| scenes: List[Tuple[FrameTimecode, FrameTimecode]], | |
| indices: List[int], | |
| scene_mode: str, | |
| ) -> List[Tuple[FrameTimecode, FrameTimecode]]: | |
| """ | |
| Filter and arrange scenes based on extraction mode. | |
| Args: | |
| scenes: Full list of detected scenes (each is a (start, end) tuple) | |
| indices: List of selected scene indices | |
| scene_mode: Extraction mode ('selective' or 'sequence') | |
| Returns: | |
| Filtered/rearranged list of scene tuples ready for export | |
| Raises: | |
| SceneCutterInputError: If scene_mode is invalid | |
| Mode details: | |
| - 'selective': Returns scenes at specified indices as-is | |
| - 'sequence': Returns scenes from each index to the next index | |
| (useful for extracting clips between key frames) | |
| """ | |
| if scene_mode not in ("selective", "sequence"): | |
| raise SceneCutterInputError(f"Invalid scene_mode: {scene_mode}") | |
| if not scenes: | |
| raise SceneCutterProcessingError("No scenes available to filter") | |
| if not indices: | |
| raise SceneCutterInputError("No scene indices provided") | |
| if scene_mode == "selective": | |
| # Direct selection: export scenes at these indices | |
| return [scenes[i] for i in indices] | |
| else: # 'sequence' | |
| # Sequential mode: for each selected scene, export from it to the next selection | |
| final_scenes = [] | |
| for i in range(len(indices)): | |
| current_idx = indices[i] | |
| start_time = scenes[current_idx][0] | |
| # If there's a next selection, end at that scene's start; else use video end | |
| end_time = ( | |
| scenes[indices[i + 1]][0] | |
| if i + 1 < len(indices) | |
| else scenes[-1][1] | |
| ) | |
| final_scenes.append((start_time, end_time)) | |
| return final_scenes | |
| # ============================================================================ | |
| # FFMPEG CONFIGURATION LAYER | |
| # ============================================================================ | |
| class FFmpegConfig: | |
| """ | |
| Encapsulates FFmpeg encoding parameters and codec selection. | |
| This class centralizes logic for: | |
| - Container-aware codec selection (WebM vs MP4/MKV) | |
| - Mode-specific encoding presets (copy, prototype, normal) | |
| - Optional video scaling | |
| Attributes: | |
| extension: File extension (determines container: .webm, .mp4, .mkv, etc.) | |
| mode: Encoding mode affecting quality/speed tradeoff | |
| scale: Optional scaling specification (e.g., "1280:720", "50%") | |
| """ | |
| VALID_MODES = {"copy", "prototype", "normal"} | |
| def __init__(self, extension: str, mode: str = "normal", scale: Optional[str] = None): | |
| """ | |
| Initialize FFmpeg configuration. | |
| Args: | |
| extension: File extension (e.g., ".webm", ".mp4") | |
| mode: Encoding mode ('copy', 'prototype', 'normal') | |
| scale: Optional scaling (e.g., "1280:-2" to maintain aspect ratio) | |
| Raises: | |
| SceneCutterInputError: If mode is invalid | |
| """ | |
| if mode not in self.VALID_MODES: | |
| raise SceneCutterInputError( | |
| f"Invalid encoding mode: {mode}. Must be one of: {', '.join(self.VALID_MODES)}" | |
| ) | |
| self.extension = extension.lower() | |
| self.mode = mode | |
| self.scale = scale | |
| def get_codecs(self) -> Tuple[str, str]: | |
| """ | |
| Determine appropriate video and audio codecs based on container. | |
| Returns: | |
| Tuple of (video_codec, audio_codec) suitable for the container | |
| """ | |
| is_webm = self.extension == ".webm" | |
| v_codec = "libvpx-vp9" if is_webm else "libx264" | |
| a_codec = "libopus" if is_webm else "aac" | |
| return v_codec, a_codec | |
| def build_args(self) -> List[str]: | |
| """ | |
| Build complete FFmpeg argument list for video processing. | |
| Combines: | |
| 1. Common arguments (stream mapping, metadata, subtitle handling) | |
| 2. Mode-specific encoding parameters | |
| 3. Optional scaling filter | |
| Returns: | |
| List of FFmpeg command-line arguments | |
| """ | |
| # Universal arguments for stream handling | |
| # -map 0: Include all streams (video, audio, subtitles) | |
| # -map_metadata 0: Preserve metadata from input | |
| # -c:s copy: Copy subtitle streams without re-encoding (faster, lossless) | |
| common_args = ["-map", "0", "-map_metadata", "0", "-c:s", "copy"] | |
| v_codec, a_codec = self.get_codecs() | |
| if self.mode == "copy": | |
| # Stream copy mode: fastest, no re-encoding | |
| arg_list = common_args + ["-c:v", "copy", "-c:a", "copy"] | |
| elif self.mode == "prototype": | |
| # Fast preview mode: very fast encoding, lower quality | |
| # ultrafast preset + high CRF (lower quality) | |
| arg_list = common_args + [ | |
| "-c:v", | |
| v_codec, | |
| "-preset", | |
| "ultrafast", | |
| "-crf", | |
| "32", | |
| "-c:a", | |
| a_codec, | |
| ] | |
| else: # 'normal' | |
| # Standard mode: balanced quality/speed | |
| # -crf 22: default quality (lower = better, but slower) | |
| arg_list = common_args + ["-c:v", v_codec, "-crf", "22", "-c:a", a_codec] | |
| # Apply scaling if specified (and not in copy mode, which can't scale) | |
| if self.scale and self.mode != "copy": | |
| # Normalize scale spec: replace -1 with -2 for odd dimensions | |
| # (-2 ensures width/height are divisible by 2, required by many codecs) | |
| clean_scale = self.scale.replace("-1", "-2") | |
| arg_list += ["-vf", f"scale={clean_scale}"] | |
| return arg_list | |
| # ============================================================================ | |
| # VIDEO PROCESSING LAYER | |
| # ============================================================================ | |
| class VideoProcessor: | |
| """ | |
| Handles actual video splitting and export via FFmpeg. | |
| This class manages the interface to PySceneDetect's split_video_ffmpeg function, | |
| translating our configuration objects into FFmpeg command-line arguments. | |
| Attributes: | |
| input_video: Path to source video file | |
| output_dir: Directory for output clips | |
| extension: File extension (determines codec/container) | |
| """ | |
| def __init__(self, input_video: str, output_dir: str, extension: str): | |
| """ | |
| Initialize video processor. | |
| Args: | |
| input_video: Path to input video | |
| output_dir: Directory for output clips | |
| extension: Original file extension (.mp4, .webm, etc.) | |
| """ | |
| self.input_video = input_video | |
| self.output_dir = output_dir | |
| self.extension = extension | |
| def split_video( | |
| self, | |
| scenes: List[Tuple[FrameTimecode, FrameTimecode]], | |
| file_base: str, | |
| mode: str, | |
| scale: Optional[str] = None, | |
| ): | |
| """ | |
| Split video at scene boundaries and export clips. | |
| Args: | |
| scenes: List of (start, end) FrameTimecode tuples to export | |
| file_base: Base name for output files (scenes will be numbered) | |
| mode: Encoding mode ('copy', 'prototype', 'normal') | |
| scale: Optional scaling (e.g., "1280:720") | |
| The output files will be named: {file_base}-Clip-1.mp4, {file_base}-Clip-2.mp4, etc. | |
| Raises: | |
| SceneCutterProcessingError: If video splitting fails | |
| """ | |
| try: | |
| ffmpeg_config = FFmpegConfig(self.extension, mode, scale) | |
| arg_override = " ".join(ffmpeg_config.build_args()) | |
| split_video_ffmpeg( | |
| self.input_video, | |
| scenes, | |
| output_file_template=os.path.join( | |
| self.output_dir, f"$VIDEO_NAME-Clip-$SCENE_NUMBER{self.extension}" | |
| ), | |
| show_progress=True, | |
| arg_override=arg_override, | |
| ) | |
| except Exception as e: | |
| raise SceneCutterProcessingError(f"Failed to split video: {e}") | |
| # ============================================================================ | |
| # MAIN ORCHESTRATOR | |
| # ============================================================================ | |
| class SceneCutterWrapper: | |
| """ | |
| Main orchestrator combining all components for scene detection and cutting. | |
| This class provides the high-level API, coordinating: | |
| 1. Scene detection (with caching) | |
| 2. Video processing configuration | |
| 3. Scene selection and filtering | |
| This is the primary class end-users interact with. | |
| Attributes: | |
| input_video: Path to input video file | |
| output_dir: Directory for output files | |
| file_base: Filename without extension | |
| extension: File extension | |
| cache: SceneCache instance for detection result persistence | |
| processor: VideoProcessor instance for splitting | |
| """ | |
| def __init__(self, input_video: str, output_dir: str): | |
| """ | |
| Initialize the scene cutter. | |
| Args: | |
| input_video: Path to source video file | |
| output_dir: Directory where output clips and caches will be stored | |
| Creates output_dir if it doesn't exist. | |
| Raises: | |
| SceneCutterInputError: If input video doesn't exist or paths are invalid | |
| """ | |
| # Validate input file exists | |
| if not os.path.isfile(input_video): | |
| raise SceneCutterInputError(f"Input video file not found: {input_video}") | |
| self.input_video = input_video | |
| self.output_dir = output_dir | |
| # Split filename into base and extension for cache/output naming | |
| self.filename = os.path.basename(input_video) | |
| self.file_base, self.extension = os.path.splitext(self.filename) | |
| # Create output directory if needed | |
| try: | |
| if not os.path.exists(self.output_dir): | |
| os.makedirs(self.output_dir) | |
| except Exception as e: | |
| raise SceneCutterInputError(f"Cannot create output directory {self.output_dir}: {e}") | |
| # Initialize sub-components | |
| self.cache = SceneCache(output_dir, self.file_base) | |
| self.processor = VideoProcessor(input_video, output_dir, self.extension) | |
| def detect_scenes( | |
| self, | |
| method: str = "content", | |
| threshold: Optional[float] = None, | |
| min_len_input: Optional[str] = None, | |
| ) -> List[Tuple[FrameTimecode, FrameTimecode]]: | |
| """ | |
| Detect scenes in the video with caching support. | |
| Algorithm: | |
| 1. Get video framerate | |
| 2. Check for cached results | |
| 3. If cached, return cached results (skip expensive detection) | |
| 4. Otherwise, run detection and cache results | |
| Args: | |
| method: Detection method ('content' or 'adaptive') | |
| threshold: Sensitivity threshold (None = use method default) | |
| min_len_input: Minimum scene length (timecode string like "15" or "0.5s") | |
| Returns: | |
| List of detected scenes as (start, end) FrameTimecode tuples | |
| Raises: | |
| SceneCutterDetectionError: If detection fails | |
| SceneCutterInputError: If parameters are invalid | |
| """ | |
| try: | |
| # Get framerate for timecode operations | |
| video = VideoManager([self.input_video]) | |
| framerate = video.get_framerate() | |
| video.release() | |
| except Exception as e: | |
| raise SceneCutterDetectionError(f"Failed to read video file: {e}") | |
| try: | |
| # Try loading from cache first (much faster than re-detecting) | |
| cached = self.cache.load(method, framerate) | |
| if cached: | |
| return cached | |
| except SceneCutterError as e: | |
| print(f"[WARN] Cache loading failed: {e}", file=sys.stderr) | |
| # Continue with detection instead of failing | |
| try: | |
| # Parse minimum scene length if provided | |
| min_frames = SceneDetector.parse_min_length(min_len_input, framerate) | |
| # Create detector with appropriate settings | |
| detector = SceneDetector.build_detector(method, threshold, min_frames) | |
| # Display detection parameters for user awareness | |
| threshold_display = threshold or SceneDetector.DEFAULT_THRESHOLDS.get(method, 27.0) | |
| min_len_display = ( | |
| f"{min_frames} frames" if min_frames is not None else "library default" | |
| ) | |
| print( | |
| f"[INFO] Detecting scenes using {method} " | |
| f"(threshold: {threshold_display}, min_len: {min_len_display})..." | |
| ) | |
| # Perform scene detection | |
| scenes = detect(self.input_video, detector) | |
| if not scenes: | |
| raise SceneCutterDetectionError("No scenes detected in video") | |
| # Cache results for future use | |
| try: | |
| self.cache.save(scenes, method) | |
| except SceneCutterError as e: | |
| print(f"[WARN] Failed to save cache: {e}", file=sys.stderr) | |
| # Continue anyway; detection succeeded | |
| return scenes | |
| except SceneCutterError: | |
| raise | |
| except Exception as e: | |
| raise SceneCutterDetectionError(f"Scene detection failed: {e}") | |
| def process_video( | |
| self, | |
| scenes: List[Tuple[FrameTimecode, FrameTimecode]], | |
| selection: Optional[str] = None, | |
| mode: str = "normal", | |
| scale: Optional[str] = None, | |
| scene_mode: str = "selective", | |
| dry_run: bool = False, | |
| ): | |
| """ | |
| Process detected scenes and export video clips. | |
| Args: | |
| scenes: List of detected scenes (from detect_scenes) | |
| selection: Scene selection string (e.g., "1,3-5") or None for all | |
| mode: Encoding mode ('copy', 'prototype', 'normal') | |
| scale: Optional scaling specification (e.g., "1280:720") | |
| scene_mode: Extraction mode ('selective' or 'sequence') | |
| dry_run: If True, show what would be done without doing it | |
| Raises: | |
| SceneCutterInputError: If selection or parameters are invalid | |
| SceneCutterProcessingError: If video splitting fails | |
| Mode descriptions: | |
| - 'copy': Stream copy (fastest, no re-encoding) | |
| - 'prototype': Fast preview (low quality, fast) | |
| - 'normal': Balanced quality/speed | |
| Scene mode descriptions: | |
| - 'selective': Export scenes at selected indices | |
| - 'sequence': Export from each selection to the next | |
| """ | |
| if not scenes: | |
| raise SceneCutterInputError("No scenes provided to process") | |
| try: | |
| # Parse user selection into scene indices | |
| indices = SceneSelector.parse_selection(selection, len(scenes)) | |
| # Filter and arrange scenes based on selection mode | |
| final_scenes = SceneSelector.filter_scenes(scenes, indices, scene_mode) | |
| print( | |
| f"[INFO] Processing {len(final_scenes)} clips " | |
| f"(Mode: {mode}, Container: {self.extension})..." | |
| ) | |
| # Dry-run mode: just print what would be exported | |
| if dry_run: | |
| for i, (s, e) in enumerate(final_scenes): | |
| print(f" Clip {i + 1}: {s.get_timecode()} -> {e.get_timecode()}") | |
| return | |
| # Actually split and export the video | |
| self.processor.split_video(final_scenes, self.file_base, mode, scale) | |
| print("[INFO] Video processing completed successfully") | |
| except SceneCutterError: | |
| raise | |
| except Exception as e: | |
| raise SceneCutterProcessingError(f"Video processing failed: {e}") | |
| # ============================================================================ | |
| # CLI INTERFACE | |
| # ============================================================================ | |
| def main(argv: Optional[List[str]] = None) -> int: | |
| """ | |
| Command-line interface for scene detection and video cutting. | |
| Args: | |
| argv: Command-line arguments (defaults to sys.argv[1:]) | |
| Returns: | |
| Exit code (0 = success, 1 = input error, 2 = detection error, 3 = processing error) | |
| """ | |
| parser = argparse.ArgumentParser( | |
| description="A utility to detect and extract video scenes from MP4, MKV, and WebM.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| # ========== REQUIRED ARGUMENTS ========== | |
| parser.add_argument("input", help="Path to the source video file.") | |
| # ========== DETECTION SETTINGS ========== | |
| detect_group = parser.add_argument_group("Detection Settings") | |
| detect_group.add_argument( | |
| "-M", | |
| "--method", | |
| default="content", | |
| choices=["content", "adaptive"], | |
| help="Scene detection algorithm. Content: gradual changes; Adaptive: fast transitions.", | |
| ) | |
| detect_group.add_argument( | |
| "-t", | |
| "--threshold", | |
| type=float, | |
| default=None, | |
| help="Detection sensitivity. Defaults: Adaptive=3.0, Content=27.0. Higher=fewer detections.", | |
| ) | |
| detect_group.add_argument( | |
| "-m", | |
| "--min-scene-len", | |
| default=None, | |
| help="Minimum scene length. Examples: 15 for frames, 0.6s for seconds.", | |
| ) | |
| # ========== OUTPUT AND PROCESSING SETTINGS ========== | |
| process_group = parser.add_argument_group("Processing and Output Settings") | |
| process_group.add_argument( | |
| "-o", "--output", default="output", help="Directory for output clips." | |
| ) | |
| process_group.add_argument( | |
| "-s", | |
| "--select", | |
| type=str, | |
| metavar="RANGE", | |
| help="Scenes to export. Examples: 1, 1,3,5, 1-3, 1,3-5.", | |
| ) | |
| process_group.add_argument( | |
| "-e", | |
| "--mode", | |
| default="normal", | |
| choices=["normal", "prototype", "copy"], | |
| help="Encoding mode. copy=fastest, prototype=preview, normal=balanced.", | |
| ) | |
| process_group.add_argument( | |
| "-S", | |
| "--scene-mode", | |
| default="selective", | |
| choices=["selective", "sequence"], | |
| help="selective=export selected scenes, sequence=export N to N+1.", | |
| ) | |
| process_group.add_argument( | |
| "-z", | |
| "--scale", | |
| type=str, | |
| metavar="W:H", | |
| help="Video scaling. Examples: 1280x720, 50%%. Use -2 for auto aspect ratio.", | |
| ) | |
| process_group.add_argument( | |
| "--no-split", | |
| action="store_true", | |
| help="Detect and cache only, skip splitting.", | |
| ) | |
| process_group.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be exported without actually exporting.", | |
| ) | |
| try: | |
| args = parser.parse_args(argv) | |
| except SystemExit as e: | |
| # argparse calls sys.exit(), we need to handle it | |
| return e.code if isinstance(e.code, int) else 1 | |
| try: | |
| # Initialize the wrapper with input file and output directory | |
| wrapper = SceneCutterWrapper(args.input, args.output) | |
| print(f"[INFO] Initialized for video: {args.input}") | |
| # Detect scenes (with caching) | |
| scenes = wrapper.detect_scenes( | |
| method=args.method, | |
| threshold=args.threshold, | |
| min_len_input=args.min_scene_len, | |
| ) | |
| print(f"[INFO] Detected {len(scenes)} scenes") | |
| # Process video (split and export) unless --no-split is specified | |
| if not args.no_split: | |
| wrapper.process_video( | |
| scenes, | |
| selection=args.select, | |
| mode=args.mode, | |
| scale=args.scale, | |
| scene_mode=args.scene_mode, | |
| dry_run=args.dry_run, | |
| ) | |
| return 0 | |
| except SceneCutterInputError as e: | |
| print(f"[ERROR] Input error: {e}", file=sys.stderr) | |
| return 1 | |
| except SceneCutterDetectionError as e: | |
| print(f"[ERROR] Detection error: {e}", file=sys.stderr) | |
| return 2 | |
| except SceneCutterProcessingError as e: | |
| print(f"[ERROR] Processing error: {e}", file=sys.stderr) | |
| return 3 | |
| except SceneCutterError as e: | |
| print(f"[ERROR] {e}", file=sys.stderr) | |
| return 1 | |
| except KeyboardInterrupt: | |
| print("\n[WARN] Interrupted by user", file=sys.stderr) | |
| return 130 | |
| except Exception as e: | |
| print(f"[ERROR] Unexpected error: {e}", file=sys.stderr) | |
| import traceback | |
| traceback.print_exc(file=sys.stderr) | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My machine is locked at: