Last active
May 1, 2019 06:17
-
-
Save ageis/322acd831078058d5938ec3c79a6895a to your computer and use it in GitHub Desktop.
Extract JPEG frames from video while applying optional magnification/cropping/sharpening/contrast enhancement; specify the duration/section or use keyframes/one frame per second, etc.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# coding: utf-8 | |
from __future__ import unicode_literals | |
from __future__ import print_function | |
import argparse | |
import re | |
import json | |
import os | |
import signal | |
import sys | |
import numpy as np | |
import subprocess | |
from imutils.video import FileVideoStream | |
from imutils.video import FPS | |
from pick import pick | |
import imutils | |
import time | |
import cv2 | |
from MediaInfo import MediaInfo | |
import traceback | |
if sys.version_info < (3, 0): | |
import Queue as queue | |
input = raw_input | |
else: | |
import queue | |
unicode = str | |
import chardet | |
import sh | |
import tqdm | |
class MediaInfo: | |
def __init__(self, **kwargs): | |
self.filename = kwargs.get("filename") | |
self.cmd = kwargs.get("cmd") | |
self.info = dict() | |
if self.filename == None: | |
self.filename = "" | |
if self.cmd == None: | |
for cmdpath in os.environ["PATH"].split(":"): | |
if os.path.isdir(cmdpath) and "mediainfo" in os.listdir(cmdpath): | |
self.cmd = cmdpath + "/mediainfo" | |
elif os.path.isdir(cmdpath) and "ffprobe" in os.listdir(cmdpath): | |
self.cmd = cmdpath + "/ffprobe" | |
if self.cmd == None: | |
self.cmd = "" | |
def getInfo(self): | |
if not os.path.exists(self.filename) or not os.path.exists(self.cmd): | |
return None | |
cmdName = os.path.basename(self.cmd) | |
if cmdName == "ffprobe": | |
self._ffprobeGetInfo() | |
elif cmdName == "mediainfo": | |
self._mediainfoGetInfo() | |
return self.info | |
def _ffprobeGetInfo(self): | |
cmd = ( | |
self.cmd | |
#+ " -v 0 -loglevel quiet -print_format json -show_entries -show_error -count_frames -i " | |
+ " -v error -loglevel quiet -select_streams v:0 -show_format -show_streams -show_error -print_format json -i " | |
+ self.filename | |
) | |
outputBytes = "" | |
try: | |
outputBytes = subprocess.check_output(cmd, shell=True) | |
except subprocess.CalledProcessError as e: | |
return "" | |
outputText = outputBytes.decode("utf-8") | |
self.info = self._ffprobeGetInfoJson(outputText) | |
def _ffprobeGetInfoJson(self, sourceString): | |
mediaInfo = dict() | |
infoDict = dict() | |
try: | |
infoDict = json.loads(sourceString) | |
except json.JSONDecodeError as err: | |
return mediaInfo | |
mediaInfo["duration"] = infoDict.get("format").get("duration") | |
videoStreamIndex = None | |
for item in infoDict.get("streams"): | |
codec_type = item.get("codec_type") | |
if codec_type == "video": | |
videoStreamIndex = item.get("index") | |
mediaInfo["haveVideo"] = True | |
if mediaInfo.get("haveVideo"): | |
mediaInfo["videoDuration"] = infoDict.get("streams")[videoStreamIndex].get("duration") | |
mediaInfo["videoWidth"] = infoDict.get("streams")[videoStreamIndex].get("width") | |
mediaInfo["videoHeight"] = infoDict.get("streams")[videoStreamIndex].get("height") | |
mediaInfo["videoFrameRate"] = infoDict.get("streams")[videoStreamIndex].get( | |
"r_frame_rate" | |
) | |
mediaInfo["videoFrameCount"] = infoDict.get("streams")[videoStreamIndex].get( | |
"nb_read_frames" | |
) | |
return mediaInfo | |
def _mediainfoGetInfo(self): | |
prevPath = os.getcwd() | |
newPath = os.path.abspath(os.path.dirname(self.filename)) | |
file = os.path.basename(self.filename) | |
cmd = self.cmd + " -f " + file | |
outputBytes = "" | |
try: | |
os.chdir(newPath) | |
try: | |
outputBytes = subprocess.check_output(cmd, shell=True) | |
except subprocess.CalledProcessError as e: | |
return "" | |
outputText = outputBytes.decode("utf-8") | |
except IOError: | |
os.chdir(prevPath) | |
return "" | |
finally: | |
os.chdir(prevPath) | |
self.info = self._mediainfoGetInfoRegex(outputText) | |
def _mediainfoGetInfoRegex(self, sourceString): | |
mediaInfo = dict() | |
general = re.search("(^General\n.*?\n\n)", sourceString, re.S) | |
if general: | |
generalInfo = general.group(0) | |
container = re.search("Format\s*:\s*([\w\_\-\\\/\. ]+)\n", generalInfo, re.S) | |
fileSize = re.search("File size\s*:\s*(\d+)\.?\d*\n", generalInfo, re.S) | |
duration = re.search("Duration\s*:\s*(\d+)\.?\d*\n", generalInfo, re.S) | |
mediaInfo["container"] = container.group(1) | |
mediaInfo["fileSize"] = fileSize.group(1) | |
mediaInfo["duration"] = (str)((float)(duration.group(1)) / 1000) | |
video = re.search("(\nVideo[\s\#\d]*\n.*?\n\n)", sourceString, re.S) | |
if video: | |
mediaInfo["haveVideo"] = 1 | |
videoInfo = video.group(0) | |
videoDuration = re.search("Duration\s*:\s*(\d+)\.?\d*\n", videoInfo, re.S) | |
videoWidth = re.search("Width\s*:\s*(\d+)\n", videoInfo, re.S) | |
videoHeight = re.search("Height\s*:\s*(\d+)\n", videoInfo, re.S) | |
videoFrameRate = re.search("Frame rate\s*:\s*([\d\.]+)\n", videoInfo, re.S) | |
videoFrameCount = re.search("Frame count\s*:\s*(\d+)\.?\d*\n", videoInfo, re.S) | |
if videoDuration: | |
mediaInfo["videoDuration"] = (str)((float)(videoDuration.group(1)) / 1000) | |
if videoWidth: | |
mediaInfo["videoWidth"] = (int)(videoWidth.group(1)) | |
if videoHeight: | |
mediaInfo["videoHeight"] = (int)(videoHeight.group(1)) | |
if videoFrameRate: | |
mediaInfo["videoFrameRate"] = videoFrameRate.group(1) | |
if videoFrameCount: | |
mediaInfo["videoFrameCount"] = videoFrameCount.group(1) | |
return mediaInfo | |
class ProgressNotifier(object): | |
_DURATION_RX = re.compile("Duration: (\d{2}):(\d{2}):(\d{2})\.\d{2}") | |
_PROGRESS_RX = re.compile("time=(\d{2}):(\d{2}):(\d{2})\.\d{2}") | |
_SOURCE_RX = re.compile("from '(.*)':") | |
_FPS_RX = re.compile("(\d{2}\.\d{2}|\d{2}) fps") | |
@staticmethod | |
def _seconds(hours, minutes, seconds): | |
return (int(hours) * 60 + int(minutes)) * 60 + int(seconds) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
if self.pbar is not None: | |
self.pbar.close() | |
def __init__(self, file=None): | |
self.lines = [] | |
self.line_acc = [] | |
self.duration = None | |
self.source = None | |
self.started = False | |
self.pbar = None | |
self.fps = None | |
self.file = file or sys.stderr | |
def __call__(self, char, stdin): | |
if not isinstance(char, unicode): | |
encoding = chardet.detect(char)["encoding"] | |
char = unicode(char, encoding) | |
if char in "\r\n": | |
line = self.newline() | |
if self.duration is None: | |
self.duration = self.get_duration(line) | |
if self.source is None: | |
self.source = self.get_source(line) | |
if self.fps is None: | |
self.fps = self.get_fps(line) | |
self.progress(line) | |
else: | |
self.line_acc.append(char) | |
if self.line_acc[-6:] == list("[y/N] "): | |
print("".join(self.line_acc), end="") | |
stdin.put(input() + "\n") | |
self.newline() | |
def newline(self): | |
line = "".join(self.line_acc) | |
self.lines.append(line) | |
self.line_acc = [] | |
return line | |
def get_fps(self, line): | |
search = self._FPS_RX.search(line) | |
if search is not None: | |
return round(float(search.group(1))) | |
def get_duration(self, line): | |
search = self._DURATION_RX.search(line) | |
if search is not None: | |
return self._seconds(*search.groups()) | |
return None | |
def get_source(self, line): | |
search = self._SOURCE_RX.search(line) | |
if search is not None: | |
return os.path.basename(search.group(1)) | |
return None | |
def progress(self, line): | |
search = self._PROGRESS_RX.search(line) | |
if search is not None: | |
total = self.duration | |
current = self._seconds(*search.groups()) | |
unit = " seconds" | |
if self.fps is not None: | |
unit = " frames" | |
current *= self.fps | |
total *= self.fps | |
if self.pbar is None: | |
self.pbar = tqdm.tqdm( | |
desc=self.source, | |
file=self.file, | |
total=total, | |
dynamic_ncols=True, | |
unit=unit, | |
ncols=0, | |
) | |
self.pbar.update(current - self.pbar.n) | |
def calculate_fps(filename): | |
if not os.path.exists(filename): | |
sys.stderr.write("ERROR: filename %r was not found!" % (filename)) | |
sys.exit(1) | |
fps_results = [] | |
fvs = FileVideoStream(filename).start() | |
timer = FPS().start() | |
timeout = time.time() + 1 | |
while fvs.more(): | |
fvs.read() | |
timer.update() | |
if time.time() >= timeout: | |
timer.stop() | |
fvs.stop() | |
fps_results.append({'OpenCV file video stream': int(np.round(timer.fps()))}) | |
break | |
vid = cv2.VideoCapture(filename) | |
cv2.namedWindow = os.path.basename(sys.argv[0]) | |
if not vid.isOpened(): | |
sys.stderr.write("ERROR: Cannot read video file %r" % (filename)) | |
sys.exit(1) | |
opencv_fps = vid.get(cv2.CAP_PROP_FPS) | |
fps_results.append({'OpenCV video capture properties': int(opencv_fps)}) | |
# tracker = cv2.TrackerBoosting_create() | |
# bbox = cv2.selectROI(frame, False) | |
# tracker.init(frame, bbox) | |
timer = FPS().start() | |
timeout = time.time() + 1 | |
while vid.isOpened(): | |
frame = vid.read() | |
# ticks = cv2.getTickCount() | |
timer.update() | |
# frames = cv2.getTickFrequency() / (cv2.getTickCount() - ticks) | |
cv2.waitKey(1) | |
if time.time() >= timeout: | |
timer.stop() | |
vid.release() | |
cv2.destroyAllWindows() | |
break | |
fps_results.append({'OpenCV video capture approximation': int(np.round(timer.fps()))}) | |
media_info = query_mediainfo(filename=filename, method='/usr/bin/mediainfo') | |
ffprobe_info = query_mediainfo(filename=filename, method='/usr/bin/ffprobe') | |
#fps_results.append({'MediaInfo': str(int(media_info['videoFrameRate'].split('.')[0]))}) | |
#fps_results.append({'ffprobe': str(int(ffprobe_info['videoFrameRate'].split('/')[0]))}) | |
fps_rates = [] | |
for measurement in fps_results: | |
for num in measurement.values(): | |
fps_rates.append(int(num)) | |
custom_string = 'Select this option to input custom FPS value.' | |
auto_string = 'Select this option to automatically use the average of all framerates.' | |
fps_results.append(custom_string) | |
fps_results.append(auto_string) | |
title = 'Please choose the correct frames per second (FPS) from these sources:' | |
selected = pick(fps_results, title, multi_select=False, min_selection_count=1) | |
print(selected[1]) | |
#sys.exit(0) | |
if selected[1] == 5: | |
fps_rate = input('Frames per second: ') | |
return fps_rate | |
elif selected[1] == 6: | |
avg_fps = int(np.round(float(sum(fps_rates))/len(fps_rates))) | |
else: | |
return selected[0] | |
def recurse_keys(df, indent = ' '): | |
for key in df.keys(): | |
print(indent+str(key)) | |
if isinstance(df[key], dict): | |
recurse_keys(df[key], indent+' ') | |
def get_frame_rate(filename): | |
if not os.path.exists(filename): | |
sys.stderr.write("ERROR: filename %r was not found!" % (filename,)) | |
sys.exit(1) | |
return -1 | |
out = subprocess.check_output(['ffprobe', filename , '-v', 'error' , '-select_streams','v:0','-print_format','flat','-show_entries','stream=avg_frame_rate','-of', 'default=noprint_wrappers=1:nokey=1'], encoding='utf8') | |
rate = str(out) | |
num = rate.split('/')[0] | |
return np.round(int(num)) | |
def get_dimensions(filename): | |
if not os.path.exists(filename): | |
sys.stderr.write("ERROR: filename %r was not found!" % (filename,)) | |
sys.exit(1) | |
return -1 | |
vid = cv2.VideoCapture(filename) | |
return (int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)), int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))) | |
def query_mediainfo(**kwargs): | |
fn = kwargs.get('filename') | |
method = kwargs.get('method') | |
try: | |
info = MediaInfo(filename=fn,cmd=method) | |
except: | |
traceback.print_exc(file=sys.stdout) | |
sys.exit(1) | |
return info.getInfo() | |
def test_dimension_string(input): | |
DIMENSION_REGEX = re.compile("(\d+):(\d+):(\d+):(\d+)") | |
search = DIMENSION_REGEX.search(input) | |
if search is not None: | |
return True | |
else: | |
return False | |
def is_duration_string(input): | |
DURATION_REGEX_1 = re.compile("(\d{2}):(\d{2}):(\d{2})\.\d{2}") | |
DURATION_REGEX_2 = re.compile("(\d{2}):(\d{2}):(\d{2})") | |
search_1 = DURATION_REGEX_1.search(input) | |
search_2 = DURATION_REGEX_2.search(input) | |
if search_1 is not None: | |
try: | |
time.strptime(input, '%H:%M:%S.%f') | |
except ValueError: | |
return False | |
return True | |
elif search_2 is not None: | |
try: | |
time.strptime(input, '%H:%M:%S') | |
except ValueError: | |
return False | |
return True | |
else: | |
return False | |
def main(argv=None, stream=sys.stderr): | |
argv = argv or sys.argv[1:] | |
parser = argparse.ArgumentParser(description='Extract JPEG frames from input video') | |
parser.add_argument('--filename', required=True, dest='filename', type=str, help='path to video file') | |
parser.add_argument('--destination', required=False, dest='destination', default=os.getcwd(), type=str, help='directory to output images to') | |
parser.add_argument('--start', required=False, dest='start', default=None, type=str, help='time to start at') | |
parser.add_argument('--duration', required=False, dest='duration', default=None, type=str, help='total duration to cover, beginning from specified start time') | |
parser.add_argument('--sharpen', required=False, dest='sharpen', default=True, action='store_true', help='whether to sharpen video slightly') | |
parser.add_argument('--contrast', required=False, dest='contrast', default=False, action='store_true', help='whether to add contrast') | |
parser.add_argument('--crop', required=False, dest='crop', default=None, type=str, help='dimension to crop e.g. 800:600:0:0') | |
parser.add_argument('--zoom', required=False, dest='zoom', default=False, action='store_true', help='double the scale') | |
parser.add_argument('--keyframes', required=False, dest='keyframes', default=False, action='store_true', help='whether to grab only key frames') | |
parser.add_argument('--verbose', required=False, dest='verbose', default=False, action='store_true', help='enable verbose/debug output from ffmpeg') | |
parser.add_argument('--dimensions', required=False, dest='dimensions', default=False, action='store_true', help='determine and output width and height of video file and then exit—for determining crop amount') | |
parser.add_argument('--fps', required=False, dest='fps', default=None, type=int, help='specify custom FPS (sometimes our calculation is incorrect)') | |
parser.add_argument('--calculate', required=False, dest='calculate', default=False, action='store_true', help='query OpenCV, mediainfo and/or ffprobe for FPS rate') | |
parser.add_argument('--onepersec', required=False, dest='onepersec', default=False, action='store_true', help='extract only one frame per second') | |
args = parser.parse_args() | |
if args.dimensions and args.filename: | |
width, height = get_dimensions(args.filename) | |
sys.stdout.write("%r x %r" % (width, height)) | |
sys.exit(0) | |
if args.calculate and args.filename: | |
calculated_fps = calculate_fps(args.filename) | |
args.fps = str(int(calculated_fps)) | |
if args.destination == os.getcwd(): | |
print('Using current working directory as default destination: ' + os.getcwd()) | |
ffmpeg_opts = list() | |
filters = list() | |
if args.verbose: | |
ffmpeg_opts.append('-loglevel') | |
ffmpeg_opts.append('debug') | |
ffmpeg_opts.append('-v') | |
ffmpeg_opts.append('verbose') | |
if args.start or args.duration: | |
ffmpeg_opts.append('-accurate_seek') | |
if args.start is not None: | |
if not is_duration_string(args.start): | |
sys.stderr.write("ERROR: %r is not a correctly formatted time duration!" % (args.start)) | |
sys.exit(1) | |
ffmpeg_opts.append('-ss') | |
ffmpeg_opts.append(args.start) | |
if args.duration is not None: | |
if not is_duration_string(args.duration): | |
sys.stderr.write("ERROR: %r is not a correctly formatted time duration!" % (args.duration)) | |
sys.exit(1) | |
ffmpeg_opts.append('-t') | |
ffmpeg_opts.append(args.duration) | |
if not args.fps: | |
fps = str(get_frame_rate(args.filename)) | |
else: | |
fps = str(args.fps) | |
ffmpeg_opts.append('-i') | |
ffmpeg_opts.append(args.filename) | |
if args.onepersec: | |
ffmpeg_opts.append('-r') | |
ffmpeg_opts.append('1') | |
ffmpeg_opts.append('-framerate') | |
ffmpeg_opts.append(fps) | |
else: | |
filters.append('fps=' + fps) | |
if args.keyframes: | |
filters.append('select=eq(pict_type\,I)') | |
ffmpeg_opts.append('-vsync') | |
ffmpeg_opts.append('vfr') | |
if args.crop: | |
if not test_dimension_string(args.crop): | |
sys.stderr.write("ERROR: %r is not a correctly formatted cropping string!" % (args.crop)) | |
sys.exit(1) | |
filters.append('crop=' + args.crop) | |
#filters.append('unsharp=5:5:1.5:5:5:0.0') | |
#filters.append('unsharp=5:7:2.0:7:7:0.0') | |
#filters.append('smartblur=lr=2.00:ls=-0.90:lt=-5.0:cr=0.5:cs=1.0:ct=1.5') | |
#filters.append('convolution="0 -1 0 -1 5 -1 0 -1 0:0 -1 0 -1 5 -1 0 -1 0:0 -1 0 -1 5 -1 0 -1 0:0 -1 0 -1 5 -1 0 -1 0"') | |
#filters.append('convolution="0 0 0 -1 1 0 0 0 0:0 0 0 -1 1 0 0 0 0:0 0 0 -1 1 0 0 0 0:0 0 0 -1 1 0 0 0 0:5:1:1:1:0:128:128:128"') | |
if args.zoom: | |
filters.append('scale=iw*2:ih*2') | |
if args.sharpen: | |
filters.append('unsharp=5:5:1.0:5:5:0.0') | |
if args.contrast: | |
filters.append('eq=contrast=1.0:gamma=0.95:brightness=0:saturation=1.0') | |
filter_opts = ",".join(filters) | |
ffmpeg_opts.append('-filter:v') | |
ffmpeg_opts.append(filter_opts) | |
if not os.path.exists(args.destination): | |
os.makedirs(args.destination) | |
dest = args.destination + '/' + os.path.splitext(os.path.basename(args.filename))[0] + '%d.jpg' | |
print(ffmpeg_opts) | |
if {"-h", "-help", "--help"}.intersection(argv): | |
sh.ffmpeg(help=True, _fg=True) | |
return 0 | |
try: | |
with ProgressNotifier(file=stream) as notifier: | |
sh.ffmpeg( | |
*ffmpeg_opts, | |
'-q:v', | |
'1', | |
dest, | |
_in=queue.Queue(), | |
_err=notifier, | |
_out_bufsize=0, | |
_err_bufsize=0, | |
_in_bufsize=0, | |
#_long_sep=' ', | |
_no_out=False, | |
_no_pipe=True, | |
_tty_in=True | |
) | |
except sh.ErrorReturnCode as err: | |
print(err) | |
print(err.stderr) | |
print(notifier.lines[-1], file=stream) | |
return err.exit_code | |
except KeyboardInterrupt: | |
print("Exiting.", file=stream) | |
return signal.SIGINT + 128 # POSIX standard | |
except Exception as e: | |
print(e) | |
else: | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment