Skip to content

Instantly share code, notes, and snippets.

@willprice
Created December 3, 2021 11:51
Show Gist options
  • Save willprice/9ef9b26945d86ca0328f00284fcd4562 to your computer and use it in GitHub Desktop.
Save willprice/9ef9b26945d86ca0328f00284fcd4562 to your computer and use it in GitHub Desktop.
import argparse
import io
import re
import sys
from pathlib import Path
import random
import subprocess
import shutil
import tempfile
from typing import Iterable, List, Optional
parser = argparse.ArgumentParser(
description="Take a video and create a shuffled version of it",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("video_in", type=Path)
parser.add_argument("video_out", type=Path)
parser.add_argument("--tmp-dir", type=Path, help="Path to temporary directory to dump frames to")
parser.add_argument("--seed", default=None, type=int, help="Seed for controlling randomness")
FRAME_FILENAME_TEMPLATE = 'frame_%010d.jpg'
def main(argv=None):
args = parser.parse_args(argv)
if args.seed is not None:
random.seed(args.seed)
check_programs_available()
shuffle_video(args.video_in, args.video_out, root_tmp_dir=args.tmp_dir)
def check_programs_available():
if shutil.which("ffmpeg") is None:
print("ffmpeg could not be found, please install it and make sure it is "
"available on your PATH.")
sys.exit(1)
def dump_frames(video_in: Path, dir: Path):
subprocess.check_call([
"ffmpeg", "-i", str(video_in.absolute()),
"-qscale", "3",
str(dir / FRAME_FILENAME_TEMPLATE)
])
def list_frames(dir: Path) -> List[Path]:
return [
Path(p) for p in natsorted([
str(path) for path in dir.iterdir()
if path.suffix.lower() == '.jpg'
])
]
def stitch_frames(shuffled_frame_paths: List[Path], video_out: Path, frame_rate: float):
process = subprocess.Popen([
"ffmpeg", "-y", "-r", str(frame_rate), "-f", "image2pipe", "-i", "-",
"-c:v", "libx264", "-vf", "format=yuv420p",
"-movflags", "faststart", str(video_out)
], stdin=subprocess.PIPE)
for file in shuffled_frame_paths:
with file.open('rb') as f:
process.stdin.write(f.read())
process.stdin.flush()
process.stdin.close()
process.wait()
def sniff_framerate(video: Path) -> float:
fractional_framerate = subprocess.check_output([
"ffprobe", "-v", "0", "-of", "csv=p=0", "-select_streams", "v:0",
"-show_entries", "stream=r_frame_rate", str(video.absolute()),
]).decode('utf8').strip()
return eval(fractional_framerate)
def shuffle_video(video_in: Path, video_out: Path, root_tmp_dir: Optional[Path] = None):
with tempfile.TemporaryDirectory(dir=root_tmp_dir) as tmp_dir:
tmp_dir = Path(tmp_dir)
framerate = sniff_framerate(video_in)
dump_frames(video_in, tmp_dir)
ordered_frame_paths = list_frames(tmp_dir)
shuffled_frame_paths = random.sample(ordered_frame_paths, len(ordered_frame_paths))
stitch_frames(shuffled_frame_paths, video_out, framerate)
def natsorted(xs):
def atoi(text):
return int(text) if text.isdigit() else text
def natural_keys(text):
# http://nedbatchelder.com/blog/200712/human_sorting.html
# (See Toothy's implementation in the comments)
return [atoi(c) for c in re.split(r'(\d+)', text)]
return sorted(xs, key=natural_keys)
if __name__ == '__main__':
main()
@willprice
Copy link
Author

A little program to take a video file and generate a new one after shuffling the frames.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment