Last active
March 8, 2021 04:38
-
-
Save AndreVallestero/84499470e42678a1ecb3b5fd3ab852dc to your computer and use it in GitHub Desktop.
Scene threaded transcoder utility
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install av scenedetect opencv-python | |
from os import cpu_count, mkdir, path | |
from tempfile import mkdtemp | |
from subprocess import Popen, PIPE | |
from concurrent.futures import ThreadPoolExecutor | |
from av import open as av_open, format as av_format | |
from scenedetect.detectors import ContentDetector | |
from scenedetect.scene_manager import SceneManager | |
from scenedetect.video_manager import VideoManager | |
from scenedetect.frame_timecode import FrameTimecode | |
""" | |
Equivalent to: | |
av1an -i tgow.mkv -w 4 -enc rav1e -v "--speed 10 --quantizer 150 --low-latency --rdo-lookahead-frames 5 --threads 2 --tiles 2 -b 1000" --max_q 150 -xs 240 -tr 35 -cm vs_ffms2 --split_method pyscene -o av1an-tgow.mkv | |
TODO: Networking | |
Each worker requests for work and is given a chunk that it will work on | |
If the worker contains the source, it will use that (verify hash) to encode | |
Else, the server will send a segment from the source (lossless keyframe cut), that contains the chunk it will work on, | |
aswell as the start frame of that segment so an offset for the chunk can be calculated | |
This can be done very quickly using packet keyframe detect | |
1 instance per physical core with n+1 tiles/threads per virtual thread = 10% faster | |
Compared to 1 instance per thread | |
e.g. 4c/8t = 4 instances (2 tiles / threads each instance) | |
Use 1 instance for debugging | |
""" | |
INSTANCES = cpu_count() // 2 # 1 | |
SOURCE = "tgow.mkv" | |
TMP_DIR = "tmp" | |
class FileAlias(): | |
def __init__(self, pipe): | |
self.name = str(pipe.name) | |
self.write = pipe.write | |
def main(): | |
# Count frames | |
keyframes = [] | |
frames = 0 | |
with av_open(SOURCE) as container: | |
for packet in container.demux(container.streams.video[0]): | |
if packet.is_keyframe: | |
keyframes.append(frames) | |
if packet.dts != None: | |
frames += 1 | |
# Generate keyframe insertions | |
video_manager = VideoManager([SOURCE]) | |
scene_manager = SceneManager() | |
scene_manager.add_detector(ContentDetector(threshold=35, min_scene_len=60)) | |
video_manager.set_downscale_factor() | |
video_manager.start() | |
scene_manager.detect_scenes(frame_source=video_manager) | |
scene_list = scene_manager.get_scene_list() | |
scenes = [int(scene[0].get_frames()) for scene in scene_list] | |
if scenes[0] != 0: | |
scenes.insert(0, 0) | |
if scenes[-1] != frames: | |
scenes.append(frames) | |
# Add more keyframes if segments are too long | |
max_scene_len = 240 | |
for i in range(len(scenes)-1): | |
distance = scenes[i+1] - scenes[i] | |
if distance > max_scene_len: | |
to_insert = distance // max_scene_len + 1 | |
offset = distance // to_insert | |
for j in range(1, to_insert): | |
scenes.append(scenes[i] + offset * j) | |
scenes.sort() | |
# Find nearest keyframe for each start frame | |
start_frames = scenes[:-1] | |
kfi = 1 | |
nearest_keyframes = [] | |
max_kfi = len(keyframes) - 1 | |
for start_frame in start_frames: | |
while(kfi <= max_kfi and keyframes[kfi] <= start_frame): | |
kfi += 1 | |
nearest_keyframes.append(keyframes[kfi - 1]) | |
# Generate chunks based on the scene postitions, make tempdir | |
chunks = tuple(zip(range(len(scenes)-1), nearest_keyframes, start_frames, scenes[1:])) | |
if not path.exists(TMP_DIR): | |
mkdir(TMP_DIR) | |
# Encode chunks | |
safe_instances = min(INSTANCES, len(chunks)) | |
if safe_instances > 1: | |
with ThreadPoolExecutor(max_workers=safe_instances) as executor: | |
results = tuple(executor.map(encode_chunk, chunks)) | |
else: | |
results = tuple(map(encode_chunk, chunks)) | |
# Create a concat file, https://github.com/PyAV-Org/PyAV/issues/469#issuecomment-457561862 | |
base_name = SOURCE.rsplit(".", 1)[0] | |
concat_file_name = f"{TMP_DIR}/concat-{base_name}.txt" | |
with open(concat_file_name, "w") as concat_file: | |
concat_file.writelines( | |
f"file 'enc-{base_name}-{chunk[0]}.ivf'\n" | |
for chunk in chunks) | |
# Concat files, https://github.com/PyAV-Org/PyAV/issues/47#issuecomment-394795283 | |
with av_open(concat_file_name, format="concat") as concat_container: | |
input_stream = concat_container.streams.video[0] | |
with av_open(f"enc-{base_name}.mkv", "w") as output_container: | |
output_container.add_stream(template=input_stream) | |
for packet in concat_container.demux(input_stream): | |
if packet.dts is not None: | |
output_container.mux(packet) | |
def encode_chunk(chunk: (int, int, int, int)): | |
chunk_index, nearest_keyframe, start_frame, end_frame = chunk | |
base_name = SOURCE.rsplit(".", 1)[0] | |
output_name = f"{TMP_DIR}/enc-{base_name}-{chunk_index}.ivf" # Should probably hash the output based on params to avoid name clashes | |
p = Popen(["rav1e", "-y", "-s", "10", "--quantizer", "150", # Quantizer 255 is really bad | |
"--low-latency", "--rdo-lookahead-frames", "5", | |
"--threads", "2", "--tiles", "2", # Use this when num instances = physical cores, threads = 2x cores, better smt optimization | |
"-b", "1000", "--output", output_name, "-"], stdin=PIPE) | |
# Open source video | |
with av_open(SOURCE) as container: | |
source_stream = container.streams.video[0] | |
# Create y4m encoder | |
with av_open(FileAlias(p.stdin), "w", format="yuv4mpegpipe", container_options={"strict": "unofficial"}) as enc_pipe: | |
y4m_stream = enc_pipe.add_stream("wrapped_avframe") | |
y4m_ctx = y4m_stream.codec_context | |
src_ctx = source_stream.codec_context | |
y4m_ctx.width = src_ctx.width | |
y4m_ctx.height = src_ctx.height | |
y4m_ctx.framerate = src_ctx.framerate | |
y4m_ctx.rate = src_ctx.rate | |
y4m_ctx.format = src_ctx.format | |
y4m_ctx.pix_fmt = src_ctx.pix_fmt | |
# Seek to chunk location | |
frame_index = 0 | |
for packet in container.demux(source_stream): | |
if frame_index >= end_frame: break | |
# Start decoding from the nearest key frame | |
if frame_index >= nearest_keyframe: | |
for frame in packet.decode(): | |
# Pipe to encoder | |
if frame_index >= start_frame: | |
raw_packets = y4m_stream.encode(frame) | |
enc_pipe.mux(raw_packets) | |
if packet.dts != None: | |
frame_index += 1 | |
# Wait for encoder to finish | |
p.stdin.close() | |
p.wait() | |
if __name__ == "__main__": | |
from time import time | |
start = time() | |
main() | |
print("Finished:", time() - start) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment