Skip to content

Instantly share code, notes, and snippets.

@AndreVallestero
Last active March 8, 2021 04:38
Show Gist options
  • Save AndreVallestero/84499470e42678a1ecb3b5fd3ab852dc to your computer and use it in GitHub Desktop.
Save AndreVallestero/84499470e42678a1ecb3b5fd3ab852dc to your computer and use it in GitHub Desktop.
Scene threaded transcoder utility
# pip install av scenedetect opencv-python
from os import cpu_count, mkdir, path
from tempfile import mkdtemp
from subprocess import Popen, PIPE
from concurrent.futures import ThreadPoolExecutor
from av import open as av_open, format as av_format
from scenedetect.detectors import ContentDetector
from scenedetect.scene_manager import SceneManager
from scenedetect.video_manager import VideoManager
from scenedetect.frame_timecode import FrameTimecode
"""
Equivalent to:
av1an -i tgow.mkv -w 4 -enc rav1e -v "--speed 10 --quantizer 150 --low-latency --rdo-lookahead-frames 5 --threads 2 --tiles 2 -b 1000" --max_q 150 -xs 240 -tr 35 -cm vs_ffms2 --split_method pyscene -o av1an-tgow.mkv
TODO: Networking
Each worker requests for work and is given a chunk that it will work on
If the worker contains the source, it will use that (verify hash) to encode
Else, the server will send a segment from the source (lossless keyframe cut), that contains the chunk it will work on,
aswell as the start frame of that segment so an offset for the chunk can be calculated
This can be done very quickly using packet keyframe detect
1 instance per physical core with n+1 tiles/threads per virtual thread = 10% faster
Compared to 1 instance per thread
e.g. 4c/8t = 4 instances (2 tiles / threads each instance)
Use 1 instance for debugging
"""
INSTANCES = cpu_count() // 2 # 1
SOURCE = "tgow.mkv"
TMP_DIR = "tmp"
class FileAlias():
def __init__(self, pipe):
self.name = str(pipe.name)
self.write = pipe.write
def main():
# Count frames
keyframes = []
frames = 0
with av_open(SOURCE) as container:
for packet in container.demux(container.streams.video[0]):
if packet.is_keyframe:
keyframes.append(frames)
if packet.dts != None:
frames += 1
# Generate keyframe insertions
video_manager = VideoManager([SOURCE])
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=35, min_scene_len=60))
video_manager.set_downscale_factor()
video_manager.start()
scene_manager.detect_scenes(frame_source=video_manager)
scene_list = scene_manager.get_scene_list()
scenes = [int(scene[0].get_frames()) for scene in scene_list]
if scenes[0] != 0:
scenes.insert(0, 0)
if scenes[-1] != frames:
scenes.append(frames)
# Add more keyframes if segments are too long
max_scene_len = 240
for i in range(len(scenes)-1):
distance = scenes[i+1] - scenes[i]
if distance > max_scene_len:
to_insert = distance // max_scene_len + 1
offset = distance // to_insert
for j in range(1, to_insert):
scenes.append(scenes[i] + offset * j)
scenes.sort()
# Find nearest keyframe for each start frame
start_frames = scenes[:-1]
kfi = 1
nearest_keyframes = []
max_kfi = len(keyframes) - 1
for start_frame in start_frames:
while(kfi <= max_kfi and keyframes[kfi] <= start_frame):
kfi += 1
nearest_keyframes.append(keyframes[kfi - 1])
# Generate chunks based on the scene postitions, make tempdir
chunks = tuple(zip(range(len(scenes)-1), nearest_keyframes, start_frames, scenes[1:]))
if not path.exists(TMP_DIR):
mkdir(TMP_DIR)
# Encode chunks
safe_instances = min(INSTANCES, len(chunks))
if safe_instances > 1:
with ThreadPoolExecutor(max_workers=safe_instances) as executor:
results = tuple(executor.map(encode_chunk, chunks))
else:
results = tuple(map(encode_chunk, chunks))
# Create a concat file, https://github.com/PyAV-Org/PyAV/issues/469#issuecomment-457561862
base_name = SOURCE.rsplit(".", 1)[0]
concat_file_name = f"{TMP_DIR}/concat-{base_name}.txt"
with open(concat_file_name, "w") as concat_file:
concat_file.writelines(
f"file 'enc-{base_name}-{chunk[0]}.ivf'\n"
for chunk in chunks)
# Concat files, https://github.com/PyAV-Org/PyAV/issues/47#issuecomment-394795283
with av_open(concat_file_name, format="concat") as concat_container:
input_stream = concat_container.streams.video[0]
with av_open(f"enc-{base_name}.mkv", "w") as output_container:
output_container.add_stream(template=input_stream)
for packet in concat_container.demux(input_stream):
if packet.dts is not None:
output_container.mux(packet)
def encode_chunk(chunk: (int, int, int, int)):
chunk_index, nearest_keyframe, start_frame, end_frame = chunk
base_name = SOURCE.rsplit(".", 1)[0]
output_name = f"{TMP_DIR}/enc-{base_name}-{chunk_index}.ivf" # Should probably hash the output based on params to avoid name clashes
p = Popen(["rav1e", "-y", "-s", "10", "--quantizer", "150", # Quantizer 255 is really bad
"--low-latency", "--rdo-lookahead-frames", "5",
"--threads", "2", "--tiles", "2", # Use this when num instances = physical cores, threads = 2x cores, better smt optimization
"-b", "1000", "--output", output_name, "-"], stdin=PIPE)
# Open source video
with av_open(SOURCE) as container:
source_stream = container.streams.video[0]
# Create y4m encoder
with av_open(FileAlias(p.stdin), "w", format="yuv4mpegpipe", container_options={"strict": "unofficial"}) as enc_pipe:
y4m_stream = enc_pipe.add_stream("wrapped_avframe")
y4m_ctx = y4m_stream.codec_context
src_ctx = source_stream.codec_context
y4m_ctx.width = src_ctx.width
y4m_ctx.height = src_ctx.height
y4m_ctx.framerate = src_ctx.framerate
y4m_ctx.rate = src_ctx.rate
y4m_ctx.format = src_ctx.format
y4m_ctx.pix_fmt = src_ctx.pix_fmt
# Seek to chunk location
frame_index = 0
for packet in container.demux(source_stream):
if frame_index >= end_frame: break
# Start decoding from the nearest key frame
if frame_index >= nearest_keyframe:
for frame in packet.decode():
# Pipe to encoder
if frame_index >= start_frame:
raw_packets = y4m_stream.encode(frame)
enc_pipe.mux(raw_packets)
if packet.dts != None:
frame_index += 1
# Wait for encoder to finish
p.stdin.close()
p.wait()
if __name__ == "__main__":
from time import time
start = time()
main()
print("Finished:", time() - start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment