Last active
July 4, 2024 02:12
-
-
Save luuil/183e4d92275e3d6641b6728a988bb38b to your computer and use it in GitHub Desktop.
Merging multiple videos/images/empty_frames into one grid video by opencv, PIL and numpy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import os | |
import numpy as np | |
from PIL import Image, ImageFont, ImageDraw | |
from typing import Tuple, Optional, Union, List | |
from dataclasses import dataclass, InitVar | |
import sys | |
from pathlib import Path | |
from enum import Enum | |
import logging | |
import skvideo.io | |
logging.basicConfig(level=logging.DEBUG, | |
format= '[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s', | |
datefmt='%H:%M:%S') | |
@dataclass | |
class MyImageFont: | |
font: Optional[ImageFont.FreeTypeFont] = None | |
font_color: Union[Tuple[int, int, int], Tuple[int, int, int, int]] = (0, 0, 255, 0), # bgr or bgra | |
path: InitVar[str] = 'simsun.ttc' | |
size: InitVar[int] = 30 # in pixels | |
def __post_init__(self, path, size): | |
self.font = ImageFont.truetype(path, size) if path else None | |
@dataclass | |
class MyImageText: | |
content: str | |
xy: Tuple[int, int] = 0, 0 # Coordinates for placing text on the image | |
image_size: InitVar[Tuple[int, int]] = 0, 0 # only for init | |
xy_ratio: InitVar[Tuple[float, float]] = 0., 0. # only for init | |
def __post_init__(self, image_size, xy_ratio): | |
w, h = image_size | |
w_scale, h_scale = xy_ratio | |
self.xy = int(w * w_scale), int(h * h_scale) | |
def put_text(img: np.ndarray, | |
text: MyImageText, | |
font: MyImageFont | |
) -> np.ndarray: | |
img_pil = Image.fromarray(img) | |
draw = ImageDraw.Draw(img_pil) | |
draw.text(text.xy, text.content, font=font.font, fill=font.font_color) | |
return np.array(img_pil) | |
def resize_with_pad(img: np.ndarray, size: Tuple[int, int], fill=(255, 255, 255)) -> np.ndarray: | |
old_image_height, old_image_width, channels = img.shape | |
new_image_width, new_image_height = size | |
fx = fy = min(new_image_height/old_image_height, new_image_width/old_image_width) | |
img = cv2.resize(img, (0,0), fx=fx, fy=fy) | |
old_image_height, old_image_width, channels = img.shape | |
# create new image of desired size and color (blue) for padding | |
fill = fill if channels == 3 else (*fill, 255) # RGB to RGBA | |
result = np.full((new_image_height,new_image_width, channels), fill, dtype=np.uint8) | |
# compute center offset | |
x_center = (new_image_width - old_image_width) // 2 | |
y_center = (new_image_height - old_image_height) // 2 | |
# copy img image into center of result image | |
result[y_center:y_center+old_image_height, x_center:x_center+old_image_width] = img | |
return result | |
def is_image_file(path: str) -> bool: | |
if not path: return False | |
p = Path(path) | |
return p.is_file() and (p.suffix in ('.png', '.jpg', '.jpeg')) | |
def is_video_file(path: str) -> bool: | |
if not path: return False | |
p = Path(path) | |
return p.is_file() and (p.suffix in ('.mp4', '.MP4', '.mkv', '.mov', '.MOV')) | |
class FileType(Enum): | |
Empty = 0 | |
Image = 1 | |
Video = 2 | |
class Layout(Enum): | |
Horizontal = 0 | |
Vertical = 1 | |
def layout(size: Tuple[int, int]) -> Layout: | |
w, h = size | |
return Layout.Horizontal if w >= h else Layout.Vertical | |
def resolution(layout: Layout) -> Tuple[int, int]: | |
if layout == Layout.Horizontal: | |
return (1920, 1080) | |
return (1920, 1080) | |
elif layout == Layout.Vertical: | |
return (1080, 1920) | |
return (1080, 1920) | |
else: | |
raise NotImplemented(f'{layout} not support!') | |
def crop_image(img: np.ndarray, size: Union[Tuple[int, int], Tuple[int,int,int,int], None]) -> np.ndarray: | |
if size is None: return img | |
im = Image.fromarray(img) | |
width, height = im.size # Get dimensions | |
if len(size) == 2: # center crop | |
new_width, new_height = size | |
left = (width - new_width)/2 | |
upper = (height - new_height)/2 | |
right = (width + new_width)/2 | |
lower = (height + new_height)/2 | |
elif len(size) == 4: | |
left, upper, right, lower = size | |
if all(0<=v<=1.0 for v in size): # left, upper, right, lower values in ratio | |
left *= width | |
upper *= height | |
right *= width | |
lower *= height | |
# Crop the image | |
# left,upper --- | |
# | | | |
# --- right,lower | |
im = im.crop((left, upper, right, lower)) | |
return np.asarray(im) | |
class FrameGenerator(object): | |
def __init__(self, path, layout=Layout.Horizontal, default_size=None, frame_range=None, text: str=None, text_xy=(0.1, 0.1), font: MyImageFont=MyImageFont(), debug=False): | |
if not path or not os.path.exists(path): | |
logging.warning(f'Not exists: {path}.') | |
self._ftype = FileType.Empty | |
self._vc = None | |
self.size = sys.maxsize, sys.maxsize | |
self.fps = 30 | |
self.total_frames = sys.maxsize | |
elif is_image_file(path): | |
logging.info(f'Image frame: {path}.') | |
self._ftype = FileType.Image | |
self._vc = None | |
self._img = cv2.imread(path, cv2.IMREAD_COLOR) | |
# self._img = resize_with_pad(self._img, default_size if default_size else resolution(layout)) | |
self._img = resize_with_pad(self._img, resolution(layout)) | |
# self._img = resize_with_pad(self._img, default_size if default_size else resolution(layout)) | |
self._img = resize_with_pad(self._img, resolution(layout)) | |
self.size = self._img.shape[1], self._img.shape[0] | |
self.fps = 30 | |
self.total_frames = sys.maxsize | |
else: | |
logging.info(f'Video frame: {path}.') | |
self._ftype = FileType.Video | |
self._p = path | |
self._vc = cv2.VideoCapture(self._p) | |
self.size = int(self._vc.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._vc.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.fps = int(self._vc.get(cv2.CAP_PROP_FPS)) | |
self.total_frames = int(self._vc.get(cv2.CAP_PROP_FRAME_COUNT)) | |
self._start = 0 | |
self._count = self.total_frames | |
self._debug = debug | |
if frame_range is not None: | |
self.set_frames_range(frame_range) | |
self._text = text | |
self._text_xy = text_xy | |
self._font = font | |
if self._debug: | |
logging.debug(f"video size( W x H ) : {self.size[0]} x {self.size[1]}") | |
def __del__(self): | |
self.release() | |
def set_frames_range(self, frame_range=None): | |
if self._ftype != FileType.Video: return | |
if frame_range is None: | |
self._start = 0 | |
self._count = self.total_frames | |
else: | |
assert isinstance(frame_range, (list, tuple, range)) | |
if isinstance(frame_range, (list, tuple)): | |
assert len(frame_range) == 2 | |
start, end = frame_range[0], frame_range[-1] | |
if end is None \ | |
or end == -1 \ | |
or end >= self.total_frames: | |
end = self.total_frames | |
assert end >= start | |
self._start = start | |
self._count = end - start | |
assert self._count <= self.total_frames | |
self._vc.set(cv2.CAP_PROP_POS_FRAMES, self._start) | |
def extract(self, path=None, bgr2rgb=False, target_size=None, fill=(0,0,0), crop_size=None): | |
if path is not None and not os.path.exists(path): | |
os.makedirs(path) | |
for i in range(0, self._count): | |
if self._ftype == FileType.Video: | |
success, frame = self._vc.read() | |
elif self._ftype == FileType.Image: | |
success, frame = True, self._img | |
else: | |
assert target_size, '`target_size` must be set to produce an empty frame when input video path is None/NotExists' | |
self.size = target_size | |
success, frame = False, None | |
if not success: | |
frame = np.zeros((self.size[1], self.size[0], 3), dtype=np.uint8) | |
frame[:] = fill | |
if self._debug: | |
logging.debug(f"frame {self._start + i}") | |
if path is not None: | |
cv2.imwrite(os.path.join(path, f"{self._start + i}.png"), frame, [cv2.IMWRITE_PNG_COMPRESSION, 0]) | |
if bgr2rgb: | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
if crop_size: | |
frame = crop_image(frame, crop_size) | |
if target_size is not None: | |
assert len(target_size) == 2 | |
assert isinstance(target_size, (list, tuple)) | |
frame = cv2.resize(frame, tuple(target_size)) | |
if self._text: | |
text = MyImageText(self._text, image_size=self.size, xy_ratio=self._text_xy) if self._text else None | |
frame = put_text(frame, text, self._font) | |
yield frame | |
def release(self): | |
if self._vc is not None: | |
self._vc.release() | |
def create_image_grid(images, grid_size=None): | |
assert images.ndim == 3 or images.ndim == 4 | |
num, img_w, img_h = images.shape[0], images.shape[-2], images.shape[-3] | |
if grid_size is not None: | |
grid_h, grid_w = tuple(grid_size) | |
else: | |
grid_w = max(int(np.ceil(np.sqrt(num))), 1) | |
grid_h = max((num - 1) // grid_w + 1, 1) | |
grid = np.zeros([grid_h * img_h, grid_w * img_w] + list(images.shape[-1:]), dtype=images.dtype) | |
for idx in range(num): | |
x = (idx % grid_w) * img_w | |
y = (idx // grid_w) * img_h | |
grid[y : y + img_h, x : x + img_w, ...] = images[idx] | |
return grid | |
def crop2shape(size: Tuple[int, int], crop_size: Union[Tuple[int, int], Tuple[int,int,int,int]]) -> Tuple[int, int]: | |
width, height = size | |
if len(crop_size) == 2: | |
return crop_size | |
elif len(crop_size) == 4: | |
left, upper, right, lower = crop_size | |
if all(0<=v<=1.0 for v in crop_size): # left, upper, right, lower values in ratio | |
left *= width | |
upper *= height | |
right *= width | |
lower *= height | |
return int(right-left), int(lower-upper) | |
else: | |
raise NotImplementedError("not support") | |
def merge_videos(videos_in: List[str], | |
video_out: str, | |
grid_size=None, | |
titles: List[str] = None, | |
title_position: Tuple[float, float] = (0.5, 0.), | |
font_size: int = 50, | |
font_color: Union[Tuple[int, int, int], Tuple[int, int, int, int]] = (0, 0, 255, 0), | |
frame_fill_color: Tuple[int, int, int] = (0,0,0), | |
max_frames: int = None, | |
crop_size: Union[Tuple[int, int], Tuple[int,int,int,int], None]=None) -> None: | |
""" | |
Args: | |
videos_in: List/Tuple | |
List of input video paths. e.g. | |
('path/to/v1.mp4', 'path/to/v2.mp4', 'path/to/v3.mp4') | |
video_out: String | |
Path of output video. e.g. | |
'path/to/output.mp4' | |
grid_size: List/Tuple. | |
Row and Column respectively. e.g. | |
(1, 3) | |
titles: List/Tuple | |
The title of each video will be displayed in the video grid, | |
the same length as the input video. e.g. | |
('v1', 'v2', 'v3') | |
title_position: List/Tuple | |
The position(width and height) where the title is displayed, and the value range is (0, 1). | |
e.g. If we want display text in the center of the video, the position is | |
(0.5, 0.5) | |
font_size: int | |
Size(in pixels) of the text. e.g. | |
50 | |
font_color: Tuple | |
Color of the text, `b,g,r,a` or `b,g,r`. e.g. | |
0,0,255,0 # red | |
255,0,0 # blue | |
frame_fill_color: Tuple | |
Color of the empty frame, `b,g,r`. e.g. | |
0,0,255 # red | |
max_frames: Int | |
Maximum number of frames per input video will be merge, e.g. | |
200 | |
Default `None`, means will process all frames. | |
crop_size: Union[Tuple[int, int], Tuple[int,int,int,int], None] | |
Crop size, 2 or 4 args e.g. | |
(w,h) center crop OR (left, upper, right, lower) | |
Default `None`, means no crop. | |
Returns: | |
None | |
""" | |
if os.path.exists(video_out): | |
logging.warning(f'{video_out} already exists!') | |
return | |
if any(videos_in) is False: | |
logging.error(f'All inputs are None: {videos_in}') | |
return | |
if titles is None: | |
texts = [None] * len(videos_in) | |
elif len(titles) < len(videos_in): | |
texts = titles + [None] * (len(videos_in) - len(titles)) | |
else: | |
texts = titles[:len(videos_in)] | |
assert len(videos_in) == len(texts) | |
dir_name = os.path.dirname(video_out) | |
os.makedirs(dir_name, exist_ok=True) | |
video_handles = [(i, FrameGenerator(v, text=text, text_xy=title_position, font=MyImageFont(size=font_size, font_color=font_color))) for i, (v, text) in enumerate(zip(videos_in, texts)) if is_video_file(v)] | |
if video_handles: | |
least_size = min(e.size for _i, e in video_handles) # all with same size WH | |
if crop_size is not None: | |
least_size = crop2shape(least_size, crop_size) | |
least_frames = max_frames if max_frames else min(e.total_frames for _i, e in video_handles) # all with same number of frames | |
fps = video_handles[0][1].fps # use the fps of first video | |
video_layout = layout(video_handles[0][1].size) | |
else: | |
img_paths = [p for p in videos_in if is_image_file(p)] | |
img = cv2.imread(img_paths[0], cv2.IMREAD_COLOR) | |
least_size = img.shape[1], img.shape[0] # all with same size WH | |
if crop_size is not None: | |
least_size = crop2shape(least_size, crop_size) | |
video_layout = layout(least_size) | |
least_frames = max_frames if max_frames else 2 # all with same number of frames | |
fps = 30 # use the fps of first video | |
logging.debug(f'Video info: {least_size}(size), {video_layout}, {least_frames}(nframes), {fps}(fps)') | |
frame_handles = [(i, FrameGenerator(v, layout=video_layout, default_size=least_size, text=text, text_xy=title_position, font=MyImageFont(size=font_size, font_color=font_color))) for i, (v, text) in enumerate(zip(videos_in, texts)) if not is_video_file(v)] | |
handles = sorted(video_handles + frame_handles, key=lambda item: item[0]) # keep sort | |
generators = [e.extract(target_size=least_size, fill=frame_fill_color, crop_size=crop_size) for _i, e in handles] | |
# read one frame and resize for each generator, then get the output video size | |
cur_frames = [next(g) for g in generators] | |
frames_grid = create_image_grid(np.array(cur_frames), grid_size=grid_size) # HWC | |
# out_size = frames_grid.shape[1], frames_grid.shape[0] # HWC to WH, as VideoWriter need that format | |
# video_writer = cv2.VideoWriter(video_out, cv2.VideoWriter_fourcc(*'mp4v'), fps, out_size) | |
""" | |
ref https://gist.github.com/docPhil99/a612c355cd31e69a0d3a6d2f87bfde8b | |
preset options: | |
- ultrafast | |
- superfast | |
- veryfast | |
- faster | |
- fast | |
- medium (default preset) | |
- slow | |
- slower | |
- veryslow | |
""" | |
video_writer = skvideo.io.FFmpegWriter(video_out, outputdict={ | |
'-vcodec': 'libx264', #use the h.264 codec | |
'-crf': '0', #set the constant rate factor to 0, which is lossless | |
'-preset':'slow' #the slower the better compression, in princple, try | |
#other options see https://trac.ffmpeg.org/wiki/Encode/H.264 | |
}) | |
for n in range(least_frames-1): | |
if n % 100 == 0: | |
logging.info(f'{n}: {len(cur_frames)} frames merge into grid with size={frames_grid.shape}') | |
# video_writer.write(frames_grid) | |
video_writer.writeFrame(frames_grid[:,:,::-1]) #write the frame as RGB not BGR | |
cur_frames = np.array([next(g) for g in generators]) | |
frames_grid = create_image_grid(cur_frames, grid_size=grid_size) | |
# video_writer.release() | |
video_writer.close() | |
logging.info(f'Output video saved... {video_out}') | |
def compress_video(video_in: Path, video_out: Path, crf=18): | |
""" | |
crf: The range of the quantizer scale is 0-51: where 0 is lossless, 23 is default, and 51 is worst possible. | |
A lower value is a higher quality and a subjectively sane range is 18-28. | |
""" | |
if video_out.exists(): return | |
video_out.parent.mkdir(exist_ok=True) | |
FFMPEG_CMD = f'ffmpeg -i "{str(video_in)}" -c:v libx264 -crf {crf} "{str(video_out)}"' | |
# FFMPEG_CMD = f'ffmpeg -i "{str(video_in)}" -vcodec h264 -acodec mp3 "{str(video_out)}"' | |
logging.info(FFMPEG_CMD) | |
os.system(FFMPEG_CMD) | |
def copy_audio_ffmpeg(src: str, dst: str): | |
"""fast than moviepy. | |
Args | |
src - str. Video of which audio comes from. | |
dst - str. Video of which audio goes to. | |
Other commands: | |
add an audio to video: ffmpeg -i video.mp4 -i audio.wav -c:v copy -c:a aac output.mp4 | |
""" | |
p_dst = Path(dst) | |
ffmpeg_cmd = f"ffmpeg -i {dst} -i {src} -c copy -map 0:0 -map 1:1 -shortest {p_dst.parent/p_dst.stem}_withaudio.mp4" | |
logging.info(ffmpeg_cmd) | |
os.system(ffmpeg_cmd) | |
def extract_frames(src: str, dst: str): | |
Path(dst).mkdir(exist_ok=True) | |
fg = FrameGenerator(src) | |
logging.info(f'{src} to {dst}') | |
logging.info(f'{src}: {fg.total_frames} frames, size {fg.size}') | |
for _ in fg.extract(dst): | |
pass | |
def main_dir(): | |
vdirs = [ | |
Path(r"path/to/videos1"), | |
Path(r"path/to/videos2"), | |
] | |
voutdir = Path(r"path/to/merge") | |
vfiles = [sorted(vdir.glob('*.mp4')) for vdir in vdirs] | |
voutdir.mkdir(exist_ok=True) | |
for files in zip(*vfiles): | |
videos_to_merge = list(map(lambda x: str(x), files)) | |
logging.debug(', '.join(videos_to_merge)) | |
output_video = voutdir / files[0].name | |
titles = None # [f.name for f in files] | |
merge_videos(videos_to_merge, | |
str(output_video), | |
grid_size=None, # row, cols | |
titles=titles, | |
title_position=(0., 0), # text poistion (0.5 * w, 0. * h), i.e. upper center | |
font_size=50, | |
font_color=(0,0,255), # Color of the font, `b,g,r`. e.g. `0,0,255`` # red | |
frame_fill_color=(0,0,0), # Color of the empty frame, `b,g,r`. e.g. `0,0,255`` # red | |
max_frames=None, # merge first 100 frames per video, `None` means all | |
# max_frames=200, # merge first 100 frames per video, `None` means all | |
# crop_size=None, # None, 2 or 4 args, i.e. (w,h) center crop OR (left,upper,right,lower) | |
# crop_size=(0,300,720,1000), # (left,upper,right,lower) | |
crop_size=(0.25,0.25,0.75,0.5), # (left,upper,right,lower) in ratio | |
# crop_size=(800,800), # (w,h) | |
) | |
# output_video = Path(output_video) | |
if output_video.exists(): | |
output_video_z = output_video.parent / 'z' | |
output_video_z.mkdir(exist_ok=True) | |
compress_video(output_video, output_video_z/output_video.name, crf=18) | |
def main(): | |
videos_to_merge = [ | |
'1.mp4', | |
'2.mp4', | |
None, # fill empty frame | |
'3.mp4', | |
] | |
titles = ['v1', 'v2', None, 'v3'] | |
output_video = 'merged.mp4' | |
merge_videos(videos_to_merge, | |
output_video, | |
grid_size=(2, 2), # row, cols | |
titles=titles, | |
title_position=(0.5, 0.), # text poistion (0.5 * w, 0. * h), i.e. upper center | |
font_size=100, | |
font_color=(255,0,0), | |
frame_fill_color=(255, 255, 255), # Color of the empty frame, `b,g,r`. e.g. `0,0,255`` # red | |
max_frames=100, # merge first 100 frames per video | |
crop_size=None) # 2 or 4 args, i.e. (w,h) center crop OR (left,upper,right,lower) | |
if __name__ == '__main__': | |
main() | |
# main_dir() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Using FFMPEG to write output video for low quality loss.