Last active
September 1, 2025 13:57
-
-
Save gphg/3b8c5e7d13a6535613d1498e24570e9b to your computer and use it in GitHub Desktop.
Multi-threaded version. Based on: https://gist.github.com/gphg/3c0c2f55bbe279ea1bcab1f8e3c20a09
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import json | |
import argparse | |
import numpy as np | |
from skimage.metrics import structural_similarity as ssim | |
from skimage.transform import resize | |
from moviepy import VideoFileClip | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
# --- Parameters You Can Tune --- | |
# The minimum similarity score (from 0 to 1) for a frame to be considered a match. | |
# Higher is stricter. 0.95 is a good starting point. | |
DEFAULT_SIMILARITY_THRESHOLD = 0.95 | |
# The minimum duration of a loop in seconds. Prevents finding tiny, useless loops. | |
DEFAULT_MIN_LOOP_DURATION = 1.5 | |
# To speed up the process, we can downscale frames before comparing them. | |
# A value of 0.25 means we resize to 25% of the original size. | |
DEFAULT_COMPARISON_RESIZE_FACTOR = 0.25 | |
# The directory to save the output media. | |
DEFAULT_OUTPUT_DIRECTORY = "output_loops" | |
# Video quality settings for MP4 and WEBM codecs. | |
# Lower CRF values mean higher quality and larger files. | |
# Default (high quality): 23. Prototype mode: 28. | |
DEFAULT_CRF = 23 | |
PROTO_CRF = 28 | |
def _find_loops_in_chunk(video_path, frame_indices_chunk, comparison_resize_factor, similarity_threshold, min_loop_frames, frame_rate, worker_id, total_workers): | |
""" | |
Worker function for a single process to find all suitable loops within a specific chunk of frames. | |
It opens the video file and loads only the necessary frames for its specific comparisons. | |
""" | |
print(f"Worker {worker_id}/{total_workers}: Starting analysis...") | |
try: | |
clip = VideoFileClip(video_path) | |
except Exception as e: | |
print(f"Worker {worker_id}/{total_workers}: Error loading video file: {e}") | |
return [] | |
num_frames = int(clip.duration * frame_rate) | |
found_loops = [] | |
for i in frame_indices_chunk: | |
try: | |
# Load and resize the starting frame for this comparison | |
start_frame_full = clip.get_frame(i / frame_rate) | |
start_frame_resized = resize(start_frame_full, (int(start_frame_full.shape[0] * comparison_resize_factor), int(start_frame_full.shape[1] * comparison_resize_factor)), anti_aliasing=True) | |
# Now, compare it to all subsequent frames | |
for j in range(i + min_loop_frames, num_frames): | |
end_frame_full = clip.get_frame(j / frame_rate) | |
end_frame_resized = resize(end_frame_full, (int(end_frame_full.shape[0] * comparison_resize_factor), int(end_frame_full.shape[1] * comparison_resize_factor)), anti_aliasing=True) | |
# Calculate SSIM score | |
data_range = start_frame_resized.max() - start_frame_resized.min() | |
ssim_score = ssim(start_frame_resized, end_frame_resized, channel_axis=-1, win_size=3, data_range=data_range) | |
if ssim_score >= similarity_threshold: | |
found_loops.append({ | |
'start_time': i / frame_rate, | |
'end_time': j / frame_rate, | |
'score': ssim_score | |
}) | |
except Exception as e: | |
print(f"Worker {worker_id}/{total_workers}: Error processing frames: {e}") | |
continue | |
clip.close() | |
print(f"Worker {worker_id}/{total_workers}: Analysis complete.") | |
# Return the best result from this chunk | |
return found_loops | |
def find_all_suitable_loops_multithreaded(video_path: str, similarity_threshold: float, min_loop_duration: float, comparison_resize_factor: float): | |
""" | |
Analyzes a video using multiple threads to find all suitable start and end timestamps for a seamless loop. | |
Args: | |
video_path: Path to the video file. | |
similarity_threshold: The minimum similarity score for a frame to be considered a match. | |
min_loop_duration: The minimum duration of a loop in seconds. | |
comparison_resize_factor: The factor to downscale frames for comparison. | |
Returns: | |
A list of dictionaries containing all suitable loops, or an empty list if no suitable loops are found. | |
""" | |
print(f"Analyzing '{video_path}' for all suitable looping pairs using multiple processes...") | |
try: | |
clip = VideoFileClip(video_path) | |
frame_rate = clip.fps | |
num_frames = int(clip.duration * frame_rate) | |
clip.close() | |
except Exception as e: | |
print(f"Error loading video file to get frame count: {e}") | |
return [] | |
min_loop_frames = int(min_loop_duration * frame_rate) | |
all_found_loops = [] | |
print("Step 1 of 2: Distributing analysis work to parallel processes...") | |
num_workers = os.cpu_count() or 1 | |
# We need to distribute the full range of `i` values across workers | |
with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
futures = [] | |
chunk_size = max(1, num_frames // num_workers) | |
# We need to split the full range of `i` values across workers | |
for i in range(num_workers): | |
start_i = i * chunk_size | |
end_i = min(start_i + chunk_size, num_frames) | |
# Create a list of frame indices for this worker to process | |
frame_indices_chunk = list(range(start_i, end_i)) | |
futures.append(executor.submit( | |
_find_loops_in_chunk, | |
video_path, | |
frame_indices_chunk, | |
comparison_resize_factor, | |
similarity_threshold, | |
min_loop_frames, | |
frame_rate, | |
i + 1, # Worker ID (1-based) | |
num_workers | |
)) | |
# Collect results and find the single best one | |
for future in as_completed(futures): | |
found_loops = future.result() | |
all_found_loops.extend(found_loops) | |
print("Step 2 of 2: Sorting and selecting the best loop from all results.") | |
return sorted(all_found_loops, key=lambda x: x['score'], reverse=True) | |
def create_looping_media(video_path: str, start_time: float, end_time: float, output_dir: str, output_format: str = 'mp4', mute_audio: bool = False, prototype_mode: bool = False, crf: int = None, preset: str = None, scale: int = None, fps: int = None): | |
""" | |
Creates a looping video or GIF from a portion of the original video. | |
Args: | |
video_path: Path to the input video file. | |
start_time: The timestamp (in seconds) where the loop should start. | |
end_time: The timestamp (in seconds) where the loop should end. | |
output_dir: The directory to save the output media. | |
output_format: The format of the output file ('mp4', 'webm', or 'gif'). | |
mute_audio: If True, the output video will have no audio. | |
prototype_mode: If True, uses a lower quality setting for smaller files. | |
crf: The Constant Rate Factor for video quality. Overrides prototype_mode if set. | |
scale: The desired output video width in pixels. | |
fps: The desired output video frames per second. | |
Returns: | |
True if the file was created successfully, False otherwise. | |
""" | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
base_name = os.path.splitext(os.path.basename(video_path))[0] | |
output_path = os.path.join(output_dir, f"{base_name}_loop.{output_format}") | |
try: | |
clip = VideoFileClip(video_path) | |
# Apply scaling and FPS changes on a single pass to prevent quality loss. | |
if scale: | |
clip = clip.resized(width=scale) | |
if fps: | |
clip = clip.set_fps(fps) | |
# Use .subclipped() to extract the specific loop segment | |
loop_clip = clip.subclipped(start_time, end_time) | |
if output_format in ['mp4', 'webm']: | |
# Determine if audio should be included. | |
include_audio = (clip.audio is not None) and not mute_audio | |
codec = "libx264" if output_format == 'mp4' else "libvpx-vp9" | |
# Set CRF value based on command-line arguments | |
crf_value = crf if crf is not None else (PROTO_CRF if prototype_mode else DEFAULT_CRF) | |
preset_value = preset if preset is not None else ('realtime' if prototype_mode else 'best') | |
ffmpeg_params = ['-loop', '0', '-g', '128', '-crf', str(crf_value), '-preset', str(preset_value)] | |
loop_clip.write_videofile(output_path, codec=codec, audio=include_audio, | |
logger='bar', ffmpeg_params=ffmpeg_params) | |
elif output_format == 'gif': | |
# Export as a high-quality GIF (GIFs do not have audio) | |
loop_clip.write_gif(output_path, fps=clip.fps) | |
print(f"Successfully saved to '{output_path}'") | |
return True | |
except Exception as e: | |
print(f"Error creating media: {e}") | |
return False | |
finally: | |
if 'clip' in locals() and clip.reader: | |
clip.close() | |
# --- Main Execution --- | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description=""" | |
Finds a seamless loop in a video and creates a looping video or GIF. | |
This script finds a single loop by comparing frames against the very first frame of the video. For finding multiple, distinct loops in longer videos, a tool like PySceneDetect is recommended to first identify scene changes. | |
""") | |
parser.add_argument("input_video", type=str, help="Path to the input video file.") | |
parser.add_argument("--output_dir", type=str, default=DEFAULT_OUTPUT_DIRECTORY, | |
help=f"Directory to save the output media. Defaults to '{DEFAULT_OUTPUT_DIRECTORY}'.") | |
parser.add_argument("--similarity_threshold", type=float, default=DEFAULT_SIMILARITY_THRESHOLD, | |
help=f"Minimum similarity score (0 to 1). Defaults to {DEFAULT_SIMILARITY_THRESHOLD}.") | |
parser.add_argument("--min_loop_duration", type=float, default=DEFAULT_MIN_LOOP_DURATION, | |
help=f"Minimum duration of the loop in seconds. Defaults to {DEFAULT_MIN_LOOP_DURATION}.") | |
parser.add_argument("--resize_factor", type=float, default=DEFAULT_COMPARISON_RESIZE_FACTOR, | |
help=f"Factor to downscale frames for faster comparison (e.g., 0.5 for 50%%). Defaults to {DEFAULT_COMPARISON_RESIZE_FACTOR}.") | |
parser.add_argument("--format", type=str, choices=['mp4', 'webm', 'gif'], default='mp4', | |
help="Output media format. Choices are 'mp4', 'webm', or 'gif'. Defaults to 'mp4'.") | |
parser.add_argument("--mute", action="store_true", | |
help="If set, the output video will not have audio. This option is ignored for GIF output.") | |
parser.add_argument("--proto", action="store_true", | |
help="If set, uses a lower quality setting for smaller files, suitable for prototypes.") | |
parser.add_argument("--crf", type=int, | |
help=f"Constant Rate Factor (CRF) for video quality. Lower values mean higher quality. If set, this value overrides the --proto flag.") | |
parser.add_argument("--shortest", action="store_true", | |
help="Creates the shortest valid loop instead of the highest-scoring one.") | |
parser.add_argument("--longest", action="store_true", | |
help="Creates the longest valid loop instead of the highest-scoring one.") | |
parser.add_argument("--scale", type=int, | |
help="Sets the output video width in pixels, maintaining aspect ratio. Useful for reducing file size.") | |
parser.add_argument("--fps", type=int, | |
help="Sets the output video frames per second. Useful for reducing file size.") | |
args = parser.parse_args() | |
if args.shortest and args.longest: | |
print("Error: Cannot use both --shortest and --longest flags together.") | |
sys.exit(1) | |
# Create the output directory if it doesn't exist | |
os.makedirs(args.output_dir, exist_ok=True) | |
CACHE_FILE = os.path.join(args.output_dir, f"{os.path.basename(args.input_video)}.loop_cache") | |
all_loops = [] | |
# Check for a cached list of loops | |
if os.path.exists(CACHE_FILE): | |
try: | |
with open(CACHE_FILE, 'r') as f: | |
all_loops = json.load(f) | |
if isinstance(all_loops, list) and all_loops: | |
print(f"Found {len(all_loops)} cached loops. Skipping analysis.") | |
except (IOError, json.JSONDecodeError) as e: | |
print(f"Error reading cache file, will re-analyze: {e}") | |
# If no cached data, find all suitable loops and save them | |
if not all_loops: | |
all_loops = find_all_suitable_loops_multithreaded( | |
args.input_video, | |
args.similarity_threshold, | |
args.min_loop_duration, | |
args.resize_factor | |
) | |
if all_loops: | |
try: | |
with open(CACHE_FILE, 'w') as f: | |
json.dump(all_loops, f, indent=4) | |
print(f"Saved {len(all_loops)} loops to cache file: {CACHE_FILE}") | |
except IOError as e: | |
print(f"Warning: Could not save cache file: {e}") | |
# Select the loop to create based on user arguments | |
loop_to_create = None | |
if all_loops: | |
if args.shortest: | |
loop_to_create = min(all_loops, key=lambda x: x['end_time'] - x['start_time']) | |
print("Selected shortest loop.") | |
elif args.longest: | |
loop_to_create = max(all_loops, key=lambda x: x['end_time'] - x['start_time']) | |
print("Selected longest loop.") | |
else: | |
# Default to the highest-scoring loop | |
loop_to_create = all_loops[0] | |
print("Selected highest-scoring loop.") | |
if loop_to_create: | |
creation_success = create_looping_media( | |
args.input_video, | |
loop_to_create['start_time'], | |
loop_to_create['end_time'], | |
args.output_dir, | |
output_format=args.format, | |
mute_audio=args.mute, | |
prototype_mode=args.proto, | |
crf=args.crf, | |
scale=args.scale, | |
fps=args.fps | |
) | |
if creation_success: | |
sys.exit(0) | |
else: | |
sys.exit(1) | |
else: | |
print("Could not find a suitable loop point.") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
All features on that Python script is expected to be identical to the single-threaded version: https://gist.github.com/gphg/3c0c2f55bbe279ea1bcab1f8e3c20a09
If there is a changed on either, eg a new feature or bug that has been fixed, then other will following.