Last active
March 16, 2023 18:40
-
-
Save pedrovhb/a8d8249a0008b3a0a80aff32f943d729 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from dataclasses import dataclass | |
from typing import Tuple, List, Union | |
import cairo | |
from abc import ABC, abstractmethod | |
@dataclass | |
class HasPosition(ABC): | |
x: float | |
y: float | |
@dataclass(kw_only=True) | |
class Shape(HasPosition, ABC): | |
angle: float = 0 | |
scale: float = 1 | |
color: Tuple[float, float, float] = (0, 0, 0) | |
pivot_x: float = 0 | |
pivot_y: float = 0 | |
layer: int = 0 | |
@abstractmethod | |
def draw(self, ctx: cairo.Context): | |
pass | |
def translate(self, dx: float, dy: float): | |
self.x += dx | |
self.y += dy | |
def rotate(self, d_angle: float): | |
self.angle += d_angle | |
def set_scale(self, scale_x: float, scale_y: float = None): | |
if scale_y is None: | |
scale_y = scale_x | |
self.scale *= scale_x | |
self.scale *= scale_y | |
@dataclass | |
class Group(Shape): | |
elements: List[Union[Shape, 'Group']] = None | |
def __post_init__(self): | |
if self.elements is None: | |
self.elements = [] | |
def add_element(self, element: Union[Shape, 'Group']): | |
self.elements.append(element) | |
def draw(self, ctx: cairo.Context): | |
ctx.save() | |
ctx.translate(self.x, self.y) | |
ctx.rotate(self.angle) | |
ctx.scale(self.scale, self.scale) | |
for element in self.elements: | |
element.draw(ctx) | |
ctx.restore() | |
@dataclass | |
class Circle(Shape): | |
radius: float | |
def draw(self, ctx: cairo.Context): | |
ctx.save() | |
ctx.translate(self.x, self.y) | |
ctx.rotate(self.angle) | |
ctx.scale(self.scale, self.scale) | |
ctx.set_source_rgb(*self.color) | |
ctx.arc(self.pivot_x, self.pivot_y, self.radius, 0, 2 * 3.14159) | |
ctx.fill() | |
ctx.restore() | |
@dataclass | |
class Rectangle(Shape): | |
width: float | |
height: float | |
def draw(self, ctx: cairo.Context): | |
ctx.save() | |
ctx.translate(self.x, self.y) | |
ctx.rotate(self.angle) | |
ctx.scale(self.scale, self.scale) | |
ctx.set_source_rgb(*self.color) | |
ctx.rectangle(self.pivot_x, self.pivot_y, self.width, self.height) | |
ctx.fill() | |
ctx.restore() | |
@dataclass | |
class Arrow(Shape): | |
length: float | |
def draw(self, ctx: cairo.Context): | |
ctx.set_source_rgb(*self.color) | |
ctx.save() | |
ctx.translate(self.x, self.y) | |
ctx.rotate(self.angle) | |
ctx.scale(self.scale, self.scale) | |
ctx.move_to(0, 0) | |
ctx.line_to(self.length, 0) | |
ctx.line_to(self.length - 10, -5) | |
ctx.move_to(self.length, 0) | |
ctx.line_to(self.length - 10, 5) | |
ctx.set_line_width(2) | |
ctx.stroke() | |
ctx.restore() | |
class Canvas: | |
def __init__(self, width: int, height: int): | |
self.width = width | |
self.height = height | |
self.surface = cairo.ImageSurface(cairo.FORMAT_RGB24, width, height) | |
self.ctx = cairo.Context(self.surface) | |
self.shapes: List[Shape] = [] | |
def add_shape(self, shape: Shape): | |
self.shapes.append(shape) | |
def draw(self): | |
sorted_shapes = sorted(self.shapes, key=lambda shape: shape.layer) | |
for shape in sorted_shapes: | |
shape.draw(self.ctx) | |
def get_rgb24_blob(self) -> bytes: | |
self.draw() | |
self.surface.flush() | |
return self.surface.get_data() | |
# def chunker(): | |
# for i in range(len(data)//4): | |
# chunk = data[i*4:i*4+3] | |
# yield chunk | |
# return b"".join(chunker()) | |
def write_to_png(self) -> None: | |
self.draw() | |
self.surface.flush() | |
self.surface.write_to_png('shapes.png') | |
canvas = Canvas(640, 480) | |
circle = Circle(100, 100, 30, color=(0, 0, 1)) | |
rectangle = Rectangle(200, 200, 100, 50, color=(0.117, 0.117, 1)) | |
arrow = Arrow(300, 300, 50, angle=3.14159 * 0.5, color=(1, 0, 0)) | |
canvas.add_shape(circle) | |
canvas.add_shape(rectangle) | |
canvas.add_shape(arrow) | |
rgb24_blob = canvas.get_rgb24_blob() | |
# Save the blob to a file | |
with open('output.rgb24', 'wb') as f: | |
f.write(rgb24_blob) | |
canvas.write_to_png() | |
import time | |
# n_runs = 1000 | |
# t = time.perf_counter() | |
# | |
# with open("t.rgb", "wb") as fd: | |
# for i in range(n_runs): | |
# canvas = Canvas(640, 480) | |
# | |
# circle = Circle(100, i/500, 30, color=(0, 0, 1), layer=1) | |
# rectangle = Rectangle(200, 200, 100, 50, color=(0.117, 0.117, 1), layer=2) | |
# | |
# group = Group(i, 300) | |
# group.add_element(Circle(0, 0, 30, color=(0, 1, 1))) | |
# group.add_element(Circle(0, 100, 30, color=(1, 1, 0))) | |
# group.add_element(Circle(0, 200, 30, color=(0.5, 1, 0))) | |
# group2 = Group(300, 300) | |
# group2.add_element(Circle(0, 0, 30, color=(i/1000, 1, 1))) | |
# group2.add_element(Circle(0, 100, 20, color=(1, 1, 0))) | |
# group2.add_element(Circle(0, 200, 50, color=(0.5, 1, 0))) | |
# | |
# group2.rotate(-0.3) | |
# group2.set_scale(0.5*((10+i)/500)) | |
# | |
# canvas.add_shape(group) | |
# canvas.add_shape(group2) | |
# group.rotate(3) | |
# blob = canvas.get_rgb24_blob() | |
# fd.write(blob) | |
# print(i) | |
# t2 = time.perf_counter() | |
# print(f"{(t2 - t) / n_runs:.5f} secs per run") | |
# # then - ffmpeg -y -pix_fmt rgb0 -format rawvideo -s 640x480 -i t.rgb out.mp4 | |
# | |
# canvas.write_to_png() | |
import asyncio | |
import shlex | |
class FFmpegEncoder: | |
def __init__(self, output_file_path: str, width: int, height: int, pix_fmt: str = "rgb0"): | |
self.output_file_path = output_file_path | |
self.width = width | |
self.height = height | |
self.pix_fmt = pix_fmt | |
self.process = None | |
async def _initialize(self): | |
loop = asyncio.get_running_loop() | |
cmd = ( | |
f"ffmpeg -y -f rawvideo -pix_fmt {self.pix_fmt} -s {self.width}x{self.height} -i - " | |
f"-c:v libx264 -pix_fmt yuv420p {shlex.quote(self.output_file_path)}" | |
) | |
self.process = await asyncio.create_subprocess_exec( | |
*shlex.split(cmd), | |
stdin=asyncio.subprocess.PIPE, | |
stdout=asyncio.subprocess.DEVNULL, | |
stderr=asyncio.subprocess.DEVNULL, | |
# loop=loop, | |
) | |
async def encode_frame(self, frame: bytes): | |
if self.process is None: | |
await self._initialize() | |
if self.process.stdin: | |
self.process.stdin.write(frame) | |
await self.process.stdin.drain() | |
async def finish_encoding(self): | |
if self.process is None: | |
return | |
if self.process.stdin: | |
self.process.stdin.close() | |
await self.process.wait() | |
async def main(output_file_path: str): | |
# Initialize the FFmpegEncoder instance with the same width and height as the input video | |
encoder = FFmpegEncoder(output_file_path, 640, 480) | |
n_runs = 1000 | |
t = time.perf_counter() | |
queue = asyncio.Queue() | |
async def do_encode(): | |
while True: | |
frame = await queue.get() | |
if frame is None: | |
break | |
else: | |
await encoder.encode_frame(frame) | |
encode_task = asyncio.create_task(do_encode()) | |
for i in range(n_runs): | |
canvas = Canvas(640, 480) | |
circle = Circle(100, i/500, 30, color=(0, 0, 1), layer=1) | |
rectangle = Rectangle(200, 200, 100, 50, color=(0.117, 0.117, 1), layer=2) | |
group = Group(i, 300) | |
group.add_element(Circle(0, 0, 30, color=(0, 1, 1))) | |
group.add_element(Circle(0, 100, 30, color=(1, 1, 0))) | |
group.add_element(Circle(0, 200, 30, color=(0.5, 1, 0))) | |
group2 = Group(300, 300) | |
group2.add_element(Circle(0, 0, 30, color=(i/1000, 1, 1))) | |
group2.add_element(Circle(0, 100, 20, color=(1, 1, 0))) | |
group2.add_element(Circle(0, 200, 50, color=(0.5, 1, 0))) | |
group2.rotate(-0.3) | |
group2.set_scale(0.5*((10+i)/500)) | |
canvas.add_shape(group) | |
canvas.add_shape(group2) | |
group.rotate(3) | |
blob = canvas.get_rgb24_blob() | |
await queue.put(blob) | |
await queue.put(None) | |
await encode_task | |
t2 = time.perf_counter() | |
print(f"{(t2 - t) / n_runs:.5f} secs per run") | |
if __name__ == "__main__": | |
output_file_path = "file.mp4" | |
asyncio.run(main(output_file_path)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment