-
-
Save datavudeja/0d87fd5d305338f1b5004e66d3bb80f9 to your computer and use it in GitHub Desktop.
Video Analysis Tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import itertools | |
import argparse | |
import json | |
import logging | |
import multiprocessing | |
import pathlib | |
import subprocess | |
import sys | |
from typing import Any, Dict, Generator, Iterable, List, cast | |
import csv | |
import os.path | |
logging.basicConfig(level=logging.DEBUG) | |
VIDEO_CONTAINER_EXTENSIONS: List[str] = [".mp4", ".webm", ".ts", ".m4v", ".avi", ".flv"] | |
def walk_for_files(folder: pathlib.Path) -> Generator[pathlib.Path, None, None]: | |
""" | |
Resursively walk the directory tree from the given path for all files | |
Args | |
--- | |
`folder`: the folder to walk | |
""" | |
for src, _, files in os.walk(folder.resolve(), followlinks=True): | |
for file_name in files: | |
abs_file_path = pathlib.Path(os.path.join(src, file_name)) | |
if abs_file_path.resolve() is not None: | |
yield abs_file_path.resolve() | |
def get_frame_data(video_path: pathlib.Path) -> Dict[str, List[Any] | str] | None: | |
""" | |
Call FFprobe to lift out all of the frame data from the video file. | |
If the file isn't a video or something goes wrong, return None | |
""" | |
if video_path.suffix not in VIDEO_CONTAINER_EXTENSIONS: | |
logging.warn( | |
f"{video_path} does not have one of the following suffixes: {VIDEO_CONTAINER_EXTENSIONS}. Ignoring" | |
) | |
return None | |
ffprobe_command: List[str] = [ | |
"ffprobe", | |
"-print_format", | |
"json", | |
"-show_frames", | |
str(video_path), | |
] | |
job = subprocess.run(ffprobe_command, capture_output=True, encoding="utf-8") | |
if job.returncode != 0: | |
logfile = f"vat-ffprobe-{video_path.name}.log" | |
with open(logfile, "w") as fp: | |
fp.write(job.stderr) | |
logging.error(f"ffprobe command exited abnormally, stderr logged to {logfile}") | |
return None | |
frame_data = json.loads(job.stdout) | |
reshaped: Dict[str, List[Any] | str] = { | |
k: [v] for k, v in frame_data["frames"][0].items() | |
} | |
for frame_entry in frame_data["frames"][1:]: | |
if frame_entry["media_type"] == "video": | |
for key in reshaped.keys(): | |
if key in frame_entry.keys(): | |
reshaped[key].append(frame_entry[key]) # type: ignore | |
reshaped["name"] = str(video_path) | |
return reshaped | |
def get_percent_smoothness(timestamps: List[float]) -> float: | |
""" | |
Given a list of timestamps, find out what percent of gaps between them could fit another frame. | |
Note that video which isn't perfectly smooth doesn't necesssarily mean it's problematic, but | |
less than 70% should be concerning. | |
Args | |
--- | |
`timestamps`: The timestamps we'll be processing, could be pts or dts. | |
""" | |
ts_diffs = [ | |
timestamps[i] - timestamps[i - 1] for i in range(len(timestamps) - 1, 0, -1) | |
][::-1] | |
expected_gap = sum(ts_diffs) / len(ts_diffs) | |
return 1.0 - ( | |
sum(1 for diff in ts_diffs if diff >= expected_gap * 2) / len(ts_diffs) | |
) | |
def is_monotonically_increasing(timestamps: List[float]) -> bool: | |
""" | |
Check if the timestamps are sorted in increasing order. | |
If they aren't, that could create playblack issues or indicate corrupted video. | |
Args | |
--- | |
`timestamps`: The timestamps we'll be processing, could be pts or dts. | |
""" | |
return all(timestamps[i] > timestamps[i - 1] for i in range(1, len(timestamps))) | |
def can_decode_successfully(video_path: pathlib.Path) -> bool | None: | |
""" | |
Check if we can decode the `video_path` without the decoder having errors. | |
If the file isn't a video, return None. | |
""" | |
if video_path.suffix not in VIDEO_CONTAINER_EXTENSIONS: | |
logging.warn( | |
f"{video_path} does not have one of the following suffixes: {VIDEO_CONTAINER_EXTENSIONS}. Ignoring" | |
) | |
return None | |
ffmpeg_command: List[str] = [ | |
"ffmpeg", | |
"-xerror", | |
"-i", | |
str(video_path), | |
"-vf", | |
"vfrdet", | |
"-f", | |
"null", | |
"-", | |
] | |
job = subprocess.run(ffmpeg_command, capture_output=True, encoding="utf-8") | |
return job.returncode == 0 | |
def main(video_folder_path: pathlib.Path, output_csv: pathlib.Path | None): | |
""" | |
Fetch all of the files in the folder and then run them through a series of test. | |
Args | |
--- | |
`video_folder_path`: the root of the directory tree we're walking for videos to test | |
`output_csv`: the file path that we're going to write our results to, if desired. If None then print to stdout | |
""" | |
# check if ffprobe and ffmpeg are in your PATH | |
path_env = os.getenv("PATH") | |
if path_env is None: | |
logging.critical( | |
"$PATH doesn't exist in env. Either your shell env is FUBAR or this is being run on an unsupported platform." | |
) | |
sys.exit(-1) | |
bin_dirs = map(pathlib.Path, path_env.split(":")) | |
if "ffmpeg" not in [ | |
binary.name for binary in itertools.chain(*map(walk_for_files, bin_dirs)) | |
]: | |
logging.critical( | |
"ffprobe binary not found in $PATH, please ensure FFmpeg is installed and reachable from your $PATH" | |
) | |
analysis_results = [] | |
with multiprocessing.Pool() as p: | |
files = cast(Iterable[pathlib.Path], walk_for_files(video_folder_path)) | |
for frame_data in p.imap_unordered(get_frame_data, files): | |
if frame_data is not None: | |
analysis_results.append( | |
{ | |
"file_name": frame_data["name"], | |
"can_decode_successfully": can_decode_successfully( | |
pathlib.Path(cast(str, frame_data["name"])) | |
), | |
"playback_smoothness": get_percent_smoothness( | |
[ | |
float(ts) | |
for ts in frame_data["best_effort_timestamp_time"] | |
] | |
), | |
} | |
) | |
if output_csv is not None: | |
with open(output_csv.resolve(), "w") as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=analysis_results[0].keys()) | |
writer.writeheader() | |
writer.writerows(analysis_results) | |
else: | |
for result in analysis_results: | |
print( | |
"\t".join( | |
[ | |
f"File:{result['file_name']}", | |
f"Can Decode?:{'YES' if result['can_decode_successfully'] else 'NO'}", | |
f"Playback Smoothness: {result['playback_smoothness'] * 100.0:0.3f}%", | |
] | |
) | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser("Video Analysis Tool") | |
parser.add_argument( | |
"-i", | |
"--input-folder", | |
type=pathlib.Path, | |
required=True, | |
help="The directory to be walked for video files", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
action="store", | |
type=pathlib.Path, | |
help="the output CSV file to be generated", | |
) | |
args = parser.parse_args() | |
print(args) | |
# check that input is a folder | |
if not args.input_folder.is_dir(): | |
logging.critical( | |
f"{args.input_folder} is either not a folder or it doesn't exist! Exiting..." | |
) | |
sys.exit(-1) | |
if args.output is None: | |
logging.info(f"CSV output file not provided, printing to stdout") | |
main(args.input_folder, args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment