Skip to content

Instantly share code, notes, and snippets.

@LrWm3
Last active November 21, 2024 20:07
Show Gist options
  • Save LrWm3/7e2166c27da10436d2e97e6a71986a50 to your computer and use it in GitHub Desktop.
Save LrWm3/7e2166c27da10436d2e97e6a71986a50 to your computer and use it in GitHub Desktop.
ipcpipelinesink && ipcpipelinesrc in python

gstreamer ipcpipeline example

Demonstration of how ipcpipelinesink & ipcpipelinesrc can be used together.

Can I use ipcpipeline elements with gst-launch-1.0?

I am not sure how to do this, if you know how please provide an example as I would love know to do this without writing a custom bin.

When to use this over shmsink & shmsrc

This allows preserving source dts & pts buffers, shm approach transfers buffer data only.

I also found this plugin did not have any issues across process boundaries, whereas I did find that with shmsrc & shmsink I needed additional videoconvert operations to recover the video successfully.

When to use this over unixfdsink & unixfdsrc

Never, unixfdsink & unixfdsrc are strictly superior as far as I can tell. they are implemented with zero copy.

# sender
gst-launch-1.0 videotestsrc ! videoconvert ! capsfilter caps="video/x-raw, width=(int)320, height=(int)240, framerate=(fraction)30/1, multiview-mode=(string)mono, pixel-aspect-ratio=(fraction)1/1, interlace-mode=(string)progressive, format=(string)BGRx" ! unixfdsink socket-path=/tmp/foo

# receiver
gst-launch-1.0 unixfdsrc socket-path=/tmp/foo ! queue ! ximagesink -v

Caveats:

  • you may need to write custom serializers for your custom metadata if you take this approach.
  • things like trickplay and queries don't work from downstream
#!/usr/bin/env python3
"""
Rewrite of [ipcpipeline1.c](https://cgit.freedesktop.org/gstreamer/gst-plugins-bad/tree/tests/examples/ipcpipeline/ipcpipeline1.c) in python.
*
* Copyright (C) 2015-2017 YouView TV Ltd
* Author: Vincent Penquerc'h <[email protected]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
"""
import os
import sys
import socket
import signal
import time
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
loop = None
def master_bus_msg(bus, msg, pipeline):
global loop
if msg.type == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print(f"ERROR: {err}", file=sys.stderr)
if dbg:
print(f"ERROR debug information: {dbg}", file=sys.stderr)
pipeline.set_state(Gst.State.NULL)
if loop is not None:
loop.quit()
elif msg.type == Gst.MessageType.WARNING:
err, dbg = msg.parse_warning()
print(f"WARNING: {err}", file=sys.stderr)
if dbg:
print(f"WARNING debug information: {dbg}", file=sys.stderr)
elif msg.type == Gst.MessageType.ASYNC_DONE:
pass
elif msg.type == Gst.MessageType.EOS:
pipeline.set_state(Gst.State.NULL)
if loop is not None:
loop.quit()
return True
def start_source(fdin, fdout):
pipeline = Gst.Pipeline()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", master_bus_msg, pipeline)
source = Gst.ElementFactory.make("videotestsrc", None)
source.set_property("pattern", 18)
source.set_property("num_buffers", 50)
capsfilter = Gst.ElementFactory.make("capsfilter", None)
caps = Gst.Caps.from_string("video/x-raw, width=640, height=480")
capsfilter.set_property("caps", caps)
ipcpipelinesink = Gst.ElementFactory.make("ipcpipelinesink", None)
ipcpipelinesink.set_property("fdin", fdin.fileno())
ipcpipelinesink.set_property("fdout", fdout.fileno())
pipeline.add(source)
pipeline.add(capsfilter)
pipeline.add(ipcpipelinesink)
source.link(capsfilter)
capsfilter.link(ipcpipelinesink)
print("parent pipeline created")
pipeline.set_state(Gst.State.PLAYING)
def start_sink(fdin, fdout):
pipeline = Gst.ElementFactory.make("ipcslavepipeline", None)
ipcpipelinesrc = Gst.ElementFactory.make("ipcpipelinesrc", None)
ipcpipelinesrc.set_property("fdin", fdin.fileno())
ipcpipelinesrc.set_property("fdout", fdout.fileno())
navseek = Gst.ElementFactory.make("navseek", None)
navseek.set_property("seek-offset", 1.0)
sink = Gst.ElementFactory.make("autovideosink", None)
pipeline.add(ipcpipelinesrc)
pipeline.add(navseek)
pipeline.add(sink)
ipcpipelinesrc.link(navseek)
navseek.link(sink)
print("child pipeline created")
# in the c version of this I don't have to set this element to playing since it tracks
# the ipcpipelinesink state, but in python I do need to set the state to READY to kick off connecting
# not sure why, if anyone can figure it out please let me know.
pipeline.set_state(Gst.State.READY)
bus = pipeline.get_bus()
bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS)
# we track the parent pipeline state & do not need to set the state to NULL
# pipeline.set_state(Gst.State.NULL)
def run(pid):
global loop
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
pass
if pid > 0:
os.kill(pid, signal.SIGTERM)
def main():
Gst.init(None)
sockets = socket.socketpair()
# Set sockets to non-blocking
for sock in sockets:
sock.setblocking(False)
pid = os.fork()
if pid < 0:
print("Error forking", file=sys.stderr)
sys.exit(1)
elif pid > 0:
# Parent process
os.environ["GST_DEBUG_FILE"] = "gstsrc.log"
start_source(sockets[1], sockets[1])
else:
# Child process
os.environ["GST_DEBUG"] = "5"
os.environ["GST_DEBUG_FILE"] = "gstsink.log"
start_sink(sockets[0], sockets[0])
run(pid)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
In this example, we would run this as two separate processes.
Usage:
For Producer:
python3 ipcpipeline_example.py --socket-path /tmp/gst-socket --role producer
For Consumer:
python3 ipcpipeline_example.py --socket-path /tmp/gst-socket --role consumer
"""
import os
import sys
import socket
import signal
import time
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
loop = None
def master_bus_msg(bus, msg, pipeline):
global loop
if msg.type == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print(f"ERROR: {err}", file=sys.stderr)
if dbg:
print(f"ERROR debug information: {dbg}", file=sys.stderr)
pipeline.set_state(Gst.State.NULL)
if loop is not None:
loop.quit()
elif msg.type == Gst.MessageType.WARNING:
err, dbg = msg.parse_warning()
print(f"WARNING: {err}", file=sys.stderr)
if dbg:
print(f"WARNING debug information: {dbg}", file=sys.stderr)
elif msg.type == Gst.MessageType.ASYNC_DONE:
pass
elif msg.type == Gst.MessageType.EOS:
pipeline.set_state(Gst.State.NULL)
if loop is not None:
loop.quit()
return True
def start_source(fdin, fdout):
pipeline = Gst.Pipeline()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", master_bus_msg, pipeline)
source = Gst.ElementFactory.make("videotestsrc", None)
source.set_property("pattern", 18)
source.set_property("num_buffers", 50)
capsfilter = Gst.ElementFactory.make("capsfilter", None)
caps = Gst.Caps.from_string("video/x-raw, width=640, height=480")
capsfilter.set_property("caps", caps)
ipcpipelinesink = Gst.ElementFactory.make("ipcpipelinesink", None)
ipcpipelinesink.set_property("fdin", fdin.fileno())
ipcpipelinesink.set_property("fdout", fdout.fileno())
pipeline.add(source)
pipeline.add(capsfilter)
pipeline.add(ipcpipelinesink)
source.link(capsfilter)
capsfilter.link(ipcpipelinesink)
print("parent pipeline created")
pipeline.set_state(Gst.State.PLAYING)
def start_sink(fdin, fdout):
pipeline = Gst.ElementFactory.make("ipcslavepipeline", None)
ipcpipelinesrc = Gst.ElementFactory.make("ipcpipelinesrc", None)
ipcpipelinesrc.set_property("fdin", fdin.fileno())
ipcpipelinesrc.set_property("fdout", fdout.fileno())
navseek = Gst.ElementFactory.make("navseek", None)
navseek.set_property("seek-offset", 1.0)
sink = Gst.ElementFactory.make("autovideosink", None)
pipeline.add(ipcpipelinesrc)
pipeline.add(navseek)
pipeline.add(sink)
ipcpipelinesrc.link(navseek)
navseek.link(sink)
print("child pipeline created")
# in the c version of this I don't have to set this element to playing since it tracks
# the ipcpipelinesink state, but in python I do need to set the state to READY to kick off connecting
# not sure why, if anyone can figure it out please let me know.
pipeline.set_state(Gst.State.READY)
bus = pipeline.get_bus()
bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS)
# we track the parent pipeline state & do not need to set the state to NULL
# pipeline.set_state(Gst.State.NULL)
def main():
parser = argparse.ArgumentParser(description='GStreamer IPC Pipeline Example')
parser.add_argument('--socket-path', required=True, help='Path to the UNIX socket')
parser.add_argument('--role', choices=['producer', 'consumer'], required=True, help='Role: producer or consumer')
args = parser.parse_args()
Gst.init(None)
if args.role == 'producer':
# Producer code
# Create UNIX domain socket, bind, listen, accept connection
# Use accepted socket's fd for fdin and fdout
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# If the socket path already exists, remove it
if os.path.exists(args.socket_path):
os.unlink(args.socket_path)
# Bind to the socket path
server_sock.bind(args.socket_path)
# Listen for connections
server_sock.listen(1)
print(f"Producer: Listening on {args.socket_path}")
# Accept a connection
conn, _ = server_sock.accept()
print("Producer: Accepted connection")
# Set the connected socket to non-blocking mode if needed
conn.setblocking(False)
# Start the source pipeline, pass conn as fdin and fdout
start_source(conn, conn)
elif args.role == 'consumer':
# Consumer code
# Create UNIX domain socket, connect to the socket at the given path
# Use connected socket's fd for fdin and fdout
client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
print(f"Consumer: Connecting to {args.socket_path}")
# Connect to the socket
client_sock.connect(args.socket_path)
print("Consumer: Connected to producer")
# Set the socket to non-blocking mode if needed
client_sock.setblocking(False)
# Start the sink pipeline, pass client_sock as fdin and fdout
start_sink(client_sock, client_sock)
run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment