Last active
October 12, 2021 03:48
-
-
Save hhsprings/02f775b046e6efed6dff55a586d439ac to your computer and use it in GitHub Desktop.
shorthand of "ffmpeg ... -c:v libx265 ..."
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| from __future__ import unicode_literals | |
| import os | |
| import sys | |
| import re | |
| import subprocess | |
| import shutil | |
| import tempfile | |
| import logging | |
| from glob import glob | |
| _log = logging.getLogger() | |
| _spcodes = { | |
| "reset": "\x1b[39;49;00m", | |
| # R:1 G Y B M C W:7 | |
| "target": "\x1b[33m", #"\x1b[42;30m", | |
| "extb": "\x1b[44;37;1m", | |
| "exta": "\x1b[44;37;1m", | |
| "reduced": "\x1b[42;30m", | |
| } | |
| if sys.platform == "win32": | |
| try: | |
| import colorama | |
| colorama.init() | |
| except ImportError as e: | |
| _spcodes = {k: "" for k, _ in _spcodes.items()} | |
| if hasattr("", "decode"): | |
| _encode = lambda s: s.encode(sys.getfilesystemencoding()) | |
| else: | |
| _encode = lambda s: s | |
| def _filter_args(*cmd): | |
| """ | |
| do filtering None, and do encoding items to bytes | |
| (in Python 2). | |
| """ | |
| return list(map(_encode, filter(None, *cmd))) | |
| def check_call(*popenargs, **kwargs): | |
| """ | |
| Basically do simply forward args to subprocess#check_call, but this | |
| does two things: | |
| * It does encoding these to bytes in Python 2. | |
| * It does omitting `None` in *cmd. | |
| """ | |
| try: | |
| import psutil | |
| _popen = psutil.Popen | |
| except ImportError: | |
| _popen = subprocess.Popen | |
| def _call(*popenargs, timeout=None, **kwargs): | |
| with _popen(*popenargs, **kwargs) as p: | |
| try: | |
| # IDLE_PRIORITY_CLASS=2<<5 | |
| if hasattr(p, "nice"): | |
| p.nice(2<<5) | |
| return p.wait(timeout=timeout) | |
| except: # Including KeyboardInterrupt, wait handled that. | |
| p.kill() | |
| # We don't call p.wait() again as p.__exit__ does that for us. | |
| raise | |
| def _check_call(*popenargs, **kwargs): | |
| retcode = _call(*popenargs, **kwargs) | |
| if retcode: | |
| cmd = kwargs.get("args") | |
| if cmd is None: | |
| cmd = popenargs[0] | |
| raise subprocess.CalledProcessError(retcode, cmd) | |
| return 0 | |
| cmd = kwargs.get("args") | |
| if cmd is None: | |
| cmd = popenargs[0] | |
| _check_call(_filter_args(cmd), **kwargs) | |
| def check_stderroutput(*popenargs, **kwargs): | |
| """ | |
| Unfortunately, ffmpeg and ffprobe throw out the information | |
| we want into the standard error output, and subprocess.check_output | |
| discards the standard error output. This function is obtained by | |
| rewriting subprocess.check_output for standard error output. | |
| And this does two things: | |
| * It does encoding these to bytes in Python 2. | |
| * It does omitting `None` in *cmd. | |
| """ | |
| if 'stderr' in kwargs: | |
| raise ValueError( | |
| 'stderr argument not allowed, it will be overridden.') | |
| cmd = kwargs.get("args") | |
| if cmd is None: | |
| cmd = popenargs[0] | |
| # | |
| process = subprocess.Popen( | |
| _filter_args(cmd), | |
| stderr=subprocess.PIPE, | |
| **kwargs) | |
| stdout_output, stderr_output = process.communicate() | |
| retcode = process.poll() | |
| if retcode: | |
| raise subprocess.CalledProcessError( | |
| retcode, list(cmd), output=stderr_output) | |
| return stderr_output | |
| # FIXME: .avi, .wmv, ...? | |
| SUPPORTED_CONTAINERS = [".mkv", ".mp4", ".webm"] | |
| def _check_orig(ifn, enctxt): | |
| probed = check_stderroutput(["ffprobe", "-hide_banner", ifn]) | |
| if re.search(br"Video: ansi,", probed): | |
| raise ValueError("{} is not video.".format(ifn)) | |
| return re.search(enctxt, probed) | |
| def _ffmpeg_for_conv(ifn, ofn, orig_same, encoder, fps, supress_ffmpeg_banner_opt): | |
| #venc = "copy" if orig_same else "libx265" | |
| ffmcmdl = [ | |
| "ffmpeg", supress_ffmpeg_banner_opt, "-y", | |
| "-i", ifn, | |
| "-c:v", | |
| ] | |
| #"-c:v", venc, | |
| if orig_same and not fps and "-crf" not in encoder: | |
| ffmcmdl.append("copy") | |
| else: | |
| ffmcmdl.extend(encoder) | |
| if fps: | |
| ffmcmdl.extend(["-vf", "fps={}".format(fps)]) | |
| ffmcmdl.extend([ | |
| "-c:a", "copy", | |
| "-map_metadata", "-1", | |
| ofn | |
| ]) | |
| check_call(ffmcmdl) | |
| def _get_argvideos(args): | |
| from glob import glob | |
| files = [] | |
| if args.videoargs_is_glob: | |
| for pat in args.video: | |
| files.extend(list(glob(pat))) | |
| else: | |
| files = args.video | |
| order = args.processing_order | |
| if order != "as_is": | |
| if order.startswith("small"): | |
| files.sort(key=lambda fn: os.stat(fn).st_size) | |
| else: | |
| files.sort(key=lambda fn: -os.stat(fn).st_size) | |
| return files | |
| def _main(): | |
| _LOGFMT = "%(asctime)s:%(levelname)5s:%(module)s(%(lineno)d):%(thread)x| %(message)s" | |
| logging.basicConfig( | |
| stream=sys.stderr, level=logging.INFO, format=_LOGFMT) | |
| import argparse | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("video", nargs="+") | |
| ap.add_argument( | |
| "--videoencoder", | |
| choices=["libx265", "libvpx-vp9"], default="libx265") | |
| ap.add_argument("--fps", type=float) | |
| ap.add_argument( | |
| "--crf_x265", type=float, | |
| help="if this is not specified, ffmpeg seems to use -crf 28 as default.") | |
| ap.add_argument("--crf_vp9", default="40") | |
| ap.add_argument( | |
| "--out_container", | |
| choices=SUPPORTED_CONTAINERS + ["KEEP"], default="KEEP") | |
| ap.add_argument("--browse_wd", help="ex. 'explorer'") | |
| ap.add_argument("--remove_original", action="store_true") | |
| ap.add_argument("--videoargs_is_glob", action="store_true") | |
| ap.add_argument("--donttouch_if_same_codec", action="store_true") | |
| ap.add_argument("--just_check_glob", action="store_true") | |
| ap.add_argument("--supress_ffmpeg_banner", action="store_true") | |
| ap.add_argument( | |
| "--processing_order", | |
| choices=[ | |
| "as_is", | |
| "smallersize_first", | |
| "largersize_first", | |
| ], default="as_is") | |
| args = ap.parse_args() | |
| curdir = os.path.abspath(os.curdir) | |
| if not args.just_check_glob: | |
| tmpdir = tempfile.mkdtemp() | |
| else: | |
| tmpdir = "" | |
| if args.browse_wd: | |
| try: | |
| check_call([args.browse_wd, curdir]) | |
| except Exception: | |
| pass | |
| try: | |
| check_call([args.browse_wd, tmpdir]) | |
| except Exception: | |
| pass | |
| if args.videoencoder == "libx265": | |
| enc, enctxt = "h.265", br" Video: hevc " | |
| encoder = [args.videoencoder] | |
| if args.crf_x265: | |
| encoder.extend(["-crf", "{}".format(args.crf_x265)]) | |
| SUPPORTED_CONTAINERS.remove(".webm") | |
| else: | |
| enc, enctxt = "vp9", br" Video: vp9 " | |
| encoder = [args.videoencoder, "-crf", args.crf_vp9] | |
| encoder.extend(["-max_muxing_queue_size", "2048"]) | |
| # | |
| def _processed_infoout(ifn, ofn): | |
| return "{}{}{} ('{}{}{}' {:3,d} bytes -> '{}{}{}' {:3,d} bytes ({}{:.2f}{} %))".format( | |
| _spcodes["target"], | |
| os.path.basename(ofn), | |
| _spcodes["reset"], | |
| _spcodes["extb"], | |
| os.path.splitext(ifn)[1], | |
| _spcodes["reset"], | |
| os.stat(ifn).st_size, | |
| _spcodes["exta"], | |
| os.path.splitext(ofn)[1], | |
| _spcodes["reset"], | |
| os.stat(ofn).st_size, | |
| _spcodes["reduced"], | |
| os.stat(ofn).st_size / os.stat(ifn).st_size * 100, | |
| _spcodes["reset"]) | |
| # | |
| supress_ffmpeg_banner_opt = "-hide_banner" if args.supress_ffmpeg_banner else "" | |
| try: | |
| _done = [] | |
| for media in _get_argvideos(args): | |
| try: | |
| orig_same = _check_orig(media, enctxt) | |
| except Exception as e: | |
| _log.error(e) | |
| continue | |
| if orig_same and args.donttouch_if_same_codec: | |
| _log.info("ignoring: %s", media) | |
| continue | |
| # | |
| if args.just_check_glob: | |
| ifn, ofn = media, media | |
| else: | |
| ifn = os.path.join(tmpdir, media) | |
| # | |
| ext = args.out_container | |
| if ext == "KEEP": | |
| _, ext = os.path.splitext(ifn.lower()) | |
| if ext not in SUPPORTED_CONTAINERS: | |
| # Should we do either whitelist or blacklist test? | |
| _log.info("ignoring: '%s' because '%s' does not support %s", media, ext, enc) | |
| continue | |
| # | |
| if not os.path.exists(os.path.dirname(ifn)): | |
| os.makedirs(os.path.dirname(ifn)) | |
| shutil.copyfile(media, ifn) | |
| os.remove(media) | |
| # | |
| mkv = os.path.splitext(media)[0] + ext | |
| ofn = os.path.join(curdir, mkv) | |
| _ffmpeg_for_conv( | |
| ifn, ofn, | |
| orig_same, encoder, args.fps, | |
| supress_ffmpeg_banner_opt) | |
| _log.info("%s", _processed_infoout(ifn, ofn)) | |
| _done.append((ifn, ofn)) | |
| except KeyboardInterrupt: | |
| args.remove_original = False | |
| finally: | |
| if _done: | |
| _log.info("processed:\n %s", "\n ".join([ | |
| _processed_infoout(ifn, ofn) for ifn, ofn in _done])) | |
| if args.remove_original: | |
| for ifn, _ in _done: | |
| try: | |
| os.remove(ifn) | |
| except Exception as erm: | |
| _log.warning("cannot remove %r: %r", ifn, erm) | |
| if not list(glob(os.path.join(tmpdir, "*"))): | |
| os.rmdir(tmpdir) | |
| else: | |
| _log.info("originals: {}".format(tmpdir)) | |
| if __name__ == '__main__': | |
| _main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment