Skip to content

Instantly share code, notes, and snippets.

@luuil
Last active July 4, 2024 02:12
Show Gist options
  • Save luuil/183e4d92275e3d6641b6728a988bb38b to your computer and use it in GitHub Desktop.
Save luuil/183e4d92275e3d6641b6728a988bb38b to your computer and use it in GitHub Desktop.
Merging multiple videos/images/empty_frames into one grid video by opencv, PIL and numpy
import cv2
import os
import numpy as np
from PIL import Image, ImageFont, ImageDraw
from typing import Tuple, Optional, Union, List
from dataclasses import dataclass, InitVar
import sys
from pathlib import Path
from enum import Enum
import logging
import skvideo.io
logging.basicConfig(level=logging.DEBUG,
format= '[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s',
datefmt='%H:%M:%S')
@dataclass
class MyImageFont:
font: Optional[ImageFont.FreeTypeFont] = None
font_color: Union[Tuple[int, int, int], Tuple[int, int, int, int]] = (0, 0, 255, 0), # bgr or bgra
path: InitVar[str] = 'simsun.ttc'
size: InitVar[int] = 30 # in pixels
def __post_init__(self, path, size):
self.font = ImageFont.truetype(path, size) if path else None
@dataclass
class MyImageText:
content: str
xy: Tuple[int, int] = 0, 0 # Coordinates for placing text on the image
image_size: InitVar[Tuple[int, int]] = 0, 0 # only for init
xy_ratio: InitVar[Tuple[float, float]] = 0., 0. # only for init
def __post_init__(self, image_size, xy_ratio):
w, h = image_size
w_scale, h_scale = xy_ratio
self.xy = int(w * w_scale), int(h * h_scale)
def put_text(img: np.ndarray,
text: MyImageText,
font: MyImageFont
) -> np.ndarray:
img_pil = Image.fromarray(img)
draw = ImageDraw.Draw(img_pil)
draw.text(text.xy, text.content, font=font.font, fill=font.font_color)
return np.array(img_pil)
def resize_with_pad(img: np.ndarray, size: Tuple[int, int], fill=(255, 255, 255)) -> np.ndarray:
old_image_height, old_image_width, channels = img.shape
new_image_width, new_image_height = size
fx = fy = min(new_image_height/old_image_height, new_image_width/old_image_width)
img = cv2.resize(img, (0,0), fx=fx, fy=fy)
old_image_height, old_image_width, channels = img.shape
# create new image of desired size and color (blue) for padding
fill = fill if channels == 3 else (*fill, 255) # RGB to RGBA
result = np.full((new_image_height,new_image_width, channels), fill, dtype=np.uint8)
# compute center offset
x_center = (new_image_width - old_image_width) // 2
y_center = (new_image_height - old_image_height) // 2
# copy img image into center of result image
result[y_center:y_center+old_image_height, x_center:x_center+old_image_width] = img
return result
def is_image_file(path: str) -> bool:
if not path: return False
p = Path(path)
return p.is_file() and (p.suffix in ('.png', '.jpg', '.jpeg'))
def is_video_file(path: str) -> bool:
if not path: return False
p = Path(path)
return p.is_file() and (p.suffix in ('.mp4', '.MP4', '.mkv', '.mov', '.MOV'))
class FileType(Enum):
Empty = 0
Image = 1
Video = 2
class Layout(Enum):
Horizontal = 0
Vertical = 1
def layout(size: Tuple[int, int]) -> Layout:
w, h = size
return Layout.Horizontal if w >= h else Layout.Vertical
def resolution(layout: Layout) -> Tuple[int, int]:
if layout == Layout.Horizontal:
return (1920, 1080)
return (1920, 1080)
elif layout == Layout.Vertical:
return (1080, 1920)
return (1080, 1920)
else:
raise NotImplemented(f'{layout} not support!')
def crop_image(img: np.ndarray, size: Union[Tuple[int, int], Tuple[int,int,int,int], None]) -> np.ndarray:
if size is None: return img
im = Image.fromarray(img)
width, height = im.size # Get dimensions
if len(size) == 2: # center crop
new_width, new_height = size
left = (width - new_width)/2
upper = (height - new_height)/2
right = (width + new_width)/2
lower = (height + new_height)/2
elif len(size) == 4:
left, upper, right, lower = size
if all(0<=v<=1.0 for v in size): # left, upper, right, lower values in ratio
left *= width
upper *= height
right *= width
lower *= height
# Crop the image
# left,upper ---
# | |
# --- right,lower
im = im.crop((left, upper, right, lower))
return np.asarray(im)
class FrameGenerator(object):
def __init__(self, path, layout=Layout.Horizontal, default_size=None, frame_range=None, text: str=None, text_xy=(0.1, 0.1), font: MyImageFont=MyImageFont(), debug=False):
if not path or not os.path.exists(path):
logging.warning(f'Not exists: {path}.')
self._ftype = FileType.Empty
self._vc = None
self.size = sys.maxsize, sys.maxsize
self.fps = 30
self.total_frames = sys.maxsize
elif is_image_file(path):
logging.info(f'Image frame: {path}.')
self._ftype = FileType.Image
self._vc = None
self._img = cv2.imread(path, cv2.IMREAD_COLOR)
# self._img = resize_with_pad(self._img, default_size if default_size else resolution(layout))
self._img = resize_with_pad(self._img, resolution(layout))
# self._img = resize_with_pad(self._img, default_size if default_size else resolution(layout))
self._img = resize_with_pad(self._img, resolution(layout))
self.size = self._img.shape[1], self._img.shape[0]
self.fps = 30
self.total_frames = sys.maxsize
else:
logging.info(f'Video frame: {path}.')
self._ftype = FileType.Video
self._p = path
self._vc = cv2.VideoCapture(self._p)
self.size = int(self._vc.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._vc.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.fps = int(self._vc.get(cv2.CAP_PROP_FPS))
self.total_frames = int(self._vc.get(cv2.CAP_PROP_FRAME_COUNT))
self._start = 0
self._count = self.total_frames
self._debug = debug
if frame_range is not None:
self.set_frames_range(frame_range)
self._text = text
self._text_xy = text_xy
self._font = font
if self._debug:
logging.debug(f"video size( W x H ) : {self.size[0]} x {self.size[1]}")
def __del__(self):
self.release()
def set_frames_range(self, frame_range=None):
if self._ftype != FileType.Video: return
if frame_range is None:
self._start = 0
self._count = self.total_frames
else:
assert isinstance(frame_range, (list, tuple, range))
if isinstance(frame_range, (list, tuple)):
assert len(frame_range) == 2
start, end = frame_range[0], frame_range[-1]
if end is None \
or end == -1 \
or end >= self.total_frames:
end = self.total_frames
assert end >= start
self._start = start
self._count = end - start
assert self._count <= self.total_frames
self._vc.set(cv2.CAP_PROP_POS_FRAMES, self._start)
def extract(self, path=None, bgr2rgb=False, target_size=None, fill=(0,0,0), crop_size=None):
if path is not None and not os.path.exists(path):
os.makedirs(path)
for i in range(0, self._count):
if self._ftype == FileType.Video:
success, frame = self._vc.read()
elif self._ftype == FileType.Image:
success, frame = True, self._img
else:
assert target_size, '`target_size` must be set to produce an empty frame when input video path is None/NotExists'
self.size = target_size
success, frame = False, None
if not success:
frame = np.zeros((self.size[1], self.size[0], 3), dtype=np.uint8)
frame[:] = fill
if self._debug:
logging.debug(f"frame {self._start + i}")
if path is not None:
cv2.imwrite(os.path.join(path, f"{self._start + i}.png"), frame, [cv2.IMWRITE_PNG_COMPRESSION, 0])
if bgr2rgb:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if crop_size:
frame = crop_image(frame, crop_size)
if target_size is not None:
assert len(target_size) == 2
assert isinstance(target_size, (list, tuple))
frame = cv2.resize(frame, tuple(target_size))
if self._text:
text = MyImageText(self._text, image_size=self.size, xy_ratio=self._text_xy) if self._text else None
frame = put_text(frame, text, self._font)
yield frame
def release(self):
if self._vc is not None:
self._vc.release()
def create_image_grid(images, grid_size=None):
assert images.ndim == 3 or images.ndim == 4
num, img_w, img_h = images.shape[0], images.shape[-2], images.shape[-3]
if grid_size is not None:
grid_h, grid_w = tuple(grid_size)
else:
grid_w = max(int(np.ceil(np.sqrt(num))), 1)
grid_h = max((num - 1) // grid_w + 1, 1)
grid = np.zeros([grid_h * img_h, grid_w * img_w] + list(images.shape[-1:]), dtype=images.dtype)
for idx in range(num):
x = (idx % grid_w) * img_w
y = (idx // grid_w) * img_h
grid[y : y + img_h, x : x + img_w, ...] = images[idx]
return grid
def crop2shape(size: Tuple[int, int], crop_size: Union[Tuple[int, int], Tuple[int,int,int,int]]) -> Tuple[int, int]:
width, height = size
if len(crop_size) == 2:
return crop_size
elif len(crop_size) == 4:
left, upper, right, lower = crop_size
if all(0<=v<=1.0 for v in crop_size): # left, upper, right, lower values in ratio
left *= width
upper *= height
right *= width
lower *= height
return int(right-left), int(lower-upper)
else:
raise NotImplementedError("not support")
def merge_videos(videos_in: List[str],
video_out: str,
grid_size=None,
titles: List[str] = None,
title_position: Tuple[float, float] = (0.5, 0.),
font_size: int = 50,
font_color: Union[Tuple[int, int, int], Tuple[int, int, int, int]] = (0, 0, 255, 0),
frame_fill_color: Tuple[int, int, int] = (0,0,0),
max_frames: int = None,
crop_size: Union[Tuple[int, int], Tuple[int,int,int,int], None]=None) -> None:
"""
Args:
videos_in: List/Tuple
List of input video paths. e.g.
('path/to/v1.mp4', 'path/to/v2.mp4', 'path/to/v3.mp4')
video_out: String
Path of output video. e.g.
'path/to/output.mp4'
grid_size: List/Tuple.
Row and Column respectively. e.g.
(1, 3)
titles: List/Tuple
The title of each video will be displayed in the video grid,
the same length as the input video. e.g.
('v1', 'v2', 'v3')
title_position: List/Tuple
The position(width and height) where the title is displayed, and the value range is (0, 1).
e.g. If we want display text in the center of the video, the position is
(0.5, 0.5)
font_size: int
Size(in pixels) of the text. e.g.
50
font_color: Tuple
Color of the text, `b,g,r,a` or `b,g,r`. e.g.
0,0,255,0 # red
255,0,0 # blue
frame_fill_color: Tuple
Color of the empty frame, `b,g,r`. e.g.
0,0,255 # red
max_frames: Int
Maximum number of frames per input video will be merge, e.g.
200
Default `None`, means will process all frames.
crop_size: Union[Tuple[int, int], Tuple[int,int,int,int], None]
Crop size, 2 or 4 args e.g.
(w,h) center crop OR (left, upper, right, lower)
Default `None`, means no crop.
Returns:
None
"""
if os.path.exists(video_out):
logging.warning(f'{video_out} already exists!')
return
if any(videos_in) is False:
logging.error(f'All inputs are None: {videos_in}')
return
if titles is None:
texts = [None] * len(videos_in)
elif len(titles) < len(videos_in):
texts = titles + [None] * (len(videos_in) - len(titles))
else:
texts = titles[:len(videos_in)]
assert len(videos_in) == len(texts)
dir_name = os.path.dirname(video_out)
os.makedirs(dir_name, exist_ok=True)
video_handles = [(i, FrameGenerator(v, text=text, text_xy=title_position, font=MyImageFont(size=font_size, font_color=font_color))) for i, (v, text) in enumerate(zip(videos_in, texts)) if is_video_file(v)]
if video_handles:
least_size = min(e.size for _i, e in video_handles) # all with same size WH
if crop_size is not None:
least_size = crop2shape(least_size, crop_size)
least_frames = max_frames if max_frames else min(e.total_frames for _i, e in video_handles) # all with same number of frames
fps = video_handles[0][1].fps # use the fps of first video
video_layout = layout(video_handles[0][1].size)
else:
img_paths = [p for p in videos_in if is_image_file(p)]
img = cv2.imread(img_paths[0], cv2.IMREAD_COLOR)
least_size = img.shape[1], img.shape[0] # all with same size WH
if crop_size is not None:
least_size = crop2shape(least_size, crop_size)
video_layout = layout(least_size)
least_frames = max_frames if max_frames else 2 # all with same number of frames
fps = 30 # use the fps of first video
logging.debug(f'Video info: {least_size}(size), {video_layout}, {least_frames}(nframes), {fps}(fps)')
frame_handles = [(i, FrameGenerator(v, layout=video_layout, default_size=least_size, text=text, text_xy=title_position, font=MyImageFont(size=font_size, font_color=font_color))) for i, (v, text) in enumerate(zip(videos_in, texts)) if not is_video_file(v)]
handles = sorted(video_handles + frame_handles, key=lambda item: item[0]) # keep sort
generators = [e.extract(target_size=least_size, fill=frame_fill_color, crop_size=crop_size) for _i, e in handles]
# read one frame and resize for each generator, then get the output video size
cur_frames = [next(g) for g in generators]
frames_grid = create_image_grid(np.array(cur_frames), grid_size=grid_size) # HWC
# out_size = frames_grid.shape[1], frames_grid.shape[0] # HWC to WH, as VideoWriter need that format
# video_writer = cv2.VideoWriter(video_out, cv2.VideoWriter_fourcc(*'mp4v'), fps, out_size)
"""
ref https://gist.github.com/docPhil99/a612c355cd31e69a0d3a6d2f87bfde8b
preset options:
- ultrafast
- superfast
- veryfast
- faster
- fast
- medium (default preset)
- slow
- slower
- veryslow
"""
video_writer = skvideo.io.FFmpegWriter(video_out, outputdict={
'-vcodec': 'libx264', #use the h.264 codec
'-crf': '0', #set the constant rate factor to 0, which is lossless
'-preset':'slow' #the slower the better compression, in princple, try
#other options see https://trac.ffmpeg.org/wiki/Encode/H.264
})
for n in range(least_frames-1):
if n % 100 == 0:
logging.info(f'{n}: {len(cur_frames)} frames merge into grid with size={frames_grid.shape}')
# video_writer.write(frames_grid)
video_writer.writeFrame(frames_grid[:,:,::-1]) #write the frame as RGB not BGR
cur_frames = np.array([next(g) for g in generators])
frames_grid = create_image_grid(cur_frames, grid_size=grid_size)
# video_writer.release()
video_writer.close()
logging.info(f'Output video saved... {video_out}')
def compress_video(video_in: Path, video_out: Path, crf=18):
"""
crf: The range of the quantizer scale is 0-51: where 0 is lossless, 23 is default, and 51 is worst possible.
A lower value is a higher quality and a subjectively sane range is 18-28.
"""
if video_out.exists(): return
video_out.parent.mkdir(exist_ok=True)
FFMPEG_CMD = f'ffmpeg -i "{str(video_in)}" -c:v libx264 -crf {crf} "{str(video_out)}"'
# FFMPEG_CMD = f'ffmpeg -i "{str(video_in)}" -vcodec h264 -acodec mp3 "{str(video_out)}"'
logging.info(FFMPEG_CMD)
os.system(FFMPEG_CMD)
def copy_audio_ffmpeg(src: str, dst: str):
"""fast than moviepy.
Args
src - str. Video of which audio comes from.
dst - str. Video of which audio goes to.
Other commands:
add an audio to video: ffmpeg -i video.mp4 -i audio.wav -c:v copy -c:a aac output.mp4
"""
p_dst = Path(dst)
ffmpeg_cmd = f"ffmpeg -i {dst} -i {src} -c copy -map 0:0 -map 1:1 -shortest {p_dst.parent/p_dst.stem}_withaudio.mp4"
logging.info(ffmpeg_cmd)
os.system(ffmpeg_cmd)
def extract_frames(src: str, dst: str):
Path(dst).mkdir(exist_ok=True)
fg = FrameGenerator(src)
logging.info(f'{src} to {dst}')
logging.info(f'{src}: {fg.total_frames} frames, size {fg.size}')
for _ in fg.extract(dst):
pass
def main_dir():
vdirs = [
Path(r"path/to/videos1"),
Path(r"path/to/videos2"),
]
voutdir = Path(r"path/to/merge")
vfiles = [sorted(vdir.glob('*.mp4')) for vdir in vdirs]
voutdir.mkdir(exist_ok=True)
for files in zip(*vfiles):
videos_to_merge = list(map(lambda x: str(x), files))
logging.debug(', '.join(videos_to_merge))
output_video = voutdir / files[0].name
titles = None # [f.name for f in files]
merge_videos(videos_to_merge,
str(output_video),
grid_size=None, # row, cols
titles=titles,
title_position=(0., 0), # text poistion (0.5 * w, 0. * h), i.e. upper center
font_size=50,
font_color=(0,0,255), # Color of the font, `b,g,r`. e.g. `0,0,255`` # red
frame_fill_color=(0,0,0), # Color of the empty frame, `b,g,r`. e.g. `0,0,255`` # red
max_frames=None, # merge first 100 frames per video, `None` means all
# max_frames=200, # merge first 100 frames per video, `None` means all
# crop_size=None, # None, 2 or 4 args, i.e. (w,h) center crop OR (left,upper,right,lower)
# crop_size=(0,300,720,1000), # (left,upper,right,lower)
crop_size=(0.25,0.25,0.75,0.5), # (left,upper,right,lower) in ratio
# crop_size=(800,800), # (w,h)
)
# output_video = Path(output_video)
if output_video.exists():
output_video_z = output_video.parent / 'z'
output_video_z.mkdir(exist_ok=True)
compress_video(output_video, output_video_z/output_video.name, crf=18)
def main():
videos_to_merge = [
'1.mp4',
'2.mp4',
None, # fill empty frame
'3.mp4',
]
titles = ['v1', 'v2', None, 'v3']
output_video = 'merged.mp4'
merge_videos(videos_to_merge,
output_video,
grid_size=(2, 2), # row, cols
titles=titles,
title_position=(0.5, 0.), # text poistion (0.5 * w, 0. * h), i.e. upper center
font_size=100,
font_color=(255,0,0),
frame_fill_color=(255, 255, 255), # Color of the empty frame, `b,g,r`. e.g. `0,0,255`` # red
max_frames=100, # merge first 100 frames per video
crop_size=None) # 2 or 4 args, i.e. (w,h) center crop OR (left,upper,right,lower)
if __name__ == '__main__':
main()
# main_dir()
@luuil
Copy link
Author

luuil commented Oct 15, 2021

What can this script do:

image

@luuil
Copy link
Author

luuil commented Jul 4, 2024

Using FFMPEG to write output video for low quality loss.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment