Created
May 4, 2020 00:13
-
-
Save fedej/7f848d20205efbff4db4a0fc78eae7ba to your computer and use it in GitHub Desktop.
asyncio ffmpeg-python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from ffmpeg._run import Error, compile | |
from ffmpeg._utils import convert_kwargs_to_cmd_line_args | |
from asyncio import create_subprocess_exec, subprocess, wait_for | |
async def probe(filename, cmd='ffprobe', timeout=None, **kwargs): | |
args = [cmd, '-show_format', '-show_streams', '-of', 'json'] | |
args += convert_kwargs_to_cmd_line_args(kwargs) | |
args += [filename] | |
p = await create_subprocess_exec(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
coro = wait_for(p.communicate(), timeout) if timeout else p.communicate() | |
out, err = await coro | |
if p.returncode != 0: | |
raise Error('ffprobe', out, err) | |
return json.loads(out.decode('utf-8')) | |
async def run(stream_spec, | |
cmd='ffmpeg', | |
pipe_stdin=False, | |
pipe_stdout=False, | |
pipe_stderr=False, | |
input=None, | |
quiet=False, | |
overwrite_output=False,): | |
args = compile(stream_spec, cmd, overwrite_output=overwrite_output) | |
stdin_stream = subprocess.PIPE if pipe_stdin else None | |
stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None | |
stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None | |
p = await create_subprocess_exec( | |
*args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream | |
) | |
out, err = await p.communicate(input) | |
if p.returncode != 0: | |
raise Error('ffmpeg', out, err) | |
return out, err | |
__all__ = ['probe', 'run'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment