Skip to content

Instantly share code, notes, and snippets.

@tylercubell
Created February 7, 2019 20:08
Show Gist options
  • Save tylercubell/d922c36f487b8484a1e27557c83045b1 to your computer and use it in GitHub Desktop.
Save tylercubell/d922c36f487b8484a1e27557c83045b1 to your computer and use it in GitHub Desktop.
GStreamer Python Add/Remove Source Dynamically
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from threading import Thread, Event
Gst.init(None)
class Main:
def __init__(self):
self.pipeline = Gst.Pipeline.new("pipeline")
self.bus = self.pipeline.get_bus()
self.current_pattern_type = 0
self.videotestsrc = Gst.ElementFactory.make("videotestsrc", "videotestsrc")
self.videotestsrc.set_property("pattern", self.current_pattern_type)
self.videotestsrc_src_pad = self.videotestsrc.get_static_pad("src")
self.pipeline.add(self.videotestsrc)
self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
self.pipeline.add(self.decodebin)
self.decodebin.connect("pad-added", self.decodebin_src_pad_created)
self.autovideosink = Gst.ElementFactory.make("autovideosink", "autovideosink")
self.pipeline.add(self.autovideosink)
self.videotestsrc.link(self.decodebin)
def decodebin_src_pad_created(self, element, pad):
self.decodebin.link(self.autovideosink)
def custom_message(self, name):
custom_structure = Gst.Structure.new_empty(name)
custom_message = Gst.Message.new_application(None, custom_structure)
self.bus.post(custom_message)
def change_source(self):
while not self.stop_event.is_set():
self.stop_event.wait(1)
if self.stop_event.is_set():
break
self.custom_message("start_switch")
def videotestsrc_probe_callback(self, pad, info):
if self.start_switch:
if not self.videotestsrc_do_switch_sent:
self.videotestsrc_drop = True
self.custom_message("do_switch")
return Gst.PadProbeReturn.DROP
else:
self.videotestsrc_do_switch_sent = False
self.videotestsrc_drop = False
pad.remove_probe(info.id)
return Gst.PadProbeReturn.OK
def decodebin_probe_callback(self, pad, info):
if self.start_switch:
if not self.decodebin_do_switch_sent:
self.decodebin_drop = True
self.custom_message("do_switch")
return Gst.PadProbeReturn.DROP
else:
self.decodebin_do_switch_sent = False
self.decodebin_drop = False
pad.remove_probe(info.id)
return Gst.PadProbeReturn.OK
def run(self):
self.pipeline.set_state(Gst.State.PLAYING)
self.stop_event = Event()
self.change_source_thread = Thread(target=self.change_source)
self.change_source_thread.start()
self.start_switch = False
self.videotestsrc_drop = False
self.decodebin_drop = False
while True:
try:
message = self.bus.timed_pop(Gst.SECOND)
if message == None:
pass
elif message.type == Gst.MessageType.APPLICATION:
if message.get_structure().get_name() == "start_switch":
self.start_switch = True
self.videotestsrc_do_switch_sent = False
self.decodebin_do_switch_sent = False
self.videotestsrc.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, self.videotestsrc_probe_callback)
self.decodebin.get_static_pad("src_0").add_probe(Gst.PadProbeType.BLOCK, self.decodebin_probe_callback)
elif message.get_structure().get_name() == "do_switch":
if self.videotestsrc_drop and self.decodebin_drop:
# Remove old videotestsrc.
self.videotestsrc.set_state(Gst.State.NULL)
self.videotestsrc.unlink(self.decodebin)
self.pipeline.remove(self.videotestsrc)
del self.videotestsrc
# Create new videotestsrc.
self.videotestsrc = Gst.ElementFactory.make("videotestsrc", "videotestsrc")
self.current_pattern_type = int(not(self.current_pattern_type))
self.videotestsrc.set_property("pattern", self.current_pattern_type)
self.pipeline.add(self.videotestsrc)
self.videotestsrc.link(self.decodebin)
self.videotestsrc.set_state(Gst.State.PLAYING)
self.start_switch = False
elif message.type == Gst.MessageType.EOS:
break
elif message.type == Gst.MessageType.ERROR:
break
except KeyboardInterrupt:
break
self.stop_event.set()
self.pipeline.set_state(Gst.State.NULL)
start = Main()
start.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment