-
-
Save gphg/3c0c2f55bbe279ea1bcab1f8e3c20a09 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import os | |
import sys | |
import json | |
import argparse | |
import numpy as np | |
from skimage.metrics import structural_similarity as ssim | |
from skimage.transform import resize | |
from moviepy import VideoFileClip | |
from tqdm import tqdm | |
# --- Parameters You Can Tune --- | |
# The minimum similarity score (from 0 to 1) for a frame to be considered a match. | |
# Higher is stricter. 0.95 is a good starting point. | |
DEFAULT_SIMILARITY_THRESHOLD = 0.95 | |
# The minimum duration of a loop in seconds. Prevents finding tiny, useless loops. | |
DEFAULT_MIN_LOOP_DURATION = 1.5 | |
# To speed up the process, we can downscale frames before comparing them. | |
# A value of 0.25 means we resize to 25% of the original size. | |
DEFAULT_COMPARISON_RESIZE_FACTOR = 0.25 | |
# The directory to save the output media. | |
DEFAULT_OUTPUT_DIRECTORY = "output_loops" | |
# Video quality settings for MP4 and WEBM codecs. | |
# Lower CRF values mean higher quality and larger files. | |
# Default (high quality): 23. Prototype mode: 28. | |
DEFAULT_CRF = 23 | |
PROTO_CRF = 28 | |
# The number of frames to load into memory at once for processing. | |
DEFAULT_FRAME_BUFFER_SIZE = 500 | |
def find_all_suitable_loops(video_path: str, similarity_threshold: float, min_loop_duration: float, comparison_resize_factor: float, frame_buffer_size: int): | |
""" | |
Analyzes a video to find all suitable start and end timestamps for a seamless loop. | |
This version uses a frame buffer to be memory-efficient while maintaining performance. | |
Args: | |
video_path: Path to the video file. | |
similarity_threshold: The minimum similarity score for a frame to be considered a match. | |
min_loop_duration: The minimum duration of a loop in seconds. | |
comparison_resize_factor: The factor to downscale frames for comparison. | |
frame_buffer_size: The number of frames to load into memory at once. | |
Returns: | |
A list of dictionaries containing all suitable loops, or an empty list if no suitable loops are found. | |
""" | |
print(f"Analyzing '{video_path}' for all suitable looping pairs...") | |
try: | |
clip = VideoFileClip(video_path) | |
frame_rate = clip.fps | |
num_frames = int(clip.duration * frame_rate) | |
except Exception as e: | |
print(f"Error loading video file: {e}") | |
return [] | |
min_loop_frames = int(min_loop_duration * frame_rate) | |
all_found_loops = [] | |
print("Step 1 of 2: Processing video with frame buffer...") | |
resized_frames = [] | |
# Process the entire video in chunks | |
for i in tqdm(range(0, num_frames, frame_buffer_size), desc="Buffering frames"): | |
end_index = min(i + frame_buffer_size, num_frames) | |
frames_to_process = [clip.get_frame(t / frame_rate) for t in range(i, end_index)] | |
resized_chunk = [resize(frame, (int(frame.shape[0] * comparison_resize_factor), int(frame.shape[1] * comparison_resize_factor)), anti_aliasing=True) for frame in frames_to_process] | |
resized_frames.extend(resized_chunk) | |
clip.close() | |
print("Step 2 of 2: Comparing buffered frames to find suitable loops.") | |
with tqdm(total=len(resized_frames) * (len(resized_frames) - min_loop_frames), desc="Comparing frames") as pbar: | |
for i in range(len(resized_frames)): | |
for j in range(i + min_loop_frames, len(resized_frames)): | |
frame1 = resized_frames[i] | |
frame2 = resized_frames[j] | |
# Calculate SSIM score | |
data_range = frame1.max() - frame1.min() | |
ssim_score = ssim(frame1, frame2, channel_axis=-1, win_size=3, data_range=data_range) | |
if ssim_score >= similarity_threshold: | |
all_found_loops.append({ | |
'start_time': i / frame_rate, | |
'end_time': j / frame_rate, | |
'score': ssim_score | |
}) | |
pbar.update(1) | |
print(f"Analysis complete. Found {len(all_found_loops)} suitable loops.") | |
return sorted(all_found_loops, key=lambda x: x['score'], reverse=True) | |
def create_looping_media(video_path: str, start_time: float, end_time: float, output_dir: str, output_format: str = 'mp4', mute_audio: bool = False, prototype_mode: bool = False, crf: int = None, preset: str = None, scale: int = None, fps: int = None): | |
""" | |
Creates a looping video or GIF from a portion of the original video. | |
Args: | |
video_path: Path to the input video file. | |
start_time: The timestamp (in seconds) where the loop should start. | |
end_time: The timestamp (in seconds) where the loop should end. | |
output_dir: The directory to save the output media. | |
output_format: The format of the output file ('mp4', 'webm', or 'gif'). | |
mute_audio: If True, the output video will have no audio. | |
prototype_mode: If True, uses a lower quality setting for smaller files. | |
crf: The Constant Rate Factor for video quality. Overrides prototype_mode if set. | |
scale: The desired output video width in pixels. | |
fps: The desired output video frames per second. | |
Returns: | |
True if the file was created successfully, False otherwise. | |
""" | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
base_name = os.path.splitext(os.path.basename(video_path))[0] | |
output_path = os.path.join(output_dir, f"{base_name}_loop.{output_format}") | |
try: | |
clip = VideoFileClip(video_path) | |
# Apply scaling and FPS changes on a single pass to prevent quality loss. | |
if scale: | |
clip = clip.resized(width=scale) | |
if fps: | |
clip = clip.set_fps(fps) | |
# Use .subclipped() to extract the specific loop segment | |
loop_clip = clip.subclipped(start_time, end_time) | |
if output_format in ['mp4', 'webm']: | |
# Determine if audio should be included. | |
include_audio = (clip.audio is not None) and not mute_audio | |
codec = "libx264" if output_format == 'mp4' else "libvpx-vp9" | |
# Set CRF value based on command-line arguments | |
crf_value = crf if crf is not None else (PROTO_CRF if prototype_mode else DEFAULT_CRF) | |
preset_value = preset if preset is not None else ('fast' if prototype_mode else 'veryslow') | |
ffmpeg_params = ['-g', '128', '-crf', str(crf_value), '-preset', preset_value] | |
if output_format == 'webm': | |
ffmpeg_params = ['-crf', str(crf_value), '-b:v', '0', '-cpu-used', '0', '-g', '128'] | |
loop_clip.write_videofile(output_path, codec=codec, audio=include_audio, | |
logger='bar', ffmpeg_params=ffmpeg_params) | |
elif output_format == 'gif': | |
# Export as a high-quality GIF (GIFs do not have audio) | |
loop_clip.write_gif(output_path, fps=clip.fps) | |
print(f"Successfully saved to '{output_path}'") | |
return True | |
except Exception as e: | |
print(f"Error creating media: {e}") | |
return False | |
finally: | |
if 'clip' in locals() and clip.reader: | |
clip.close() | |
# --- Main Execution --- | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description=""" | |
Finds a seamless loop in a video and creates a looping video or GIF. | |
""") | |
parser.add_argument("input_video", type=str, help="Path to the input video file.") | |
parser.add_argument("--output_dir", type=str, default=DEFAULT_OUTPUT_DIRECTORY, | |
help=f"Directory to save the output media. Defaults to '{DEFAULT_OUTPUT_DIRECTORY}'.") | |
parser.add_argument("--similarity_threshold", type=float, default=DEFAULT_SIMILARITY_THRESHOLD, | |
help=f"Minimum similarity score (0 to 1). Defaults to {DEFAULT_SIMILARITY_THRESHOLD}.") | |
parser.add_argument("--min_loop_duration", type=float, default=DEFAULT_MIN_LOOP_DURATION, | |
help=f"Minimum duration of the loop in seconds. Defaults to {DEFAULT_MIN_LOOP_DURATION}.") | |
parser.add_argument("--resize_factor", type=float, default=DEFAULT_COMPARISON_RESIZE_FACTOR, | |
help=f"Factor to downscale frames for faster comparison (e.g., 0.5 for 50%%). Defaults to {DEFAULT_COMPARISON_RESIZE_FACTOR}.") | |
parser.add_argument("--format", type=str, choices=['mp4', 'webm', 'gif'], default='mp4', | |
help="Output media format. Choices are 'mp4', 'webm', or 'gif'. Defaults to 'mp4'.") | |
parser.add_argument("--mute", action="store_true", | |
help="If set, the output video will not have audio. This option is ignored for GIF output.") | |
parser.add_argument("--proto", action="store_true", | |
help="If set, uses a lower quality setting for smaller files, suitable for prototypes.") | |
parser.add_argument("--crf", type=int, | |
help=f"Constant Rate Factor (CRF) for video quality. Lower values mean higher quality. If set, this value overrides the --proto flag.") | |
parser.add_argument("--shortest", action="store_true", | |
help="Creates the shortest valid loop instead of the highest-scoring one.") | |
parser.add_argument("--longest", action="store_true", | |
help="Creates the longest valid loop instead of the highest-scoring one.") | |
parser.add_argument("--scale", type=int, | |
help="Sets the output video width in pixels, maintaining aspect ratio. Useful for reducing file size.") | |
parser.add_argument("--fps", type=int, | |
help="Sets the output video frames per second. Useful for reducing file size.") | |
parser.add_argument("--buffer_size", type=int, default=DEFAULT_FRAME_BUFFER_SIZE, | |
help=f"Number of frames to buffer for in-memory processing. A larger number is faster but uses more RAM. Defaults to {DEFAULT_FRAME_BUFFER_SIZE}.") | |
args = parser.parse_args() | |
if args.shortest and args.longest: | |
print("Error: Cannot use both --shortest and --longest flags together.") | |
sys.exit(1) | |
# Create the output directory if it doesn't exist | |
os.makedirs(args.output_dir, exist_ok=True) | |
CACHE_FILE = os.path.join(args.output_dir, f"{os.path.basename(args.input_video)}_cached_loops.json") | |
all_loops = [] | |
# Check for a cached list of loops | |
if os.path.exists(CACHE_FILE): | |
try: | |
with open(CACHE_FILE, 'r') as f: | |
all_loops = json.load(f) | |
if isinstance(all_loops, list) and all_loops: | |
print(f"Found {len(all_loops)} cached loops. Skipping analysis.") | |
except (IOError, json.JSONDecodeError) as e: | |
print(f"Error reading cache file, will re-analyze: {e}") | |
# If no cached data, find all suitable loops and save them | |
if not all_loops: | |
all_loops = find_all_suitable_loops( | |
args.input_video, | |
args.similarity_threshold, | |
args.min_loop_duration, | |
args.resize_factor, | |
args.buffer_size | |
) | |
if all_loops: | |
try: | |
with open(CACHE_FILE, 'w') as f: | |
json.dump(all_loops, f, indent=4) | |
print(f"Saved {len(all_loops)} loops to cache file: {CACHE_FILE}") | |
except IOError as e: | |
print(f"Warning: Could not save cache file: {e}") | |
# Select the loop to create based on user arguments | |
loop_to_create = None | |
if all_loops: | |
if args.shortest: | |
loop_to_create = min(all_loops, key=lambda x: x['end_time'] - x['start_time']) | |
print("Selected shortest loop.") | |
elif args.longest: | |
loop_to_create = max(all_loops, key=lambda x: x['end_time'] - x['start_time']) | |
print("Selected longest loop.") | |
else: | |
# Default to the highest-scoring loop | |
loop_to_create = all_loops[0] | |
print("Selected highest-scoring loop.") | |
if loop_to_create: | |
creation_success = create_looping_media( | |
args.input_video, | |
loop_to_create['start_time'], | |
loop_to_create['end_time'], | |
args.output_dir, | |
output_format=args.format, | |
mute_audio=args.mute, | |
prototype_mode=args.proto, | |
crf=args.crf, | |
scale=args.scale, | |
fps=args.fps | |
) | |
if creation_success: | |
sys.exit(0) | |
else: | |
sys.exit(1) | |
else: | |
print("Could not find a suitable loop point.") | |
sys.exit(1) |
Multi-threading version (experimental):
https://gist.github.com/gphg/3b8c5e7d13a6535613d1498e24570e9b
A thing to be wary: you will eventually get a loop with a black screen at the end or the start of frame, yet it is at highest score; in my case it was "score": 1
. To fix this is imply delete an object with the highest score on the video's .loop_cache
with a text editor (it is on the top list).
Also, if you input like really big video, even if it is 30 seconds, it crashes due to memory error. This is a known issue.
Renamed .loop_cache
to _cached_loops.json
.
The multi-threaded experimental version is broken and slower: DO NOT USE IT. You this find_loop.py
instead.
The latest update speed up the loop detection by staging the incoming frames to be compared into memory. The memory usage can be set using --buffer_size
.
Loop detection is time consuming, because it depends on video duration and its fps rate. This only be done once: it creates cache file contains metadata of the loop. You can delete the cache file to redo the detection.
If video has long duration, like surprisingly long (more than two minutes). I suggest you to split the video manually at specific duration. Otherwise, use this program to get the job be done automatically: https://github.com/Breakthrough/PySceneDetect