Created
September 20, 2023 01:59
-
-
Save monyone/83f3d5d48d717a3531c23757c81f11a0 to your computer and use it in GitHub Desktop.
AV1 in MPEG-TS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
ffmpeg -f lavfi -i testsrc=r=30:d=10 -pix_fmt yuv420p -c:v librav1e -r 30 -movflags frag_every_frame+empty_moov av1.mp4 | |
# ffmpeg -f lavfi -i testsrc=r=30:d=10 -pix_fmt yuv420p -c:v libaom-av1 -r 30 -movflags frag_every_frame+empty_moov av1.mp4 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import sys | |
from dataclasses import dataclass | |
import re | |
def findBox(data: memoryview | bytearray | bytes, box: str | list[str]) -> memoryview | bytearray | bytes | None: | |
begin = 0 | |
while begin < len(data): | |
size = int.from_bytes(data[begin:begin+4], byteorder='big') - 8 | |
name = data[begin+4:begin+8].decode('ascii') | |
content = data[begin+8:begin+8+size] | |
if type(box) == str and name == box: | |
return content | |
elif type(box) == list and name == box[0]: | |
if len(box) == 1: return content | |
else: return findBox(content, box[1:]) | |
begin += 8 + size | |
def parseTimescaleByMdhd(data: memoryview | bytearray | bytes) -> int | None: | |
version = data[0] | |
if version == 0: | |
return int.from_bytes(data[12:16], byteorder='big') | |
elif version == 1: | |
return int.from_bytes(data[20:24], byteorder='big') | |
else: | |
return None | |
def parseBaseMediaDecodeTimeByTfdt(data: memoryview | bytearray | bytes): | |
version = data[0] | |
if version == 0: | |
return int.from_bytes(data[4:8], byteorder='big') | |
elif version == 1: | |
return int.from_bytes(data[4:12], byteorder='big') | |
else: | |
return None | |
""" | |
def parseCompositionTimeOffsetByTrun(data: memoryview | bytearray | bytes) -> list[int]: | |
version = data[0] | |
flags = int.from_bytes(data[1:4], byteorder='big') | |
print(flags) | |
sample_count = int.from_bytes(data[4:8], byteorder='big') | |
begin = 8 | |
if (flags & 0x000001) != 0: begin += 4 | |
if (flags & 0x000004) != 0: begin += 4 | |
result: list[int] = [] | |
for _ in range(sample_count): | |
if (flags & 0x000100) != 0: begin += 4 | |
if (flags & 0x000200) != 0: begin += 4 | |
if (flags & 0x000400) != 0: begin += 4 | |
if (flags & 0x000800) != 0: | |
result.append(int.from_bytes(data[4:8], byteorder='big')) | |
begin += 4 | |
return result | |
""" | |
@dataclass | |
class AV1CodecConfigurationBox: | |
marker: int # 1 bit | |
version: int # 7 bit | |
seq_profile: int # 3 bit | |
seq_level_idx_0: int # 5 bit | |
seq_level_tier_0: int # 1 bit | |
high_bitdepth: int # 1 bit | |
twelve_bit: int # 1 bit | |
monochrome: int # 1 bit | |
chroma_subsampling_x: int # 1 bit | |
chroma_subsampling_y: int # 1 bit | |
chroma_sample_position: int # 2 bit | |
reserved: int # 3 bit (0) | |
initial_presentation_delay_present: int # 1 bit | |
initial_presentation_delay_minus_one: int # 4 bit | |
configOBUs: memoryview | bytearray | bytes | |
def parseAV1CodecConfigurationBox(data: memoryview | bytearray | bytes): | |
return AV1CodecConfigurationBox( | |
marker=((data[0] & 0b10000000) >> 7), | |
version=((data[0] & 0b01111111) >> 0), | |
seq_profile=((data[1] & 0b11100000) >> 5), | |
seq_level_idx_0=((data[1] & 0b00011111) >> 0), | |
seq_level_tier_0=((data[2] & 0b10000000) >> 7), | |
high_bitdepth=((data[2] & 0b01000000) >> 6), | |
twelve_bit=((data[2] & 0b00100000) >> 5), | |
monochrome=((data[2] & 0b00010000) >> 4), | |
chroma_subsampling_x=((data[2] & 0b00001000) >> 3), | |
chroma_subsampling_y=((data[2] & 0b00000100) >> 2), | |
chroma_sample_position=((data[2] & 0b00000011) >> 0), | |
reserved=((data[3] & 0b11100000) >> 5), | |
initial_presentation_delay_present=((data[3] & 0b00010000) >> 4), | |
initial_presentation_delay_minus_one=((data[3] & 0b00000001) >> 0), | |
configOBUs=(data[4:]) | |
) | |
ESCAPE = re.compile('\0\0(\0|\1|\2|\3)'.encode('ascii')) | |
def escapeObus(data: memoryview | bytearray | bytes) -> bytes: | |
begin = 0 | |
av1_in_ts = bytearray() | |
while begin < len(data): | |
offset = 0 | |
forbidden_bit = (data[begin + offset] & 0b10000000) >> 7 | |
type = (data[begin + offset] & 0b01111000) >> 3 | |
extension_flag = (data[begin + offset] & 0b00000100) >> 2 | |
has_size_field = (data[begin + offset] & 0b00000010) >> 1 | |
reserved_1bit = (data[begin + offset] & 0b00000001) >> 0 | |
offset += 1 | |
if extension_flag: offset += 1 | |
size = len(data) - (begin + offset) | |
if has_size_field: | |
size, cnt = 0, 0 | |
while True: | |
val = data[begin + offset] | |
offset += 1 | |
size |= (val & 0b01111111) << (cnt * 7) | |
if (val & 0b10000000) == 0: break | |
av1_in_ts += b'\0\0\1' + re.sub(ESCAPE, b'\0\0\3\\1', data[begin + offset: begin + offset + size]) | |
begin += offset + size | |
return bytes(av1_in_ts) | |
PACKET_SIZE = 188 | |
HEADER_SIZE = 4 | |
def packetizeSection(section: memoryview | bytearray | bytes, pid: int, continuity_counter: int) -> list[bytes]: | |
result: list[bytes] = [] | |
begin = 0 | |
while (begin < len(section)): | |
next = min(len(section), begin + (PACKET_SIZE - HEADER_SIZE) - (1 if begin == 0 else 0)) | |
result.append(bytes( | |
([ | |
0x47, | |
(0 << 7) | ((1 if begin == 0 else 0) << 6) | (0 << 5) | ((pid & 0x1F00) >> 8), | |
(pid & 0x00FF), | |
(0 << 6) | (1 << 4) | (continuity_counter & 0x0F), | |
]) + | |
([0] if begin == 0 else []) + | |
list(section[begin:next]) + | |
([0xFF] * ((PACKET_SIZE - HEADER_SIZE) - ((next - begin) + (1 if begin == 0 else 0)))) | |
)) | |
continuity_counter = (continuity_counter + 1) & 0x0F | |
begin = next | |
return result | |
def packetizePES(pes: memoryview | bytearray | bytes, pid: int, continuity_counter: int) -> list[bytes]: | |
result: list[bytes] = [] | |
begin = 0 | |
while (begin < len(pes)): | |
next = min(len(pes), begin + (PACKET_SIZE - HEADER_SIZE)) | |
packet = bytearray() | |
packet += bytes([ | |
0x47, | |
(0 << 7) | ((1 if begin == 0 else 0) << 6) | (0 << 5) | ((pid & 0x1F00) >> 8), | |
(pid & 0x00FF), | |
(0 << 6) | (0x30 if (PACKET_SIZE - HEADER_SIZE) > (next - begin) else 0x10) | (continuity_counter & 0x0F), | |
]) | |
if (((PACKET_SIZE - HEADER_SIZE) > (next - begin))): | |
packet += bytes([((PACKET_SIZE - HEADER_SIZE) - (next - begin)) - 1]) | |
if (((PACKET_SIZE - HEADER_SIZE) > (next - begin + 1))): | |
packet += b'\x00' | |
if (((PACKET_SIZE - HEADER_SIZE) > (next - begin + 2))): | |
packet += bytes([0xFF] * (((PACKET_SIZE - HEADER_SIZE) - (next - begin)) - 2)) | |
packet += bytes(pes[begin:next]) | |
result.append(bytes(packet)) | |
continuity_counter = (continuity_counter + 1) & 0x0F | |
begin = next | |
return result | |
def CRC32(section: memoryview | bytearray | bytes) -> int: | |
crc = 0xFFFFFFFF | |
for byte in section: | |
for index in range(7, -1, -1): | |
bit = (byte & (1 << index)) >> index | |
c = 1 if crc & 0x80000000 else 0 | |
crc <<= 1 | |
if c ^ bit: crc ^= 0x04c11db7 | |
crc &= 0xFFFFFFFF | |
return crc | |
def genPAT(pmt_pid: int): | |
PAT = bytearray([ | |
0x00, # table_id | |
0b10000000, # section_syntax_indicator, section_length | |
0b00000000, # section_length | |
0b00000000, # transport_stream_id | |
0b00000000, # transport_stream_id | |
0b00000001, # reserved, version_number, current_next_indicator | |
0b00000001, # section_number | |
0b00000001, # last_section_number | |
# | |
0b00000000, # program_number | |
0b00000001, # program_number | |
(0x1F00 & pmt_pid) >> 8, # program_map_pid | |
(0x00FF & pmt_pid) >> 0, # program_map_pid | |
]) | |
PAT[1] = (PAT[1] & 0xF0) | (((len(PAT) - 3 + 4) & 0x0F00) >> 8) | |
PAT[2] = ((len(PAT) - 3 + 4) & 0x00FF) | |
PAT_CRC32 = CRC32(PAT) | |
PAT += int.to_bytes(PAT_CRC32, 4, byteorder='big') | |
return PAT | |
def genPMT(pcr_pid: int, av1_pid: int, config: AV1CodecConfigurationBox): | |
PMT = bytearray([ | |
0x02, # table_id | |
0b10000000, # section_syntax_indicator, section_length | |
0b00000000, # section_length | |
0b00000000, # program_number | |
0b00000001, # program_number | |
0b00000001, # reserved, version_number, current_next_indicator | |
0b00000001, # section_number | |
0b00000001, # last_section_number | |
# | |
(0x1F00 & pcr_pid) >> 8, # pcr_pid | |
(0x00FF & pcr_pid) >> 0, # pcr_pid | |
0, # program_info_length (no descriptor loop) | |
0, # program_info_length (no descriptor loop) | |
# | |
]) | |
registration_descriptor = bytes([ | |
0x05, | |
4, | |
]) + ('AV01'.encode('ascii')) | |
av1_video_descriptor = bytes([ | |
0x80, | |
4, | |
(config.marker << 7) | (config.version << 0), | |
(config.seq_profile << 5) | (config.seq_level_idx_0 << 0), | |
(config.seq_level_tier_0 << 7) | (config.high_bitdepth << 6) | (config.twelve_bit << 5) | (config.monochrome << 4) | (config.chroma_subsampling_x << 3) | (config.chroma_subsampling_y << 2) | (config.chroma_sample_position << 0), | |
(3 << 6) | (0 << 5) | (config.initial_presentation_delay_present << 4) | (config.initial_presentation_delay_minus_one << 0) | |
# 3 is hdr_wcg_idc (No indication made regarding HDR/WCG or SDR characteristics of the stream) | |
]) | |
descriptors = registration_descriptor + av1_video_descriptor | |
PMT += bytes([ | |
0x06, | |
(0x1F00 & av1_pid) >> 8, # program_map_pid | |
(0x00FF & av1_pid) >> 0, # program_map_pid | |
]) + (int.to_bytes(len(descriptors), 2, byteorder='big')) + descriptors | |
PMT[1] = (PMT[1] & 0xF0) | (((len(PMT) - 3 + 4) & 0x0F00) >> 8) | |
PMT[2] = ((len(PMT) - 3 + 4) & 0x00FF) | |
PMT_CRC32 = CRC32(PMT) | |
PMT += int.to_bytes(PMT_CRC32, 4, byteorder='big') | |
return PMT | |
def genPCR(pcr_pid: int, pcr: int, continuity_counter: int): | |
PCR = bytearray([ | |
0x47, | |
(0 << 7) | (1 << 6) | (0 << 5) | ((pcr_pid & 0x1F00) >> 8), | |
(pcr_pid & 0x00FF), | |
(0 << 6) | 0x10 | (continuity_counter & 0x0F), | |
183, | |
0x10, | |
(pcr & 0x1FE000000) >> 25, | |
(pcr & 0x001FE0000) >> 17, | |
(pcr & 0x00001FE00) >> 9, | |
(pcr & 0x0000001FE) >> 1, | |
(pcr & 0x000000001) >> 0, | |
0, | |
0, | |
]) | |
PCR += bytes([0xFF] * (PACKET_SIZE - len(PCR))) | |
return PCR | |
def genAV1(av1_in_ts: memoryview | bytearray | bytes, pts: int, dts: int): | |
AV1 = bytearray([ | |
0, 0, 1, | |
0xBD, | |
0, 0, | |
0b10000000, | |
0b11000000, | |
10, | |
# pts | |
0b00110001 | ((pts & 0x1C0000000) >> 29), | |
(pts & 0x3FC00000) >> 22, | |
0b00000001 | ((pts & 0x003F8000) >> 15), | |
(pts & 0x00007F80) >> 7, | |
0b00000001 | ((pts & 0x0000007F) << 1), | |
# dts | |
0b00010001 | ((dts & 0x1C0000000) >> 29), | |
(dts & 0x3FC00000) >> 22, | |
0b00000001 | ((dts & 0x003F8000) >> 15), | |
(dts & 0x00007F80) >> 7, | |
0b00000001 | ((dts & 0x0000007F) << 1) | |
]) + av1_in_ts | |
return AV1 | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description=('AV1 to AnnexB')) | |
parser.add_argument('-i', '--input', type=argparse.FileType('rb'), nargs='?', default=sys.stdin.buffer) | |
parser.add_argument('-o', '--output', type=argparse.FileType('wb'), nargs='?', default=sys.stdout.buffer) | |
args = parser.parse_args() | |
timescale: int | None = None | |
config: AV1CodecConfigurationBox | None = None | |
dts: int = None | |
PAT_CC, PMT_CC, AV1_CC, PCR_CC = 0, 0, 0, 0 | |
PMT_PID, AV1_PID, PCR_PID = 0x100, 0x101, 0x200 | |
EMIT_SECONDS = 0 | |
while args.input: | |
size = int.from_bytes(args.input.read(4)) - 8 | |
if size < 0: break | |
name = args.input.read(4).decode('ascii') | |
box = args.input.read(size) | |
if name == 'moov': | |
mdhd = findBox(box, ['trak', 'mdia', 'mdhd']) | |
timescale = parseTimescaleByMdhd(mdhd) | |
stsd = findBox(box, ['trak', 'mdia', 'minf', 'stbl', 'stsd']) | |
av01 = findBox(stsd[8:], 'av01') | |
av1C = findBox(av01[4 + 4 + 4 + 12 + 2 + 2 + 4 + 4 + 4 + 2 + 1 + 31 + 2 + 2:], 'av1C') | |
config = parseAV1CodecConfigurationBox(av1C) | |
elif name == 'moof': | |
tfdt = findBox(box, ['traf', 'tfdt']) | |
dts = parseBaseMediaDecodeTimeByTfdt(tfdt) | |
elif name == 'mdat': | |
if dts is None: continue | |
av1_in_ts = escapeObus(box) | |
if (dts / timescale) >= EMIT_SECONDS: | |
for packet in packetizeSection(genPAT(PMT_PID), 0, PAT_CC): | |
args.output.write(packet) | |
PAT_CC = (PAT_CC + 1) & 0x0F | |
for packet in packetizeSection(genPMT(PCR_PID, AV1_PID, config), PMT_PID, PMT_CC): | |
args.output.write(packet) | |
PMT_CC = (PMT_CC + 1) & 0x0F | |
args.output.write(genPCR(PCR_PID, int(EMIT_SECONDS * 90000), PCR_CC)) | |
PCR_CC = (PCR_CC + 1) & 0x0F | |
EMIT_SECONDS += 0.1 | |
for packet in packetizePES(genAV1(av1_in_ts, dts, dts), AV1_PID, AV1_CC): | |
args.output.write(packet) | |
AV1_CC = (AV1_CC + 1) & 0x0F |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment