Last active
          August 3, 2025 10:01 
        
      - 
      
- 
        Save TimelessP/478dbb85fa23974cab9115b1d041b927 to your computer and use it in GitHub Desktop. 
    Motion parallax masking using rotating random field layers mapped to shades of grey.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python3 | |
| """ | |
| Parallax Motion Masking Video Generator | |
| Creates videos where an image is only visible through the differential motion | |
| of random noise layers at different speeds based on the image's grey levels. | |
| """ | |
| import numpy as np | |
| import cv2 | |
| import subprocess | |
| import os | |
| import argparse | |
| import sys | |
| from PIL import Image | |
| from pathlib import Path | |
| # Set random seed for reproducibility | |
| np.random.seed(0) | |
| class ParallaxMotionGenerator: | |
| """ | |
| A class for generating parallax motion masking videos. | |
| The concept: An input image is quantized to multiple grey levels, and each | |
| grey level maps to a different layer of random noise that moves at a | |
| different speed. The image becomes visible only through motion parallax. | |
| """ | |
| def __init__(self, input_path, output_path, width=1920, height=1080, | |
| fps=20, duration=10, grey_levels=32, motion_direction=-1, inverse=False, for_x=False): | |
| """ | |
| Initialize the parallax motion generator. | |
| Args: | |
| input_path (str): Path to input image | |
| output_path (str): Path for output video | |
| width (int): Output video width (if height is None, width is used to calculate height proportionally) | |
| height (int): Output video height (if width is None, height is used to calculate width proportionally) | |
| fps (int): Frames per second | |
| duration (int): Video duration in seconds | |
| grey_levels (int): Number of grey levels to quantize to | |
| motion_direction (int): -1 for leftward, 1 for rightward motion | |
| inverse (bool): If True, lighter pixels move less, darker pixels move more | |
| for_x (bool): If True, optimize encoding for Twitter/X upload | |
| """ | |
| self.input_path = Path(input_path) | |
| self.output_path = Path(output_path) | |
| # Validate inputs | |
| if not self.input_path.exists(): | |
| raise FileNotFoundError(f"Input image not found: {input_path}") | |
| # Handle proportional sizing | |
| if width is None and height is None: | |
| # Both None - use defaults | |
| self.width = 1920 | |
| self.height = 1080 | |
| elif width is not None and height is not None: | |
| # Both specified - use as-is | |
| self.width = width | |
| self.height = height | |
| else: | |
| # Only one specified - calculate the other proportionally | |
| img = Image.open(self.input_path) | |
| orig_width, orig_height = img.size | |
| orig_ratio = orig_width / orig_height | |
| if width is not None and height is None: | |
| # Only width specified - calculate height proportionally | |
| self.width = width | |
| self.height = int(width / orig_ratio) | |
| print(f"π Proportional sizing: width={self.width} specified, calculated height={self.height}") | |
| elif width is None and height is not None: | |
| # Only height specified - calculate width proportionally | |
| self.height = height | |
| self.width = int(height * orig_ratio) | |
| print(f"π Proportional sizing: height={self.height} specified, calculated width={self.width}") | |
| self.fps = fps | |
| self.duration = duration | |
| self.total_frames = duration * fps | |
| self.grey_levels = grey_levels | |
| self.motion_direction = motion_direction | |
| self.inverse = inverse | |
| self.for_x = for_x | |
| # Create output directory if it doesn't exist | |
| self.output_path.parent.mkdir(parents=True, exist_ok=True) | |
| print(f"π¬ Parallax Motion Generator initialized:") | |
| print(f" Input: {self.input_path}") | |
| print(f" Output: {self.output_path}") | |
| print(f" Dimensions: {self.width}x{self.height}") | |
| print(f" Duration: {self.duration}s @ {self.fps}fps ({self.total_frames} frames)") | |
| print(f" Grey levels: {self.grey_levels}") | |
| print(f" Motion: {'inverse' if self.inverse else 'normal'} ({'leftward' if self.motion_direction == -1 else 'rightward'})") | |
| print(f" Encoding: {'Twitter/X optimized (H.264)' if self.for_x else 'High quality (MPEG4)'}") | |
| def load_and_prepare_image(self): | |
| """Load input image, convert to greyscale, and quantize to specified grey levels""" | |
| print("πΌοΈ Loading and preparing input image...") | |
| # Load the image | |
| img = Image.open(self.input_path) | |
| # Convert to greyscale | |
| grey_img = img.convert('L') | |
| # Calculate proportional resize to fit within target dimensions | |
| orig_width, orig_height = grey_img.size | |
| target_ratio = self.width / self.height | |
| orig_ratio = orig_width / orig_height | |
| if orig_ratio > target_ratio: | |
| # Image is wider - fit to width | |
| new_width = self.width | |
| new_height = int(self.width / orig_ratio) | |
| else: | |
| # Image is taller - fit to height | |
| new_height = self.height | |
| new_width = int(self.height * orig_ratio) | |
| # Resize maintaining aspect ratio | |
| grey_img = grey_img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| # Create a black canvas of target size and center the image | |
| canvas = Image.new('L', (self.width, self.height), 0) # Black background | |
| offset_x = (self.width - new_width) // 2 | |
| offset_y = (self.height - new_height) // 2 | |
| canvas.paste(grey_img, (offset_x, offset_y)) | |
| print(f"π Original size: {orig_width}x{orig_height} (ratio: {orig_ratio:.2f})") | |
| print(f"π Resized to: {new_width}x{new_height}, centered in {self.width}x{self.height}") | |
| print(f"π Offset: ({offset_x}, {offset_y})") | |
| # Convert to numpy array | |
| grey_array = np.array(canvas) | |
| # Debug: Print original image statistics | |
| print(f"π Original image: min={grey_array.min()}, max={grey_array.max()}, mean={grey_array.mean():.1f}") | |
| # Enhance contrast before quantization if needed | |
| if grey_array.max() - grey_array.min() < 100: # Low contrast image | |
| print("π Enhancing contrast...") | |
| grey_array = ((grey_array - grey_array.min()) * 255 / (grey_array.max() - grey_array.min())).astype(np.uint8) | |
| print(f"π After contrast enhancement: min={grey_array.min()}, max={grey_array.max()}, mean={grey_array.mean():.1f}") | |
| # Quantize to specified number of grey levels | |
| # First normalize to 0-1, then scale to 0-(grey_levels-1), then round | |
| normalized = grey_array.astype(np.float32) / 255.0 | |
| quantized = np.round(normalized * (self.grey_levels - 1)).astype(np.uint8) | |
| # Debug: Print some statistics about the quantized image | |
| unique_levels = np.unique(quantized) | |
| print(f"π Image quantized to {len(unique_levels)} unique grey levels: {unique_levels}") | |
| print(f"π Grey level distribution: min={quantized.min()}, max={quantized.max()}, mean={quantized.mean():.1f}") | |
| self.source_image = quantized | |
| return quantized | |
| def generate_random_layers(self): | |
| """Generate random dark grey/light grey layers for each grey level""" | |
| print("π² Generating random layers...") | |
| layers = [] | |
| for i in range(self.grey_levels): | |
| # Generate random layer with 50% probability of light grey pixels | |
| # Using dark grey (64) and light grey (192) for softer contrast | |
| layer = np.random.choice([64, 192], size=(self.height, self.width), p=[0.5, 0.5]).astype(np.uint8) | |
| layers.append(layer) | |
| self.random_layers = layers | |
| return layers | |
| def apply_rotation_mapping(self, layers, frame_time): | |
| """Apply motion mapping based on time and grey level""" | |
| rotated_layers = [] | |
| for grey_level in range(self.grey_levels): | |
| layer = layers[grey_level] | |
| # Calculate rotation speed based on inverse setting | |
| if self.inverse: | |
| # Inverse: lighter pixels (higher grey level) move less, darker pixels move more | |
| speed_factor = (self.grey_levels - grey_level) | |
| else: | |
| # Normal: darker pixels (lower grey level) move less, lighter pixels move more | |
| speed_factor = (grey_level + 1) | |
| pixels_per_second = self.motion_direction * speed_factor | |
| # Calculate total pixel shift for this frame | |
| total_shift = int(frame_time * pixels_per_second) | |
| # Apply horizontal shift with wraparound | |
| shift_amount = total_shift % self.width | |
| if shift_amount != 0: | |
| rotated_layer = np.roll(layer, shift_amount, axis=1) | |
| else: | |
| rotated_layer = layer.copy() | |
| rotated_layers.append(rotated_layer) | |
| return rotated_layers | |
| def compose_frame(self, source_image, rotated_layers, debug=False): | |
| """Compose final frame by mapping each pixel to its corresponding layer""" | |
| output_frame = np.zeros((self.height, self.width), dtype=np.uint8) | |
| # Debug: Count how many pixels map to each layer | |
| layer_counts = np.zeros(self.grey_levels, dtype=int) if debug else None | |
| for y in range(self.height): | |
| for x in range(self.width): | |
| # Get the grey level at this pixel | |
| grey_level = source_image[y, x] | |
| # Ensure grey level is in valid range | |
| grey_level = np.clip(grey_level, 0, self.grey_levels - 1) | |
| if debug: | |
| layer_counts[grey_level] += 1 | |
| # Use the corresponding layer's pixel value | |
| output_frame[y, x] = rotated_layers[grey_level][y, x] | |
| return output_frame, layer_counts | |
| def generate_video(self): | |
| """Generate the complete parallax motion video""" | |
| print("π¬ Starting video generation...") | |
| # Load and prepare the input image | |
| source_image = self.load_and_prepare_image() | |
| # Generate random layers | |
| random_layers = self.generate_random_layers() | |
| print(f"π¬ Generating video frames...") | |
| # Create temporary raw video file | |
| tmp_yuv = f'temp_parallax_video_{os.getpid()}.yuv' | |
| video_pipe = open(tmp_yuv, 'wb') | |
| try: | |
| for frame_index in range(self.total_frames): | |
| t = frame_index / self.fps | |
| # Apply rotation mapping to all layers | |
| rotated_layers = self.apply_rotation_mapping(random_layers, t) | |
| # Compose the final frame | |
| frame, layer_counts = self.compose_frame(source_image, rotated_layers, debug=(frame_index == 0)) | |
| # Debug: Print layer usage for first frame | |
| if frame_index == 0 and layer_counts is not None: | |
| print(f"π Frame 0 layer usage:") | |
| for i in range(self.grey_levels): | |
| if layer_counts[i] > 0: | |
| print(f" Grey level {i}: {layer_counts[i]:,} pixels") | |
| # Convert to BGR for video output (duplicate channels for greyscale) | |
| bgr_frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) | |
| # Write frame to pipe | |
| video_pipe.write(bgr_frame.astype(np.uint8).tobytes()) | |
| # Progress indicator | |
| if frame_index % (self.total_frames // 10) == 0: | |
| progress = (frame_index / self.total_frames) * 100 | |
| print(f"Progress: {progress:.1f}%") | |
| video_pipe.close() | |
| print("π₯ Encoding video with FFmpeg...") | |
| if self.for_x: | |
| # Twitter/X optimized encoding - try available encoders | |
| print("π± Using available encoder for Twitter/X compatibility...") | |
| # Try encoders that are likely available based on the FFmpeg config | |
| encoders_to_try = [ | |
| ('libopenh264', 'H.264 via OpenH264'), | |
| ('mpeg4', 'MPEG-4'), | |
| ('mpeg2video', 'MPEG-2'), | |
| ('mpeg1video', 'MPEG-1') | |
| ] | |
| success = False | |
| for encoder, desc in encoders_to_try: | |
| try: | |
| print(f"π Trying {desc} encoder ({encoder})...") | |
| # Use high quality settings for crisp pixels | |
| if encoder == 'libopenh264': | |
| cmd = [ | |
| 'ffmpeg', '-y', '-f', 'rawvideo', '-pix_fmt', 'bgr24', | |
| '-s', f'{self.width}x{self.height}', '-r', str(self.fps), | |
| '-i', tmp_yuv, '-c:v', encoder, '-pix_fmt', 'yuv420p', | |
| '-b:v', '8M', # High bitrate for crisp pixels | |
| '-maxrate', '10M', '-bufsize', '16M', | |
| '-movflags', '+faststart', str(self.output_path) | |
| ] | |
| elif encoder == 'mpeg4': | |
| cmd = [ | |
| 'ffmpeg', '-y', '-f', 'rawvideo', '-pix_fmt', 'bgr24', | |
| '-s', f'{self.width}x{self.height}', '-r', str(self.fps), | |
| '-i', tmp_yuv, '-c:v', encoder, '-pix_fmt', 'yuv420p', | |
| '-q:v', '1', # Highest quality for MPEG4 | |
| '-movflags', '+faststart', str(self.output_path) | |
| ] | |
| else: # mpeg2video, mpeg1video | |
| cmd = [ | |
| 'ffmpeg', '-y', '-f', 'rawvideo', '-pix_fmt', 'bgr24', | |
| '-s', f'{self.width}x{self.height}', '-r', str(self.fps), | |
| '-i', tmp_yuv, '-c:v', encoder, '-pix_fmt', 'yuv420p', | |
| '-b:v', '8M', # High bitrate | |
| '-movflags', '+faststart', str(self.output_path) | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| print(f"β {desc} encoding successful") | |
| success = True | |
| break | |
| except subprocess.CalledProcessError as e: | |
| print(f"β {desc} encoding failed") | |
| # Remove failed file if it exists | |
| if self.output_path.exists(): | |
| self.output_path.unlink() | |
| continue | |
| if not success: | |
| print("π All H.264-compatible encoders failed, falling back to MPEG4...") | |
| self.for_x = False | |
| if not self.for_x: | |
| # High quality encoding using available encoders | |
| print("π₯ Using available encoder for high quality...") | |
| # Try encoders that are available based on the FFmpeg config | |
| encoders_to_try = [ | |
| ('mpeg4', 'MPEG-4', ['-q:v', '1']), # Highest quality | |
| ('mpeg2video', 'MPEG-2', ['-b:v', '10M', '-maxrate', '12M']), # Very high bitrate | |
| ('mpeg1video', 'MPEG-1', ['-b:v', '8M']), # High bitrate | |
| ('huffyuv', 'HuffYUV (lossless)', []), # Lossless option | |
| ('ffv1', 'FFV1 (lossless)', ['-level', '3']) # Lossless with compression | |
| ] | |
| success = False | |
| for encoder, desc, extra_args in encoders_to_try: | |
| try: | |
| print(f"π Trying {desc} encoder ({encoder})...") | |
| cmd = [ | |
| 'ffmpeg', | |
| '-y', # Overwrite output file | |
| '-f', 'rawvideo', | |
| '-pix_fmt', 'bgr24', | |
| '-s', f'{self.width}x{self.height}', | |
| '-r', str(self.fps), | |
| '-i', tmp_yuv, | |
| '-c:v', encoder | |
| ] | |
| # Add pixel format only for non-lossless codecs | |
| if encoder not in ['huffyuv', 'ffv1']: | |
| cmd.extend(['-pix_fmt', 'yuv420p']) | |
| cmd.extend(extra_args) | |
| cmd.extend(['-movflags', '+faststart', str(self.output_path)]) | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| print(f"β {desc} encoding successful") | |
| success = True | |
| break | |
| except subprocess.CalledProcessError as e: | |
| print(f"β {desc} encoding failed") | |
| # Remove failed file if it exists | |
| if self.output_path.exists(): | |
| self.output_path.unlink() | |
| continue | |
| if not success: | |
| raise RuntimeError("All available video encoders failed") | |
| # Validate the output file | |
| if not self.output_path.exists() or self.output_path.stat().st_size == 0: | |
| raise RuntimeError(f"Video encoding failed - output file is missing or empty: {self.output_path}") | |
| print(f"β Video saved as '{self.output_path}' ({self.output_path.stat().st_size:,} bytes)") | |
| # Quick validation with ffprobe | |
| try: | |
| result = subprocess.run([ | |
| 'ffprobe', '-v', 'quiet', '-print_format', 'json', | |
| '-show_format', '-show_streams', str(self.output_path) | |
| ], capture_output=True, text=True, check=True) | |
| if result.stdout.strip() == '{}' or not result.stdout.strip(): | |
| print("β οΈ Warning: Video file may be corrupted (ffprobe returned empty result)") | |
| else: | |
| print("β Video file validation passed") | |
| except subprocess.CalledProcessError: | |
| print("β οΈ Warning: Could not validate video file with ffprobe") | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(tmp_yuv): | |
| os.remove(tmp_yuv) | |
| def get_info(self): | |
| """Return information about the generator configuration""" | |
| return { | |
| 'input_path': str(self.input_path), | |
| 'output_path': str(self.output_path), | |
| 'dimensions': f"{self.width}x{self.height}", | |
| 'fps': self.fps, | |
| 'duration': self.duration, | |
| 'total_frames': self.total_frames, | |
| 'grey_levels': self.grey_levels, | |
| 'motion_direction': 'leftward' if self.motion_direction == -1 else 'rightward' | |
| } | |
| def main(): | |
| """Main function with argument parsing for command-line usage""" | |
| parser = argparse.ArgumentParser( | |
| description="Generate parallax motion masking videos from images", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python parallax.py --input yacht.jpeg --output yacht_parallax.mp4 | |
| python parallax.py --input image.png --output video.mp4 --output-width 640 --output-height 480 | |
| python parallax.py --input photo.jpg --output result.mp4 --fps 30 --duration 5 --inverse | |
| python parallax.py --input social.jpg --output social.mp4 --for-x | |
| """ | |
| ) | |
| # Required arguments | |
| parser.add_argument('--input', '-i', required=True, | |
| help='Input image file path') | |
| parser.add_argument('--output', '-o', required=True, | |
| help='Output video file path') | |
| # Optional arguments | |
| parser.add_argument('--output-width', type=int, default=None, | |
| help='Output video width (if only width specified, height calculated proportionally)') | |
| parser.add_argument('--output-height', type=int, default=None, | |
| help='Output video height (if only height specified, width calculated proportionally)') | |
| parser.add_argument('--fps', type=int, default=20, | |
| help='Frames per second (default: 20)') | |
| parser.add_argument('--duration', type=int, default=10, | |
| help='Video duration in seconds (default: 10)') | |
| parser.add_argument('--grey-levels', type=int, default=32, | |
| help='Number of grey levels to quantize to (default: 32)') | |
| parser.add_argument('--motion-direction', choices=['left', 'right'], default='right', | |
| help='Motion direction (default: right)') | |
| parser.add_argument('--inverse', action='store_true', | |
| help='Inverse motion: lighter pixels move less, darker pixels move more') | |
| parser.add_argument('--for-x', action='store_true', | |
| help='Optimize encoding for Twitter/X upload (uses H.264 instead of MPEG4)') | |
| args = parser.parse_args() | |
| # Convert motion direction | |
| motion_direction = -1 if args.motion_direction == 'left' else 1 | |
| try: | |
| # Create and run the generator | |
| generator = ParallaxMotionGenerator( | |
| input_path=args.input, | |
| output_path=args.output, | |
| width=args.output_width, | |
| height=args.output_height, | |
| fps=args.fps, | |
| duration=args.duration, | |
| grey_levels=args.grey_levels, | |
| motion_direction=motion_direction, | |
| inverse=args.inverse, | |
| for_x=args.for_x | |
| ) | |
| print("\nπ Starting parallax motion video generation...") | |
| print("π Concept: The input image will only be visible through") | |
| print(" the differential motion of random noise layers!") | |
| print() | |
| generator.generate_video() | |
| print() | |
| print("π― Done! The image should be visible through motion parallax.") | |
| print("πΈ Individual frames will show only random black/white noise,") | |
| print(" but the moving video reveals the image through layer motion!") | |
| except Exception as e: | |
| print(f"β Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment