Skip to content

Instantly share code, notes, and snippets.

@voluntas
Last active November 9, 2025 02:08
Show Gist options
  • Select an option

  • Save voluntas/cb1c525ff4462044073205034ab7b3c2 to your computer and use it in GitHub Desktop.

Select an option

Save voluntas/cb1c525ff4462044073205034ab7b3c2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
webcodecs-py は絶賛開発中です。
blend2d-py でダミー映像を生成し、webcodecs-py で AV1 エンコードして mp4-py で MP4 ファイルに出力するサンプル
必要な依存関係:
uv add blend2d-py mp4-py
使い方:
uv run python examples/blend2d_to_mp4.py
uv run python examples/blend2d_to_mp4.py --output output.mp4 --width 1920 --height 1080
"""
import argparse
import random
import sys
from typing import Optional
from blend2d import CompOp, Context, Image
from mp4 import Mp4FileMuxer, Mp4MuxSample, Mp4SampleEntryAv01
from webcodecs import (
EncodedVideoChunkType,
VideoEncoder,
VideoEncoderConfig,
VideoFrame,
VideoFrameBufferInit,
VideoPixelFormat,
)
class AnimatedCircle:
"""アニメーションする円クラス"""
def __init__(self, x, y, radius, vx, vy, r, g, b, alpha):
self.x = x
self.y = y
self.radius = radius
self.vx = vx
self.vy = vy
self.r = r
self.g = g
self.b = b
self.alpha = alpha
def update(self, screen_width, screen_height):
"""位置を更新し、画面端で跳ね返る"""
self.x += self.vx
self.y += self.vy
# 左右の壁で跳ね返る
if self.x - self.radius <= 0 or self.x + self.radius >= screen_width:
self.vx = -self.vx
self.x = max(self.radius, min(self.x, screen_width - self.radius))
# 上下の壁で跳ね返る
if self.y - self.radius <= 0 or self.y + self.radius >= screen_height:
self.vy = -self.vy
self.y = max(self.radius, min(self.y, screen_height - self.radius))
def draw(self, ctx):
"""円を描画"""
ctx.set_fill_style_rgba(self.r, self.g, self.b, self.alpha)
ctx.fill_circle(self.x, self.y, self.radius)
class MP4Writer:
"""MP4 ファイルへの書き込みを行うクラス"""
def __init__(self, filename: str, width: int, height: int, fps: int):
self.filename = filename
self.width = width
self.height = height
self.fps = fps
self.timescale = 30000 # MP4 タイムスケール
self.frame_duration = self.timescale // fps # 各フレームの duration
self.muxer: Optional[Mp4FileMuxer] = None
self.sample_entry: Optional[Mp4SampleEntryAv01] = None
self.frame_count = 0
def start(self):
"""Muxer を開始"""
self.muxer = Mp4FileMuxer(self.filename)
self.muxer.__enter__()
def write(self, chunk_data: bytes, is_keyframe: bool):
"""フレームを書き込み"""
if self.muxer is None:
raise RuntimeError("Muxer が開始されていません")
# 最初のキーフレームから config_obus を抽出して sample_entry を作成
if self.sample_entry is None:
if not is_keyframe:
raise RuntimeError("最初のフレームはキーフレームである必要があります")
# AV1 の config_obus を抽出(最初の OBU シーケンス)
config_obus = self._extract_config_obus(chunk_data)
self.sample_entry = Mp4SampleEntryAv01(
width=self.width,
height=self.height,
config_obus=config_obus,
seq_profile=0, # Main Profile
seq_level_idx_0=8, # Level 4.0
seq_tier_0=0, # Main tier
)
# MP4 サンプルを作成
sample = Mp4MuxSample(
track_kind="video",
sample_entry=self.sample_entry,
keyframe=is_keyframe,
timescale=self.timescale,
duration=self.frame_duration,
data=chunk_data,
)
# サンプルを追加
self.muxer.append_sample(sample)
self.frame_count += 1
def _extract_config_obus(self, chunk_data: bytes) -> bytes:
"""AV1 chunk から config OBUs を抽出する
最初のキーフレームから Sequence Header OBU を抽出します。
簡易実装として、chunk_data 全体を config_obus として使用します。
"""
# 簡易実装: 最初のキーフレームのデータをそのまま config_obus として使用
# 本来は OBU をパースして Sequence Header のみを抽出すべきですが、
# 多くの場合、最初のキーフレームに必要な情報が含まれています
return chunk_data
def stop(self):
"""Muxer を停止"""
if self.muxer is not None:
self.muxer.finalize()
self.muxer.__exit__(None, None, None)
self.muxer = None
def main():
parser = argparse.ArgumentParser(
description="blend2d-py でダミー映像を生成し、AV1 エンコードして MP4 ファイルに出力"
)
parser.add_argument("--width", type=int, default=640, help="映像の幅(デフォルト: 640)")
parser.add_argument("--height", type=int, default=480, help="映像の高さ(デフォルト: 480)")
parser.add_argument("--fps", type=int, default=30, help="フレームレート(デフォルト: 30)")
parser.add_argument(
"--duration", type=int, default=15, help="映像の長さ(秒)(デフォルト: 15)"
)
parser.add_argument(
"--bitrate", type=int, default=500000, help="ビットレート(デフォルト: 500000)"
)
parser.add_argument(
"--output", type=str, default="output.mp4", help="出力ファイル名(デフォルト: output.mp4)"
)
args = parser.parse_args()
width = args.width
height = args.height
fps = args.fps
total_frames = fps * args.duration
print("=== blend2d-py → webcodecs-py → mp4-py パイプライン ===")
print(f"解像度: {width}x{height}")
print(f"フレームレート: {fps} fps")
print(f"映像の長さ: {args.duration} 秒")
print(f"総フレーム数: {total_frames}")
print(f"ビットレート: {args.bitrate} bps")
print(f"出力ファイル: {args.output}")
print()
# アニメーションする円を作成
colors = [
(255, 0, 0), # 赤
(0, 255, 0), # 緑
(0, 0, 255), # 青
(255, 255, 0), # 黄色
(255, 0, 255), # マゼンタ
(0, 255, 255), # シアン
(255, 128, 0), # オレンジ
(128, 0, 255), # 紫
]
circles = []
for i, color in enumerate(colors):
x = random.randint(50, width - 50)
y = random.randint(50, height - 50)
radius = random.randint(20, 40)
vx = random.uniform(-4.0, 4.0)
vy = random.uniform(-4.0, 4.0)
alpha = random.randint(150, 220)
circles.append(AnimatedCircle(x, y, radius, vx, vy, *color, alpha))
# MP4 ライターを初期化
mp4_writer = MP4Writer(args.output, width, height, fps)
mp4_writer.start()
# エンコーダーを初期化
encoded_frame_count = 0
def on_output(chunk):
nonlocal encoded_frame_count
# MP4 ファイルに書き込み
frame_data = chunk.get_data()
is_keyframe = chunk.type == EncodedVideoChunkType.KEY
mp4_writer.write(frame_data, is_keyframe)
encoded_frame_count += 1
# エンコードされたフレームのサイズを表示
chunk_type = "Key" if is_keyframe else "Delta"
print(
f" フレーム {encoded_frame_count:4d}/{total_frames}: {chunk_type:5s} "
f"{chunk.byte_length:6d} bytes, timestamp={chunk.timestamp}"
)
def on_error(error):
print(f"エンコーダーエラー: {error}", file=sys.stderr)
encoder = VideoEncoder(on_output, on_error)
config: VideoEncoderConfig = {
"codec": "av01.0.04M.08",
"width": width,
"height": height,
"bitrate": args.bitrate,
"framerate": float(fps),
"bitrate_mode": "constant",
"latency_mode": "realtime",
}
encoder.configure(config)
print("エンコーダーを初期化しました")
print(f" コーデック: {config['codec']}")
print(f" ビットレート: {args.bitrate} bps ({args.bitrate / 1000:.0f} kbps)")
print()
# フレームを生成してエンコード
frame_count = 0
timestamp = 0
frame_duration = 1_000_000 // fps # マイクロ秒単位 (WebCodecs API 準拠)
print("フレームの生成とエンコードを開始します...")
print()
try:
for frame_count in range(total_frames):
# blend2d-py でフレームを生成
img = Image(width, height)
ctx = Context(img)
# 背景を黒で塗りつぶし
ctx.set_fill_style_rgba(0, 0, 0, 255)
ctx.fill_all()
# アルファブレンディングを有効化
ctx.set_comp_op(CompOp.SRC_OVER)
# 各円を更新して描画
for circle in circles:
circle.update(width, height)
circle.draw(ctx)
# フレーム番号を表示(オプション)
# テキスト描画は未実装なので、簡単な図形で代用
# 右上に小さな四角形を描画してフレームカウンターとする
indicator_size = 5 + (frame_count % 20)
ctx.set_fill_style_rgba(255, 255, 255, 200)
ctx.fill_circle(width - 30, 30, indicator_size)
ctx.end()
# NumPy 配列として取得
bgra = img.asarray()
# VideoFrame を作成(BGRA フォーマット)
init = VideoFrameBufferInit(
format=VideoPixelFormat.BGRA,
coded_width=width,
coded_height=height,
timestamp=timestamp,
)
bgra_frame = VideoFrame(bgra, init)
# BGRA → I420 変換(AV1 エンコーダーは I420 が必要)
i420_frame = bgra_frame.convert_to(VideoPixelFormat.I420)
bgra_frame.close()
# エンコード(最初のフレームと 3 秒ごとにキーフレームを強制)
keyframe = frame_count == 0 or frame_count % (fps * 3) == 0
encoder.encode(i420_frame, {"keyFrame": keyframe})
i420_frame.close()
timestamp += frame_duration
except Exception as e:
print(f"\nエラーが発生しました: {e}", file=sys.stderr)
return 1
print(f"\n合計 {frame_count + 1} フレームを生成しました")
# エンコーダーをフラッシュ
print("エンコーダーをフラッシュしています...")
encoder.flush()
encoder.close()
print(f"エンコードされたチャンク数: {encoded_frame_count}")
print()
# MP4 ライターを停止
print(f"MP4 ファイルを完了しています: {args.output}")
mp4_writer.stop()
print(f"ファイルを保存しました: {args.output}")
print()
print("=== 完了 ===")
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment