Created
December 3, 2021 11:51
-
-
Save willprice/9ef9b26945d86ca0328f00284fcd4562 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import io | |
import re | |
import sys | |
from pathlib import Path | |
import random | |
import subprocess | |
import shutil | |
import tempfile | |
from typing import Iterable, List, Optional | |
parser = argparse.ArgumentParser( | |
description="Take a video and create a shuffled version of it", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
parser.add_argument("video_in", type=Path) | |
parser.add_argument("video_out", type=Path) | |
parser.add_argument("--tmp-dir", type=Path, help="Path to temporary directory to dump frames to") | |
parser.add_argument("--seed", default=None, type=int, help="Seed for controlling randomness") | |
FRAME_FILENAME_TEMPLATE = 'frame_%010d.jpg' | |
def main(argv=None): | |
args = parser.parse_args(argv) | |
if args.seed is not None: | |
random.seed(args.seed) | |
check_programs_available() | |
shuffle_video(args.video_in, args.video_out, root_tmp_dir=args.tmp_dir) | |
def check_programs_available(): | |
if shutil.which("ffmpeg") is None: | |
print("ffmpeg could not be found, please install it and make sure it is " | |
"available on your PATH.") | |
sys.exit(1) | |
def dump_frames(video_in: Path, dir: Path): | |
subprocess.check_call([ | |
"ffmpeg", "-i", str(video_in.absolute()), | |
"-qscale", "3", | |
str(dir / FRAME_FILENAME_TEMPLATE) | |
]) | |
def list_frames(dir: Path) -> List[Path]: | |
return [ | |
Path(p) for p in natsorted([ | |
str(path) for path in dir.iterdir() | |
if path.suffix.lower() == '.jpg' | |
]) | |
] | |
def stitch_frames(shuffled_frame_paths: List[Path], video_out: Path, frame_rate: float): | |
process = subprocess.Popen([ | |
"ffmpeg", "-y", "-r", str(frame_rate), "-f", "image2pipe", "-i", "-", | |
"-c:v", "libx264", "-vf", "format=yuv420p", | |
"-movflags", "faststart", str(video_out) | |
], stdin=subprocess.PIPE) | |
for file in shuffled_frame_paths: | |
with file.open('rb') as f: | |
process.stdin.write(f.read()) | |
process.stdin.flush() | |
process.stdin.close() | |
process.wait() | |
def sniff_framerate(video: Path) -> float: | |
fractional_framerate = subprocess.check_output([ | |
"ffprobe", "-v", "0", "-of", "csv=p=0", "-select_streams", "v:0", | |
"-show_entries", "stream=r_frame_rate", str(video.absolute()), | |
]).decode('utf8').strip() | |
return eval(fractional_framerate) | |
def shuffle_video(video_in: Path, video_out: Path, root_tmp_dir: Optional[Path] = None): | |
with tempfile.TemporaryDirectory(dir=root_tmp_dir) as tmp_dir: | |
tmp_dir = Path(tmp_dir) | |
framerate = sniff_framerate(video_in) | |
dump_frames(video_in, tmp_dir) | |
ordered_frame_paths = list_frames(tmp_dir) | |
shuffled_frame_paths = random.sample(ordered_frame_paths, len(ordered_frame_paths)) | |
stitch_frames(shuffled_frame_paths, video_out, framerate) | |
def natsorted(xs): | |
def atoi(text): | |
return int(text) if text.isdigit() else text | |
def natural_keys(text): | |
# http://nedbatchelder.com/blog/200712/human_sorting.html | |
# (See Toothy's implementation in the comments) | |
return [atoi(c) for c in re.split(r'(\d+)', text)] | |
return sorted(xs, key=natural_keys) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A little program to take a video file and generate a new one after shuffling the frames.