Skip to content

Instantly share code, notes, and snippets.

@safijari
Last active July 20, 2024 17:52
Show Gist options
  • Save safijari/00f3126b4e5f27bfa7607587b7ac2703 to your computer and use it in GitHub Desktop.
Save safijari/00f3126b4e5f27bfa7607587b7ac2703 to your computer and use it in GitHub Desktop.
Recreating Valve Video Recording on the Steam Deck
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
class VideoProcessor:
def __init__(self):
Gst.init(None)
self.pipeline = Gst.Pipeline.new("video_processing_pipeline")
self.setup_pipeline()
def setup_pipeline(self):
# Create elements
self.src = Gst.ElementFactory.make("pipewiresrc", "source")
self.postproc = Gst.ElementFactory.make("vaapipostproc", "postproc")
self.queue = Gst.ElementFactory.make("queue", "queue")
self.encoder = Gst.ElementFactory.make("vaapih264enc", "encoder")
self.parser = Gst.ElementFactory.make("h264parse", "parser")
self.sink = Gst.ElementFactory.make("appsink", "sink")
# Set properties
self.src.set_property("do-timestamp", True)
self.sink.set_property("emit-signals", True)
self.sink.connect("new-sample", self.on_new_sample)
# Add elements to pipeline
elements = [self.src, self.postproc, self.queue, self.encoder, self.parser, self.sink]
for element in elements:
self.pipeline.add(element)
# Link elements
self.src.link(self.postproc)
self.postproc.link(self.queue)
self.queue.link(self.encoder)
self.encoder.link(self.parser)
self.parser.link(self.sink)
def on_new_sample(self, sink):
sample = sink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
caps = sample.get_caps()
# Process the buffer here
self.process_frame(buffer, caps)
return Gst.FlowReturn.OK
def process_frame(self, buffer, caps):
# This is where you can access and process the encoded frame
# For example, you can get the data and size of the buffer:
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
data = map_info.data
size = map_info.size
print(f"Received encoded frame: {size} bytes")
# Do something with the data here
buffer.unmap(map_info)
def start(self):
self.pipeline.set_state(Gst.State.PLAYING)
self.loop = GLib.MainLoop()
try:
self.loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
if __name__ == "__main__":
processor = VideoProcessor()
processor.start()
import numpy as np
import cv2
import ffmpeg
import subprocess
import os
import time
def write_video_live(output_dir, base_name='output', frame_width=640, frame_height=480, fps=60):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_init = os.path.join(output_dir, f'{base_name}_init.mp4')
output_segment = os.path.join(output_dir, f'{base_name}_%03d.m4s')
print(output_segment)
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s=f'{frame_width}x{frame_height}', framerate=fps)
.output("/tmp/test.mpd", format='dash', vcodec='libx264', pix_fmt='yuv420p', seg_duration=2, reset_timestamps=1, streaming=1,
video_bitrate="10000k", segment_format='m4s')
.global_args('-loglevel', 'error') # Suppress ffmpeg output to stderr
.overwrite_output()
.run_async(pipe_stdin=True)
)
for i in range(360):
frame = np.zeros((frame_height, frame_width, 3), "uint8")
frame[i:i+100, i:i+100, :] = 255
process.stdin.write(frame.tobytes())
time.sleep(1/60)
process.stdin.close()
process.wait()
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import ffmpeg
import os
class VideoProcessor:
def __init__(self, output_dir, base_name='output', fps=60):
Gst.init(None)
self.pipeline = Gst.Pipeline.new("video_processing_pipeline")
self.output_dir = output_dir
self.base_name = base_name
self.fps = fps
self.setup_pipeline()
self.setup_ffmpeg()
def setup_pipeline(self):
# Create elements
self.src = Gst.ElementFactory.make("pipewiresrc", "source")
self.postproc = Gst.ElementFactory.make("vaapipostproc", "postproc")
self.queue = Gst.ElementFactory.make("queue", "queue")
self.encoder = Gst.ElementFactory.make("vaapih264enc", "encoder")
self.parser = Gst.ElementFactory.make("h264parse", "parser")
self.sink = Gst.ElementFactory.make("appsink", "sink")
# Set properties
self.src.set_property("do-timestamp", True)
self.sink.set_property("emit-signals", True)
self.sink.connect("new-sample", self.on_new_sample)
# Add elements to pipeline
elements = [self.src, self.postproc, self.queue, self.encoder, self.parser, self.sink]
for element in elements:
self.pipeline.add(element)
# Link elements
self.src.link(self.postproc)
self.postproc.link(self.queue)
self.queue.link(self.encoder)
self.encoder.link(self.parser)
self.parser.link(self.sink)
def setup_ffmpeg(self):
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
output_mpd = os.path.join(self.output_dir, f'{self.base_name}.mpd')
self.ffmpeg_process = (
ffmpeg
.input('pipe:', format='h264', framerate=self.fps)
.output(output_mpd, format='dash', vcodec='copy',
seg_duration=2, streaming=1, reset_timestamps=1,
movflags='+faststart', use_template=1, use_timeline=1)
.global_args('-loglevel', 'info')
.overwrite_output()
.run_async(pipe_stdin=True)
)
def on_new_sample(self, sink):
sample = sink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
self.process_frame(buffer)
return Gst.FlowReturn.OK
def process_frame(self, buffer):
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
data = map_info.data
self.ffmpeg_process.stdin.write(data)
buffer.unmap(map_info)
def start(self):
self.pipeline.set_state(Gst.State.PLAYING)
self.loop = GLib.MainLoop()
try:
self.loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
if self.ffmpeg_process:
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.wait()
if __name__ == "__main__":
output_dir = "/tmp/test"
processor = VideoProcessor(output_dir)
processor.start()
@safijari
Copy link
Author

To use this install decky-recorder and also run pip install ffmpeg-python --target /tmp/ffmpeg-python && rsync -rv /tmp/ffmpeg-python/ deck@steamdeck:/home/deck/ffmpeg-python on a host machine. Then to run do

PYTHONPATH=$PYTHONPATH:/home/deck/ffmpeg-python/ GST_PLUGIN_PATH=/home/deck/homebrew/plugins/decky-recorder/bin/gstreamer-1.0/ LD_LIBRARY_PATH=/home/deck/homebrew/plugins/decky-recorder/bin/ python script.py

@safijari
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment