Skip to content

Instantly share code, notes, and snippets.

@twobob
Created May 22, 2025 03:57
Show Gist options
  • Save twobob/2f6766e66555cf883ad47fb4baa97d12 to your computer and use it in GitHub Desktop.
Save twobob/2f6766e66555cf883ad47fb4baa97d12 to your computer and use it in GitHub Desktop.
Tries to make a perfect loop out of a video
import sys
import os
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
import threading
import time
import multiprocessing
import math
import queue
from moviepy import VideoFileClip
import urllib.request # For downloading files from URLs.
import urllib.error # For handling URL-related errors.
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QLabel, QLineEdit, QFileDialog, QListWidget,
QProgressBar, QDoubleSpinBox, QSpinBox, QMessageBox, QHeaderView,
QComboBox, QCheckBox)
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt6.QtGui import QIcon, QCloseEvent, QImage, QPixmap
# --- Application Constants ---
DEFAULT_SIMILARITY_THRESHOLD = 88 # Default percentage for frame similarity.
DEFAULT_IGNORE_FRAMES = 0 # Default number of initial frames to ignore during analysis.
DEFAULT_MIN_LOOP_DURATION_SEC = 3.0 # Default minimum loop duration in seconds.
DEFAULT_MAX_SEARCH_WINDOW_SEC = 60.0 # Default maximum time window (in seconds) from a start frame to search for a matching end frame.
DEFAULT_COMPARE_WIDTH = 640 # Default width (in pixels) to which frames are resized for comparison.
DEFAULT_PREVIEW_LOOP_COUNT = 3 # Default number of times a selected loop will repeat in the preview.
DEFAULT_GPU_BATCH_SIZE = 64 # Default batch size for GPU-accelerated processing.
PRESET_FRACTIONS = { # Predefined fractions for setting comparison width relative to original video width.
"Full (1/1)": 1,
"Half (1/2)": 2,
"Quarter (1/4)": 4,
"Eighth (1/8)": 8,
}
CUSTOM_PRESET_TEXT = "Custom" # Text identifier for custom comparison width selection in the UI.
# --- Multiprocessing Worker Function ---
def process_frame_chunk_worker(args_bundle):
"""
Worker function designed for use with a multiprocessing.Pool.
It processes a specific chunk of video frames to find potential loops based on structural similarity.
@param args_bundle: A tuple containing all necessary arguments for processing:
- video_path (str): Filesystem path to the video file.
- i_start (int): The starting frame index for the outer loop of comparisons within this chunk.
- i_end (int): The ending frame index (exclusive) for the outer loop of comparisons.
- total_frames_static (int): Total number of frames in the entire video.
- fps_static (float): Frames per second of the video.
- min_loop_frames_static (int): Minimum duration of a valid loop, expressed in frames.
- max_search_window_frames_static (int): Maximum number of subsequent frames to search for a match.
- compare_width_static (int): Target width for resizing frames before comparison.
- compare_height_static (int): Target height for resizing frames before comparison.
- similarity_thresh_normalized_static (float): Normalized SSIM threshold (0.0-1.0) for considering frames similar.
- progress_queue (multiprocessing.Queue, optional): A queue to report progress (number of i-frames processed).
- stop_event (multiprocessing.Event, optional): An event to signal premature termination of processing.
@return: A list of dictionaries, where each dictionary contains information about a found loop.
Returns an empty list if no loops are found or an error occurs.
"""
video_path, i_start, i_end, total_frames_static, fps_static, \
min_loop_frames_static, max_search_window_frames_static, \
compare_width_static, compare_height_static, \
similarity_thresh_normalized_static, progress_queue, stop_event = args_bundle
cap_worker = None
local_found_loops = []
try:
cap_worker = cv2.VideoCapture(video_path)
if not cap_worker.isOpened():
# Log error or handle as appropriate if video cannot be opened by worker
return []
for i in range(i_start, i_end):
if stop_event and stop_event.is_set():
break
cap_worker.set(cv2.CAP_PROP_POS_FRAMES, i)
ret_i, frame_i_full = cap_worker.read()
if not ret_i: continue
frame_i_resized = cv2.resize(frame_i_full, (compare_width_static, compare_height_static))
frame_i_gray = cv2.cvtColor(frame_i_resized, cv2.COLOR_BGR2GRAY)
search_end_limit = min(i + max_search_window_frames_static + 1, total_frames_static)
for j in range(i + min_loop_frames_static + 1, search_end_limit):
if stop_event and stop_event.is_set():
break
cap_worker.set(cv2.CAP_PROP_POS_FRAMES, j)
ret_j, frame_j_full = cap_worker.read()
if not ret_j: continue
frame_j_resized = cv2.resize(frame_j_full, (compare_width_static, compare_height_static))
frame_j_gray = cv2.cvtColor(frame_j_resized, cv2.COLOR_BGR2GRAY)
similarity = ssim(frame_i_gray, frame_j_gray, data_range=frame_i_gray.max() - frame_i_gray.min())
if similarity >= similarity_thresh_normalized_static:
loop_start_frame = i
loop_end_frame = j - 1
duration_frames = loop_end_frame - loop_start_frame + 1
duration_sec = duration_frames / fps_static
loop_info = {
"start_frame": loop_start_frame, "end_frame": loop_end_frame,
"start_time": loop_start_frame / fps_static, "end_time": (loop_end_frame + 1) / fps_static,
"duration_sec": duration_sec, "similarity": similarity
}
local_found_loops.append(loop_info)
if progress_queue:
progress_queue.put(1) # Report one 'i' iteration completed
return local_found_loops
except Exception:
# Consider logging the specific exception for debugging worker issues
return []
finally:
if cap_worker:
cap_worker.release()
# --- Loop Finding Worker Thread ---
class LoopFinderThread(QThread):
"""
A QThread subclass responsible for the core video analysis to find loops.
It can operate in different processing modes (CPU single/multi-core, GPU).
Emits signals to update the UI with progress, results, status messages, and errors.
"""
progress_updated = pyqtSignal(int) # Emits current processing progress (percentage, 0-100).
status_updated = pyqtSignal(str) # Emits textual status updates for the UI.
results_found = pyqtSignal(list) # Emits a list of dictionaries, each representing a found loop.
error_occurred = pyqtSignal(str) # Emits error messages encountered during processing.
def __init__(self, video_path, similarity_thresh, ignore_frames,
min_loop_duration_sec, max_search_window_sec, compare_width,
processing_mode, gpu_batch_size):
"""
Initializes the LoopFinderThread with all necessary parameters for loop detection.
@param video_path (str): Filesystem path to the video file.
@param similarity_thresh (float): The SSIM similarity threshold (0-100) for frames to be considered a match.
@param ignore_frames (int): The number of initial frames in the video to skip during analysis.
@param min_loop_duration_sec (float): The minimum duration (in seconds) for a valid loop.
@param max_search_window_sec (float): The maximum time window (in seconds) from a starting frame
to search for a matching end frame.
@param compare_width (int): The width (in pixels) to which frames will be resized before comparison.
Height is calculated based on aspect ratio.
@param processing_mode (str): The selected processing mode ("CPU (Single Core)", "CPU (Multi-core)", or "GPU").
@param gpu_batch_size (int): The batch size to use if GPU processing mode is selected.
"""
super().__init__()
self.video_path = video_path
self.similarity_thresh_normalized = similarity_thresh / 100.0 # Normalize to 0.0-1.0 for SSIM
self.ignore_frames = ignore_frames
self.min_loop_duration_sec = min_loop_duration_sec
self.max_search_window_sec = max_search_window_sec
self.compare_width = compare_width
self.processing_mode = processing_mode
self.gpu_batch_size = gpu_batch_size
self._is_running = True # Internal flag to control the thread's execution loop.
self._multiprocessing_stop_event = None # Event used to signal multiprocessing workers to stop.
def run(self):
"""
The main entry point for the thread's execution.
It selects and calls the appropriate processing method based on `self.processing_mode`.
"""
self._is_running = True
if self.processing_mode == "CPU (Multi-core)":
self.run_cpu_multicore()
elif self.processing_mode == "GPU":
self.run_gpu()
else: # Default to single core CPU
self.run_cpu_singlecore()
def run_cpu_singlecore(self):
"""
Performs loop detection using a single CPU core.
This method iterates through video frames, resizes them, converts to grayscale,
and compares them using SSIM to find matching pairs that form loops.
"""
self.status_updated.emit("Starting video analysis (Single Core CPU)...")
found_loops = []
cap = None
try:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
self.error_occurred.emit("Error: Could not open video file.")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
self.error_occurred.emit("Error: Could not determine video FPS.")
if cap: cap.release()
return
self.status_updated.emit(f"Video: {total_frames} frames, {fps:.2f} FPS")
min_loop_frames = int(self.min_loop_duration_sec * fps)
max_search_window_frames_for_cpu = int(self.max_search_window_sec * fps)
outer_loop_end_frame = total_frames - min_loop_frames
if self.ignore_frames >= outer_loop_end_frame:
self.status_updated.emit(f"Error: 'Ignore First N Frames' ({self.ignore_frames}) is too large or video too short for min loop duration.")
self.results_found.emit([])
if cap: cap.release()
return
compare_height = -1 # Will be calculated based on aspect ratio
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Rewind to read the first frame for dimensions
ret, first_frame_full = cap.read()
if ret:
h_full, w_full, _ = first_frame_full.shape
if w_full == 0:
self.error_occurred.emit("Error: Video width is zero.")
if cap: cap.release()
return
aspect_ratio = h_full / w_full
compare_height = int(self.compare_width * aspect_ratio)
if compare_height <= 0:
self.error_occurred.emit(f"Error: Calculated comparison height ({compare_height}) is invalid.")
if cap: cap.release()
return
else:
self.error_occurred.emit("Error: Could not read first frame.")
if cap: cap.release()
return
self.status_updated.emit(f"Comparing frames at {self.compare_width}x{compare_height} resolution.")
processed_i_frames_count = 0
outer_loop_total_iterations = outer_loop_end_frame - self.ignore_frames
if outer_loop_total_iterations <= 0: outer_loop_total_iterations = 1 # Avoid division by zero for progress
for i_frame_index in range(self.ignore_frames, outer_loop_end_frame):
if not self._is_running:
self.status_updated.emit("Processing cancelled.")
if cap: cap.release()
return
cap.set(cv2.CAP_PROP_POS_FRAMES, i_frame_index)
ret_i, frame_i_full = cap.read()
if not ret_i:
self.status_updated.emit(f"Warning: Could not read frame_i {i_frame_index}")
continue
frame_i_resized = cv2.resize(frame_i_full, (self.compare_width, compare_height))
frame_i_gray = cv2.cvtColor(frame_i_resized, cv2.COLOR_BGR2GRAY)
start_j_candidate = i_frame_index + min_loop_frames + 1
end_j_candidate_limit = min(i_frame_index + 1 + max_search_window_frames_for_cpu, total_frames)
for j_candidate_frame_index in range(start_j_candidate , end_j_candidate_limit):
if not self._is_running:
self.status_updated.emit("Processing cancelled.")
if cap: cap.release()
return
cap.set(cv2.CAP_PROP_POS_FRAMES, j_candidate_frame_index)
ret_j, frame_j_full = cap.read()
if not ret_j:
continue
frame_j_resized = cv2.resize(frame_j_full, (self.compare_width, compare_height))
frame_j_gray = cv2.cvtColor(frame_j_resized, cv2.COLOR_BGR2GRAY)
similarity = ssim(frame_i_gray, frame_j_gray, data_range=frame_i_gray.max() - frame_i_gray.min())
if similarity >= self.similarity_thresh_normalized:
loop_start_frame = i_frame_index
loop_end_frame = j_candidate_frame_index - 1
duration_frames = loop_end_frame - loop_start_frame + 1
if duration_frames < min_loop_frames: continue # Ensure minimum duration
duration_sec = duration_frames / fps
loop_info = {
"start_frame": loop_start_frame, "end_frame": loop_end_frame,
"start_time": loop_start_frame / fps, "end_time": (loop_end_frame + 1) / fps,
"duration_sec": duration_sec, "similarity": similarity
}
found_loops.append(loop_info)
processed_i_frames_count +=1
progress = int((processed_i_frames_count / outer_loop_total_iterations) * 100)
self.progress_updated.emit(progress)
self.status_updated.emit(f"Analysis complete. Found {len(found_loops)} potential loops.")
self.results_found.emit(found_loops)
except Exception as e:
self.error_occurred.emit(f"An error occurred during CPU analysis: {str(e)}")
finally:
if cap: cap.release()
self._is_running = False
def run_cpu_multicore(self):
"""
Performs loop detection using multiple CPU cores via `multiprocessing.Pool`.
Video frame processing is divided into chunks and distributed among worker processes.
"""
self.status_updated.emit("Starting video analysis (Multi-core CPU)...")
cap_main = None # Used for initial video property reading only
with multiprocessing.Manager() as manager:
progress_queue = manager.Queue() # For workers to report progress units
self._multiprocessing_stop_event = manager.Event() # To signal workers to stop
try:
cap_main = cv2.VideoCapture(self.video_path)
if not cap_main.isOpened():
self.error_occurred.emit("Error: Could not open video file for multi-core setup.")
return
total_frames = int(cap_main.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap_main.get(cv2.CAP_PROP_FPS)
if fps <= 0:
self.error_occurred.emit("Error: Could not determine video FPS for multi-core setup.")
if cap_main: cap_main.release()
return
min_loop_frames = int(self.min_loop_duration_sec * fps)
max_search_window_frames_for_cpu = int(self.max_search_window_sec * fps)
compare_height = -1
cap_main.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, first_frame_full = cap_main.read()
if ret:
h_full, w_full, _ = first_frame_full.shape
if w_full == 0:
self.error_occurred.emit("Error: Video width is zero for multi-core setup.")
if cap_main: cap_main.release()
return
aspect_ratio = h_full / w_full
compare_height = int(self.compare_width * aspect_ratio)
if compare_height <= 0:
self.error_occurred.emit(f"Error: Calculated comparison height ({compare_height}) is invalid for multi-core.")
if cap_main: cap_main.release()
return
else:
self.error_occurred.emit("Error: Could not read first frame for multi-core setup.")
if cap_main: cap_main.release()
return
if cap_main: cap_main.release() # Release after getting properties
num_processes = max(1, multiprocessing.cpu_count() -1) # Leave one core for system/UI responsiveness
outer_loop_end_frame = total_frames - min_loop_frames
i_loop_start = self.ignore_frames
if i_loop_start >= outer_loop_end_frame:
self.status_updated.emit("No frames to process after considering ignore frames and min loop duration.")
self.results_found.emit([])
return
total_i_iterations = outer_loop_end_frame - i_loop_start
chunk_size = math.ceil(total_i_iterations / num_processes)
if chunk_size == 0 and total_i_iterations > 0 : chunk_size = 1 # Ensure chunk_size is at least 1 if there's work
tasks = [] # List of argument bundles for worker processes
for k in range(num_processes):
chunk_i_start = i_loop_start + k * chunk_size
chunk_i_end = min(i_loop_start + (k + 1) * chunk_size, outer_loop_end_frame)
if chunk_i_start >= chunk_i_end: continue # Skip empty chunks
task_args = (
self.video_path, chunk_i_start, chunk_i_end, total_frames, fps,
min_loop_frames, max_search_window_frames_for_cpu,
self.compare_width, compare_height,
self.similarity_thresh_normalized, progress_queue, self._multiprocessing_stop_event
)
tasks.append(task_args)
if not tasks:
self.status_updated.emit("No processing tasks generated for multi-core.")
self.results_found.emit([])
return
all_found_loops = []
# Start a separate thread to monitor progress from the multiprocessing queue
progress_monitor_thread = threading.Thread(target=self._monitor_mp_progress,
args=(progress_queue, total_i_iterations), daemon=True)
progress_monitor_thread.start()
with multiprocessing.Pool(processes=num_processes) as pool:
results_from_pool = pool.map(process_frame_chunk_worker, tasks)
for loop_list_from_worker in results_from_pool:
all_found_loops.extend(loop_list_from_worker)
if progress_queue:
progress_queue.put(None) # Signal progress monitor to finish
progress_monitor_thread.join(timeout=2.0) # Wait for monitor thread to complete
if self._multiprocessing_stop_event and self._multiprocessing_stop_event.is_set():
self.status_updated.emit("Processing cancelled (Multi-core).")
else:
self.status_updated.emit(f"Analysis complete (Multi-core). Found {len(all_found_loops)} potential loops.")
self.results_found.emit(all_found_loops)
except Exception as e:
if self._multiprocessing_stop_event and not self._multiprocessing_stop_event.is_set():
self._multiprocessing_stop_event.set() # Ensure workers are signalled to stop on error
self.error_occurred.emit(f"Multi-core processing error: {str(e)}")
finally:
if self._multiprocessing_stop_event and not self._multiprocessing_stop_event.is_set():
self._multiprocessing_stop_event.set() # Ensure event is set on any exit path
self._is_running = False
def _monitor_mp_progress(self, progress_mp_queue: multiprocessing.Queue, total_iterations: int):
"""
Monitors a multiprocessing queue for progress updates from worker processes.
This method runs in a separate thread and emits `progress_updated` signal.
@param progress_mp_queue: The `multiprocessing.Queue` instance workers put progress items (count of processed i_frames) into.
@param total_iterations: The total number of 'i' iterations expected across all workers.
"""
processed_count = 0
while self._is_running: # Check if the main thread is still running
try:
item = progress_mp_queue.get(timeout=0.1) # Timeout allows checking _is_running periodically
if item is None: # Sentinel value from main thread to stop monitoring
break
processed_count += item
if total_iterations > 0:
progress = int((processed_count / total_iterations) * 100)
self.progress_updated.emit(progress)
if processed_count >= total_iterations: # All expected work reported
break
except queue.Empty:
# Timeout occurred, loop again to check _is_running or get next item
continue
except Exception:
# Handle other potential queue errors if necessary (e.g., log)
break
if total_iterations > 0: # Ensure final progress is emitted if loop broke early
final_progress = int((processed_count / total_iterations) * 100)
self.progress_updated.emit(final_progress)
def run_gpu(self):
"""
Performs loop detection using GPU acceleration via PyTorch and TorchMetrics.
Frames are processed in batches to leverage GPU parallelism for SSIM calculation.
Requires PyTorch with CUDA support and TorchMetrics to be installed.
"""
self.status_updated.emit("Starting video analysis (GPU Batched)...")
try:
import torch
import torchmetrics
except ImportError:
self.error_occurred.emit("PyTorch or TorchMetrics not found. Please install them for GPU mode (e.g., pip install torch torchmetrics).")
return
if not torch.cuda.is_available():
self.error_occurred.emit("CUDA not available. GPU mode cannot run.")
return
device = torch.device("cuda")
# `reduction='none'` to get SSIM score for each image pair in a batch
ssim_metric = torchmetrics.StructuralSimilarityIndexMeasure(data_range=1.0, reduction='none').to(device)
found_loops = []
cap = None
try:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
self.error_occurred.emit("Error: Could not open video file for GPU mode.")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
self.error_occurred.emit("Error: Could not determine video FPS for GPU mode.")
if cap: cap.release()
return
min_loop_frames = int(self.min_loop_duration_sec * fps)
max_j_frames_in_window = int(self.max_search_window_sec * fps)
compare_height = -1
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret_fh, first_frame_fh = cap.read()
if ret_fh:
h_full, w_full, _ = first_frame_fh.shape
if w_full == 0:
self.error_occurred.emit("Error: Video width is zero for GPU.");
if cap: cap.release()
return
aspect_ratio = h_full / w_full
compare_height = int(self.compare_width * aspect_ratio)
if compare_height <= 0:
self.error_occurred.emit(f"Error: Invalid compare height for GPU ({compare_height}).");
if cap: cap.release()
return
else:
self.error_occurred.emit("Error: Could not read first frame for GPU.");
if cap: cap.release()
return
self.status_updated.emit(f"GPU: Comparing at {self.compare_width}x{compare_height}, Batch: {self.gpu_batch_size}, Device: {torch.cuda.get_device_name(0)}")
processed_i_frames_count = 0
outer_loop_end_frame = total_frames - min_loop_frames
if self.ignore_frames >= outer_loop_end_frame:
self.status_updated.emit("Not enough frames to process with current 'ignore' and 'min loop duration' settings.")
self.results_found.emit([])
if cap: cap.release()
return
outer_loop_total_iterations = outer_loop_end_frame - self.ignore_frames
if outer_loop_total_iterations <= 0: outer_loop_total_iterations = 1
i_batch_size = self.gpu_batch_size
i_indices = list(range(self.ignore_frames, outer_loop_end_frame))
# Process i_frames in batches
for i_batch_start in range(0, len(i_indices), i_batch_size):
if not self._is_running: break
current_i_batch_indices = i_indices[i_batch_start : i_batch_start + i_batch_size]
i_frames_np_list = [] # Store numpy arrays of i_frames for the current batch
actual_i_frame_indices_in_batch = [] # Store original video frame indices for this i_batch
for i_frame_idx_val in current_i_batch_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, i_frame_idx_val)
ret_i, frame_i_full = cap.read()
if not ret_i: continue
frame_i_resized_np = cv2.resize(frame_i_full, (self.compare_width, compare_height))
frame_i_gray_np = cv2.cvtColor(frame_i_resized_np, cv2.COLOR_BGR2GRAY)
frame_i_norm_np = frame_i_gray_np.astype(np.float32) / 255.0 # Normalize for SSIM
i_frames_np_list.append(frame_i_norm_np)
actual_i_frame_indices_in_batch.append(i_frame_idx_val)
if not i_frames_np_list: # If no valid i_frames were read in this batch
processed_i_frames_count += len(current_i_batch_indices) # Still count them for progress
progress = int((processed_i_frames_count / outer_loop_total_iterations) * 100)
self.progress_updated.emit(progress)
continue
# Stack numpy arrays into a single tensor: [Batch, Height, Width] -> [Batch, Channels=1, Height, Width]
i_frames_batch_tensor = torch.from_numpy(np.stack(i_frames_np_list)).unsqueeze(1).to(device)
# For each i_frame in the current i_batch_tensor, compare with its corresponding j_frames
for batch_idx, original_i_frame_index in enumerate(actual_i_frame_indices_in_batch):
if not self._is_running: break
current_i_frame_tensor = i_frames_batch_tensor[batch_idx] # Shape: [1, H, W]
start_j_candidate = original_i_frame_index + min_loop_frames + 1
end_j_candidate_limit = min(original_i_frame_index + 1 + max_j_frames_in_window, total_frames)
num_j_frames_to_read = end_j_candidate_limit - start_j_candidate
if num_j_frames_to_read <= 0:
processed_i_frames_count += 1
continue
j_frames_np_list = []
actual_j_frame_indices_in_batch = []
cap.set(cv2.CAP_PROP_POS_FRAMES, start_j_candidate) # Set read position for this j-window
for _ in range(num_j_frames_to_read):
if not self._is_running: break
ret_j, frame_j_full = cap.read()
if not ret_j: break # Stop if cannot read more frames for this j-window
frame_j_resized_np = cv2.resize(frame_j_full, (self.compare_width, compare_height))
frame_j_gray_np = cv2.cvtColor(frame_j_resized_np, cv2.COLOR_BGR2GRAY)
frame_j_norm_np = frame_j_gray_np.astype(np.float32) / 255.0
j_frames_np_list.append(frame_j_norm_np)
actual_j_frame_indices_in_batch.append(start_j_candidate + len(j_frames_np_list) -1)
if not j_frames_np_list:
processed_i_frames_count += 1
continue
j_frames_batch_tensor = torch.from_numpy(np.stack(j_frames_np_list)).unsqueeze(1).to(device)
# Expand the single i_frame_tensor to match the batch size of j_frames for comparison
expanded_i_frame_tensor = current_i_frame_tensor.expand_as(j_frames_batch_tensor)
similarity_scores_tensor = ssim_metric(expanded_i_frame_tensor, j_frames_batch_tensor)
similarity_scores_np = similarity_scores_tensor.cpu().numpy()
similarity_scores_np = np.atleast_1d(similarity_scores_np) # Ensure iterable
for score_idx, similarity_score in enumerate(similarity_scores_np):
if score_idx >= len(actual_j_frame_indices_in_batch): break
if similarity_score >= self.similarity_thresh_normalized:
loop_end_j_index = actual_j_frame_indices_in_batch[score_idx]
loop_start_frame = original_i_frame_index
loop_end_frame = loop_end_j_index - 1
duration_frames = loop_end_frame - loop_start_frame + 1
if duration_frames < min_loop_frames: continue
duration_sec = duration_frames / fps
loop_info = {
"start_frame": loop_start_frame, "end_frame": loop_end_frame,
"start_time": loop_start_frame / fps, "end_time": (loop_end_frame + 1) / fps,
"duration_sec": duration_sec, "similarity": float(similarity_score)
}
found_loops.append(loop_info)
processed_i_frames_count += 1 # Increment after processing one original i_frame and its j-window
# Update progress after each i_batch is processed
progress = int((processed_i_frames_count / outer_loop_total_iterations) * 100)
self.progress_updated.emit(progress)
if not self._is_running:
self.status_updated.emit("Processing cancelled (GPU).")
else:
self.status_updated.emit(f"Analysis complete (GPU). Found {len(found_loops)} potential loops.")
self.results_found.emit(found_loops)
except Exception as e:
self.error_occurred.emit(f"GPU processing error: {str(e)}")
finally:
if cap: cap.release()
self._is_running = False
def stop(self):
"""
Requests the thread to stop its current processing task.
Sets the internal `_is_running` flag to False and signals the
multiprocessing stop event if multi-core CPU mode was active.
"""
self._is_running = False
if self.processing_mode == "CPU (Multi-core)" and self._multiprocessing_stop_event:
self._multiprocessing_stop_event.set()
self.status_updated.emit("Attempting to stop processing...")
# --- Video Preview Worker Thread ---
class PreviewWorker(QThread):
"""
A QThread subclass dedicated to playing a selected video loop segment.
It reads frames from the video file at the appropriate rate and emits them
for display in the UI. Supports looping a specific number of times or indefinitely.
"""
frame_ready = pyqtSignal(np.ndarray) # Emits a BGR numpy array (frame) when it's ready for display.
preview_finished = pyqtSignal() # Emitted when the preview completes all its loops or is stopped.
preview_error = pyqtSignal(str) # Emitted if an error occurs during preview (e.g., cannot read frame).
def __init__(self, video_path, start_frame, end_frame, loop_count, loop_forever, parent=None):
"""
Initializes the PreviewWorker.
@param video_path (str): Filesystem path to the video file.
@param start_frame (int): The starting frame index of the loop segment to be previewed.
@param end_frame (int): The ending frame index of the loop segment.
@param loop_count (int): The number of times to repeat the loop (ignored if `loop_forever` is True).
@param loop_forever (bool): If True, the loop segment will repeat indefinitely until stopped.
@param parent (QObject, optional): The parent object for this QThread.
"""
super().__init__(parent)
self.video_path = video_path
self.start_frame = start_frame
self.end_frame = end_frame
self.loop_count = loop_count
self.loop_forever = loop_forever
self._is_running = True # Internal flag to control the thread's execution loop.
def run(self):
"""
The main execution method for the preview thread.
It reads frames for the specified loop segment, emits them via `frame_ready`,
and manages loop repetitions and frame timing based on video FPS.
"""
cap = None
ret = True # Flag to track if frame reading was successful within the loop
try:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
self.preview_error.emit("Preview Error: Could not open video file.")
return
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0: fps = 30.0 # Default FPS if detection fails or is invalid
frame_delay = 1.0 / fps # Time to wait between frames to approximate original speed
current_loop_iteration = 0
while self._is_running:
if not self.loop_forever and current_loop_iteration >= self.loop_count:
break # Reached desired loop count
for frame_idx in range(self.start_frame, self.end_frame + 1):
if not self._is_running: break # Stop requested by user/system
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
self.preview_error.emit(f"Preview Error: Could not read frame {frame_idx}")
break # Error reading frame, exit inner loop
if frame is not None and frame.size > 0:
self.frame_ready.emit(frame)
time.sleep(frame_delay) # Pause to maintain video playback speed
if not self._is_running or not ret: break # Exit outer loop if stopped or error in inner loop
current_loop_iteration +=1
except Exception as e:
self.preview_error.emit(f"Preview runtime error: {str(e)}")
finally:
if cap:
cap.release()
self.preview_finished.emit() # Always emit finished signal, regardless of how it exited
def stop(self):
"""Requests the preview worker thread to stop its execution."""
self._is_running = False
# --- Main Application Window ---
class VideoLooperApp(QMainWindow):
"""
The main application window for the Video Looper.
This class encapsulates the UI, event handling, and coordination of
video processing and preview tasks.
"""
def __init__(self):
"""
Initializes the main application window, sets up the UI,
and initializes member variables.
"""
super().__init__()
self.setWindowTitle("Video Looper Pro")
self.setGeometry(100, 100, 800, 830)
# --- Application State Variables ---
self.video_path = None # Path to the currently loaded video file.
self.original_video_width = None # Width of the original loaded video.
self.original_video_height = None # Height of the original loaded video.
self.found_loops = [] # List to store dictionaries of found/dummy loops.
self.loop_finder_thread = None # Reference to the active LoopFinderThread instance.
self.preview_worker = None # Reference to the active PreviewWorker instance.
self.is_previewing = False # Boolean flag indicating if a preview is currently active.
# --- UI Construction ---
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
self._setup_file_layout(main_layout)
self._setup_mode_gpu_layout(main_layout)
self._setup_parameters_layout(main_layout)
self._setup_action_layout(main_layout)
self._setup_results_list(main_layout)
self._setup_preview_area(main_layout)
self._setup_bottom_controls(main_layout)
self._load_default_video_on_startup() # Attempt to load/download a demo video
# Set initial UI state for GPU controls based on default processing mode
self._handle_processing_mode_change(self.processing_mode_combo.currentText())
def _setup_file_layout(self, parent_layout):
"""
Sets up the UI elements for video file loading (label, path display, load button).
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
file_layout = QHBoxLayout()
self.video_path_label = QLabel("No video selected.")
self.video_path_label.setWordWrap(True)
load_button = QPushButton("Load Video")
load_button.clicked.connect(self.load_video)
file_layout.addWidget(QLabel("Video File:"))
file_layout.addWidget(self.video_path_label, 1) # Label takes available horizontal space
file_layout.addWidget(load_button)
parent_layout.addLayout(file_layout)
def _setup_mode_gpu_layout(self, parent_layout):
"""
Sets up UI elements for selecting processing mode and GPU batch size.
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
mode_gpu_layout = QHBoxLayout()
mode_gpu_layout.addWidget(QLabel("Processing Mode:"))
self.processing_mode_combo = QComboBox()
# GPU mode listed first as it's often preferred if available
self.processing_mode_combo.addItems(["GPU", "CPU (Single Core)", "CPU (Multi-core)"])
self.processing_mode_combo.currentTextChanged.connect(self._handle_processing_mode_change)
mode_gpu_layout.addWidget(self.processing_mode_combo)
self.gpu_batch_size_label = QLabel("GPU Batch Size:")
self.gpu_batch_size_input = QSpinBox()
self.gpu_batch_size_input.setRange(16, 2048) # Min/Max allowable batch size
self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE)
self.gpu_batch_size_input.setToolTip("Batch size for GPU processing. Larger values may be faster but consume more VRAM.")
mode_gpu_layout.addWidget(self.gpu_batch_size_label)
mode_gpu_layout.addWidget(self.gpu_batch_size_input)
parent_layout.addLayout(mode_gpu_layout)
# GPU-specific controls are initially hidden, shown when GPU mode is selected.
self.gpu_batch_size_label.setVisible(False)
self.gpu_batch_size_input.setVisible(False)
def _setup_parameters_layout(self, parent_layout):
"""
Sets up the UI elements for various loop finding parameters (similarity, durations, compare width).
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
params_layout = QHBoxLayout()
# Parameter Group 1: Similarity Threshold, Ignore First N Frames
params_group1_layout = QVBoxLayout()
self.similarity_thresh_input = QDoubleSpinBox()
self.similarity_thresh_input.setRange(0.0, 100.0)
self.similarity_thresh_input.setValue(DEFAULT_SIMILARITY_THRESHOLD)
self.similarity_thresh_input.setSuffix(" %")
sim_layout = QHBoxLayout()
sim_layout.addWidget(QLabel("Similarity Threshold:"))
sim_layout.addWidget(self.similarity_thresh_input)
params_group1_layout.addLayout(sim_layout)
self.ignore_frames_input = QSpinBox()
self.ignore_frames_input.setRange(0, 1000000) # Allow ignoring a large number of frames
self.ignore_frames_input.setValue(DEFAULT_IGNORE_FRAMES)
ignore_layout = QHBoxLayout()
ignore_layout.addWidget(QLabel("Ignore First N Frames:"))
ignore_layout.addWidget(self.ignore_frames_input)
params_group1_layout.addLayout(ignore_layout)
params_layout.addLayout(params_group1_layout)
# Parameter Group 2: Minimum Loop Duration, Maximum Search Window
params_group2_layout = QVBoxLayout()
self.min_loop_duration_input = QDoubleSpinBox()
self.min_loop_duration_input.setRange(0.1, 600.0) # Min/Max loop duration in seconds
self.min_loop_duration_input.setValue(DEFAULT_MIN_LOOP_DURATION_SEC)
self.min_loop_duration_input.setSuffix(" s")
min_dur_layout = QHBoxLayout()
min_dur_layout.addWidget(QLabel("Min Loop Duration:"))
min_dur_layout.addWidget(self.min_loop_duration_input)
params_group2_layout.addLayout(min_dur_layout)
self.max_search_window_input = QDoubleSpinBox()
self.max_search_window_input.setRange(0.1, 1800.0) # Min/Max search window in seconds
self.max_search_window_input.setValue(DEFAULT_MAX_SEARCH_WINDOW_SEC)
self.max_search_window_input.setSuffix(" s")
max_win_layout = QHBoxLayout()
max_win_layout.addWidget(QLabel("Max Search Window:"))
max_win_layout.addWidget(self.max_search_window_input)
params_group2_layout.addLayout(max_win_layout)
params_layout.addLayout(params_group2_layout)
# Parameter Group 3: Comparison Width (Presets and Manual Input)
params_group3_layout = QVBoxLayout()
comp_width_label_layout = QHBoxLayout()
comp_width_label_layout.addWidget(QLabel("Comparison Width:"))
self.compare_width_preset_combo = QComboBox()
self.compare_width_preset_combo.addItems(list(PRESET_FRACTIONS.keys()) + [CUSTOM_PRESET_TEXT])
self.compare_width_preset_combo.setEnabled(False) # Enabled only when a video is loaded
self.compare_width_preset_combo.currentTextChanged.connect(self._handle_compare_width_preset_change)
comp_width_label_layout.addWidget(self.compare_width_preset_combo)
params_group3_layout.addLayout(comp_width_label_layout)
self.compare_width_input = QSpinBox()
self.compare_width_input.setRange(160, 3840) # Min/Max comparison width in pixels
self.compare_width_input.setStepType(QSpinBox.StepType.AdaptiveDecimalStepType)
self.compare_width_input.setValue(DEFAULT_COMPARE_WIDTH)
self.compare_width_input.setSuffix(" px")
self.compare_width_input.valueChanged.connect(self._handle_compare_width_spinbox_change)
self.compare_width_input.valueChanged.connect(self._update_gpu_batch_size_suggestion_on_compare_width_change)
params_group3_layout.addWidget(self.compare_width_input)
params_layout.addLayout(params_group3_layout)
parent_layout.addLayout(params_layout)
def _setup_action_layout(self, parent_layout):
"""
Sets up the main action buttons: 'Find Loops' and 'Cancel Processing'.
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
action_layout = QHBoxLayout()
self.find_loops_button = QPushButton("Find Loops")
self.find_loops_button.clicked.connect(self.start_find_loops)
self.find_loops_button.setEnabled(False) # Enabled when video is loaded
self.cancel_button = QPushButton("Cancel Processing")
self.cancel_button.clicked.connect(self.cancel_processing)
self.cancel_button.setEnabled(False) # Enabled when processing is active
action_layout.addWidget(self.find_loops_button)
action_layout.addWidget(self.cancel_button)
parent_layout.addLayout(action_layout)
def _setup_results_list(self, parent_layout):
"""
Sets up the QListWidget for displaying found loop results.
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
parent_layout.addWidget(QLabel("Found Loops (select one to save or preview):"))
self.results_list = QListWidget()
self.results_list.itemDoubleClicked.connect(self.save_selected_loop) # Double-click on item to save
parent_layout.addWidget(self.results_list)
def _setup_preview_area(self, parent_layout):
"""
Sets up the UI elements for video preview, including the display label and control buttons.
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
preview_section_layout = QVBoxLayout()
preview_section_layout.addWidget(QLabel("Loop Preview:"))
self.preview_label = QLabel("Select a loop and click 'Preview Selected Loop'")
self.preview_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.preview_label.setMinimumHeight(240) # Ensure a decent size for preview
self.preview_label.setMinimumWidth(320)
self.preview_label.setStyleSheet("color: gray; border: 1px solid lightgray; background-color: black;")
preview_section_layout.addWidget(self.preview_label)
preview_controls_layout = QHBoxLayout()
self.preview_button = QPushButton("Preview Selected Loop")
self.preview_button.clicked.connect(self.start_preview_thread)
self.preview_button.setEnabled(False) # Enabled when a loop is selected
preview_controls_layout.addWidget(self.preview_button)
self.stop_preview_button = QPushButton("Stop Preview")
self.stop_preview_button.clicked.connect(self.stop_preview)
self.stop_preview_button.setEnabled(False) # Enabled when preview is active
preview_controls_layout.addWidget(self.stop_preview_button)
preview_section_layout.addLayout(preview_controls_layout)
# Controls for preview loop count
loop_count_layout = QHBoxLayout()
loop_count_layout.addWidget(QLabel("Loop Count:"))
self.loop_count_input = QSpinBox()
self.loop_count_input.setRange(1, 100)
self.loop_count_input.setValue(DEFAULT_PREVIEW_LOOP_COUNT)
loop_count_layout.addWidget(self.loop_count_input)
self.loop_forever_checkbox = QCheckBox("Loop Forever")
self.loop_forever_checkbox.stateChanged.connect(self._toggle_loop_count_input_enabled)
loop_count_layout.addWidget(self.loop_forever_checkbox)
loop_count_layout.addStretch() # Push controls to the left
preview_section_layout.addLayout(loop_count_layout)
parent_layout.addLayout(preview_section_layout)
def _setup_bottom_controls(self, parent_layout):
"""
Sets up bottom UI elements: save button, progress bar, status label, and warning label.
@param parent_layout (QVBoxLayout): The main layout to add this section to.
"""
self.save_loop_button = QPushButton("Save Selected Loop")
self.save_loop_button.clicked.connect(self.save_selected_loop)
self.save_loop_button.setEnabled(False) # Enabled when a loop is selected
parent_layout.addWidget(self.save_loop_button)
self.progress_bar = QProgressBar()
parent_layout.addWidget(self.progress_bar)
self.status_label = QLabel("Status: Ready. Load a video to begin.")
self.status_label.setWordWrap(True)
parent_layout.addWidget(self.status_label)
warning_label = QLabel(
"<b>Warning:</b> Finding loops in long videos can be resource-intensive. "
"Adjust parameters for optimal performance."
)
warning_label.setStyleSheet("color: orange; font-style: italic;")
warning_label.setWordWrap(True)
parent_layout.addWidget(warning_label)
def _load_default_video_on_startup(self):
"""
Attempts to load a predefined default video ('demo.mp4') upon application startup.
If the video file is not found in the script's directory, it attempts to download it
from a predefined URL. Updates the status label with progress/outcome.
"""
default_video_name = "demo.mp4"
demo_video_url = "https://xn--1xap.com/demo.mp4" # URL for the demo video
script_dir = os.path.dirname(os.path.abspath(__file__))
default_video_path = os.path.join(script_dir, default_video_name)
if not os.path.isfile(default_video_path):
self.status_label.setText(f"Status: Default video '{default_video_name}' not found. Attempting to download...")
QApplication.processEvents() # Ensure UI updates before download starts
try:
self.status_label.setText(f"Status: Downloading '{default_video_name}' from {demo_video_url}...")
QApplication.processEvents()
os.makedirs(script_dir, exist_ok=True) # Ensure directory exists
# Perform download with progress indication in status bar
with urllib.request.urlopen(demo_video_url) as response, open(default_video_path, 'wb') as out_file:
total_length_header = response.info().get('Content-Length')
if total_length_header:
total_length = int(total_length_header)
dl_bytes = 0
block_size_bytes = 8192 # 8KB blocks
while True:
buffer = response.read(block_size_bytes)
if not buffer: break
dl_bytes += len(buffer)
out_file.write(buffer)
# Simple text-based progress bar for status label
progress_percent_done = int(50 * dl_bytes / total_length)
self.status_label.setText(f"Status: Downloading... [{'=' * progress_percent_done}{' ' * (50-progress_percent_done)}] {dl_bytes // 1024} KB / {total_length // 1024} KB")
QApplication.processEvents()
else: # No Content-Length header, download without progress percentage
out_file.write(response.read())
self.status_label.setText(f"Status: Download complete. Loading '{default_video_name}'...")
QApplication.processEvents()
if not os.path.isfile(default_video_path): # Verify download success
self.status_label.setText(f"Status: Download failed to create file. Please load a video manually.")
return
except urllib.error.URLError as e:
self.status_label.setText(f"Status: Failed to download '{default_video_name}': Network error ({e.reason}). Please load a video manually.")
return
except IOError as e: # File system error during save
self.status_label.setText(f"Status: Failed to save '{default_video_name}': File error ({e.strerror}). Please load a video manually.")
return
except Exception as e: # Catch-all for other download/save errors
self.status_label.setText(f"Status: Error downloading '{default_video_name}': {str(e)}. Please load a video manually.")
# Attempt to clean up partially downloaded file if it exists and is small (e.g., < 1KB)
if os.path.exists(default_video_path) and os.path.getsize(default_video_path) < 1024:
try: os.remove(default_video_path)
except OSError: pass # Ignore if removal fails
return
if os.path.isfile(default_video_path):
self.load_video_at_startup(default_video_path) # Proceed to load the (now existing) video
else:
# This state implies download failed and was reported, or file was removed post-check.
self.status_label.setText(f"Status: Default video '{default_video_name}' still not found. Please load a video manually.")
# --- UI Event Handlers and Core Logic Methods ---
def _handle_processing_mode_change(self, mode_text):
"""
Handles changes in the processing mode selection (ComboBox).
Shows/hides GPU-specific controls (batch size) and triggers an update
for the GPU batch size suggestion if GPU mode is selected.
@param mode_text (str): The text of the currently selected processing mode.
"""
is_gpu_mode = (mode_text == "GPU")
self.gpu_batch_size_label.setVisible(is_gpu_mode)
self.gpu_batch_size_input.setVisible(is_gpu_mode)
if is_gpu_mode:
self._update_gpu_batch_size_suggestion()
def _update_gpu_batch_size_suggestion_on_compare_width_change(self):
"""
Callback for when the comparison width changes.
If GPU mode is active, it triggers an update to the GPU batch size suggestion.
"""
if self.processing_mode_combo.currentText() == "GPU":
self._update_gpu_batch_size_suggestion()
def _update_gpu_batch_size_suggestion(self):
"""
Suggests an appropriate GPU batch size based on available VRAM (if detectable via PyTorch)
and the current comparison width. This is a heuristic and may require manual adjustment by the user.
Updates the GPU batch size input field and its tooltip.
"""
if not (self.original_video_width and self.original_video_height):
# Video dimensions are needed for context, but suggestion can proceed with defaults if not available.
# Tooltip will indicate if suggestion is based on defaults or actual VRAM.
pass
try:
import torch
if not torch.cuda.is_available():
self.gpu_batch_size_input.setToolTip("CUDA not available. Set batch size manually.")
# Potentially set a conservative default if CUDA isn't found but GPU mode is selected.
# self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE)
return
device_props = torch.cuda.get_device_properties(0)
total_vram_gb = device_props.total_memory / (1024**3) # VRAM in GB
# Heuristic: Base batch size on VRAM, then scale by comparison width.
# These are example tiers and multipliers, adjust based on typical frame sizes and VRAM usage.
if total_vram_gb > 10: # e.g., >10GB VRAM
suggested_batch = 128 * 16 # Base factor, can be adjusted
elif total_vram_gb > 6: # e.g., 6-10GB VRAM
suggested_batch = 96 * 16
elif total_vram_gb > 3.5: # e.g., 3.5-6GB VRAM
suggested_batch = 64 * 16
elif total_vram_gb > 1.5: # e.g., 1.5-3.5GB VRAM
suggested_batch = 32 * 16
else: # e.g., <= 1.5GB VRAM
suggested_batch = 16 * 16
current_compare_width = self.compare_width_input.value()
# Scale down suggested batch for higher comparison resolutions
if current_compare_width > 1280: # High comparison resolution (e.g., > 720p width)
suggested_batch = max(16, suggested_batch // 4)
elif current_compare_width > 720: # Medium-high comparison resolution (e.g., > 480p width)
suggested_batch = max(16, suggested_batch // 2)
# Clamp suggested batch to the spinbox's defined min/max range
suggested_batch = max(self.gpu_batch_size_input.minimum(),
min(self.gpu_batch_size_input.maximum(), suggested_batch))
self.gpu_batch_size_input.setValue(suggested_batch)
self.gpu_batch_size_input.setToolTip(f"Suggested: {suggested_batch} (based on ~{total_vram_gb:.1f}GB VRAM and compare width {current_compare_width}px). Adjust as needed.")
except ImportError:
self.gpu_batch_size_input.setToolTip("PyTorch not found. Cannot estimate VRAM. Set batch size manually.")
# self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE) # Fallback to default
except Exception: # Catch other errors during VRAM detection (e.g., CUDA issues not caught by is_available)
self.gpu_batch_size_input.setToolTip("Could not estimate VRAM or suggest batch size. Set manually.")
# self.gpu_batch_size_input.setValue(DEFAULT_GPU_BATCH_SIZE) # Fallback to default
def _toggle_loop_count_input_enabled(self, state):
"""
Enables or disables the loop count QSpinBox based on the state of the 'Loop Forever' QCheckBox.
@param state (int): The Qt.CheckState of the 'Loop Forever' checkbox.
"""
if state == Qt.CheckState.Checked.value:
self.loop_count_input.setEnabled(False) # Disabled if looping forever
else:
self.loop_count_input.setEnabled(True) # Enabled if not looping forever
def _handle_compare_width_preset_change(self, preset_text):
"""
Handles changes in the comparison width preset ComboBox.
Updates the comparison width QSpinBox value based on the selected preset
and the original video width.
@param preset_text (str): The text of the selected preset.
"""
if not self.original_video_width or preset_text == CUSTOM_PRESET_TEXT:
# Do nothing if original video width is unknown or "Custom" is selected
return
if preset_text in PRESET_FRACTIONS:
divisor = PRESET_FRACTIONS[preset_text]
# Calculate target width, ensuring it's at least the minimum allowed by the spinbox (e.g., 160)
target_width = max(self.compare_width_input.minimum(), self.original_video_width // divisor)
# Block signals to prevent recursive updates if spinbox change also updates combobox
self.compare_width_input.blockSignals(True)
self.compare_width_input.setValue(target_width)
self.compare_width_input.blockSignals(False)
# Manually trigger GPU suggestion update as spinbox valueChanged was blocked
self._update_gpu_batch_size_suggestion_on_compare_width_change()
def _handle_compare_width_spinbox_change(self, new_width):
"""
Handles changes in the comparison width QSpinBox.
Updates the preset ComboBox to reflect the current width, selecting "Custom"
if the width doesn't match a predefined preset.
@param new_width (int): The new width value from the QSpinBox.
"""
if not self.original_video_width:
# If no video loaded, ensure preset is "Custom" if not already
if self.compare_width_preset_combo.currentText() != CUSTOM_PRESET_TEXT:
self.compare_width_preset_combo.blockSignals(True)
self.compare_width_preset_combo.setCurrentText(CUSTOM_PRESET_TEXT)
self.compare_width_preset_combo.blockSignals(False)
return
matching_preset = CUSTOM_PRESET_TEXT # Default to custom
for preset_name, divisor in PRESET_FRACTIONS.items():
# Check if the new_width corresponds to this preset's fraction of original width
# Allow for slight rounding differences by checking if it's the clamped minimum or exact division
if self.original_video_width // divisor == new_width:
if self.original_video_width % divisor == 0 or \
new_width == max(self.compare_width_input.minimum(), self.original_video_width // divisor):
matching_preset = preset_name
break
# Update combobox only if its current text doesn't match the determined preset
if self.compare_width_preset_combo.currentText() != matching_preset:
self.compare_width_preset_combo.blockSignals(True)
self.compare_width_preset_combo.setCurrentText(matching_preset)
self.compare_width_preset_combo.blockSignals(False)
# Note: GPU suggestion update is connected directly to valueChanged of compare_width_input
def _set_initial_compare_width_and_preset(self, video_width, video_height):
"""
Sets the initial suggested comparison width and corresponding preset in the UI
when a new video is loaded. Aims for a sensible default (e.g., half width or DEFAULT_COMPARE_WIDTH).
@param video_width (int): The width of the loaded video in pixels.
@param video_height (int): The height of the loaded video in pixels.
"""
self.original_video_width = video_width
self.original_video_height = video_height
self.compare_width_preset_combo.setEnabled(True) # Enable preset selection
# Determine a sensible initial suggested width.
# Try to match DEFAULT_COMPARE_WIDTH via a preset, or use a common fraction like 1/2 or 1/4.
initial_suggested_width = DEFAULT_COMPARE_WIDTH
closest_preset_text = CUSTOM_PRESET_TEXT
min_difference_to_default = float('inf')
# Attempt to find a preset that results in DEFAULT_COMPARE_WIDTH or is close to it.
for preset_text_key, div_val in PRESET_FRACTIONS.items():
preset_calculated_width = max(self.compare_width_input.minimum(), video_width // div_val)
if preset_calculated_width == DEFAULT_COMPARE_WIDTH: # Prioritize exact match for default
closest_preset_text = preset_text_key
initial_suggested_width = preset_calculated_width
break
current_difference = abs(preset_calculated_width - DEFAULT_COMPARE_WIDTH)
if current_difference < min_difference_to_default :
min_difference_to_default = current_difference
closest_preset_text = preset_text_key
initial_suggested_width = preset_calculated_width
elif current_difference == min_difference_to_default and div_val < PRESET_FRACTIONS.get(closest_preset_text, float('inf')):
# Prefer larger fractions (smaller divisor) if difference is the same
closest_preset_text = preset_text_key
initial_suggested_width = preset_calculated_width
# Fallback logic if no preset is reasonably close to DEFAULT_COMPARE_WIDTH
# Heuristic threshold: if closest preset is still far from default, try common fractions.
if closest_preset_text == CUSTOM_PRESET_TEXT or abs(initial_suggested_width - DEFAULT_COMPARE_WIDTH) > 100 :
if video_width >= 1280 and video_width // 2 >= self.compare_width_input.minimum(): # Prefer 1/2 for HD+ videos
initial_suggested_width = max(self.compare_width_input.minimum(), video_width // 2)
closest_preset_text = "Half (1/2)" if "Half (1/2)" in PRESET_FRACTIONS else CUSTOM_PRESET_TEXT
elif video_width // 4 >= self.compare_width_input.minimum() : # Default to 1/4 for others if valid
initial_suggested_width = max(self.compare_width_input.minimum(), video_width // 4)
closest_preset_text = "Quarter (1/4)" if "Quarter (1/4)" in PRESET_FRACTIONS else CUSTOM_PRESET_TEXT
else: # Fallback to full width if video is very small or fractions are too small
initial_suggested_width = video_width
closest_preset_text = "Full (1/1)" if "Full (1/1)" in PRESET_FRACTIONS else CUSTOM_PRESET_TEXT
# Ensure the suggested width is within the spinbox's valid range
initial_suggested_width = max(self.compare_width_input.minimum(),
min(self.compare_width_input.maximum(), initial_suggested_width))
# Update spinbox and combobox, blocking signals to prevent unwanted immediate re-triggering
self.compare_width_input.blockSignals(True)
self.compare_width_input.setValue(initial_suggested_width)
self.compare_width_input.blockSignals(False)
self.compare_width_preset_combo.blockSignals(True)
# Call the handler directly to sync combobox to the new spinbox value
self._handle_compare_width_spinbox_change(initial_suggested_width)
self.compare_width_preset_combo.blockSignals(False)
# Trigger GPU batch size suggestion based on this initial compare width
self._update_gpu_batch_size_suggestion_on_compare_width_change()
def _create_dummy_full_video_loop(self, video_path_for_dummy):
"""
Creates a 'dummy' loop entry that represents the entire duration of the video.
This allows users to preview or save the full video via the loop interface
before or without finding specific shorter loops.
@param video_path_for_dummy (str): The filesystem path to the video.
@return: A dictionary formatted like a found loop, representing the full video,
or None if video properties cannot be read.
"""
cap_temp = None
try:
cap_temp = cv2.VideoCapture(video_path_for_dummy)
if not cap_temp.isOpened(): return None
total_frames = int(cap_temp.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap_temp.get(cv2.CAP_PROP_FPS)
if fps <= 0 or total_frames <= 0: return None # Invalid video properties
return {
"start_frame": 0,
"end_frame": total_frames - 1,
"start_time": 0.0,
"end_time": total_frames / fps,
"duration_sec": total_frames / fps,
"similarity": 1.0, # Represents a perfect match for itself
"is_dummy": True # Special flag to identify this as a full-video entry
}
except Exception:
# Log error if necessary
return None
finally:
if cap_temp: cap_temp.release()
def load_video_at_startup(self, file_name):
"""
Loads a video file specified by `file_name` when the application starts.
This is typically used for loading a default video. It updates the UI
to reflect the loaded video and prepares for processing.
@param file_name (str): The filesystem path to the video file to load.
"""
self.stop_preview() # Ensure any ongoing preview is stopped
self.video_path = file_name
self.video_path_label.setText(os.path.basename(file_name))
self.results_list.clear()
self.found_loops = []
self.original_video_width = None
self.original_video_height = None
self.compare_width_preset_combo.setEnabled(False) # Keep disabled until dimensions are read
try:
cap_temp = cv2.VideoCapture(file_name)
if cap_temp.isOpened():
video_width = int(cap_temp.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap_temp.get(cv2.CAP_PROP_FRAME_HEIGHT))
if video_width > 0 and video_height > 0:
self._set_initial_compare_width_and_preset(video_width, video_height)
dummy_loop = self._create_dummy_full_video_loop(file_name)
if dummy_loop:
self.found_loops.append(dummy_loop)
self.display_results(self.found_loops) # This will enable preview/save buttons
cap_temp.release()
except Exception as e:
self.update_status(f"Warning: Could not read video dimensions or create dummy loop: {e}")
self.status_label.setText(f"Loaded: {os.path.basename(file_name)}. Ready to find loops or preview full video.")
self.find_loops_button.setEnabled(True)
if not self.found_loops: # If dummy loop creation failed, ensure buttons are disabled
self.save_loop_button.setEnabled(False)
self.preview_button.setEnabled(False)
self.progress_bar.setValue(0)
# Ensure GPU UI state is correct based on current processing mode
self._handle_processing_mode_change(self.processing_mode_combo.currentText())
def load_video(self):
"""
Opens a file dialog for the user to select a video file.
If a file is selected, it's loaded, UI is updated, and a dummy full-video loop
is created and added to the results list.
"""
self.stop_preview() # Stop any active preview
file_name, _ = QFileDialog.getOpenFileName(self, "Open Video File", "",
"Video Files (*.mp4 *.avi *.mov *.mkv);;All Files (*)")
if file_name:
self.video_path = file_name
self.video_path_label.setText(os.path.basename(file_name))
self.results_list.clear()
self.found_loops = []
self.original_video_width = None
self.original_video_height = None
self.compare_width_preset_combo.setEnabled(False) # Keep disabled until dimensions are read
try:
cap_temp = cv2.VideoCapture(file_name)
if cap_temp.isOpened():
video_width = int(cap_temp.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap_temp.get(cv2.CAP_PROP_FRAME_HEIGHT))
if video_width > 0 and video_height > 0:
self._set_initial_compare_width_and_preset(video_width, video_height)
dummy_loop = self._create_dummy_full_video_loop(file_name)
if dummy_loop:
self.found_loops.append(dummy_loop)
self.display_results(self.found_loops) # Enables preview/save buttons
cap_temp.release()
except Exception as e:
self.update_status(f"Warning: Could not read video dimensions or create dummy loop: {e}")
self.status_label.setText(f"Loaded: {os.path.basename(file_name)}. Ready to find loops or preview full video.")
self.find_loops_button.setEnabled(True)
if not self.found_loops: # If dummy loop creation failed
self.save_loop_button.setEnabled(False)
self.preview_button.setEnabled(False)
self.progress_bar.setValue(0)
else: # No file selected by user
# Reset relevant UI elements if no video is loaded
self.original_video_width = None
self.original_video_height = None
self.compare_width_preset_combo.setEnabled(False)
self.compare_width_input.setValue(DEFAULT_COMPARE_WIDTH)
self._handle_compare_width_spinbox_change(DEFAULT_COMPARE_WIDTH) # Sync combobox
# Ensure GPU UI state is correct based on current processing mode, even if no video loaded
self._handle_processing_mode_change(self.processing_mode_combo.currentText())
def start_find_loops(self):
"""
Initiates the loop finding process. This involves:
1. Stopping any active preview.
2. Validating that a video is loaded.
3. Checking if processing is already in progress.
4. Gathering parameters from UI inputs.
5. Performing pre-checks for GPU mode (PyTorch and CUDA availability).
6. Updating UI to reflect processing state (disabling/enabling buttons).
7. Creating and starting the `LoopFinderThread` with the gathered parameters.
"""
self.stop_preview()
if not self.video_path:
QMessageBox.warning(self, "No Video", "Please load a video file first.")
return
if self.loop_finder_thread and self.loop_finder_thread.isRunning():
QMessageBox.information(self, "Busy", "Processing is already in progress.")
return
# Gather parameters from UI elements
similarity_thresh = self.similarity_thresh_input.value()
ignore_frames = self.ignore_frames_input.value()
min_loop_duration_sec = self.min_loop_duration_input.value()
max_search_window_sec = self.max_search_window_input.value()
compare_width = self.compare_width_input.value()
processing_mode = self.processing_mode_combo.currentText()
gpu_batch_size = self.gpu_batch_size_input.value()
# Pre-flight checks for GPU mode
if processing_mode == "GPU":
try:
import torch
if not torch.cuda.is_available():
QMessageBox.warning(self, "GPU Mode Error", "GPU (CUDA) not available. Please install PyTorch with CUDA support or select a CPU mode.")
return
except ImportError:
QMessageBox.warning(self, "GPU Mode Error", "PyTorch is not installed. Please install PyTorch with CUDA support or select a CPU mode.")
return
# Update UI to processing state
self.find_loops_button.setEnabled(False)
self.cancel_button.setEnabled(True)
self.progress_bar.setValue(0)
self.results_list.clear() # Clear previous results
self.found_loops = []
self.save_loop_button.setEnabled(False)
self.preview_button.setEnabled(False)
# Initialize and start the loop finding thread
self.loop_finder_thread = LoopFinderThread(
self.video_path, similarity_thresh, ignore_frames,
min_loop_duration_sec, max_search_window_sec, compare_width,
processing_mode, gpu_batch_size
)
# Connect signals from the thread to UI update slots
self.loop_finder_thread.progress_updated.connect(self.progress_bar.setValue)
self.loop_finder_thread.status_updated.connect(self.update_status)
self.loop_finder_thread.results_found.connect(self.display_results)
self.loop_finder_thread.error_occurred.connect(self.handle_error)
self.loop_finder_thread.finished.connect(self.on_processing_finished) # For UI cleanup post-processing
self.loop_finder_thread.start()
def cancel_processing(self):
"""
Stops the currently running `LoopFinderThread` if one is active.
Also ensures any active preview is stopped.
"""
self.stop_preview()
if self.loop_finder_thread and self.loop_finder_thread.isRunning():
self.loop_finder_thread.stop()
# The thread's `finished` signal will call `on_processing_finished` for UI updates.
def on_processing_finished(self):
"""
Slot called when the `LoopFinderThread` finishes its execution (either normally or due to cancellation/error).
Resets UI elements related to processing state (e.g., enables 'Find Loops' button, disables 'Cancel').
"""
self.find_loops_button.setEnabled(True if self.video_path else False) # Re-enable if video loaded
self.cancel_button.setEnabled(False) # Disable cancel button
def update_status(self, message):
"""
Updates the application's status label with the provided message.
@param message (str): The status message to display.
"""
self.status_label.setText(f"Status: {message}")
def display_results(self, loops):
"""
Populates the results QListWidget with the found loops.
Each loop is formatted for display, showing start/end times, duration, similarity, and frame numbers.
Enables save and preview buttons if loops are found.
@param loops (list): A list of dictionaries, where each dictionary represents a found loop
(or a dummy full-video loop).
"""
self.found_loops = loops
self.results_list.clear()
if not self.found_loops:
self.results_list.addItem("No loops found matching criteria.")
self.save_loop_button.setEnabled(False)
self.preview_button.setEnabled(False)
return
for loop_info in self.found_loops:
if loop_info.get("is_dummy"): # Special formatting for the full-video dummy loop
item_text = (f"FULL VIDEO: {loop_info['start_time']:.2f}s - {loop_info['end_time']:.2f}s "
f"(Dur: {loop_info['duration_sec']:.2f}s) "
f"Frames: {loop_info['start_frame']}-{loop_info['end_frame']}")
else: # Standard formatting for found loops
item_text = (f"Loop: {loop_info['start_time']:.2f}s - {loop_info['end_time']:.2f}s "
f"(Dur: {loop_info['duration_sec']:.2f}s, Sim: {loop_info['similarity']:.3f}) "
f"Frames: {loop_info['start_frame']}-{loop_info['end_frame']}")
self.results_list.addItem(item_text)
# Enable buttons now that results are available
self.save_loop_button.setEnabled(True)
self.preview_button.setEnabled(True)
if self.results_list.count() > 0:
self.results_list.setCurrentRow(0) # Auto-select the first item in the list
def handle_error(self, error_message):
"""
Displays an error message to the user via a QMessageBox.
Also updates the status label and resets UI elements related to processing.
@param error_message (str): The error message to display.
"""
self.stop_preview() # Ensure preview is stopped on error
QMessageBox.critical(self, "Error", error_message)
self.status_label.setText(f"Status: Error - {error_message}")
# Reset UI to a non-processing state
self.find_loops_button.setEnabled(True if self.video_path else False)
self.cancel_button.setEnabled(False)
self.progress_bar.setValue(0)
def save_selected_loop(self):
"""
Saves the currently selected loop from the results list to a new video file.
Prompts the user for a save location and filename. Uses MoviePy for video subclip creation and writing.
"""
self.stop_preview() # Stop preview before saving
selected_items = self.results_list.selectedItems()
if not selected_items:
QMessageBox.warning(self, "No Loop Selected", "Please select a loop from the list to save.")
return
selected_index = self.results_list.row(selected_items[0])
if selected_index < 0 or selected_index >= len(self.found_loops):
QMessageBox.warning(self, "Selection Error", "Invalid loop selected.")
return
if not self.video_path: # Should not happen if a loop is selectable, but defensive check
self.handle_error("Internal Error: Video path is not set. Cannot save loop.")
return
loop_to_save = self.found_loops[selected_index]
# Suggest a default save file name based on original video and loop frames
original_dir = os.path.dirname(self.video_path)
original_filename, original_ext = os.path.splitext(os.path.basename(self.video_path))
default_save_name = os.path.join(
original_dir,
f"{original_filename}_loop_{loop_to_save['start_frame']}_{loop_to_save['end_frame']}{original_ext}"
)
save_file_name, _ = QFileDialog.getSaveFileName(self, "Save Loop As", default_save_name,
"Video Files (*.mp4 *.avi *.mov *.mkv);;All Files (*)")
if not save_file_name:
return # User cancelled the save dialog
self.status_label.setText(f"Status: Saving loop to {os.path.basename(save_file_name)}...")
QApplication.processEvents() # Update UI before potentially long save operation
video_clip = None
looped_subclip_instance = None
try:
video_clip = VideoFileClip(self.video_path)
# MoviePy slicing for subclip: clip[start_time:end_time]
looped_subclip_instance = video_clip[loop_to_save['start_time']:loop_to_save['end_time']]
if looped_subclip_instance is None:
self.handle_error("Error saving loop: Failed to create video subclip.")
return
if looped_subclip_instance.duration <= 0:
self.handle_error(f"Error saving loop: Subclip has zero or negative duration ({looped_subclip_instance.duration}s). Cannot save.")
return
# Write the subclip to the chosen file
looped_subclip_instance.write_videofile(save_file_name,
codec="libx264", # Common video codec
audio_codec="aac", # Common audio codec
threads=multiprocessing.cpu_count(), # Use available cores
logger=None, # Suppress MoviePy console logs
preset="medium") # FFmpeg preset for quality/speed balance
QMessageBox.information(self, "Success", f"Loop saved to {save_file_name}")
self.status_label.setText(f"Status: Loop saved to {os.path.basename(save_file_name)}")
except Exception as e:
self.handle_error(f"Error saving loop: {str(e)}")
finally:
# Ensure MoviePy clips are closed to release file handles and resources
if looped_subclip_instance:
try: looped_subclip_instance.close()
except Exception as sub_exc: print(f"Error closing subclip: {sub_exc}") # Log to console
if video_clip:
try: video_clip.close()
except Exception as main_exc: print(f"Error closing main video clip: {main_exc}") # Log to console
def update_preview_frame(self, frame_bgr):
"""
Slot that receives a BGR frame (as a NumPy array) from the `PreviewWorker`.
It resizes the frame to fit the `preview_label` while maintaining aspect ratio,
converts it to RGB, creates a QImage, and then a QPixmap to display it.
@param frame_bgr (np.ndarray): The BGR video frame to be displayed.
"""
if not self.is_previewing or frame_bgr is None or frame_bgr.size == 0:
return # Do nothing if not previewing or frame is invalid
try:
preview_h_target = self.preview_label.height()
preview_w_target = self.preview_label.width()
# Avoid processing if label is too small or not visible
if preview_h_target <= 10 or preview_w_target <= 10: return
frame_h, frame_w = frame_bgr.shape[:2]
if frame_h == 0 or frame_w == 0: return # Invalid frame dimensions
# Calculate new dimensions to fit label while maintaining aspect ratio
img_aspect = frame_w / frame_h
lbl_aspect = preview_w_target / preview_h_target
if img_aspect > lbl_aspect: # Image is wider than label aspect ratio
new_w = preview_w_target
new_h = int(new_w / img_aspect)
else: # Image is taller or has same aspect ratio as label
new_h = preview_h_target
new_w = int(new_h * img_aspect)
# Fallback if calculated dimensions are invalid (should not happen with valid inputs)
if new_w <= 0 or new_h <= 0:
resized_frame = cv2.resize(frame_bgr, (max(1, preview_w_target // 2), max(1, preview_h_target // 2)))
else:
resized_frame = cv2.resize(frame_bgr, (new_w, new_h))
# Convert BGR (OpenCV default) to RGB (Qt default for QImage.Format_RGB888)
rgb_image = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
bytes_per_line = ch * w
# Create QImage. The .copy() is important for QImage to manage its own data lifetime,
# especially when the underlying numpy array might go out of scope or change.
qt_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format.Format_RGB888).copy()
self.preview_label.setPixmap(QPixmap.fromImage(qt_image))
except Exception as e:
print(f"Error updating preview frame: {e}") # Log error to console
# Consider emitting a signal to `handle_error` or updating status label for user visibility
def handle_preview_finished(self):
"""
Slot called when the `PreviewWorker` signals that it has finished
(either completed all loops or was stopped).
Resets UI elements related to the preview state and cleans up the worker.
"""
if self.is_previewing: # If it wasn't stopped by a new preview starting or manual stop_preview call
self.update_status("Preview finished.")
self.is_previewing = False # Reset previewing flag
# Restore UI state to non-previewing
self.preview_button.setEnabled(bool(self.found_loops and self.results_list.count() > 0))
self.stop_preview_button.setEnabled(False)
self.results_list.setEnabled(True) # Re-enable interaction with results list
# Re-enable find loops button if a video is loaded and no processing is active
find_loops_enabled = bool(self.video_path) and not (self.loop_finder_thread and self.loop_finder_thread.isRunning())
self.find_loops_button.setEnabled(find_loops_enabled)
# Re-enable save button if loops are found and an item is selected
save_loop_enabled = bool(self.found_loops and self.results_list.currentItem())
self.save_loop_button.setEnabled(save_loop_enabled)
if self.preview_worker:
self.preview_worker.deleteLater() # Schedule the worker object for deletion
self.preview_worker = None
def start_preview_thread(self):
"""
Starts the `PreviewWorker` thread for the currently selected loop in the results list.
Manages UI state changes to reflect that a preview is active (e.g., disabling other controls).
"""
selected_items = self.results_list.selectedItems()
if not selected_items:
QMessageBox.information(self, "No Selection", "Please select a loop from the list to preview.")
return
if not self.video_path: # Should not be possible if items are in list, but defensive
self.handle_error("No Video: Cannot preview without a loaded video.")
return
selected_index = self.results_list.row(selected_items[0])
if selected_index < 0 or selected_index >= len(self.found_loops): # Invalid index
self.handle_error("Selection Error: Invalid loop selected for preview.")
return
# Stop any existing preview worker first to prevent multiple previews
if self.preview_worker and self.preview_worker.isRunning():
self.preview_worker.stop()
# The old worker's `finished` signal will trigger `handle_preview_finished` for cleanup.
self.stop_preview() # Call to ensure UI is fully reset before starting a new preview
self.is_previewing = True
# Update UI to "previewing" state
self.preview_button.setEnabled(False)
self.stop_preview_button.setEnabled(True)
self.results_list.setEnabled(False) # Disable interaction with list during preview
self.find_loops_button.setEnabled(False) # Disable finding new loops during preview
self.save_loop_button.setEnabled(False) # Disable saving during preview
self.preview_label.setText("Previewing...") # Placeholder text while preview loads
self.update_status("Starting preview...")
loop_to_preview = self.found_loops[selected_index]
loop_count_val = self.loop_count_input.value()
loop_forever_val = self.loop_forever_checkbox.isChecked()
# Create and start the new preview worker
self.preview_worker = PreviewWorker(
video_path=self.video_path,
start_frame=int(loop_to_preview['start_frame']),
end_frame=int(loop_to_preview['end_frame']),
loop_count=loop_count_val,
loop_forever=loop_forever_val
)
self.preview_worker.frame_ready.connect(self.update_preview_frame)
self.preview_worker.preview_finished.connect(self.handle_preview_finished)
self.preview_worker.preview_error.connect(self.handle_error) # Connect error signal
self.preview_worker.start()
def stop_preview(self):
"""
Stops the currently active `PreviewWorker` (if any) and resets related UI elements
to their non-previewing state.
"""
was_actively_previewing = self.is_previewing # Store if a preview was genuinely active
self.is_previewing = False # Set flag immediately
if self.preview_worker and self.preview_worker.isRunning():
self.preview_worker.stop()
# The worker's `preview_finished` signal (triggered by its run() method exiting)
# will call `handle_preview_finished` for full cleanup and UI reset.
# Immediately reset some UI elements for responsiveness,
# `handle_preview_finished` will do a more thorough reset.
self.preview_label.setText("Select a loop and click 'Preview Selected Loop'")
self.preview_label.setPixmap(QPixmap()) # Clear any existing image
can_preview_again = bool(self.found_loops and self.results_list.count() > 0)
self.preview_button.setEnabled(can_preview_again)
self.stop_preview_button.setEnabled(False)
self.results_list.setEnabled(True)
find_loops_enabled = bool(self.video_path) and not (self.loop_finder_thread and self.loop_finder_thread.isRunning())
self.find_loops_button.setEnabled(find_loops_enabled)
save_loop_enabled = bool(self.found_loops and self.results_list.currentItem())
self.save_loop_button.setEnabled(save_loop_enabled)
if was_actively_previewing: # Only update status if it was actually previewing and now stopped
self.update_status("Preview stopped.")
# If the worker was running, its `finished` signal handles `deleteLater`.
# If it wasn't running but an instance existed, `handle_preview_finished` (if called) would handle it.
# This method primarily focuses on stopping the thread and immediate UI feedback.
def closeEvent(self, a0: QCloseEvent | None): # Changed parameter name from 'event' to 'a0'
"""
Handles the application's close event (e.g., user clicks the 'X' button).
Ensures that any active worker threads (`LoopFinderThread`, `PreviewWorker`)
are stopped gracefully before the application exits.
@param a0 (QCloseEvent, optional): The close event object provided by Qt.
"""
self.stop_preview() # Stop preview worker first
if self.loop_finder_thread and self.loop_finder_thread.isRunning():
self.loop_finder_thread.stop()
if not self.loop_finder_thread.wait(1000): # Wait up to 1 second for thread to finish
self.update_status("Warning: Loop finder thread did not stop gracefully.")
if self.preview_worker and self.preview_worker.isRunning():
# `stop_preview` should have already initiated stop, but this is a final check.
self.preview_worker.stop()
if not self.preview_worker.wait(500): # Wait up to 0.5 seconds
self.update_status("Warning: Preview worker thread did not stop gracefully.")
if a0: # If event is provided (standard Qt behavior)
a0.accept() # Accept the close event, allowing the application to exit
# --- Application Entry Point ---
if __name__ == '__main__':
app = QApplication(sys.argv)
# Required for `multiprocessing` to work correctly when the application is
# packaged/frozen (e.g., using PyInstaller).
# `hasattr(sys, '_MEIPASS')` is another common check for PyInstaller environments.
if getattr(sys, 'frozen', False) or hasattr(sys, '_MEIPASS'):
multiprocessing.freeze_support()
looper_app = VideoLooperApp()
looper_app.show()
sys.exit(app.exec())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment