Created
May 22, 2025 03:57
-
-
Save twobob/2f6766e66555cf883ad47fb4baa97d12 to your computer and use it in GitHub Desktop.
Tries to make a perfect loop out of a video
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import cv2 | |
import numpy as np | |
from skimage.metrics import structural_similarity as ssim | |
import threading | |
import time | |
import multiprocessing | |
import math | |
import queue | |
from moviepy import VideoFileClip | |
import urllib.request # For downloading files from URLs. | |
import urllib.error # For handling URL-related errors. | |
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, | |
QPushButton, QLabel, QLineEdit, QFileDialog, QListWidget, | |
QProgressBar, QDoubleSpinBox, QSpinBox, QMessageBox, QHeaderView, | |
QComboBox, QCheckBox) | |
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QTimer | |
from PyQt6.QtGui import QIcon, QCloseEvent, QImage, QPixmap | |
# --- Application Constants --- | |
DEFAULT_SIMILARITY_THRESHOLD = 88 # Default percentage for frame similarity. | |
DEFAULT_IGNORE_FRAMES = 0 # Default number of initial frames to ignore during analysis. | |
DEFAULT_MIN_LOOP_DURATION_SEC = 3.0 # Default minimum loop duration in seconds. | |
DEFAULT_MAX_SEARCH_WINDOW_SEC = 60.0 # Default maximum time window (in seconds) from a start frame to search for a matching end frame. | |
DEFAULT_COMPARE_WIDTH = 640 # Default width (in pixels) to which frames are resized for comparison. | |
DEFAULT_PREVIEW_LOOP_COUNT = 3 # Default number of times a selected loop will repeat in the preview. | |
DEFAULT_GPU_BATCH_SIZE = 64 # Default batch size for GPU-accelerated processing. | |
PRESET_FRACTIONS = { # Predefined fractions for setting comparison width relative to original video width. | |
"Full (1/1)": 1, | |
"Half (1/2)": 2, | |
"Quarter (1/4)": 4, | |
"Eighth (1/8)": 8, | |
} | |
CUSTOM_PRESET_TEXT = "Custom" # Text identifier for custom comparison width selection in the UI. | |
# --- Multiprocessing Worker Function --- | |
def process_frame_chunk_worker(args_bundle): | |
""" | |
Worker function designed for use with a multiprocessing.Pool. | |
It processes a specific chunk of video frames to find potential loops based on structural similarity. | |
@param args_bundle: A tuple containing all necessary arguments for processing: | |
- video_path (str): Filesystem path to the video file. | |
- i_start (int): The starting frame index for the outer loop of comparisons within this chunk. | |
- i_end (int): The ending frame index (exclusive) for the outer loop of comparisons. | |
- total_frames_static (int): Total number of frames in the entire video. | |
- fps_static (float): Frames per second of the video. | |
- min_loop_frames_static (int): Minimum duration of a valid loop, expressed in frames. | |
- max_search_window_frames_static (int): Maximum number of subsequent frames to search for a match. | |
- compare_width_static (int): Target width for resizing frames before comparison. | |
- compare_height_static (int): Target height for resizing frames before comparison. | |
- similarity_thresh_normalized_static (float): Normalized SSIM threshold (0.0-1.0) for considering frames similar. | |
- progress_queue (multiprocessing.Queue, optional): A queue to report progress (number of i-frames processed). | |
- stop_event (multiprocessing.Event, optional): An event to signal premature termination of processing. | |
@return: A list of dictionaries, where each dictionary contains information about a found loop. | |
Returns an empty list if no loops are found or an error occurs. | |
""" | |
video_path, i_start, i_end, total_frames_static, fps_static, \ | |
min_loop_frames_static, max_search_window_frames_static, \ | |
compare_width_static, compare_height_static, \ | |
similarity_thresh_normalized_static, progress_queue, stop_event = args_bundle | |
cap_worker = None | |
local_found_loops = [] | |
try: | |
cap_worker = cv2.VideoCapture(video_path) | |
if not cap_worker.isOpened(): | |
# Log error or handle as appropriate if video cannot be opened by worker | |
return [] | |
for i in range(i_start, i_end): | |
if stop_event and stop_event.is_set(): | |
break | |
cap_worker.set(cv2.CAP_PROP_POS_FRAMES, i) | |
ret_i, frame_i_full = cap_worker.read() | |
if not ret_i: continue | |
frame_i_resized = cv2.resize(frame_i_full, (compare_width_static, compare_height_static)) | |
frame_i_gray = cv2.cvtColor(frame_i_resized, cv2.COLOR_BGR2GRAY) | |
search_end_limit = min(i + max_search_window_frames_static + 1, total_frames_static) | |
for j in range(i + min_loop_frames_static + 1, search_end_limit): | |
if stop_event and stop_event.is_set(): | |
break | |
cap_worker.set(cv2.CAP_PROP_POS_FRAMES, j) | |
ret_j, frame_j_full = cap_worker.read() | |
if not ret_j: continue | |
frame_j_resized = cv2.resize(frame_j_full, (compare_width_static, compare_height_static)) | |
frame_j_gray = cv2.cvtColor(frame_j_resized, cv2.COLOR_BGR2GRAY) | |
similarity = ssim(frame_i_gray, frame_j_gray, data_range=frame_i_gray.max() - frame_i_gray.min()) | |
if similarity >= similarity_thresh_normalized_static: | |
loop_start_frame = i | |
loop_end_frame = j - 1 | |
duration_frames = loop_end_frame - loop_start_frame + 1 | |
duration_sec = duration_frames / fps_static | |
loop_info = { | |
"start_frame": loop_start_frame, "end_frame": loop_end_frame, | |
"start_time": loop_start_frame / fps_static, "end_time": (loop_end_frame + 1) / fps_static, | |
"duration_sec": duration_sec, "similarity": similarity | |
} | |
local_found_loops.append(loop_info) | |
if progress_queue: | |
progress_queue.put(1) # Report one 'i' iteration completed | |
return local_found_loops | |
except Exception: | |
# Consider logging the specific exception for debugging worker issues | |
return [] | |
finally: | |
if cap_worker: | |
cap_worker.release() | |
# --- Loop Finding Worker Thread --- | |
class LoopFinderThread(QThread): | |
""" | |
A QThread subclass responsible for the core video analysis to find loops. | |
It can operate in different processing modes (CPU single/multi-core, GPU). | |
Emits signals to update the UI with progress, results, status messages, and errors. | |
""" | |
progress_updated = pyqtSignal(int) # Emits current processing progress (percentage, 0-100). | |
status_updated = pyqtSignal(str) # Emits textual status updates for the UI. | |
results_found = pyqtSignal(list) # Emits a list of dictionaries, each representing a found loop. | |
error_occurred = pyqtSignal(str) # Emits error messages encountered during processing. | |
def __init__(self, video_path, similarity_thresh, ignore_frames, | |
min_loop_duration_sec, max_search_window_sec, compare_width, | |
processing_mode, gpu_batch_size): | |
""" | |
Initializes the LoopFinderThread with all necessary parameters for loop detection. | |
@param video_path (str): Filesystem path to the video file. | |
@param similarity_thresh (float): The SSIM similarity threshold (0-100) for frames to be considered a match. | |
@param ignore_frames (int): The number of initial frames in the video to skip during analysis. | |
@param min_loop_duration_sec (float): The minimum duration (in seconds) for a valid loop. | |
@param max_search_window_sec (float): The maximum time window (in seconds) from a starting frame | |
to search for a matching end frame. | |
@param compare_width (int): The width (in pixels) to which frames will be resized before comparison. | |
Height is calculated based on aspect ratio. | |
@param processing_mode (str): The selected processing mode ("CPU (Single Core)", "CPU (Multi-core)", or "GPU"). | |
@param gpu_batch_size (int): The batch size to use if GPU processing mode is selected. | |
""" | |
super().__init__() | |
self.video_path = video_path | |
self.similarity_thresh_normalized = similarity_thresh / 100.0 # Normalize to 0.0-1.0 for SSIM | |
self.ignore_frames = ignore_frames | |
self.min_loop_duration_sec = min_loop_duration_sec | |
self.max_search_window_sec = max_search_window_sec | |
self.compare_width = compare_width | |
self.processing_mode = processing_mode | |
self.gpu_batch_size = gpu_batch_size | |
self._is_running = True # Internal flag to control the thread's execution loop. | |
self._multiprocessing_stop_event = None # Event used to signal multiprocessing workers to stop. | |
def run(self): | |
""" | |
The main entry point for the thread's execution. | |
It selects and calls the appropriate processing method based on `self.processing_mode`. | |
""" | |
self._is_running = True | |
if self.processing_mode == "CPU (Multi-core)": | |
self.run_cpu_multicore() | |
elif self.processing_mode == "GPU": | |
self.run_gpu() | |
else: # Default to single core CPU | |
self.run_cpu_singlecore() | |
def run_cpu_singlecore(self): | |
""" | |
Performs loop detection using a single CPU core. | |
This method iterates through video frames, resizes them, converts to grayscale, | |
and compares them using SSIM to find matching pairs that form loops. | |
""" | |
self.status_updated.emit("Starting video analysis (Single Core CPU)...") | |
found_loops = [] | |
cap = None | |
try: | |
cap = cv2.VideoCapture(self.video_path) | |
if not cap.isOpened(): | |
self.error_occurred.emit("Error: Could not open video file.") | |
return | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if fps <= 0: | |
self.error_occurred.emit("Error: Could not determine video FPS.") | |
if cap: cap.release() | |
return | |
self.status_updated.emit(f"Video: {total_frames} frames, {fps:.2f} FPS") | |
min_loop_frames = int(self.min_loop_duration_sec * fps) | |
max_search_window_frames_for_cpu = int(self.max_search_window_sec * fps) | |
outer_loop_end_frame = total_frames - min_loop_frames | |
if self.ignore_frames >= outer_loop_end_frame: | |
self.status_updated.emit(f"Error: 'Ignore First N Frames' ({self.ignore_frames}) is too large or video too short for min loop duration.") | |
self.results_found.emit([]) | |
if cap: cap.release() | |
return | |
compare_height = -1 # Will be calculated based on aspect ratio | |
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Rewind to read the first frame for dimensions | |
ret, first_frame_full = cap.read() | |
if ret: | |
h_full, w_full, _ = first_frame_full.shape | |
if w_full == 0: | |
self.error_occurred.emit("Error: Video width is zero.") | |
if cap: cap.release() | |
return | |
aspect_ratio = h_full / w_full | |
compare_height = int(self.compare_width * aspect_ratio) | |
if compare_height <= 0: | |
self.error_occurred.emit(f"Error: Calculated comparison height ({compare_height}) is invalid.") | |
if cap: cap.release() | |
return | |
else: | |
self.error_occurred.emit("Error: Could not read first frame.") | |
if cap: cap.release() | |
return | |
self.status_updated.emit(f"Comparing frames at {self.compare_width}x{compare_height} resolution.") | |
processed_i_frames_count = 0 | |
outer_loop_total_iterations = outer_loop_end_frame - self.ignore_frames | |
if outer_loop_total_iterations <= 0: outer_loop_total_iterations = 1 # Avoid division by zero for progress | |
for i_frame_index in range(self.ignore_frames, outer_loop_end_frame): | |
if not self._is_running: | |
self.status_updated.emit("Processing cancelled.") | |
if cap: cap.release() | |
return | |
cap.set(cv2.CAP_PROP_POS_FRAMES, i_frame_index) | |
ret_i, frame_i_full = cap.read() | |
if not ret_i: | |
self.status_updated.emit(f"Warning: Could not read frame_i {i_frame_index}") | |
continue | |
frame_i_resized = cv2.resize(frame_i_full, (self.compare_width, compare_height)) | |
frame_i_gray = cv2.cvtColor(frame_i_resized, cv2.COLOR_BGR2GRAY) | |
start_j_candidate = i_frame_index + min_loop_frames + 1 | |
end_j_candidate_limit = min(i_frame_index + 1 + max_search_window_frames_for_cpu, total_frames) | |
for j_candidate_frame_index in range(start_j_candidate , end_j_candidate_limit): | |
if not self._is_running: | |
self.status_updated.emit("Processing cancelled.") | |
if cap: cap.release() | |
return | |
cap.set(cv2.CAP_PROP_POS_FRAMES, j_candidate_frame_index) | |
ret_j, frame_j_full = cap.read() | |
if not ret_j: | |
continue | |
frame_j_resized = cv2.resize(frame_j_full, (self.compare_width, compare_height)) | |
frame_j_gray = cv2.cvtColor(frame_j_resized, cv2.COLOR_BGR2GRAY) | |
similarity = ssim(frame_i_gray, frame_j_gray, data_range=frame_i_gray.max() - frame_i_gray.min()) | |
if similarity >= self.similarity_thresh_normalized: | |
loop_start_frame = i_frame_index | |
loop_end_frame = j_candidate_frame_index - 1 | |
duration_frames = loop_end_frame - loop_start_frame + 1 | |
if duration_frames < min_loop_frames: continue # Ensure minimum duration | |
duration_sec = duration_frames / fps | |
loop_info = { | |
"start_frame": loop_start_frame, "end_frame": loop_end_frame, | |
"start_time": loop_start_frame / fps, "end_time": (loop_end_frame + 1) / fps, | |
"duration_sec": duration_sec, "similarity": similarity | |
} | |
found_loops.append(loop_info) | |
processed_i_frames_count +=1 | |
progress = int((processed_i_frames_count / outer_loop_total_iterations) * 100) | |
self.progress_updated.emit(progress) | |
self.status_updated.emit(f"Analysis complete. Found {len(found_loops)} potential loops.") | |
self.results_found.emit(found_loops) | |
except Exception as e: | |
self.error_occurred.emit(f"An error occurred during CPU analysis: {str(e)}") | |
finally: | |
if cap: cap.release() | |
self._is_running = False | |
def run_cpu_multicore(self): | |
""" | |
Performs loop detection using multiple CPU cores via `multiprocessing.Pool`. | |
Video frame processing is divided into chunks and distributed among worker processes. | |
""" | |
self.status_updated.emit("Starting video analysis (Multi-core CPU)...") | |
cap_main = None # Used for initial video property reading only | |
with multiprocessing.Manager() as manager: | |
progress_queue = manager.Queue() # For workers to report progress units | |
self._multiprocessing_stop_event = manager.Event() # To signal workers to stop | |
try: | |
cap_main = cv2.VideoCapture(self.video_path) | |
if not cap_main.isOpened(): | |
self.error_occurred.emit("Error: Could not open video file for multi-core setup.") | |
return | |
total_frames = int(cap_main.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = cap_main.get(cv2.CAP_PROP_FPS) | |
if fps <= 0: | |
self.error_occurred.emit("Error: Could not determine video FPS for multi-core setup.") | |
if cap_main: cap_main.release() | |
return | |
min_loop_frames = int(self.min_loop_duration_sec * fps) | |
max_search_window_frames_for_cpu = int(self.max_search_window_sec * fps) | |
compare_height = -1 | |
cap_main.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
ret, first_frame_full = cap_main.read() | |
if ret: | |
h_full, w_full, _ = first_frame_full.shape | |
if w_full == 0: | |
self.error_occurred.emit("Error: Video width is zero for multi-core setup.") | |
if cap_main: cap_main.release() | |
return | |
aspect_ratio = h_full / w_full | |
compare_height = int(self.compare_width * aspect_ratio) | |
if compare_height <= 0: | |
self.error_occurred.emit(f"Error: Calculated comparison height ({compare_height}) is invalid for multi-core.") | |
if cap_main: cap_main.release() | |
return | |
else: | |
self.error_occurred.emit("Error: Could not read first frame for multi-core setup.") | |
if cap_main: cap_main.release() | |
return | |
if cap_main: cap_main.release() # Release after getting properties | |
num_processes = max(1, multiprocessing.cpu_count() -1) # Leave one core for system/UI responsiveness | |
outer_loop_end_frame = total_frames - min_loop_frames | |
i_loop_start = self.ignore_frames | |
if i_loop_start >= outer_loop_end_frame: | |
self.status_updated.emit("No frames to process after considering ignore frames and min loop duration.") | |
self.results_found.emit([]) | |
return | |
total_i_iterations = outer_loop_end_frame - i_loop_start | |
chunk_size = math.ceil(total_i_iterations / num_processes) | |
if chunk_size == 0 and total_i_iterations > 0 : chunk_size = 1 # Ensure chunk_size is at least 1 if there's work | |
tasks = [] # List of argument bundles for worker processes | |
for k in range(num_processes): | |
chunk_i_start = i_loop_start + k * chunk_size | |
chunk_i_end = min(i_loop_start + (k + 1) * chunk_size, outer_loop_end_frame) | |
if chunk_i_start >= chunk_i_end: continue # Skip empty chunks | |
task_args = ( | |
self.video_path, chunk_i_start, chunk_i_end, total_frames, fps, | |
min_loop_frames, max_search_window_frames_for_cpu, | |
self.compare_width, compare_height, | |
self.similarity_thresh_normalized, progress_queue, self._multiprocessing_stop_event | |
) | |
tasks.append(task_args) | |
if not tasks: | |
self.status_updated.emit("No processing tasks generated for multi-core.") | |
self.results_found.emit([]) | |
return | |
all_found_loops = [] | |
# Start a separate thread to monitor progress from the multiprocessing queue | |
progress_monitor_thread = threading.Thread(target=self._monitor_mp_progress, | |
args=(progress_queue, total_i_iterations), daemon=True) | |
progress_monitor_thread.start() | |
with multiprocessing.Pool(processes=num_processes) as pool: | |
results_from_pool = pool.map(process_frame_chunk_worker, tasks) | |
for loop_list_from_worker in results_from_pool: | |
all_found_loops.extend(loop_list_from_worker) | |
if progress_queue: | |
progress_queue.put(None) # Signal progress monitor to finish | |
progress_monitor_thread.join(timeout=2.0) # Wait for monitor thread to complete | |
if self._multiprocessing_stop_event and self._multiprocessing_stop_event.is_set(): | |
self.status_updated.emit("Processing cancelled (Multi-core).") | |
else: | |
self.status_updated.emit(f"Analysis complete (Multi-core). Found {len(all_found_loops)} potential loops.") | |
self.results_found.emit(all_found_loops) | |
except Exception as e: | |
if self._multiprocessing_stop_event and not self._multiprocessing_stop_event.is_set(): | |
self._multiprocessing_stop_event.set() # Ensure workers are signalled to stop on error | |
self.error_occurred.emit(f"Multi-core processing error: {str(e)}") | |
finally: | |
if self._multiprocessing_stop_event and not self._multiprocessing_stop_event.is_set(): | |
self._multiprocessing_stop_event.set() # Ensure event is set on any exit path | |
self._is_running = False | |
def _monitor_mp_progress(self, progress_mp_queue: multiprocessing.Queue, total_iterations: int): | |
""" | |
Monitors a multiprocessing queue for progress updates from worker processes. | |
This method runs in a separate thread and emits `progress_updated` signal. | |
@param progress_mp_queue: The `multiprocessing.Queue` instance workers put progress items (count of processed i_frames) into. | |
@param total_iterations: The total number of 'i' iterations expected across all workers. | |
""" | |
processed_count = 0 | |
while self._is_running: # Check if the main thread is still running | |
try: | |
item = progress_mp_queue.get(timeout=0.1) # Timeout allows checking _is_running periodically | |
if item is None: # Sentinel value from main thread to stop monitoring | |
break | |
processed_count += item | |
if total_iterations > 0: | |
progress = int((processed_count / total_iterations) * 100) | |
self.progress_updated.emit(progress) | |
if processed_count >= total_iterations: # All expected work reported | |
break | |
except queue.Empty: | |
# Timeout occurred, loop again to check _is_running or get next item | |
continue | |
except Exception: | |
# Handle other potential queue errors if necessary (e.g., log) | |
break | |
if total_iterations > 0: # Ensure final progress is emitted if loop broke early | |
final_progress = int((processed_count / total_iterations) * 100) | |
self.progress_updated.emit(final_progress) | |
def run_gpu(self): | |
""" | |
Performs loop detection using GPU acceleration via PyTorch and TorchMetrics. | |
Frames are processed in batches to leverage GPU parallelism for SSIM calculation. | |
Requires PyTorch with CUDA support and TorchMetrics to be installed. | |
""" | |
self.status_updated.emit("Starting video analysis (GPU Batched)...") | |
try: | |
import torch | |
import torchmetrics | |
except ImportError: | |
self.error_occurred.emit("PyTorch or TorchMetrics not found. Please install them for GPU mode (e.g., pip install torch torchmetrics).") | |
return | |
if not torch.cuda.is_available(): | |
self.error_occurred.emit("CUDA not available. GPU mode cannot run.") | |
return | |
device = torch.device("cuda") | |
# `reduction='none'` to get SSIM score for each image pair in a batch | |
ssim_metric = torchmetrics.StructuralSimilarityIndexMeasure(data_range=1.0, reduction='none').to(device) | |
found_loops = [] | |
cap = None | |
try: | |
cap = cv2.VideoCapture(self.video_path) | |
if not cap.isOpened(): | |
self.error_occurred.emit("Error: Could not open video file for GPU mode.") | |
return | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if fps <= 0: | |
self.error_occurred.emit("Error: Could not determine video FPS for GPU mode.") | |
if cap: cap.release() | |
return | |
min_loop_frames = int(self.min_loop_duration_sec * fps) | |
max_j_frames_in_window = int(self.max_search_window_sec * fps) | |
compare_height = -1 | |
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
ret_fh, first_frame_fh = cap.read() | |
if ret_fh: | |
h_full, w_full, _ = first_frame_fh.shape | |
if w_full == 0: | |
self.error_occurred.emit("Error: Video width is zero for GPU."); | |
if cap: cap.release() | |
return | |
aspect_ratio = h_full / w_full | |
compare_height = int(self.compare_width * aspect_ratio) | |
if compare_height <= 0: | |
self.error_occurred.emit(f"Error: Invalid compare height for GPU ({compare_height})."); | |
if cap: cap.release() | |
return | |
else: | |
self.error_occurred.emit("Error: Could not read first frame for GPU."); | |
if cap: cap.release() | |
return | |
self.status_updated.emit(f"GPU: Comparing at {self.compare_width}x{compare_height}, Batch: {self.gpu_batch_size}, Device: {torch.cuda.get_device_name(0)}") | |
processed_i_frames_count = 0 | |
outer_loop_end_frame = total_frames - min_loop_frames | |
if self.ignore_frames >= outer_loop_end_frame: | |
self.status_updated.emit("Not enough frames to process with current 'ignore' and 'min loop duration' settings.") | |
self.results_found.emit([]) | |
if cap: cap.release() | |
return | |
outer_loop_total_iterations = outer_loop_end_frame - self.ignore_frames | |
if outer_loop_total_iterations <= 0: outer_loop_total_iterations = 1 | |
i_batch_size = self.gpu_batch_size | |
i_indices = list(range(self.ignore_frames, outer_loop_end_frame)) | |
# Process i_frames in batches | |
for i_batch_start in range(0, len(i_indices), i_batch_size): | |
if not self._is_running: break | |
current_i_batch_indices = i_indices[i_batch_start : i_batch_start + i_batch_size] | |
i_frames_np_list = [] # Store numpy arrays of i_frames for the current batch | |
actual_i_frame_indices_in_batch = [] # Store original video frame indices for this i_batch | |
for i_frame_idx_val in current_i_batch_indices: | |
cap.set(cv2.CAP_PROP_POS_FRAMES, i_frame_idx_val) | |
ret_i, frame_i_full = cap.read() | |
if not ret_i: continue | |
frame_i_resized_np = cv2.resize(frame_i_full, (self.compare_width, compare_height)) | |
frame_i_gray_np = cv2.cvtColor(frame_i_resized_np, cv2.COLOR_BGR2GRAY) | |
frame_i_norm_np = frame_i_gray_np.astype(np.float32) / 255.0 # Normalize for SSIM | |
i_frames_np_list.append(frame_i_norm_np) | |
actual_i_frame_indices_in_batch.append(i_frame_idx_val) | |
if not i_frames_np_list: # If no valid i_frames were read in this batch | |
processed_i_frames_count += len(current_i_batch_indices) # Still count them for progress | |
progress = int((processed_i_frames_count / outer_loop_total_iterations) * 100) | |
self.progress_updated.emit(progress) | |
continue | |
# Stack numpy arrays into a single tensor: [Batch, Height, Width] -> [Batch, Channels=1, Height, Width] | |
i_frames_batch_tensor = torch.from_numpy(np.stack(i_frames_np_list)).unsqueeze(1).to(device) | |
# For each i_frame in the current i_batch_tensor, compare with its corresponding j_frames | |
for batch_idx, original_i_frame_index in enumerate(actual_i_frame_indices_in_batch): | |
if not self._is_running: break | |
current_i_frame_tensor = i_frames_batch_tensor[batch_idx] # Shape: [1, H, W] | |
start_j_candidate = original_i_frame_index + min_loop_frames + 1 | |
end_j_candidate_limit = min(original_i_frame_index + 1 + max_j_frames_in_window, total_frames) | |
num_j_frames_to_read = end_j_candidate_limit - start_j_candidate | |
if num_j_frames_to_read <= 0: | |
processed_i_frames_count += 1 | |
continue | |
j_frames_np_list = [] | |
actual_j_frame_indices_in_batch = [] | |
cap.set(cv2.CAP_PROP_POS_FRAMES, start_j_candidate) # Set read position for this j-window | |
for _ in range(num_j_frames_to_read): | |
if not self._is_running: break | |
ret_j, frame_j_full = cap.read() | |
if not ret_j: break # Stop if cannot read more frames for this j-window | |
frame_j_resized_np = cv2.resize(frame_j_full, (self.compare_width, compare_height)) | |
frame_j_gray_np = cv2.cvtColor(frame_j_resized_np, cv2.COLOR_BGR2GRAY) | |
frame_j_norm_np = frame_j_gray_np.astype(np.float32) / 255.0 | |
j_frames_np_list.append(frame_j_norm_np) | |
actual_j_frame_indices_in_batch.append(start_j_candidate + len(j_frames_np_list) -1) | |
if not j_frames_np_list: | |
processed_i_frames_count += 1 | |
continue | |
j_frames_batch_tensor = torch.from_numpy(np.stack(j_frames_np_list)).unsqueeze(1).to(device) | |
# Expand the single i_frame_tensor to match the batch size of j_frames for comparison | |
expanded_i_frame_tensor = current_i_frame_tensor.expand_as(j_frames_batch_tensor) | |
similarity_scores_tensor = ssim_metric(expanded_i_frame_tensor, j_frames_batch_tensor) | |
similarity_scores_np = similarity_scores_tensor.cpu().numpy() | |
similarity_scores_np = np.atleast_1d(similarity_scores_np) # Ensure iterable | |
for score_idx, similarity_score in enumerate(similarity_scores_np): | |
if score_idx >= len(actual_j_frame_indices_in_batch): break | |
if similarity_score >= self.similarity_thresh_normalized: | |
loop_end_j_index = actual_j_frame_indices_in_batch[score_idx] | |
loop_start_frame = original_i_frame_index | |
loop_end_frame = loop_end_j_index - 1 | |
duration_frames = loop_end_frame - loop_start_frame + 1 | |
if duration_frames < min_loop_frames: continue | |
duration_sec = duration_frames / fps | |
loop_info = { | |
"start_frame": loop_start_frame, "end_frame": loop_end_frame, | |
"start_time": loop_start_frame / fps, "end_time": (loop_end_frame + 1) / fps, | |
"duration_sec": duration_sec, "similarity": float(similarity_score) | |
} | |
found_loops.append(loop_info) | |
processed_i_frames_count += 1 # Increment after processing one original i_frame and its j-window | |
# Update progress after each i_batch is processed | |
progress = int((processed_i_frames_count / outer_loop_total_iterations) * 100) | |
self.progress_updated.emit(progress) | |
if not self._is_running: | |
self.status_updated.emit("Processing cancelled (GPU).") | |
else: | |
self.status_updated.emit(f"Analysis complete (GPU). Found {len(found_loops)} potential loops.") | |
self.results_found.emit(found_loops) | |
except Exception as e: | |
self.error_occurred.emit(f"GPU processing error: {str(e)}") | |
finally: | |
if cap: cap.release() | |
self._is_running = False | |
def stop(self): | |
""" | |
Requests the thread to stop its current processing task. | |
Sets the internal `_is_running` flag to False and signals the | |
multiprocessing stop event if multi-core CPU mode was active. | |
""" | |
self._is_running = False | |
if self.processing_mode == "CPU (Multi-core)" and self._multiprocessing_stop_event: | |
self._multiprocessing_stop_event.set() | |
self.status_updated.emit("Attempting to stop processing...") | |
# --- Video Preview Worker Thread --- | |
class PreviewWorker(QThread): | |
""" | |
A QThread subclass dedicated to playing a selected video loop segment. | |
It reads frames from the video file at the appropriate rate and emits them | |
for display in the UI. Supports looping a specific number of times or indefinitely. | |
""" | |
frame_ready = pyqtSignal(np.ndarray) # Emits a BGR numpy array (frame) when it's ready for display. | |
preview_finished = pyqtSignal() # Emitted when the preview completes all its loops or is stopped. | |
preview_error = pyqtSignal(str) # Emitted if an error occurs during preview (e.g., cannot read frame). | |
def __init__(self, video_path, start_frame, end_frame, loop_count, loop_forever, parent=None): | |
""" | |
Initializes the PreviewWorker. | |
@param video_path (str): Filesystem path to the video file. | |
@param start_frame (int): The starting frame index of the loop segment to be previewed. | |
@param end_frame (int): The ending frame index of the loop segment. | |
@param loop_count (int): The number of times to repeat the loop (ignored if `loop_forever` is True). | |
@param loop_forever (bool): If True, the loop segment will repeat indefinitely until stopped. | |
@param parent (QObject, optional): The parent object for this QThread. | |
""" | |
super().__init__(parent) | |
self.video_path = video_path | |
self.start_frame = start_frame | |
self.end_frame = end_frame | |
self.loop_count = loop_count | |
self.loop_forever = loop_forever | |
self._is_running = True # Internal flag to control the thread's execution loop. | |
def run(self): | |
""" | |
The main execution method for the preview thread. | |
It reads frames for the specified loop segment, emits them via `frame_ready`, | |
and manages loop repetitions and frame timing based on video FPS. | |
""" | |
cap = None | |
ret = True # Flag to track if frame reading was successful within the loop | |
try: | |
cap = cv2.VideoCapture(self.video_path) | |
if not cap.isOpened(): | |
self.preview_error.emit("Preview Error: Could not open video file.") | |
return | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if fps <= 0: fps = 30.0 # Default FPS if detection fails or is invalid | |
frame_delay = 1.0 / fps # Time to wait between frames to approximate original speed | |
current_loop_iteration = 0 | |
while self._is_running: | |
if not self.loop_forever and current_loop_iteration >= self.loop_count: | |
break # Reached desired loop count | |
for frame_idx in range(self.start_frame, self.end_frame + 1): | |
if not self._is_running: break # Stop requested by user/system | |
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) | |
ret, frame = cap.read() | |
if not ret: | |
self.preview_error.emit(f"Preview Error: Could not read frame {frame_idx}") | |
break # Error reading frame, exit inner loop | |
if frame is not None and frame.size > 0: | |
self.frame_ready.emit(frame) | |
time.sleep(frame_delay) # Pause to maintain video playback speed | |
if not self._is_running or not ret: break # Exit outer loop if stopped or error in inner loop | |
current_loop_iteration +=1 | |
except Exception as e: | |
self.preview_error.emit(f"Preview runtime error: {str(e)}") | |
finally: | |
if cap: | |
cap.release() | |
self.preview_finished.emit() # Always emit finished signal, regardless of how it exited | |
def stop(self): | |
"""Requests the preview worker thread to stop its execution.""" | |
self._is_running = False | |
# --- Main Application Window --- | |
class VideoLooperApp(QMainWindow): | |
""" | |
The main application window for the Video Looper. | |
This class encapsulates the UI, event handling, and coordination of | |
video processing and preview tasks. | |
""" | |
def __init__(self): | |
""" | |
Initializes the main application window, sets up the UI, | |
and initializes member variables. | |
""" | |
super().__init__() | |
self.setWindowTitle("Video Looper Pro") | |
self.setGeometry(100, 100, 800, 830) | |
# --- Application State Variables --- | |
self.video_path = None # Path to the currently loaded video file. | |
self.original_video_width = None # Width of the original loaded video. | |
self.original_video_height = None # Height of the original loaded video. | |
self.found_loops = [] # List to store dictionaries of found/dummy loops. | |
self.loop_finder_thread = None # Reference to the active LoopFinderThread instance. | |
self.preview_worker = None # Reference to the active PreviewWorker instance. | |
self.is_previewing = False # Boolean flag indicating if a preview is currently active. | |
# --- UI Construction --- | |
central_widget = QWidget() | |
self.setCentralWidget(central_widget) | |
main_layout = QVBoxLayout(central_widget) | |
self._setup_file_layout(main_layout) | |
self._setup_mode_gpu_layout(main_layout) | |
self._setup_parameters_layout(main_layout) | |
self._setup_action_layout(main_layout) | |
self._setup_results_list(main_layout) | |
self._setup_preview_area(main_layout) | |
self._setup_bottom_controls(main_layout) | |
self._load_default_video_on_startup() # Attempt to load/download a demo video | |
# Set initial UI state for GPU controls based on default processing mode | |
self._handle_processing_mode_change(self.processing_mode_combo.currentText()) | |
def _setup_file_layout(self, parent_layout): | |
""" | |
Sets up the UI elements for video file loading (label, path display, load button). | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
file_layout = QHBoxLayout() | |
self.video_path_label = QLabel("No video selected.") | |
self.video_path_label.setWordWrap(True) | |
load_button = QPushButton("Load Video") | |
load_button.clicked.connect(self.load_video) | |
file_layout.addWidget(QLabel("Video File:")) | |
file_layout.addWidget(self.video_path_label, 1) # Label takes available horizontal space | |
file_layout.addWidget(load_button) | |
parent_layout.addLayout(file_layout) | |
def _setup_mode_gpu_layout(self, parent_layout): | |
""" | |
Sets up UI elements for selecting processing mode and GPU batch size. | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
mode_gpu_layout = QHBoxLayout() | |
mode_gpu_layout.addWidget(QLabel("Processing Mode:")) | |
self.processing_mode_combo = QComboBox() | |
# GPU mode listed first as it's often preferred if available | |
self.processing_mode_combo.addItems(["GPU", "CPU (Single Core)", "CPU (Multi-core)"]) | |
self.processing_mode_combo.currentTextChanged.connect(self._handle_processing_mode_change) | |
mode_gpu_layout.addWidget(self.processing_mode_combo) | |
self.gpu_batch_size_label = QLabel("GPU Batch Size:") | |
self.gpu_batch_size_input = QSpinBox() | |
self.gpu_batch_size_input.setRange(16, 2048) # Min/Max allowable batch size | |
self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE) | |
self.gpu_batch_size_input.setToolTip("Batch size for GPU processing. Larger values may be faster but consume more VRAM.") | |
mode_gpu_layout.addWidget(self.gpu_batch_size_label) | |
mode_gpu_layout.addWidget(self.gpu_batch_size_input) | |
parent_layout.addLayout(mode_gpu_layout) | |
# GPU-specific controls are initially hidden, shown when GPU mode is selected. | |
self.gpu_batch_size_label.setVisible(False) | |
self.gpu_batch_size_input.setVisible(False) | |
def _setup_parameters_layout(self, parent_layout): | |
""" | |
Sets up the UI elements for various loop finding parameters (similarity, durations, compare width). | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
params_layout = QHBoxLayout() | |
# Parameter Group 1: Similarity Threshold, Ignore First N Frames | |
params_group1_layout = QVBoxLayout() | |
self.similarity_thresh_input = QDoubleSpinBox() | |
self.similarity_thresh_input.setRange(0.0, 100.0) | |
self.similarity_thresh_input.setValue(DEFAULT_SIMILARITY_THRESHOLD) | |
self.similarity_thresh_input.setSuffix(" %") | |
sim_layout = QHBoxLayout() | |
sim_layout.addWidget(QLabel("Similarity Threshold:")) | |
sim_layout.addWidget(self.similarity_thresh_input) | |
params_group1_layout.addLayout(sim_layout) | |
self.ignore_frames_input = QSpinBox() | |
self.ignore_frames_input.setRange(0, 1000000) # Allow ignoring a large number of frames | |
self.ignore_frames_input.setValue(DEFAULT_IGNORE_FRAMES) | |
ignore_layout = QHBoxLayout() | |
ignore_layout.addWidget(QLabel("Ignore First N Frames:")) | |
ignore_layout.addWidget(self.ignore_frames_input) | |
params_group1_layout.addLayout(ignore_layout) | |
params_layout.addLayout(params_group1_layout) | |
# Parameter Group 2: Minimum Loop Duration, Maximum Search Window | |
params_group2_layout = QVBoxLayout() | |
self.min_loop_duration_input = QDoubleSpinBox() | |
self.min_loop_duration_input.setRange(0.1, 600.0) # Min/Max loop duration in seconds | |
self.min_loop_duration_input.setValue(DEFAULT_MIN_LOOP_DURATION_SEC) | |
self.min_loop_duration_input.setSuffix(" s") | |
min_dur_layout = QHBoxLayout() | |
min_dur_layout.addWidget(QLabel("Min Loop Duration:")) | |
min_dur_layout.addWidget(self.min_loop_duration_input) | |
params_group2_layout.addLayout(min_dur_layout) | |
self.max_search_window_input = QDoubleSpinBox() | |
self.max_search_window_input.setRange(0.1, 1800.0) # Min/Max search window in seconds | |
self.max_search_window_input.setValue(DEFAULT_MAX_SEARCH_WINDOW_SEC) | |
self.max_search_window_input.setSuffix(" s") | |
max_win_layout = QHBoxLayout() | |
max_win_layout.addWidget(QLabel("Max Search Window:")) | |
max_win_layout.addWidget(self.max_search_window_input) | |
params_group2_layout.addLayout(max_win_layout) | |
params_layout.addLayout(params_group2_layout) | |
# Parameter Group 3: Comparison Width (Presets and Manual Input) | |
params_group3_layout = QVBoxLayout() | |
comp_width_label_layout = QHBoxLayout() | |
comp_width_label_layout.addWidget(QLabel("Comparison Width:")) | |
self.compare_width_preset_combo = QComboBox() | |
self.compare_width_preset_combo.addItems(list(PRESET_FRACTIONS.keys()) + [CUSTOM_PRESET_TEXT]) | |
self.compare_width_preset_combo.setEnabled(False) # Enabled only when a video is loaded | |
self.compare_width_preset_combo.currentTextChanged.connect(self._handle_compare_width_preset_change) | |
comp_width_label_layout.addWidget(self.compare_width_preset_combo) | |
params_group3_layout.addLayout(comp_width_label_layout) | |
self.compare_width_input = QSpinBox() | |
self.compare_width_input.setRange(160, 3840) # Min/Max comparison width in pixels | |
self.compare_width_input.setStepType(QSpinBox.StepType.AdaptiveDecimalStepType) | |
self.compare_width_input.setValue(DEFAULT_COMPARE_WIDTH) | |
self.compare_width_input.setSuffix(" px") | |
self.compare_width_input.valueChanged.connect(self._handle_compare_width_spinbox_change) | |
self.compare_width_input.valueChanged.connect(self._update_gpu_batch_size_suggestion_on_compare_width_change) | |
params_group3_layout.addWidget(self.compare_width_input) | |
params_layout.addLayout(params_group3_layout) | |
parent_layout.addLayout(params_layout) | |
def _setup_action_layout(self, parent_layout): | |
""" | |
Sets up the main action buttons: 'Find Loops' and 'Cancel Processing'. | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
action_layout = QHBoxLayout() | |
self.find_loops_button = QPushButton("Find Loops") | |
self.find_loops_button.clicked.connect(self.start_find_loops) | |
self.find_loops_button.setEnabled(False) # Enabled when video is loaded | |
self.cancel_button = QPushButton("Cancel Processing") | |
self.cancel_button.clicked.connect(self.cancel_processing) | |
self.cancel_button.setEnabled(False) # Enabled when processing is active | |
action_layout.addWidget(self.find_loops_button) | |
action_layout.addWidget(self.cancel_button) | |
parent_layout.addLayout(action_layout) | |
def _setup_results_list(self, parent_layout): | |
""" | |
Sets up the QListWidget for displaying found loop results. | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
parent_layout.addWidget(QLabel("Found Loops (select one to save or preview):")) | |
self.results_list = QListWidget() | |
self.results_list.itemDoubleClicked.connect(self.save_selected_loop) # Double-click on item to save | |
parent_layout.addWidget(self.results_list) | |
def _setup_preview_area(self, parent_layout): | |
""" | |
Sets up the UI elements for video preview, including the display label and control buttons. | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
preview_section_layout = QVBoxLayout() | |
preview_section_layout.addWidget(QLabel("Loop Preview:")) | |
self.preview_label = QLabel("Select a loop and click 'Preview Selected Loop'") | |
self.preview_label.setAlignment(Qt.AlignmentFlag.AlignCenter) | |
self.preview_label.setMinimumHeight(240) # Ensure a decent size for preview | |
self.preview_label.setMinimumWidth(320) | |
self.preview_label.setStyleSheet("color: gray; border: 1px solid lightgray; background-color: black;") | |
preview_section_layout.addWidget(self.preview_label) | |
preview_controls_layout = QHBoxLayout() | |
self.preview_button = QPushButton("Preview Selected Loop") | |
self.preview_button.clicked.connect(self.start_preview_thread) | |
self.preview_button.setEnabled(False) # Enabled when a loop is selected | |
preview_controls_layout.addWidget(self.preview_button) | |
self.stop_preview_button = QPushButton("Stop Preview") | |
self.stop_preview_button.clicked.connect(self.stop_preview) | |
self.stop_preview_button.setEnabled(False) # Enabled when preview is active | |
preview_controls_layout.addWidget(self.stop_preview_button) | |
preview_section_layout.addLayout(preview_controls_layout) | |
# Controls for preview loop count | |
loop_count_layout = QHBoxLayout() | |
loop_count_layout.addWidget(QLabel("Loop Count:")) | |
self.loop_count_input = QSpinBox() | |
self.loop_count_input.setRange(1, 100) | |
self.loop_count_input.setValue(DEFAULT_PREVIEW_LOOP_COUNT) | |
loop_count_layout.addWidget(self.loop_count_input) | |
self.loop_forever_checkbox = QCheckBox("Loop Forever") | |
self.loop_forever_checkbox.stateChanged.connect(self._toggle_loop_count_input_enabled) | |
loop_count_layout.addWidget(self.loop_forever_checkbox) | |
loop_count_layout.addStretch() # Push controls to the left | |
preview_section_layout.addLayout(loop_count_layout) | |
parent_layout.addLayout(preview_section_layout) | |
def _setup_bottom_controls(self, parent_layout): | |
""" | |
Sets up bottom UI elements: save button, progress bar, status label, and warning label. | |
@param parent_layout (QVBoxLayout): The main layout to add this section to. | |
""" | |
self.save_loop_button = QPushButton("Save Selected Loop") | |
self.save_loop_button.clicked.connect(self.save_selected_loop) | |
self.save_loop_button.setEnabled(False) # Enabled when a loop is selected | |
parent_layout.addWidget(self.save_loop_button) | |
self.progress_bar = QProgressBar() | |
parent_layout.addWidget(self.progress_bar) | |
self.status_label = QLabel("Status: Ready. Load a video to begin.") | |
self.status_label.setWordWrap(True) | |
parent_layout.addWidget(self.status_label) | |
warning_label = QLabel( | |
"<b>Warning:</b> Finding loops in long videos can be resource-intensive. " | |
"Adjust parameters for optimal performance." | |
) | |
warning_label.setStyleSheet("color: orange; font-style: italic;") | |
warning_label.setWordWrap(True) | |
parent_layout.addWidget(warning_label) | |
def _load_default_video_on_startup(self): | |
""" | |
Attempts to load a predefined default video ('demo.mp4') upon application startup. | |
If the video file is not found in the script's directory, it attempts to download it | |
from a predefined URL. Updates the status label with progress/outcome. | |
""" | |
default_video_name = "demo.mp4" | |
demo_video_url = "https://xn--1xap.com/demo.mp4" # URL for the demo video | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
default_video_path = os.path.join(script_dir, default_video_name) | |
if not os.path.isfile(default_video_path): | |
self.status_label.setText(f"Status: Default video '{default_video_name}' not found. Attempting to download...") | |
QApplication.processEvents() # Ensure UI updates before download starts | |
try: | |
self.status_label.setText(f"Status: Downloading '{default_video_name}' from {demo_video_url}...") | |
QApplication.processEvents() | |
os.makedirs(script_dir, exist_ok=True) # Ensure directory exists | |
# Perform download with progress indication in status bar | |
with urllib.request.urlopen(demo_video_url) as response, open(default_video_path, 'wb') as out_file: | |
total_length_header = response.info().get('Content-Length') | |
if total_length_header: | |
total_length = int(total_length_header) | |
dl_bytes = 0 | |
block_size_bytes = 8192 # 8KB blocks | |
while True: | |
buffer = response.read(block_size_bytes) | |
if not buffer: break | |
dl_bytes += len(buffer) | |
out_file.write(buffer) | |
# Simple text-based progress bar for status label | |
progress_percent_done = int(50 * dl_bytes / total_length) | |
self.status_label.setText(f"Status: Downloading... [{'=' * progress_percent_done}{' ' * (50-progress_percent_done)}] {dl_bytes // 1024} KB / {total_length // 1024} KB") | |
QApplication.processEvents() | |
else: # No Content-Length header, download without progress percentage | |
out_file.write(response.read()) | |
self.status_label.setText(f"Status: Download complete. Loading '{default_video_name}'...") | |
QApplication.processEvents() | |
if not os.path.isfile(default_video_path): # Verify download success | |
self.status_label.setText(f"Status: Download failed to create file. Please load a video manually.") | |
return | |
except urllib.error.URLError as e: | |
self.status_label.setText(f"Status: Failed to download '{default_video_name}': Network error ({e.reason}). Please load a video manually.") | |
return | |
except IOError as e: # File system error during save | |
self.status_label.setText(f"Status: Failed to save '{default_video_name}': File error ({e.strerror}). Please load a video manually.") | |
return | |
except Exception as e: # Catch-all for other download/save errors | |
self.status_label.setText(f"Status: Error downloading '{default_video_name}': {str(e)}. Please load a video manually.") | |
# Attempt to clean up partially downloaded file if it exists and is small (e.g., < 1KB) | |
if os.path.exists(default_video_path) and os.path.getsize(default_video_path) < 1024: | |
try: os.remove(default_video_path) | |
except OSError: pass # Ignore if removal fails | |
return | |
if os.path.isfile(default_video_path): | |
self.load_video_at_startup(default_video_path) # Proceed to load the (now existing) video | |
else: | |
# This state implies download failed and was reported, or file was removed post-check. | |
self.status_label.setText(f"Status: Default video '{default_video_name}' still not found. Please load a video manually.") | |
# --- UI Event Handlers and Core Logic Methods --- | |
def _handle_processing_mode_change(self, mode_text): | |
""" | |
Handles changes in the processing mode selection (ComboBox). | |
Shows/hides GPU-specific controls (batch size) and triggers an update | |
for the GPU batch size suggestion if GPU mode is selected. | |
@param mode_text (str): The text of the currently selected processing mode. | |
""" | |
is_gpu_mode = (mode_text == "GPU") | |
self.gpu_batch_size_label.setVisible(is_gpu_mode) | |
self.gpu_batch_size_input.setVisible(is_gpu_mode) | |
if is_gpu_mode: | |
self._update_gpu_batch_size_suggestion() | |
def _update_gpu_batch_size_suggestion_on_compare_width_change(self): | |
""" | |
Callback for when the comparison width changes. | |
If GPU mode is active, it triggers an update to the GPU batch size suggestion. | |
""" | |
if self.processing_mode_combo.currentText() == "GPU": | |
self._update_gpu_batch_size_suggestion() | |
def _update_gpu_batch_size_suggestion(self): | |
""" | |
Suggests an appropriate GPU batch size based on available VRAM (if detectable via PyTorch) | |
and the current comparison width. This is a heuristic and may require manual adjustment by the user. | |
Updates the GPU batch size input field and its tooltip. | |
""" | |
if not (self.original_video_width and self.original_video_height): | |
# Video dimensions are needed for context, but suggestion can proceed with defaults if not available. | |
# Tooltip will indicate if suggestion is based on defaults or actual VRAM. | |
pass | |
try: | |
import torch | |
if not torch.cuda.is_available(): | |
self.gpu_batch_size_input.setToolTip("CUDA not available. Set batch size manually.") | |
# Potentially set a conservative default if CUDA isn't found but GPU mode is selected. | |
# self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE) | |
return | |
device_props = torch.cuda.get_device_properties(0) | |
total_vram_gb = device_props.total_memory / (1024**3) # VRAM in GB | |
# Heuristic: Base batch size on VRAM, then scale by comparison width. | |
# These are example tiers and multipliers, adjust based on typical frame sizes and VRAM usage. | |
if total_vram_gb > 10: # e.g., >10GB VRAM | |
suggested_batch = 128 * 16 # Base factor, can be adjusted | |
elif total_vram_gb > 6: # e.g., 6-10GB VRAM | |
suggested_batch = 96 * 16 | |
elif total_vram_gb > 3.5: # e.g., 3.5-6GB VRAM | |
suggested_batch = 64 * 16 | |
elif total_vram_gb > 1.5: # e.g., 1.5-3.5GB VRAM | |
suggested_batch = 32 * 16 | |
else: # e.g., <= 1.5GB VRAM | |
suggested_batch = 16 * 16 | |
current_compare_width = self.compare_width_input.value() | |
# Scale down suggested batch for higher comparison resolutions | |
if current_compare_width > 1280: # High comparison resolution (e.g., > 720p width) | |
suggested_batch = max(16, suggested_batch // 4) | |
elif current_compare_width > 720: # Medium-high comparison resolution (e.g., > 480p width) | |
suggested_batch = max(16, suggested_batch // 2) | |
# Clamp suggested batch to the spinbox's defined min/max range | |
suggested_batch = max(self.gpu_batch_size_input.minimum(), | |
min(self.gpu_batch_size_input.maximum(), suggested_batch)) | |
self.gpu_batch_size_input.setValue(suggested_batch) | |
self.gpu_batch_size_input.setToolTip(f"Suggested: {suggested_batch} (based on ~{total_vram_gb:.1f}GB VRAM and compare width {current_compare_width}px). Adjust as needed.") | |
except ImportError: | |
self.gpu_batch_size_input.setToolTip("PyTorch not found. Cannot estimate VRAM. Set batch size manually.") | |
# self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE) # Fallback to default | |
except Exception: # Catch other errors during VRAM detection (e.g., CUDA issues not caught by is_available) | |
self.gpu_batch_size_input.setToolTip("Could not estimate VRAM or suggest batch size. Set manually.") | |
# self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE) # Fallback to default | |
def _toggle_loop_count_input_enabled(self, state): | |
""" | |
Enables or disables the loop count QSpinBox based on the state of the 'Loop Forever' QCheckBox. | |
@param state (int): The Qt.CheckState of the 'Loop Forever' checkbox. | |
""" | |
if state == Qt.CheckState.Checked.value: | |
self.loop_count_input.setEnabled(False) # Disabled if looping forever | |
else: | |
self.loop_count_input.setEnabled(True) # Enabled if not looping forever | |
def _handle_compare_width_preset_change(self, preset_text): | |
""" | |
Handles changes in the comparison width preset ComboBox. | |
Updates the comparison width QSpinBox value based on the selected preset | |
and the original video width. | |
@param preset_text (str): The text of the selected preset. | |
""" | |
if not self.original_video_width or preset_text == CUSTOM_PRESET_TEXT: | |
# Do nothing if original video width is unknown or "Custom" is selected | |
return | |
if preset_text in PRESET_FRACTIONS: | |
divisor = PRESET_FRACTIONS[preset_text] | |
# Calculate target width, ensuring it's at least the minimum allowed by the spinbox (e.g., 160) | |
target_width = max(self.compare_width_input.minimum(), self.original_video_width // divisor) | |
# Block signals to prevent recursive updates if spinbox change also updates combobox | |
self.compare_width_input.blockSignals(True) | |
self.compare_width_input.setValue(target_width) | |
self.compare_width_input.blockSignals(False) | |
# Manually trigger GPU suggestion update as spinbox valueChanged was blocked | |
self._update_gpu_batch_size_suggestion_on_compare_width_change() | |
def _handle_compare_width_spinbox_change(self, new_width): | |
""" | |
Handles changes in the comparison width QSpinBox. | |
Updates the preset ComboBox to reflect the current width, selecting "Custom" | |
if the width doesn't match a predefined preset. | |
@param new_width (int): The new width value from the QSpinBox. | |
""" | |
if not self.original_video_width: | |
# If no video loaded, ensure preset is "Custom" if not already | |
if self.compare_width_preset_combo.currentText() != CUSTOM_PRESET_TEXT: | |
self.compare_width_preset_combo.blockSignals(True) | |
self.compare_width_preset_combo.setCurrentText(CUSTOM_PRESET_TEXT) | |
self.compare_width_preset_combo.blockSignals(False) | |
return | |
matching_preset = CUSTOM_PRESET_TEXT # Default to custom | |
for preset_name, divisor in PRESET_FRACTIONS.items(): | |
# Check if the new_width corresponds to this preset's fraction of original width | |
# Allow for slight rounding differences by checking if it's the clamped minimum or exact division | |
if self.original_video_width // divisor == new_width: | |
if self.original_video_width % divisor == 0 or \ | |
new_width == max(self.compare_width_input.minimum(), self.original_video_width // divisor): | |
matching_preset = preset_name | |
break | |
# Update combobox only if its current text doesn't match the determined preset | |
if self.compare_width_preset_combo.currentText() != matching_preset: | |
self.compare_width_preset_combo.blockSignals(True) | |
self.compare_width_preset_combo.setCurrentText(matching_preset) | |
self.compare_width_preset_combo.blockSignals(False) | |
# Note: GPU suggestion update is connected directly to valueChanged of compare_width_input | |
def _set_initial_compare_width_and_preset(self, video_width, video_height): | |
""" | |
Sets the initial suggested comparison width and corresponding preset in the UI | |
when a new video is loaded. Aims for a sensible default (e.g., half width or DEFAULT_COMPARE_WIDTH). | |
@param video_width (int): The width of the loaded video in pixels. | |
@param video_height (int): The height of the loaded video in pixels. | |
""" | |
self.original_video_width = video_width | |
self.original_video_height = video_height | |
self.compare_width_preset_combo.setEnabled(True) # Enable preset selection | |
# Determine a sensible initial suggested width. | |
# Try to match DEFAULT_COMPARE_WIDTH via a preset, or use a common fraction like 1/2 or 1/4. | |
initial_suggested_width = DEFAULT_COMPARE_WIDTH | |
closest_preset_text = CUSTOM_PRESET_TEXT | |
min_difference_to_default = float('inf') | |
# Attempt to find a preset that results in DEFAULT_COMPARE_WIDTH or is close to it. | |
for preset_text_key, div_val in PRESET_FRACTIONS.items(): | |
preset_calculated_width = max(self.compare_width_input.minimum(), video_width // div_val) | |
if preset_calculated_width == DEFAULT_COMPARE_WIDTH: # Prioritize exact match for default | |
closest_preset_text = preset_text_key | |
initial_suggested_width = preset_calculated_width | |
break | |
current_difference = abs(preset_calculated_width - DEFAULT_COMPARE_WIDTH) | |
if current_difference < min_difference_to_default : | |
min_difference_to_default = current_difference | |
closest_preset_text = preset_text_key | |
initial_suggested_width = preset_calculated_width | |
elif current_difference == min_difference_to_default and div_val < PRESET_FRACTIONS.get(closest_preset_text, float('inf')): | |
# Prefer larger fractions (smaller divisor) if difference is the same | |
closest_preset_text = preset_text_key | |
initial_suggested_width = preset_calculated_width | |
# Fallback logic if no preset is reasonably close to DEFAULT_COMPARE_WIDTH | |
# Heuristic threshold: if closest preset is still far from default, try common fractions. | |
if closest_preset_text == CUSTOM_PRESET_TEXT or abs(initial_suggested_width - DEFAULT_COMPARE_WIDTH) > 100 : | |
if video_width >= 1280 and video_width // 2 >= self.compare_width_input.minimum(): # Prefer 1/2 for HD+ videos | |
initial_suggested_width = max(self.compare_width_input.minimum(), video_width // 2) | |
closest_preset_text = "Half (1/2)" if "Half (1/2)" in PRESET_FRACTIONS else CUSTOM_PRESET_TEXT | |
elif video_width // 4 >= self.compare_width_input.minimum() : # Default to 1/4 for others if valid | |
initial_suggested_width = max(self.compare_width_input.minimum(), video_width // 4) | |
closest_preset_text = "Quarter (1/4)" if "Quarter (1/4)" in PRESET_FRACTIONS else CUSTOM_PRESET_TEXT | |
else: # Fallback to full width if video is very small or fractions are too small | |
initial_suggested_width = video_width | |
closest_preset_text = "Full (1/1)" if "Full (1/1)" in PRESET_FRACTIONS else CUSTOM_PRESET_TEXT | |
# Ensure the suggested width is within the spinbox's valid range | |
initial_suggested_width = max(self.compare_width_input.minimum(), | |
min(self.compare_width_input.maximum(), initial_suggested_width)) | |
# Update spinbox and combobox, blocking signals to prevent unwanted immediate re-triggering | |
self.compare_width_input.blockSignals(True) | |
self.compare_width_input.setValue(initial_suggested_width) | |
self.compare_width_input.blockSignals(False) | |
self.compare_width_preset_combo.blockSignals(True) | |
# Call the handler directly to sync combobox to the new spinbox value | |
self._handle_compare_width_spinbox_change(initial_suggested_width) | |
self.compare_width_preset_combo.blockSignals(False) | |
# Trigger GPU batch size suggestion based on this initial compare width | |
self._update_gpu_batch_size_suggestion_on_compare_width_change() | |
def _create_dummy_full_video_loop(self, video_path_for_dummy): | |
""" | |
Creates a 'dummy' loop entry that represents the entire duration of the video. | |
This allows users to preview or save the full video via the loop interface | |
before or without finding specific shorter loops. | |
@param video_path_for_dummy (str): The filesystem path to the video. | |
@return: A dictionary formatted like a found loop, representing the full video, | |
or None if video properties cannot be read. | |
""" | |
cap_temp = None | |
try: | |
cap_temp = cv2.VideoCapture(video_path_for_dummy) | |
if not cap_temp.isOpened(): return None | |
total_frames = int(cap_temp.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = cap_temp.get(cv2.CAP_PROP_FPS) | |
if fps <= 0 or total_frames <= 0: return None # Invalid video properties | |
return { | |
"start_frame": 0, | |
"end_frame": total_frames - 1, | |
"start_time": 0.0, | |
"end_time": total_frames / fps, | |
"duration_sec": total_frames / fps, | |
"similarity": 1.0, # Represents a perfect match for itself | |
"is_dummy": True # Special flag to identify this as a full-video entry | |
} | |
except Exception: | |
# Log error if necessary | |
return None | |
finally: | |
if cap_temp: cap_temp.release() | |
def load_video_at_startup(self, file_name): | |
""" | |
Loads a video file specified by `file_name` when the application starts. | |
This is typically used for loading a default video. It updates the UI | |
to reflect the loaded video and prepares for processing. | |
@param file_name (str): The filesystem path to the video file to load. | |
""" | |
self.stop_preview() # Ensure any ongoing preview is stopped | |
self.video_path = file_name | |
self.video_path_label.setText(os.path.basename(file_name)) | |
self.results_list.clear() | |
self.found_loops = [] | |
self.original_video_width = None | |
self.original_video_height = None | |
self.compare_width_preset_combo.setEnabled(False) # Keep disabled until dimensions are read | |
try: | |
cap_temp = cv2.VideoCapture(file_name) | |
if cap_temp.isOpened(): | |
video_width = int(cap_temp.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
video_height = int(cap_temp.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
if video_width > 0 and video_height > 0: | |
self._set_initial_compare_width_and_preset(video_width, video_height) | |
dummy_loop = self._create_dummy_full_video_loop(file_name) | |
if dummy_loop: | |
self.found_loops.append(dummy_loop) | |
self.display_results(self.found_loops) # This will enable preview/save buttons | |
cap_temp.release() | |
except Exception as e: | |
self.update_status(f"Warning: Could not read video dimensions or create dummy loop: {e}") | |
self.status_label.setText(f"Loaded: {os.path.basename(file_name)}. Ready to find loops or preview full video.") | |
self.find_loops_button.setEnabled(True) | |
if not self.found_loops: # If dummy loop creation failed, ensure buttons are disabled | |
self.save_loop_button.setEnabled(False) | |
self.preview_button.setEnabled(False) | |
self.progress_bar.setValue(0) | |
# Ensure GPU UI state is correct based on current processing mode | |
self._handle_processing_mode_change(self.processing_mode_combo.currentText()) | |
def load_video(self): | |
""" | |
Opens a file dialog for the user to select a video file. | |
If a file is selected, it's loaded, UI is updated, and a dummy full-video loop | |
is created and added to the results list. | |
""" | |
self.stop_preview() # Stop any active preview | |
file_name, _ = QFileDialog.getOpenFileName(self, "Open Video File", "", | |
"Video Files (*.mp4 *.avi *.mov *.mkv);;All Files (*)") | |
if file_name: | |
self.video_path = file_name | |
self.video_path_label.setText(os.path.basename(file_name)) | |
self.results_list.clear() | |
self.found_loops = [] | |
self.original_video_width = None | |
self.original_video_height = None | |
self.compare_width_preset_combo.setEnabled(False) # Keep disabled until dimensions are read | |
try: | |
cap_temp = cv2.VideoCapture(file_name) | |
if cap_temp.isOpened(): | |
video_width = int(cap_temp.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
video_height = int(cap_temp.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
if video_width > 0 and video_height > 0: | |
self._set_initial_compare_width_and_preset(video_width, video_height) | |
dummy_loop = self._create_dummy_full_video_loop(file_name) | |
if dummy_loop: | |
self.found_loops.append(dummy_loop) | |
self.display_results(self.found_loops) # Enables preview/save buttons | |
cap_temp.release() | |
except Exception as e: | |
self.update_status(f"Warning: Could not read video dimensions or create dummy loop: {e}") | |
self.status_label.setText(f"Loaded: {os.path.basename(file_name)}. Ready to find loops or preview full video.") | |
self.find_loops_button.setEnabled(True) | |
if not self.found_loops: # If dummy loop creation failed | |
self.save_loop_button.setEnabled(False) | |
self.preview_button.setEnabled(False) | |
self.progress_bar.setValue(0) | |
else: # No file selected by user | |
# Reset relevant UI elements if no video is loaded | |
self.original_video_width = None | |
self.original_video_height = None | |
self.compare_width_preset_combo.setEnabled(False) | |
self.compare_width_input.setValue(DEFAULT_COMPARE_WIDTH) | |
self._handle_compare_width_spinbox_change(DEFAULT_COMPARE_WIDTH) # Sync combobox | |
# Ensure GPU UI state is correct based on current processing mode, even if no video loaded | |
self._handle_processing_mode_change(self.processing_mode_combo.currentText()) | |
def start_find_loops(self): | |
""" | |
Initiates the loop finding process. This involves: | |
1. Stopping any active preview. | |
2. Validating that a video is loaded. | |
3. Checking if processing is already in progress. | |
4. Gathering parameters from UI inputs. | |
5. Performing pre-checks for GPU mode (PyTorch and CUDA availability). | |
6. Updating UI to reflect processing state (disabling/enabling buttons). | |
7. Creating and starting the `LoopFinderThread` with the gathered parameters. | |
""" | |
self.stop_preview() | |
if not self.video_path: | |
QMessageBox.warning(self, "No Video", "Please load a video file first.") | |
return | |
if self.loop_finder_thread and self.loop_finder_thread.isRunning(): | |
QMessageBox.information(self, "Busy", "Processing is already in progress.") | |
return | |
# Gather parameters from UI elements | |
similarity_thresh = self.similarity_thresh_input.value() | |
ignore_frames = self.ignore_frames_input.value() | |
min_loop_duration_sec = self.min_loop_duration_input.value() | |
max_search_window_sec = self.max_search_window_input.value() | |
compare_width = self.compare_width_input.value() | |
processing_mode = self.processing_mode_combo.currentText() | |
gpu_batch_size = self.gpu_batch_size_input.value() | |
# Pre-flight checks for GPU mode | |
if processing_mode == "GPU": | |
try: | |
import torch | |
if not torch.cuda.is_available(): | |
QMessageBox.warning(self, "GPU Mode Error", "GPU (CUDA) not available. Please install PyTorch with CUDA support or select a CPU mode.") | |
return | |
except ImportError: | |
QMessageBox.warning(self, "GPU Mode Error", "PyTorch is not installed. Please install PyTorch with CUDA support or select a CPU mode.") | |
return | |
# Update UI to processing state | |
self.find_loops_button.setEnabled(False) | |
self.cancel_button.setEnabled(True) | |
self.progress_bar.setValue(0) | |
self.results_list.clear() # Clear previous results | |
self.found_loops = [] | |
self.save_loop_button.setEnabled(False) | |
self.preview_button.setEnabled(False) | |
# Initialize and start the loop finding thread | |
self.loop_finder_thread = LoopFinderThread( | |
self.video_path, similarity_thresh, ignore_frames, | |
min_loop_duration_sec, max_search_window_sec, compare_width, | |
processing_mode, gpu_batch_size | |
) | |
# Connect signals from the thread to UI update slots | |
self.loop_finder_thread.progress_updated.connect(self.progress_bar.setValue) | |
self.loop_finder_thread.status_updated.connect(self.update_status) | |
self.loop_finder_thread.results_found.connect(self.display_results) | |
self.loop_finder_thread.error_occurred.connect(self.handle_error) | |
self.loop_finder_thread.finished.connect(self.on_processing_finished) # For UI cleanup post-processing | |
self.loop_finder_thread.start() | |
def cancel_processing(self): | |
""" | |
Stops the currently running `LoopFinderThread` if one is active. | |
Also ensures any active preview is stopped. | |
""" | |
self.stop_preview() | |
if self.loop_finder_thread and self.loop_finder_thread.isRunning(): | |
self.loop_finder_thread.stop() | |
# The thread's `finished` signal will call `on_processing_finished` for UI updates. | |
def on_processing_finished(self): | |
""" | |
Slot called when the `LoopFinderThread` finishes its execution (either normally or due to cancellation/error). | |
Resets UI elements related to processing state (e.g., enables 'Find Loops' button, disables 'Cancel'). | |
""" | |
self.find_loops_button.setEnabled(True if self.video_path else False) # Re-enable if video loaded | |
self.cancel_button.setEnabled(False) # Disable cancel button | |
def update_status(self, message): | |
""" | |
Updates the application's status label with the provided message. | |
@param message (str): The status message to display. | |
""" | |
self.status_label.setText(f"Status: {message}") | |
def display_results(self, loops): | |
""" | |
Populates the results QListWidget with the found loops. | |
Each loop is formatted for display, showing start/end times, duration, similarity, and frame numbers. | |
Enables save and preview buttons if loops are found. | |
@param loops (list): A list of dictionaries, where each dictionary represents a found loop | |
(or a dummy full-video loop). | |
""" | |
self.found_loops = loops | |
self.results_list.clear() | |
if not self.found_loops: | |
self.results_list.addItem("No loops found matching criteria.") | |
self.save_loop_button.setEnabled(False) | |
self.preview_button.setEnabled(False) | |
return | |
for loop_info in self.found_loops: | |
if loop_info.get("is_dummy"): # Special formatting for the full-video dummy loop | |
item_text = (f"FULL VIDEO: {loop_info['start_time']:.2f}s - {loop_info['end_time']:.2f}s " | |
f"(Dur: {loop_info['duration_sec']:.2f}s) " | |
f"Frames: {loop_info['start_frame']}-{loop_info['end_frame']}") | |
else: # Standard formatting for found loops | |
item_text = (f"Loop: {loop_info['start_time']:.2f}s - {loop_info['end_time']:.2f}s " | |
f"(Dur: {loop_info['duration_sec']:.2f}s, Sim: {loop_info['similarity']:.3f}) " | |
f"Frames: {loop_info['start_frame']}-{loop_info['end_frame']}") | |
self.results_list.addItem(item_text) | |
# Enable buttons now that results are available | |
self.save_loop_button.setEnabled(True) | |
self.preview_button.setEnabled(True) | |
if self.results_list.count() > 0: | |
self.results_list.setCurrentRow(0) # Auto-select the first item in the list | |
def handle_error(self, error_message): | |
""" | |
Displays an error message to the user via a QMessageBox. | |
Also updates the status label and resets UI elements related to processing. | |
@param error_message (str): The error message to display. | |
""" | |
self.stop_preview() # Ensure preview is stopped on error | |
QMessageBox.critical(self, "Error", error_message) | |
self.status_label.setText(f"Status: Error - {error_message}") | |
# Reset UI to a non-processing state | |
self.find_loops_button.setEnabled(True if self.video_path else False) | |
self.cancel_button.setEnabled(False) | |
self.progress_bar.setValue(0) | |
def save_selected_loop(self): | |
""" | |
Saves the currently selected loop from the results list to a new video file. | |
Prompts the user for a save location and filename. Uses MoviePy for video subclip creation and writing. | |
""" | |
self.stop_preview() # Stop preview before saving | |
selected_items = self.results_list.selectedItems() | |
if not selected_items: | |
QMessageBox.warning(self, "No Loop Selected", "Please select a loop from the list to save.") | |
return | |
selected_index = self.results_list.row(selected_items[0]) | |
if selected_index < 0 or selected_index >= len(self.found_loops): | |
QMessageBox.warning(self, "Selection Error", "Invalid loop selected.") | |
return | |
if not self.video_path: # Should not happen if a loop is selectable, but defensive check | |
self.handle_error("Internal Error: Video path is not set. Cannot save loop.") | |
return | |
loop_to_save = self.found_loops[selected_index] | |
# Suggest a default save file name based on original video and loop frames | |
original_dir = os.path.dirname(self.video_path) | |
original_filename, original_ext = os.path.splitext(os.path.basename(self.video_path)) | |
default_save_name = os.path.join( | |
original_dir, | |
f"{original_filename}_loop_{loop_to_save['start_frame']}_{loop_to_save['end_frame']}{original_ext}" | |
) | |
save_file_name, _ = QFileDialog.getSaveFileName(self, "Save Loop As", default_save_name, | |
"Video Files (*.mp4 *.avi *.mov *.mkv);;All Files (*)") | |
if not save_file_name: | |
return # User cancelled the save dialog | |
self.status_label.setText(f"Status: Saving loop to {os.path.basename(save_file_name)}...") | |
QApplication.processEvents() # Update UI before potentially long save operation | |
video_clip = None | |
looped_subclip_instance = None | |
try: | |
video_clip = VideoFileClip(self.video_path) | |
# MoviePy slicing for subclip: clip[start_time:end_time] | |
looped_subclip_instance = video_clip[loop_to_save['start_time']:loop_to_save['end_time']] | |
if looped_subclip_instance is None: | |
self.handle_error("Error saving loop: Failed to create video subclip.") | |
return | |
if looped_subclip_instance.duration <= 0: | |
self.handle_error(f"Error saving loop: Subclip has zero or negative duration ({looped_subclip_instance.duration}s). Cannot save.") | |
return | |
# Write the subclip to the chosen file | |
looped_subclip_instance.write_videofile(save_file_name, | |
codec="libx264", # Common video codec | |
audio_codec="aac", # Common audio codec | |
threads=multiprocessing.cpu_count(), # Use available cores | |
logger=None, # Suppress MoviePy console logs | |
preset="medium") # FFmpeg preset for quality/speed balance | |
QMessageBox.information(self, "Success", f"Loop saved to {save_file_name}") | |
self.status_label.setText(f"Status: Loop saved to {os.path.basename(save_file_name)}") | |
except Exception as e: | |
self.handle_error(f"Error saving loop: {str(e)}") | |
finally: | |
# Ensure MoviePy clips are closed to release file handles and resources | |
if looped_subclip_instance: | |
try: looped_subclip_instance.close() | |
except Exception as sub_exc: print(f"Error closing subclip: {sub_exc}") # Log to console | |
if video_clip: | |
try: video_clip.close() | |
except Exception as main_exc: print(f"Error closing main video clip: {main_exc}") # Log to console | |
def update_preview_frame(self, frame_bgr): | |
""" | |
Slot that receives a BGR frame (as a NumPy array) from the `PreviewWorker`. | |
It resizes the frame to fit the `preview_label` while maintaining aspect ratio, | |
converts it to RGB, creates a QImage, and then a QPixmap to display it. | |
@param frame_bgr (np.ndarray): The BGR video frame to be displayed. | |
""" | |
if not self.is_previewing or frame_bgr is None or frame_bgr.size == 0: | |
return # Do nothing if not previewing or frame is invalid | |
try: | |
preview_h_target = self.preview_label.height() | |
preview_w_target = self.preview_label.width() | |
# Avoid processing if label is too small or not visible | |
if preview_h_target <= 10 or preview_w_target <= 10: return | |
frame_h, frame_w = frame_bgr.shape[:2] | |
if frame_h == 0 or frame_w == 0: return # Invalid frame dimensions | |
# Calculate new dimensions to fit label while maintaining aspect ratio | |
img_aspect = frame_w / frame_h | |
lbl_aspect = preview_w_target / preview_h_target | |
if img_aspect > lbl_aspect: # Image is wider than label aspect ratio | |
new_w = preview_w_target | |
new_h = int(new_w / img_aspect) | |
else: # Image is taller or has same aspect ratio as label | |
new_h = preview_h_target | |
new_w = int(new_h * img_aspect) | |
# Fallback if calculated dimensions are invalid (should not happen with valid inputs) | |
if new_w <= 0 or new_h <= 0: | |
resized_frame = cv2.resize(frame_bgr, (max(1, preview_w_target // 2), max(1, preview_h_target // 2))) | |
else: | |
resized_frame = cv2.resize(frame_bgr, (new_w, new_h)) | |
# Convert BGR (OpenCV default) to RGB (Qt default for QImage.Format_RGB888) | |
rgb_image = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB) | |
h, w, ch = rgb_image.shape | |
bytes_per_line = ch * w | |
# Create QImage. The .copy() is important for QImage to manage its own data lifetime, | |
# especially when the underlying numpy array might go out of scope or change. | |
qt_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format.Format_RGB888).copy() | |
self.preview_label.setPixmap(QPixmap.fromImage(qt_image)) | |
except Exception as e: | |
print(f"Error updating preview frame: {e}") # Log error to console | |
# Consider emitting a signal to `handle_error` or updating status label for user visibility | |
def handle_preview_finished(self): | |
""" | |
Slot called when the `PreviewWorker` signals that it has finished | |
(either completed all loops or was stopped). | |
Resets UI elements related to the preview state and cleans up the worker. | |
""" | |
if self.is_previewing: # If it wasn't stopped by a new preview starting or manual stop_preview call | |
self.update_status("Preview finished.") | |
self.is_previewing = False # Reset previewing flag | |
# Restore UI state to non-previewing | |
self.preview_button.setEnabled(bool(self.found_loops and self.results_list.count() > 0)) | |
self.stop_preview_button.setEnabled(False) | |
self.results_list.setEnabled(True) # Re-enable interaction with results list | |
# Re-enable find loops button if a video is loaded and no processing is active | |
find_loops_enabled = bool(self.video_path) and not (self.loop_finder_thread and self.loop_finder_thread.isRunning()) | |
self.find_loops_button.setEnabled(find_loops_enabled) | |
# Re-enable save button if loops are found and an item is selected | |
save_loop_enabled = bool(self.found_loops and self.results_list.currentItem()) | |
self.save_loop_button.setEnabled(save_loop_enabled) | |
if self.preview_worker: | |
self.preview_worker.deleteLater() # Schedule the worker object for deletion | |
self.preview_worker = None | |
def start_preview_thread(self): | |
""" | |
Starts the `PreviewWorker` thread for the currently selected loop in the results list. | |
Manages UI state changes to reflect that a preview is active (e.g., disabling other controls). | |
""" | |
selected_items = self.results_list.selectedItems() | |
if not selected_items: | |
QMessageBox.information(self, "No Selection", "Please select a loop from the list to preview.") | |
return | |
if not self.video_path: # Should not be possible if items are in list, but defensive | |
self.handle_error("No Video: Cannot preview without a loaded video.") | |
return | |
selected_index = self.results_list.row(selected_items[0]) | |
if selected_index < 0 or selected_index >= len(self.found_loops): # Invalid index | |
self.handle_error("Selection Error: Invalid loop selected for preview.") | |
return | |
# Stop any existing preview worker first to prevent multiple previews | |
if self.preview_worker and self.preview_worker.isRunning(): | |
self.preview_worker.stop() | |
# The old worker's `finished` signal will trigger `handle_preview_finished` for cleanup. | |
self.stop_preview() # Call to ensure UI is fully reset before starting a new preview | |
self.is_previewing = True | |
# Update UI to "previewing" state | |
self.preview_button.setEnabled(False) | |
self.stop_preview_button.setEnabled(True) | |
self.results_list.setEnabled(False) # Disable interaction with list during preview | |
self.find_loops_button.setEnabled(False) # Disable finding new loops during preview | |
self.save_loop_button.setEnabled(False) # Disable saving during preview | |
self.preview_label.setText("Previewing...") # Placeholder text while preview loads | |
self.update_status("Starting preview...") | |
loop_to_preview = self.found_loops[selected_index] | |
loop_count_val = self.loop_count_input.value() | |
loop_forever_val = self.loop_forever_checkbox.isChecked() | |
# Create and start the new preview worker | |
self.preview_worker = PreviewWorker( | |
video_path=self.video_path, | |
start_frame=int(loop_to_preview['start_frame']), | |
end_frame=int(loop_to_preview['end_frame']), | |
loop_count=loop_count_val, | |
loop_forever=loop_forever_val | |
) | |
self.preview_worker.frame_ready.connect(self.update_preview_frame) | |
self.preview_worker.preview_finished.connect(self.handle_preview_finished) | |
self.preview_worker.preview_error.connect(self.handle_error) # Connect error signal | |
self.preview_worker.start() | |
def stop_preview(self): | |
""" | |
Stops the currently active `PreviewWorker` (if any) and resets related UI elements | |
to their non-previewing state. | |
""" | |
was_actively_previewing = self.is_previewing # Store if a preview was genuinely active | |
self.is_previewing = False # Set flag immediately | |
if self.preview_worker and self.preview_worker.isRunning(): | |
self.preview_worker.stop() | |
# The worker's `preview_finished` signal (triggered by its run() method exiting) | |
# will call `handle_preview_finished` for full cleanup and UI reset. | |
# Immediately reset some UI elements for responsiveness, | |
# `handle_preview_finished` will do a more thorough reset. | |
self.preview_label.setText("Select a loop and click 'Preview Selected Loop'") | |
self.preview_label.setPixmap(QPixmap()) # Clear any existing image | |
can_preview_again = bool(self.found_loops and self.results_list.count() > 0) | |
self.preview_button.setEnabled(can_preview_again) | |
self.stop_preview_button.setEnabled(False) | |
self.results_list.setEnabled(True) | |
find_loops_enabled = bool(self.video_path) and not (self.loop_finder_thread and self.loop_finder_thread.isRunning()) | |
self.find_loops_button.setEnabled(find_loops_enabled) | |
save_loop_enabled = bool(self.found_loops and self.results_list.currentItem()) | |
self.save_loop_button.setEnabled(save_loop_enabled) | |
if was_actively_previewing: # Only update status if it was actually previewing and now stopped | |
self.update_status("Preview stopped.") | |
# If the worker was running, its `finished` signal handles `deleteLater`. | |
# If it wasn't running but an instance existed, `handle_preview_finished` (if called) would handle it. | |
# This method primarily focuses on stopping the thread and immediate UI feedback. | |
def closeEvent(self, a0: QCloseEvent | None): # Changed parameter name from 'event' to 'a0' | |
""" | |
Handles the application's close event (e.g., user clicks the 'X' button). | |
Ensures that any active worker threads (`LoopFinderThread`, `PreviewWorker`) | |
are stopped gracefully before the application exits. | |
@param a0 (QCloseEvent, optional): The close event object provided by Qt. | |
""" | |
self.stop_preview() # Stop preview worker first | |
if self.loop_finder_thread and self.loop_finder_thread.isRunning(): | |
self.loop_finder_thread.stop() | |
if not self.loop_finder_thread.wait(1000): # Wait up to 1 second for thread to finish | |
self.update_status("Warning: Loop finder thread did not stop gracefully.") | |
if self.preview_worker and self.preview_worker.isRunning(): | |
# `stop_preview` should have already initiated stop, but this is a final check. | |
self.preview_worker.stop() | |
if not self.preview_worker.wait(500): # Wait up to 0.5 seconds | |
self.update_status("Warning: Preview worker thread did not stop gracefully.") | |
if a0: # If event is provided (standard Qt behavior) | |
a0.accept() # Accept the close event, allowing the application to exit | |
# --- Application Entry Point --- | |
if __name__ == '__main__': | |
app = QApplication(sys.argv) | |
# Required for `multiprocessing` to work correctly when the application is | |
# packaged/frozen (e.g., using PyInstaller). | |
# `hasattr(sys, '_MEIPASS')` is another common check for PyInstaller environments. | |
if getattr(sys, 'frozen', False) or hasattr(sys, '_MEIPASS'): | |
multiprocessing.freeze_support() | |
looper_app = VideoLooperApp() | |
looper_app.show() | |
sys.exit(app.exec()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment