Last active
March 14, 2025 00:07
-
-
Save xram64/d1f1c2acb7c1beb73361b373cac6d725 to your computer and use it in GitHub Desktop.
Script to batch resize and sort an image set, and combine frames into an MP4 using FFMPEG.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from os import path | |
| from sys import argv | |
| from pathlib import Path | |
| from enum import Enum | |
| from tqdm import tqdm | |
| from numpy import ndarray, asarray | |
| from PIL import Image | |
| import imageio.v3 as iio | |
| import imageio_ffmpeg as iio_ffmpeg | |
| #| ~~ ImageFramesToVideo ~~ | |
| #| A tool to batch resize and sort an image set, and combine the frames into an MP4 video file using FFMPEG. | |
| #| Originally made to combine timelapse screenshots from Factorio. | |
| #| xram | v1 (3/12/25) | |
| # Usage: `python ImageFramesToVideo.py <input_dir_path> <output_path>` | |
| # `framesByName` :: dict[str, ndarray] :: Dict mapping filenames to image arrays | |
| # `frames` :: list[ndarray] :: Ordered list of image arrays | |
| class SortMethod(Enum): | |
| Alphabetical = 0 | |
| Numerical = 1 | |
| # Define FFMPEG options | |
| FFMPEG_OPTIONS = { | |
| 'pix_fmt_in': None, | |
| 'pix_fmt_out': None, | |
| 'fps': None, | |
| 'quality': None, | |
| 'bitrate': None, | |
| 'codec': None, | |
| 'macro_block_size': None, | |
| 'ffmpeg_log_level': None, | |
| 'ffmpeg_timeout': None, | |
| 'input_params': None, | |
| 'output_params': None, | |
| 'audio_path': None, | |
| 'audio_codec': None, | |
| } | |
| #region Functions | |
| def getFrames(path_to_frames_dir: str, file_types: str = ['.png']) -> dict[str, ndarray]: | |
| """ | |
| Make a dict mapping filenames to image arrays for all image files in a directory. | |
| :param path_to_frames_dir: Path to directory containing image files. | |
| :param file_types: List of file extensions (including '.') to include. | |
| :returns: Dict mapping filenames to `ndarray` images. | |
| """ | |
| print('Loading frames...') | |
| framesByName: dict[str, ndarray] = dict() | |
| # Iterate through all image files in folder | |
| for file in tqdm(Path(path_to_frames_dir).iterdir(), desc='Found', unit=' frames'): | |
| # Only collect files of the specified type(s) | |
| if (not file.is_file()) or (not path.splitext(file)[1] in file_types): | |
| continue | |
| # Save image ndarray in dict, indexed by filename (with extension) | |
| framesByName[path.basename(file)] = iio.imread(file) | |
| return framesByName | |
| def sortFrames(framesByName: dict[str, ndarray], method = SortMethod.Alphabetical, descending: bool = False) -> list[ndarray]: | |
| """ | |
| Sort the frame dict by the filename of each image, and return a sorted list of only the frames. | |
| :param framesByName: Dict mapping filenames to `ndarray` images. | |
| :param method: Sorting method. Options are 'SortMethod.Alphabetical' or 'SortMethod.Numerical'. | |
| :param descending: Flag to sort in descending order instead of ascending. | |
| :returns: Ordered list of `ndarray` images. | |
| """ | |
| print('Sorting frames...') | |
| frame_filenames = list(framesByName.keys()) | |
| # Sort a list of the image filenames alphabetically (ascending by default) | |
| if method == SortMethod.Alphabetical: | |
| frame_filenames.sort(reverse=descending) | |
| # Sort a list of the image filenames numerically (ascending by default) | |
| if method == SortMethod.Numerical: | |
| # Sort string filenames by their int equivalents. | |
| # Ignores file extensions (including '.') and leading 0's (i.e. '00123'='123'). | |
| try: | |
| frame_filenames.sort(key=lambda x: int(path.splitext(x)[0]), reverse=descending) | |
| except: | |
| print("Error: Cannot sort numerically because not all file names are integers.") | |
| return None | |
| # Construct a list of image ndarrays by pulling them from the dict in their sorted filename order | |
| sorted_frames: list[ndarray] = [] | |
| for sorted_name in frame_filenames: | |
| sorted_frames.append(framesByName[sorted_name]) | |
| return sorted_frames | |
| def resizeFrames(frames: list[ndarray], target_size: tuple[int], resampling_method = Image.Resampling.BICUBIC) -> list[ndarray]: | |
| """ | |
| Resize each image in a list of image arrays to uniform dimensions. | |
| :param frames: Ordered list of `ndarray` images. | |
| :param target_size: New image dimensions as a (width, height) tuple. | |
| :param resampling_method: Resampling filter to use, from `PIL.Image.Resampling`. | |
| :returns: Ordered list of the resized `ndarray` images. | |
| """ | |
| print('Resizing frames...') | |
| resized_frames: list[ndarray] = [] | |
| # Resize each frame | |
| for frame in tqdm(frames, unit=' frames'): | |
| resized_frame = Image.fromarray(frame).resize(target_size, resampling_method) | |
| resized_frame = asarray(resized_frame) # Convert image data back into ndarray | |
| resized_frames.append(resized_frame) | |
| return resized_frames | |
| def writeVideoWithFFMPEG(frames: list[ndarray], video_output_path: str, video_size: tuple[int], options: dict[str, any] = FFMPEG_OPTIONS) -> dict[str, any]: | |
| """ | |
| Write processed frames to an MP4 video file using FFMPEG. | |
| :param frames: Ordered list of `ndarray` images. | |
| :param video_output_path: Path to output video file. | |
| :param video_size: Size of video as a (width, height) tuple. | |
| :param options: Optional dict of FFMPEG options (overrides defaults). | |
| :returns: Status info as a dict. | |
| """ | |
| print('Writing to video file...') | |
| # Verify uniform size of video frames. Note `video_size` is (width)x(height), but frame dimensions are (height)x(width). | |
| video_size_ndarray_order = (video_size[1], video_size[0]) | |
| for frame in frames: | |
| if not frame.shape[0:2] == video_size_ndarray_order: | |
| return {'success': False, 'fail_reason': 'Frames are not all the same size.'} | |
| # Override default options (merge dicts) | |
| options = {**FFMPEG_OPTIONS, **options} | |
| # Initialize the ffmpeg generator | |
| video = iio_ffmpeg.write_frames( | |
| path=video_output_path, | |
| size=video_size, | |
| pix_fmt_in=options['pix_fmt_in'], | |
| pix_fmt_out=options['pix_fmt_out'], | |
| fps=options['fps'], | |
| quality=options['quality'], | |
| bitrate=options['bitrate'], | |
| codec=options['codec'], | |
| macro_block_size=options['macro_block_size'], | |
| ffmpeg_log_level=options['ffmpeg_log_level'], | |
| ffmpeg_timeout=options['ffmpeg_timeout'], | |
| input_params=options['input_params'], | |
| output_params=options['output_params'], | |
| audio_path=options['audio_path'], | |
| audio_codec=options['audio_codec'], | |
| ) | |
| # Seed the generator | |
| video.send(None) | |
| # Write frames to video file | |
| for frame in tqdm(frames, unit=' frames'): | |
| video.send(frame) | |
| video.close() | |
| return {'success': True} | |
| #region Main | |
| if __name__ == '__main__': | |
| # Get paths from command line args | |
| if len(argv) >= 2: | |
| input_dir_path = argv[1] | |
| output_path = argv[2] | |
| # FFMPEG options: | |
| # - pix_fmt_in ['rbg24'] :: pixel format of incoming frames ('gray', 'gray8a', 'rgb24', or 'rgba') | |
| # - pix_fmt_out ['yuv420p'] :: pixel format of written frames | |
| # - fps [16] :: frames per second | |
| # - quality [5] :: quality level between 0 and 10 (ignored if bitrate is given) | |
| # - bitrate [None] :: video bitrate | |
| # - codec ['libx264'] :: video codec | |
| # - macro_block_size [16] :: macro block size | |
| # - ffmpeg_log_level ['warning'] :: logging level | |
| # - ffmpeg_timeout [None] :: timeout in seconds to wait for ffmpeg process to finish | |
| # - input_params [None] :: additional ffmpeg input command line parameters | |
| # - output_params [None] :: additional ffmpeg output command line parameters | |
| # - audio_path [None] :: input file path for encoding with an audio stream | |
| # - audio_codec [None] :: audio codec to use if `audio_path` is provided | |
| options = dict() | |
| # Set paths (if not provided in args) | |
| #input_dir_path = "" | |
| #output_path = "" | |
| # Set options | |
| video_size = (1920, 1080) | |
| options['fps'] = 15 | |
| options['macro_block_size'] = 8 # (to allow for 1920x1080 size) | |
| # Process frames and write video file | |
| framesByName = getFrames(input_dir_path) | |
| frames = sortFrames(framesByName, method=SortMethod.Numerical) | |
| frames = resizeFrames(frames, video_size, resampling_method=Image.Resampling.HAMMING) | |
| result = writeVideoWithFFMPEG(frames, output_path, video_size, options) | |
| if result['success']: | |
| print(f"Video successfully written to '{path.abspath(output_path)}'!") | |
| else: | |
| print(f"Error: {result['fail_reason']}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment