Last active
July 20, 2024 17:52
-
-
Save safijari/00f3126b4e5f27bfa7607587b7ac2703 to your computer and use it in GitHub Desktop.
Recreating Valve Video Recording on the Steam Deck
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst, GLib | |
class VideoProcessor: | |
def __init__(self): | |
Gst.init(None) | |
self.pipeline = Gst.Pipeline.new("video_processing_pipeline") | |
self.setup_pipeline() | |
def setup_pipeline(self): | |
# Create elements | |
self.src = Gst.ElementFactory.make("pipewiresrc", "source") | |
self.postproc = Gst.ElementFactory.make("vaapipostproc", "postproc") | |
self.queue = Gst.ElementFactory.make("queue", "queue") | |
self.encoder = Gst.ElementFactory.make("vaapih264enc", "encoder") | |
self.parser = Gst.ElementFactory.make("h264parse", "parser") | |
self.sink = Gst.ElementFactory.make("appsink", "sink") | |
# Set properties | |
self.src.set_property("do-timestamp", True) | |
self.sink.set_property("emit-signals", True) | |
self.sink.connect("new-sample", self.on_new_sample) | |
# Add elements to pipeline | |
elements = [self.src, self.postproc, self.queue, self.encoder, self.parser, self.sink] | |
for element in elements: | |
self.pipeline.add(element) | |
# Link elements | |
self.src.link(self.postproc) | |
self.postproc.link(self.queue) | |
self.queue.link(self.encoder) | |
self.encoder.link(self.parser) | |
self.parser.link(self.sink) | |
def on_new_sample(self, sink): | |
sample = sink.emit("pull-sample") | |
if sample: | |
buffer = sample.get_buffer() | |
caps = sample.get_caps() | |
# Process the buffer here | |
self.process_frame(buffer, caps) | |
return Gst.FlowReturn.OK | |
def process_frame(self, buffer, caps): | |
# This is where you can access and process the encoded frame | |
# For example, you can get the data and size of the buffer: | |
success, map_info = buffer.map(Gst.MapFlags.READ) | |
if success: | |
data = map_info.data | |
size = map_info.size | |
print(f"Received encoded frame: {size} bytes") | |
# Do something with the data here | |
buffer.unmap(map_info) | |
def start(self): | |
self.pipeline.set_state(Gst.State.PLAYING) | |
self.loop = GLib.MainLoop() | |
try: | |
self.loop.run() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
self.stop() | |
def stop(self): | |
self.pipeline.set_state(Gst.State.NULL) | |
self.loop.quit() | |
if __name__ == "__main__": | |
processor = VideoProcessor() | |
processor.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import cv2 | |
import ffmpeg | |
import subprocess | |
import os | |
import time | |
def write_video_live(output_dir, base_name='output', frame_width=640, frame_height=480, fps=60): | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
output_init = os.path.join(output_dir, f'{base_name}_init.mp4') | |
output_segment = os.path.join(output_dir, f'{base_name}_%03d.m4s') | |
print(output_segment) | |
process = ( | |
ffmpeg | |
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s=f'{frame_width}x{frame_height}', framerate=fps) | |
.output("/tmp/test.mpd", format='dash', vcodec='libx264', pix_fmt='yuv420p', seg_duration=2, reset_timestamps=1, streaming=1, | |
video_bitrate="10000k", segment_format='m4s') | |
.global_args('-loglevel', 'error') # Suppress ffmpeg output to stderr | |
.overwrite_output() | |
.run_async(pipe_stdin=True) | |
) | |
for i in range(360): | |
frame = np.zeros((frame_height, frame_width, 3), "uint8") | |
frame[i:i+100, i:i+100, :] = 255 | |
process.stdin.write(frame.tobytes()) | |
time.sleep(1/60) | |
process.stdin.close() | |
process.wait() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst, GLib | |
import ffmpeg | |
import os | |
class VideoProcessor: | |
def __init__(self, output_dir, base_name='output', fps=60): | |
Gst.init(None) | |
self.pipeline = Gst.Pipeline.new("video_processing_pipeline") | |
self.output_dir = output_dir | |
self.base_name = base_name | |
self.fps = fps | |
self.setup_pipeline() | |
self.setup_ffmpeg() | |
def setup_pipeline(self): | |
# Create elements | |
self.src = Gst.ElementFactory.make("pipewiresrc", "source") | |
self.postproc = Gst.ElementFactory.make("vaapipostproc", "postproc") | |
self.queue = Gst.ElementFactory.make("queue", "queue") | |
self.encoder = Gst.ElementFactory.make("vaapih264enc", "encoder") | |
self.parser = Gst.ElementFactory.make("h264parse", "parser") | |
self.sink = Gst.ElementFactory.make("appsink", "sink") | |
# Set properties | |
self.src.set_property("do-timestamp", True) | |
self.sink.set_property("emit-signals", True) | |
self.sink.connect("new-sample", self.on_new_sample) | |
# Add elements to pipeline | |
elements = [self.src, self.postproc, self.queue, self.encoder, self.parser, self.sink] | |
for element in elements: | |
self.pipeline.add(element) | |
# Link elements | |
self.src.link(self.postproc) | |
self.postproc.link(self.queue) | |
self.queue.link(self.encoder) | |
self.encoder.link(self.parser) | |
self.parser.link(self.sink) | |
def setup_ffmpeg(self): | |
if not os.path.exists(self.output_dir): | |
os.makedirs(self.output_dir) | |
output_mpd = os.path.join(self.output_dir, f'{self.base_name}.mpd') | |
self.ffmpeg_process = ( | |
ffmpeg | |
.input('pipe:', format='h264', framerate=self.fps) | |
.output(output_mpd, format='dash', vcodec='copy', | |
seg_duration=2, streaming=1, reset_timestamps=1, | |
movflags='+faststart', use_template=1, use_timeline=1) | |
.global_args('-loglevel', 'info') | |
.overwrite_output() | |
.run_async(pipe_stdin=True) | |
) | |
def on_new_sample(self, sink): | |
sample = sink.emit("pull-sample") | |
if sample: | |
buffer = sample.get_buffer() | |
self.process_frame(buffer) | |
return Gst.FlowReturn.OK | |
def process_frame(self, buffer): | |
success, map_info = buffer.map(Gst.MapFlags.READ) | |
if success: | |
data = map_info.data | |
self.ffmpeg_process.stdin.write(data) | |
buffer.unmap(map_info) | |
def start(self): | |
self.pipeline.set_state(Gst.State.PLAYING) | |
self.loop = GLib.MainLoop() | |
try: | |
self.loop.run() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
self.stop() | |
def stop(self): | |
self.pipeline.set_state(Gst.State.NULL) | |
self.loop.quit() | |
if self.ffmpeg_process: | |
self.ffmpeg_process.stdin.close() | |
self.ffmpeg_process.wait() | |
if __name__ == "__main__": | |
output_dir = "/tmp/test" | |
processor = VideoProcessor(output_dir) | |
processor.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Protos are here https://github.com/SteamDatabase/Protobufs