Skip to content

Instantly share code, notes, and snippets.

@gphg
Last active September 1, 2025 13:57
Show Gist options
  • Save gphg/3b8c5e7d13a6535613d1498e24570e9b to your computer and use it in GitHub Desktop.
Save gphg/3b8c5e7d13a6535613d1498e24570e9b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import sys
import json
import argparse
import numpy as np
from skimage.metrics import structural_similarity as ssim
from skimage.transform import resize
from moviepy import VideoFileClip
from concurrent.futures import ProcessPoolExecutor, as_completed
# --- Parameters You Can Tune ---
# The minimum similarity score (from 0 to 1) for a frame to be considered a match.
# Higher is stricter. 0.95 is a good starting point.
DEFAULT_SIMILARITY_THRESHOLD = 0.95
# The minimum duration of a loop in seconds. Prevents finding tiny, useless loops.
DEFAULT_MIN_LOOP_DURATION = 1.5
# To speed up the process, we can downscale frames before comparing them.
# A value of 0.25 means we resize to 25% of the original size.
DEFAULT_COMPARISON_RESIZE_FACTOR = 0.25
# The directory to save the output media.
DEFAULT_OUTPUT_DIRECTORY = "output_loops"
# Video quality settings for MP4 and WEBM codecs.
# Lower CRF values mean higher quality and larger files.
# Default (high quality): 23. Prototype mode: 28.
DEFAULT_CRF = 23
PROTO_CRF = 28
def _find_loops_in_chunk(video_path, frame_indices_chunk, comparison_resize_factor, similarity_threshold, min_loop_frames, frame_rate, worker_id, total_workers):
"""
Worker function for a single process to find all suitable loops within a specific chunk of frames.
It opens the video file and loads only the necessary frames for its specific comparisons.
"""
print(f"Worker {worker_id}/{total_workers}: Starting analysis...")
try:
clip = VideoFileClip(video_path)
except Exception as e:
print(f"Worker {worker_id}/{total_workers}: Error loading video file: {e}")
return []
num_frames = int(clip.duration * frame_rate)
found_loops = []
for i in frame_indices_chunk:
try:
# Load and resize the starting frame for this comparison
start_frame_full = clip.get_frame(i / frame_rate)
start_frame_resized = resize(start_frame_full, (int(start_frame_full.shape[0] * comparison_resize_factor), int(start_frame_full.shape[1] * comparison_resize_factor)), anti_aliasing=True)
# Now, compare it to all subsequent frames
for j in range(i + min_loop_frames, num_frames):
end_frame_full = clip.get_frame(j / frame_rate)
end_frame_resized = resize(end_frame_full, (int(end_frame_full.shape[0] * comparison_resize_factor), int(end_frame_full.shape[1] * comparison_resize_factor)), anti_aliasing=True)
# Calculate SSIM score
data_range = start_frame_resized.max() - start_frame_resized.min()
ssim_score = ssim(start_frame_resized, end_frame_resized, channel_axis=-1, win_size=3, data_range=data_range)
if ssim_score >= similarity_threshold:
found_loops.append({
'start_time': i / frame_rate,
'end_time': j / frame_rate,
'score': ssim_score
})
except Exception as e:
print(f"Worker {worker_id}/{total_workers}: Error processing frames: {e}")
continue
clip.close()
print(f"Worker {worker_id}/{total_workers}: Analysis complete.")
# Return the best result from this chunk
return found_loops
def find_all_suitable_loops_multithreaded(video_path: str, similarity_threshold: float, min_loop_duration: float, comparison_resize_factor: float):
"""
Analyzes a video using multiple threads to find all suitable start and end timestamps for a seamless loop.
Args:
video_path: Path to the video file.
similarity_threshold: The minimum similarity score for a frame to be considered a match.
min_loop_duration: The minimum duration of a loop in seconds.
comparison_resize_factor: The factor to downscale frames for comparison.
Returns:
A list of dictionaries containing all suitable loops, or an empty list if no suitable loops are found.
"""
print(f"Analyzing '{video_path}' for all suitable looping pairs using multiple processes...")
try:
clip = VideoFileClip(video_path)
frame_rate = clip.fps
num_frames = int(clip.duration * frame_rate)
clip.close()
except Exception as e:
print(f"Error loading video file to get frame count: {e}")
return []
min_loop_frames = int(min_loop_duration * frame_rate)
all_found_loops = []
print("Step 1 of 2: Distributing analysis work to parallel processes...")
num_workers = os.cpu_count() or 1
# We need to distribute the full range of `i` values across workers
with ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = []
chunk_size = max(1, num_frames // num_workers)
# We need to split the full range of `i` values across workers
for i in range(num_workers):
start_i = i * chunk_size
end_i = min(start_i + chunk_size, num_frames)
# Create a list of frame indices for this worker to process
frame_indices_chunk = list(range(start_i, end_i))
futures.append(executor.submit(
_find_loops_in_chunk,
video_path,
frame_indices_chunk,
comparison_resize_factor,
similarity_threshold,
min_loop_frames,
frame_rate,
i + 1, # Worker ID (1-based)
num_workers
))
# Collect results and find the single best one
for future in as_completed(futures):
found_loops = future.result()
all_found_loops.extend(found_loops)
print("Step 2 of 2: Sorting and selecting the best loop from all results.")
return sorted(all_found_loops, key=lambda x: x['score'], reverse=True)
def create_looping_media(video_path: str, start_time: float, end_time: float, output_dir: str, output_format: str = 'mp4', mute_audio: bool = False, prototype_mode: bool = False, crf: int = None, preset: str = None, scale: int = None, fps: int = None):
"""
Creates a looping video or GIF from a portion of the original video.
Args:
video_path: Path to the input video file.
start_time: The timestamp (in seconds) where the loop should start.
end_time: The timestamp (in seconds) where the loop should end.
output_dir: The directory to save the output media.
output_format: The format of the output file ('mp4', 'webm', or 'gif').
mute_audio: If True, the output video will have no audio.
prototype_mode: If True, uses a lower quality setting for smaller files.
crf: The Constant Rate Factor for video quality. Overrides prototype_mode if set.
scale: The desired output video width in pixels.
fps: The desired output video frames per second.
Returns:
True if the file was created successfully, False otherwise.
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
base_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = os.path.join(output_dir, f"{base_name}_loop.{output_format}")
try:
clip = VideoFileClip(video_path)
# Apply scaling and FPS changes on a single pass to prevent quality loss.
if scale:
clip = clip.resized(width=scale)
if fps:
clip = clip.set_fps(fps)
# Use .subclipped() to extract the specific loop segment
loop_clip = clip.subclipped(start_time, end_time)
if output_format in ['mp4', 'webm']:
# Determine if audio should be included.
include_audio = (clip.audio is not None) and not mute_audio
codec = "libx264" if output_format == 'mp4' else "libvpx-vp9"
# Set CRF value based on command-line arguments
crf_value = crf if crf is not None else (PROTO_CRF if prototype_mode else DEFAULT_CRF)
preset_value = preset if preset is not None else ('realtime' if prototype_mode else 'best')
ffmpeg_params = ['-loop', '0', '-g', '128', '-crf', str(crf_value), '-preset', str(preset_value)]
loop_clip.write_videofile(output_path, codec=codec, audio=include_audio,
logger='bar', ffmpeg_params=ffmpeg_params)
elif output_format == 'gif':
# Export as a high-quality GIF (GIFs do not have audio)
loop_clip.write_gif(output_path, fps=clip.fps)
print(f"Successfully saved to '{output_path}'")
return True
except Exception as e:
print(f"Error creating media: {e}")
return False
finally:
if 'clip' in locals() and clip.reader:
clip.close()
# --- Main Execution ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="""
Finds a seamless loop in a video and creates a looping video or GIF.
This script finds a single loop by comparing frames against the very first frame of the video. For finding multiple, distinct loops in longer videos, a tool like PySceneDetect is recommended to first identify scene changes.
""")
parser.add_argument("input_video", type=str, help="Path to the input video file.")
parser.add_argument("--output_dir", type=str, default=DEFAULT_OUTPUT_DIRECTORY,
help=f"Directory to save the output media. Defaults to '{DEFAULT_OUTPUT_DIRECTORY}'.")
parser.add_argument("--similarity_threshold", type=float, default=DEFAULT_SIMILARITY_THRESHOLD,
help=f"Minimum similarity score (0 to 1). Defaults to {DEFAULT_SIMILARITY_THRESHOLD}.")
parser.add_argument("--min_loop_duration", type=float, default=DEFAULT_MIN_LOOP_DURATION,
help=f"Minimum duration of the loop in seconds. Defaults to {DEFAULT_MIN_LOOP_DURATION}.")
parser.add_argument("--resize_factor", type=float, default=DEFAULT_COMPARISON_RESIZE_FACTOR,
help=f"Factor to downscale frames for faster comparison (e.g., 0.5 for 50%%). Defaults to {DEFAULT_COMPARISON_RESIZE_FACTOR}.")
parser.add_argument("--format", type=str, choices=['mp4', 'webm', 'gif'], default='mp4',
help="Output media format. Choices are 'mp4', 'webm', or 'gif'. Defaults to 'mp4'.")
parser.add_argument("--mute", action="store_true",
help="If set, the output video will not have audio. This option is ignored for GIF output.")
parser.add_argument("--proto", action="store_true",
help="If set, uses a lower quality setting for smaller files, suitable for prototypes.")
parser.add_argument("--crf", type=int,
help=f"Constant Rate Factor (CRF) for video quality. Lower values mean higher quality. If set, this value overrides the --proto flag.")
parser.add_argument("--shortest", action="store_true",
help="Creates the shortest valid loop instead of the highest-scoring one.")
parser.add_argument("--longest", action="store_true",
help="Creates the longest valid loop instead of the highest-scoring one.")
parser.add_argument("--scale", type=int,
help="Sets the output video width in pixels, maintaining aspect ratio. Useful for reducing file size.")
parser.add_argument("--fps", type=int,
help="Sets the output video frames per second. Useful for reducing file size.")
args = parser.parse_args()
if args.shortest and args.longest:
print("Error: Cannot use both --shortest and --longest flags together.")
sys.exit(1)
# Create the output directory if it doesn't exist
os.makedirs(args.output_dir, exist_ok=True)
CACHE_FILE = os.path.join(args.output_dir, f"{os.path.basename(args.input_video)}.loop_cache")
all_loops = []
# Check for a cached list of loops
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'r') as f:
all_loops = json.load(f)
if isinstance(all_loops, list) and all_loops:
print(f"Found {len(all_loops)} cached loops. Skipping analysis.")
except (IOError, json.JSONDecodeError) as e:
print(f"Error reading cache file, will re-analyze: {e}")
# If no cached data, find all suitable loops and save them
if not all_loops:
all_loops = find_all_suitable_loops_multithreaded(
args.input_video,
args.similarity_threshold,
args.min_loop_duration,
args.resize_factor
)
if all_loops:
try:
with open(CACHE_FILE, 'w') as f:
json.dump(all_loops, f, indent=4)
print(f"Saved {len(all_loops)} loops to cache file: {CACHE_FILE}")
except IOError as e:
print(f"Warning: Could not save cache file: {e}")
# Select the loop to create based on user arguments
loop_to_create = None
if all_loops:
if args.shortest:
loop_to_create = min(all_loops, key=lambda x: x['end_time'] - x['start_time'])
print("Selected shortest loop.")
elif args.longest:
loop_to_create = max(all_loops, key=lambda x: x['end_time'] - x['start_time'])
print("Selected longest loop.")
else:
# Default to the highest-scoring loop
loop_to_create = all_loops[0]
print("Selected highest-scoring loop.")
if loop_to_create:
creation_success = create_looping_media(
args.input_video,
loop_to_create['start_time'],
loop_to_create['end_time'],
args.output_dir,
output_format=args.format,
mute_audio=args.mute,
prototype_mode=args.proto,
crf=args.crf,
scale=args.scale,
fps=args.fps
)
if creation_success:
sys.exit(0)
else:
sys.exit(1)
else:
print("Could not find a suitable loop point.")
sys.exit(1)
@gphg
Copy link
Author

gphg commented Aug 31, 2025

All features on that Python script is expected to be identical to the single-threaded version: https://gist.github.com/gphg/3c0c2f55bbe279ea1bcab1f8e3c20a09

If there is a changed on either, eg a new feature or bug that has been fixed, then other will following.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment