Skip to content

Instantly share code, notes, and snippets.

@xram64
Last active March 14, 2025 00:07
Show Gist options
  • Select an option

  • Save xram64/d1f1c2acb7c1beb73361b373cac6d725 to your computer and use it in GitHub Desktop.

Select an option

Save xram64/d1f1c2acb7c1beb73361b373cac6d725 to your computer and use it in GitHub Desktop.
Script to batch resize and sort an image set, and combine frames into an MP4 using FFMPEG.
from os import path
from sys import argv
from pathlib import Path
from enum import Enum
from tqdm import tqdm
from numpy import ndarray, asarray
from PIL import Image
import imageio.v3 as iio
import imageio_ffmpeg as iio_ffmpeg
#| ~~ ImageFramesToVideo ~~
#| A tool to batch resize and sort an image set, and combine the frames into an MP4 video file using FFMPEG.
#| Originally made to combine timelapse screenshots from Factorio.
#| xram | v1 (3/12/25)
# Usage: `python ImageFramesToVideo.py <input_dir_path> <output_path>`
# `framesByName` :: dict[str, ndarray] :: Dict mapping filenames to image arrays
# `frames` :: list[ndarray] :: Ordered list of image arrays
class SortMethod(Enum):
Alphabetical = 0
Numerical = 1
# Define FFMPEG options
FFMPEG_OPTIONS = {
'pix_fmt_in': None,
'pix_fmt_out': None,
'fps': None,
'quality': None,
'bitrate': None,
'codec': None,
'macro_block_size': None,
'ffmpeg_log_level': None,
'ffmpeg_timeout': None,
'input_params': None,
'output_params': None,
'audio_path': None,
'audio_codec': None,
}
#region Functions
def getFrames(path_to_frames_dir: str, file_types: str = ['.png']) -> dict[str, ndarray]:
"""
Make a dict mapping filenames to image arrays for all image files in a directory.
:param path_to_frames_dir: Path to directory containing image files.
:param file_types: List of file extensions (including '.') to include.
:returns: Dict mapping filenames to `ndarray` images.
"""
print('Loading frames...')
framesByName: dict[str, ndarray] = dict()
# Iterate through all image files in folder
for file in tqdm(Path(path_to_frames_dir).iterdir(), desc='Found', unit=' frames'):
# Only collect files of the specified type(s)
if (not file.is_file()) or (not path.splitext(file)[1] in file_types):
continue
# Save image ndarray in dict, indexed by filename (with extension)
framesByName[path.basename(file)] = iio.imread(file)
return framesByName
def sortFrames(framesByName: dict[str, ndarray], method = SortMethod.Alphabetical, descending: bool = False) -> list[ndarray]:
"""
Sort the frame dict by the filename of each image, and return a sorted list of only the frames.
:param framesByName: Dict mapping filenames to `ndarray` images.
:param method: Sorting method. Options are 'SortMethod.Alphabetical' or 'SortMethod.Numerical'.
:param descending: Flag to sort in descending order instead of ascending.
:returns: Ordered list of `ndarray` images.
"""
print('Sorting frames...')
frame_filenames = list(framesByName.keys())
# Sort a list of the image filenames alphabetically (ascending by default)
if method == SortMethod.Alphabetical:
frame_filenames.sort(reverse=descending)
# Sort a list of the image filenames numerically (ascending by default)
if method == SortMethod.Numerical:
# Sort string filenames by their int equivalents.
# Ignores file extensions (including '.') and leading 0's (i.e. '00123'='123').
try:
frame_filenames.sort(key=lambda x: int(path.splitext(x)[0]), reverse=descending)
except:
print("Error: Cannot sort numerically because not all file names are integers.")
return None
# Construct a list of image ndarrays by pulling them from the dict in their sorted filename order
sorted_frames: list[ndarray] = []
for sorted_name in frame_filenames:
sorted_frames.append(framesByName[sorted_name])
return sorted_frames
def resizeFrames(frames: list[ndarray], target_size: tuple[int], resampling_method = Image.Resampling.BICUBIC) -> list[ndarray]:
"""
Resize each image in a list of image arrays to uniform dimensions.
:param frames: Ordered list of `ndarray` images.
:param target_size: New image dimensions as a (width, height) tuple.
:param resampling_method: Resampling filter to use, from `PIL.Image.Resampling`.
:returns: Ordered list of the resized `ndarray` images.
"""
print('Resizing frames...')
resized_frames: list[ndarray] = []
# Resize each frame
for frame in tqdm(frames, unit=' frames'):
resized_frame = Image.fromarray(frame).resize(target_size, resampling_method)
resized_frame = asarray(resized_frame) # Convert image data back into ndarray
resized_frames.append(resized_frame)
return resized_frames
def writeVideoWithFFMPEG(frames: list[ndarray], video_output_path: str, video_size: tuple[int], options: dict[str, any] = FFMPEG_OPTIONS) -> dict[str, any]:
"""
Write processed frames to an MP4 video file using FFMPEG.
:param frames: Ordered list of `ndarray` images.
:param video_output_path: Path to output video file.
:param video_size: Size of video as a (width, height) tuple.
:param options: Optional dict of FFMPEG options (overrides defaults).
:returns: Status info as a dict.
"""
print('Writing to video file...')
# Verify uniform size of video frames. Note `video_size` is (width)x(height), but frame dimensions are (height)x(width).
video_size_ndarray_order = (video_size[1], video_size[0])
for frame in frames:
if not frame.shape[0:2] == video_size_ndarray_order:
return {'success': False, 'fail_reason': 'Frames are not all the same size.'}
# Override default options (merge dicts)
options = {**FFMPEG_OPTIONS, **options}
# Initialize the ffmpeg generator
video = iio_ffmpeg.write_frames(
path=video_output_path,
size=video_size,
pix_fmt_in=options['pix_fmt_in'],
pix_fmt_out=options['pix_fmt_out'],
fps=options['fps'],
quality=options['quality'],
bitrate=options['bitrate'],
codec=options['codec'],
macro_block_size=options['macro_block_size'],
ffmpeg_log_level=options['ffmpeg_log_level'],
ffmpeg_timeout=options['ffmpeg_timeout'],
input_params=options['input_params'],
output_params=options['output_params'],
audio_path=options['audio_path'],
audio_codec=options['audio_codec'],
)
# Seed the generator
video.send(None)
# Write frames to video file
for frame in tqdm(frames, unit=' frames'):
video.send(frame)
video.close()
return {'success': True}
#region Main
if __name__ == '__main__':
# Get paths from command line args
if len(argv) >= 2:
input_dir_path = argv[1]
output_path = argv[2]
# FFMPEG options:
# - pix_fmt_in ['rbg24'] :: pixel format of incoming frames ('gray', 'gray8a', 'rgb24', or 'rgba')
# - pix_fmt_out ['yuv420p'] :: pixel format of written frames
# - fps [16] :: frames per second
# - quality [5] :: quality level between 0 and 10 (ignored if bitrate is given)
# - bitrate [None] :: video bitrate
# - codec ['libx264'] :: video codec
# - macro_block_size [16] :: macro block size
# - ffmpeg_log_level ['warning'] :: logging level
# - ffmpeg_timeout [None] :: timeout in seconds to wait for ffmpeg process to finish
# - input_params [None] :: additional ffmpeg input command line parameters
# - output_params [None] :: additional ffmpeg output command line parameters
# - audio_path [None] :: input file path for encoding with an audio stream
# - audio_codec [None] :: audio codec to use if `audio_path` is provided
options = dict()
# Set paths (if not provided in args)
#input_dir_path = ""
#output_path = ""
# Set options
video_size = (1920, 1080)
options['fps'] = 15
options['macro_block_size'] = 8 # (to allow for 1920x1080 size)
# Process frames and write video file
framesByName = getFrames(input_dir_path)
frames = sortFrames(framesByName, method=SortMethod.Numerical)
frames = resizeFrames(frames, video_size, resampling_method=Image.Resampling.HAMMING)
result = writeVideoWithFFMPEG(frames, output_path, video_size, options)
if result['success']:
print(f"Video successfully written to '{path.abspath(output_path)}'!")
else:
print(f"Error: {result['fail_reason']}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment