Skip to content

Instantly share code, notes, and snippets.

@louiecaulfield
Created October 1, 2024 09:28
Show Gist options
  • Save louiecaulfield/8688a4dfe59d4f6ec30038be693f7ccf to your computer and use it in GitHub Desktop.
Save louiecaulfield/8688a4dfe59d4f6ec30038be693f7ccf to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import GLib,Gst, GstApp, GstVideo
class GStreamer():
def __init__(self):
Gst.init(None)
self.pipeline = Gst.Pipeline.new('dynamic')
self.clock = self.pipeline.get_pipeline_clock()
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', self.on_message)
self.on_terminate : Callable[[], None] = None
def start(self):
self.pipeline.set_state(Gst.State.PLAYING)
def stop(self):
print("Terminating pipeline: " + str(self.pipeline))
if self.pipeline is not None:
self.pipeline.send_event(Gst.Event.new_eos())
def terminate(self):
self.stop()
if self.on_terminate:
self.on_terminate()
def on_message(self, bus: Gst.Bus, message: Gst.Message) -> Gst.FlowReturn:
mtype = message.type
if mtype == Gst.MessageType.EOS:
# Handle End of Stream
print("End of stream")
self.terminate()
elif mtype == Gst.MessageType.ERROR:
# Handle Errors
err, debug = message.parse_error()
print(err, debug)
self.terminate()
elif mtype == Gst.MessageType.WARNING:
# Handle warnings
err, debug = message.parse_warning()
print(err, debug)
return Gst.FlowReturn.OK
class PipeWireToWindow(GStreamer) :
def __init__(self, width : int, height : int, fps : int):
super().__init__()
self.format = f"video/x-raw,max-framerate={fps}/1,width={width},height={height}"
def connect(self, node_id : int):
src = Gst.ElementFactory.make_with_properties('pipewiresrc', ['path'], [str(node_id)])
self.pipeline.add(src)
capsfilter = Gst.ElementFactory.make('capsfilter')
caps = Gst.Caps.from_string(self.format)
capsfilter.set_property('caps', caps)
self.pipeline.add(capsfilter)
src.link(capsfilter)
convertor = Gst.ElementFactory.make('videoconvert')
self.pipeline.add(convertor)
capsfilter.link(convertor)
sink = Gst.ElementFactory.make('xvimagesink')
self.pipeline.add(sink)
convertor.link(sink)
class PipeWireToBuffer(GStreamer):
def __init__(self, width : int, height : int, fps : int):
super().__init__()
self.format = f"video/x-raw,format=RGB16,max-framerate={fps}/1,width={width},height={height}"
self.on_buffer : Callable[[bytes], None] = None
def connect(self, node_id : int):
src = Gst.ElementFactory.make_with_properties('pipewiresrc', ['path'], [str(node_id)])
self.pipeline.add(src)
convertor = Gst.ElementFactory.make('videoconvert')
self.pipeline.add(convertor)
src.link(convertor)
capsfilter = Gst.ElementFactory.make('capsfilter')
caps = Gst.Caps.from_string(self.format)
capsfilter.set_property('caps', caps)
self.pipeline.add(capsfilter)
convertor.link(capsfilter)
sink = Gst.ElementFactory.make_with_properties('appsink', ['emit-signals','drop','max-buffers'], [True, 1, 1])
self.pipeline.add(sink)
capsfilter.link(sink)
sink.connect("new-sample", self.sink_buffer)
def sink_buffer(self, sink: GstApp.AppSink) -> Gst.FlowReturn:
sample = sink.emit("pull-sample") # Gst.Sample
if not isinstance(sample, Gst.Sample):
return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
caps_format = sample.get_caps().get_structure(0) # Gst.Structure
frmt_str = caps_format.get_value('format')
video_format = GstVideo.VideoFormat.from_string(frmt_str)
w, h = caps_format.get_value('width'), caps_format.get_value('height')
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
raise RuntimeError("Could not map buffer data")
self.on_buffer(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
class ScreenCast():
def __init__(self, mainloop):
DBusGMainLoop(set_as_default=True)
self.bus = dbus.SessionBus()
self.screen_cast_iface = 'org.gnome.Mutter.ScreenCast'
self.screen_cast_session_iface = 'org.gnome.Mutter.ScreenCast.Session'
screen_cast = self.bus.get_object(self.screen_cast_iface, '/org/gnome/Mutter/ScreenCast')
session_path = screen_cast.CreateSession([], dbus_interface=self.screen_cast_iface)
self.session = self.bus.get_object(self.screen_cast_iface, session_path)
def recordVirtual(self):
self.stream_path = self.session.RecordVirtual(
dbus.Dictionary({'is-platform': dbus.Boolean(True, variant_level=1),
'cursor-mode': dbus.UInt32(1, variant_level=1)}, signature='sv'),
dbus_interface=self.screen_cast_session_iface)
def connect(self, callback):
assert(self.stream_path is not None)
stream = self.bus.get_object(self.screen_cast_iface, self.stream_path)
stream.connect_to_signal("PipeWireStreamAdded", callback)
def start(self):
self.session.Start(dbus_interface=self.screen_cast_session_iface)
def stop(self):
try:
print("Stopping dbus caster")
self.session.Stop(dbus_interface=self.screen_cast_session_iface)
except Exception as e:
print("Failed to stop:" + str(e))
if __name__=="__main__":
def terminate():
caster.stop()
window.stop()
loop.quit()
window = PipeWireToWindow(640, 480, 30)
window.on_terminate = terminate
def connect(node_id):
window.connect(node_id)
window.start()
loop = GLib.MainLoop()
caster = ScreenCast(loop)
caster.recordVirtual()
caster.connect(connect)
caster.start()
try:
loop.run()
finally:
terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment