Created
January 31, 2022 13:47
-
-
Save pszemraj/39b683b601462b2a3950f2df48a39a18 to your computer and use it in GitHub Desktop.
splits all video files in a directory into chunks of a given duration.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
slice_video_dir.py - splits all video files in a directory into chunks of a given duration. | |
credit to https://gist.github.com/agalera/8cd63429b06e21d1420c6030c6b45c53 | |
""" | |
import argparse | |
import glob | |
import os | |
from moviepy.video.io.VideoFileClip import VideoFileClip | |
from pathlib import Path | |
from joblib import Parallel, delayed | |
from tqdm.auto import tqdm | |
clip_start = 0 | |
num = 0 | |
def write_videofile(video_path, clip_start, duration, output_dir): | |
""" | |
write_videofile - writes a video file to the output directory | |
Parameters | |
---------- | |
video_path : str, path to the video file, including filename | |
clip_start : float, start time of the clip | |
duration : float, duration of the clip | |
output_dir : str, path to the output directory | |
""" | |
_target_video_path = Path(video_path) | |
clip_end = clip_start + duration | |
seq_id = int(clip_start // duration) | |
output_header = _target_video_path.stem + f"_{clip_start}_{clip_end}_ID={seq_id}" | |
try: | |
clip = VideoFileClip(str(_target_video_path.resolve())).subclip(clip_start, clip_end) | |
clip.write_videofile(f"{output_dir}/{output_header}.mp4", | |
threads=1, preset='ultrafast', codec='h264') | |
except Exception as e: | |
print(f"Error: {e}", clip_start, clip_end) | |
def get_parser(): | |
""" | |
get_parser - a helper function for the argparse module | |
""" | |
parser = argparse.ArgumentParser( | |
description="submit a question, GPT model responds" | |
) | |
parser.add_argument( | |
"-i", | |
"--input-dir", | |
required=True, | |
type=str, | |
help="path to directory containing input files", | |
) | |
parser.add_argument( | |
"-o", | |
"--output-dir", | |
required=False, | |
default=None, | |
type=str, | |
help="path to directory to write output files (new folder created). Defaults to input-dir", | |
) | |
parser.add_argument( | |
"-cl", | |
"--clip-length", | |
required=False, | |
default=0.5, | |
type=float, | |
help="length of each clip in seconds", | |
) | |
parser.add_argument( | |
"-r", | |
"--recursive", | |
required=False, | |
default=False, | |
action="store_true", | |
help="whether to load files recursively from the input directory", | |
) | |
return parser | |
if __name__ == "__main__": | |
parser = get_parser() | |
args = parser.parse_args() | |
duration = args.clip_length | |
input_dir = Path(args.input_dir) | |
output_dir = input_dir if args.output_dir is None else Path(args.output_dir) | |
output_dir = output_dir / "sliced-clips" | |
output_dir.mkdir(exist_ok=True) | |
cpu_count = os.cpu_count() | |
# load all video files in the input directory, including all common file extensions | |
extensions = ["*.mp4", "*.mov", "*.avi", "*.mkv", "*.flv", "*.wmv", "*.webm"] | |
video_files = [] | |
for ext in extensions: | |
video_files.extend(glob.glob(str(input_dir / ext))) | |
for video_file in tqdm(video_files, total=len(video_files), desc="Processing files"): | |
clip = VideoFileClip(video_file) # load the video file | |
vid_outputs = output_dir / Path(video_file).stem | |
vid_outputs.mkdir(exist_ok=True) | |
video_length = clip.duration # get the length of the video file | |
clip_starts = [clip_start + i * duration for i in range(int(video_length / duration))] # generate list of clip starts | |
# joblib parallel | |
Parallel(n_jobs=cpu_count)(delayed(write_videofile)(video_file, clip_start, duration, vid_outputs) for clip_start in clip_starts) | |
print("Done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment