-
-
Save planetrocky/e9944085d5e101408d8db9fd0c6aaa20 to your computer and use it in GitHub Desktop.
Shows GOP structure for video file using ffmpeg --show-frames output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring | |
# | |
# Shows GOP structure of video file. Useful for checking suitability for HLS and DASH packaging. | |
# Example: | |
# | |
# $ iframe-probe.py myvideo.mp4 | |
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
# GOP: IPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP 60 CLOSED | |
# GOP: IPPPPPPPPPPPPPPPPP 18 CLOSED | |
# | |
# Key: | |
# 🟧 I: IDR Frame | |
# 🟨 i: i frame | |
# 🟩 P: p frame | |
# 🟦 B: b frame | |
import sys | |
import errno | |
import os | |
import json | |
import subprocess | |
import shutil | |
import argparse | |
import textwrap | |
import statistics | |
import numpy as np | |
import plotext | |
FFMPEG_PATH = shutil.which("ffmpeg") | |
if not FFMPEG_PATH: | |
print('FFmpeg not found. FFmpeg must be installed and available in PATH.') | |
sys.exit(1) | |
class BFrame: | |
def __init__(self): | |
self.pts_time = 0 | |
self.json_data = {} | |
def __format__(self, format_spec = "ascii"): | |
if format_spec == "ascii": | |
return repr(self) | |
if format_spec == "emoji": | |
return "🟦" | |
return ( | |
"\033[38;5;63m" | |
"B" | |
"\033[0m" | |
) | |
def __repr__(self): | |
return "B" | |
def __str__(self): | |
return repr(self) | |
class PFrame: | |
def __init__(self): | |
self.pts_time = 0 | |
self.json_data = {} | |
def __format__(self, format_spec = "ascii"): | |
if format_spec == "ascii": | |
return repr(self) | |
if format_spec == "emoji": | |
return "🟩" | |
return ( | |
"\033[38;5;76m" | |
"P" | |
"\033[0m" | |
) | |
def __repr__(self): | |
return "P" | |
def __str__(self): | |
return repr(self) | |
class IFrame: | |
def __init__(self): | |
self.key_frame = False | |
self.pts_time = 0 | |
self.json_data = {} | |
def __format__(self, format_spec = "ascii"): | |
if format_spec == "ascii": | |
return repr(self) | |
if self.key_frame: | |
if format_spec == "emoji": | |
return "🟧" | |
return ( | |
'\033[48;5;166;38;5;255;1m' | |
'I' | |
'\033[0m' | |
) | |
if format_spec == "emoji": | |
return "🟨" | |
return ( | |
'\033[48;5;130;38;5;255m' | |
'i' | |
'\033[0m' | |
) | |
def __repr__(self): | |
if self.key_frame: | |
return "I" | |
return "i" | |
def __str__(self): | |
return repr(self) | |
class GOP: | |
def __init__(self): | |
self.closed = False | |
self.frames = [] | |
def add_frame(self, frame): | |
self.frames.append(frame) | |
if isinstance(frame, IFrame) and frame.key_frame: | |
self.closed = True | |
def __format__(self, format_spec = "ascii"): | |
if format_spec == "ascii": | |
return repr(self) | |
frames_repr = '' | |
for frame in self.frames: | |
frames_repr += format(frame, format_spec) | |
if self.closed: | |
gtype = '\033[48;5;22m' + ' CLOSED ' + '\033[0m' | |
else: | |
gtype = '\033[48;5;88m' + ' OPEN ' + '\033[0m' | |
return ( | |
f'\033[48;5;18m' | |
f'GOP:' | |
f'\033[0m' | |
f' {gtype}' | |
f' {len(self.frames):3d}' | |
f' {self.frames[0].pts_time:9.3f}' | |
f' {frames_repr}' | |
) | |
def __repr__(self): | |
frames_repr = '' | |
for frame in self.frames: | |
frames_repr += repr(frame) | |
gtype = 'CLOSED' if self.closed else ' OPEN ' | |
return ( | |
f'GOP:' | |
f' {gtype}' | |
f' {len(self.frames):3d}' | |
f' {frames_repr}' | |
) | |
def __str__(self): | |
return repr(self) | |
def process_ffmpeg_output(frames_json): | |
gops = [] | |
gop = GOP() | |
gops.append(gop) | |
for jframe in frames_json: | |
if jframe["media_type"] == "video": | |
frame = None | |
if jframe["pict_type"] == 'I': | |
if gop.frames: | |
# GOP open and new iframe. Time to close GOP | |
gop = GOP() | |
gops.append(gop) | |
frame = IFrame() | |
if jframe["key_frame"] == 1: | |
frame.key_frame = True | |
elif jframe["pict_type"] == 'P': | |
frame = PFrame() | |
elif jframe["pict_type"] == 'B': | |
frame = BFrame() | |
if jframe["pts_time"]: | |
frame.pts_time = float(jframe["pts_time"]) | |
frame.json_data = jframe | |
gop.add_frame(frame) | |
return gops | |
def print_gops(gops, print_format): | |
for gop in gops: | |
if print_format == "ascii": | |
print(repr(gop)) | |
else: | |
print(format(gop, print_format)) | |
def print_gop_pts_time(gops, print_format): | |
pts_times = [float(0.0)] | |
pts_times += [x.frames[0].pts_time for x in gops] | |
pts_times_delta = np.diff(pts_times) | |
for i, pts_time_delta in enumerate(pts_times_delta): | |
if print_format == "ascii": | |
print(( | |
f"GOP {i:3d}" | |
f" PTS {pts_time_delta:8.3f}" | |
)) | |
else: | |
print(( | |
f"\033[48;5;18m" | |
f"GOP:" | |
f"\033[0m" | |
f" {i:3d}" | |
f" \033[48;5;22m" | |
f"PTS-delta:" | |
f"\033[0m" | |
f" {pts_time_delta:+6.4f}" | |
)) | |
return pts_times_delta | |
class UltimateHelpFormatter( | |
argparse.RawTextHelpFormatter, | |
argparse.ArgumentDefaultsHelpFormatter | |
): | |
pass | |
def main(): | |
parser = argparse.ArgumentParser( | |
description = textwrap.dedent( | |
"""\ | |
dump GOP structure of video file | |
key: | |
🟧 or I: IDR frame | |
🟨 or i: i frame | |
🟩 or P: p frame | |
🟦 or B: b frame | |
"""), | |
formatter_class = UltimateHelpFormatter | |
) | |
parser.add_argument( | |
"filename", | |
help="video file to parse" | |
) | |
parser.add_argument( | |
"-e", | |
"--ffprobe-exec", | |
dest = "ffprobe_exec", | |
help = "ffprobe executable. (default: %(default)s)", | |
default = "ffprobe" | |
) | |
parser.add_argument( | |
"-d", | |
"--duration", | |
default = 30, | |
help = "duration (default: %(default)s), set to 0 to scan entire file" | |
) | |
parser.add_argument( | |
"-f", | |
"--format", | |
choices = ["ascii", "ansi", "emoji"], | |
default = "ascii", | |
help = "output formatting" | |
) | |
parser.add_argument( | |
"-p", | |
"--pts_time", | |
action="store_true", | |
help = "show gop pts time deltas" | |
) | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="store_true", | |
help = "verbose mode" | |
) | |
parser.add_argument( | |
"-g", | |
"--grid", | |
action="store_true", | |
help = "add grid to plot" | |
) | |
args = parser.parse_args() | |
if args.filename is None or args.filename == "": | |
parser.print_help() | |
print("\033[48;5;88m No filename provided! \033[0m") | |
sys.exit(1) | |
if not os.path.isfile(args.filename): | |
raise FileNotFoundError( | |
errno.ENOENT, | |
os.strerror(errno.ENOENT), | |
args.filename | |
) | |
print_format = args.format.lower() | |
command = ( | |
f'"{args.ffprobe_exec}"' | |
f' -hide_banner' | |
f' -loglevel error' | |
f' -threads 4' | |
f' -select_streams v:0' | |
f' -show_entries frame=media_type,key_frame,pict_type,pts,pts_time' | |
f' -print_format json' | |
f' "{args.filename}"' | |
) | |
# @TODO convert to CSV to remove very slow JSON processing | |
# -output_format csv=p=0 | |
if int(args.duration) > 0: | |
print(f"Analyzing first \033[4m{args.duration}s\033[24m of file...") | |
command += f' -read_intervals "%+{args.duration}"' | |
else: | |
print("Analyzing entire file...") | |
try: | |
response_json = subprocess.check_output(command, shell=True, stderr=None) | |
except subprocess.CalledProcessError as e: | |
print(f'\033[31mERROR: ({e.returncode}) {e.output.decode()} {e}\033[0m') | |
sys.exit(1) | |
frames_json = json.loads(response_json)["frames"] | |
gops = process_ffmpeg_output(frames_json) | |
if args.pts_time: | |
if args.verbose: | |
print("\033[4mGOP PTS Time Delta:\033[24m\n") | |
pts_times_delta = print_gop_pts_time(gops, print_format) | |
print( "\033[48;5;18mStatistics (units=seconds):\033[0m") | |
print(f"\033[48;5;22mmin :\033[0m {min(pts_times_delta):+6.4f}") | |
print(f"\033[48;5;22mmean :\033[0m {statistics.mean(pts_times_delta):+6.4f} (±{statistics.stdev(pts_times_delta):6.4f})") | |
print(f"\033[48;5;22mmax :\033[0m {max(pts_times_delta):+6.4f}") | |
print(f"\033[48;5;22mmode :\033[0m {statistics.mode(pts_times_delta):+6.4f}") | |
print(f"\033[48;5;22mmedian:\033[0m {statistics.median(pts_times_delta):+6.4f}") | |
print(f"\033[48;5;22mstdev :\033[0m {statistics.stdev(pts_times_delta):+6.4f}") | |
print() | |
gop_sizes = [len(x.frames) for x in gops] | |
if args.verbose: | |
print(f"GOP sizes: {gop_sizes}") | |
print(f"GOP lengths: {sorted(list(set(gop_sizes)))}") | |
print( "\033[48;5;18mStatistics (units=frames):\033[0m") | |
print(f"\033[48;5;22mmin :\033[0m {min(gop_sizes):+4d}") | |
print(f"\033[48;5;22mmean :\033[0m {statistics.mean(gop_sizes):+6.4f} (±{statistics.stdev(gop_sizes):6.4f})") | |
print(f"\033[48;5;22mmax :\033[0m {max(gop_sizes):+4d}") | |
print(f"\033[48;5;22mmode :\033[0m {statistics.mode(gop_sizes):+4d}") | |
print(f"\033[48;5;22mmedian:\033[0m {statistics.median(gop_sizes):+6.4f}") | |
print(f"\033[48;5;22mstdev :\033[0m {statistics.stdev(gop_sizes):+6.4f}") | |
width = plotext.terminal_width() | |
bins = min(width, max(gop_sizes) - min(gop_sizes)) | |
if args.verbose: | |
print(f"{width=} {bins=}") | |
plotext.theme('pro') | |
plotext.grid(args.grid, args.grid) | |
plotext.plot_size(width, 20) | |
plotext.hist(data = gop_sizes, bins = bins) | |
if int(args.duration) > 0: | |
plotext.title(f"Histogram Plot first {args.duration}s") | |
else: | |
plotext.title(f"Histogram Plot of entire file") | |
plotext.xlabel("GOP size") | |
plotext.show() | |
else: | |
if args.verbose: | |
print("\033[4mGOP Frame Analysis:\033[24m\n") | |
print_gops(gops, print_format) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment