|
#!/usr/bin/env python3 |
|
""" |
|
In this example, we would run this as two separate processes. |
|
|
|
Usage: |
|
For Producer: |
|
python3 ipcpipeline_example.py --socket-path /tmp/gst-socket --role producer |
|
|
|
For Consumer: |
|
python3 ipcpipeline_example.py --socket-path /tmp/gst-socket --role consumer |
|
""" |
|
|
|
import os |
|
import sys |
|
import socket |
|
import signal |
|
import time |
|
import gi |
|
gi.require_version('Gst', '1.0') |
|
from gi.repository import Gst, GLib |
|
|
|
loop = None |
|
|
|
def master_bus_msg(bus, msg, pipeline): |
|
global loop |
|
if msg.type == Gst.MessageType.ERROR: |
|
err, dbg = msg.parse_error() |
|
print(f"ERROR: {err}", file=sys.stderr) |
|
if dbg: |
|
print(f"ERROR debug information: {dbg}", file=sys.stderr) |
|
pipeline.set_state(Gst.State.NULL) |
|
if loop is not None: |
|
loop.quit() |
|
elif msg.type == Gst.MessageType.WARNING: |
|
err, dbg = msg.parse_warning() |
|
print(f"WARNING: {err}", file=sys.stderr) |
|
if dbg: |
|
print(f"WARNING debug information: {dbg}", file=sys.stderr) |
|
elif msg.type == Gst.MessageType.ASYNC_DONE: |
|
pass |
|
elif msg.type == Gst.MessageType.EOS: |
|
pipeline.set_state(Gst.State.NULL) |
|
if loop is not None: |
|
loop.quit() |
|
return True |
|
|
|
def start_source(fdin, fdout): |
|
pipeline = Gst.Pipeline() |
|
bus = pipeline.get_bus() |
|
bus.add_signal_watch() |
|
bus.connect("message", master_bus_msg, pipeline) |
|
|
|
source = Gst.ElementFactory.make("videotestsrc", None) |
|
source.set_property("pattern", 18) |
|
source.set_property("num_buffers", 50) |
|
|
|
capsfilter = Gst.ElementFactory.make("capsfilter", None) |
|
caps = Gst.Caps.from_string("video/x-raw, width=640, height=480") |
|
capsfilter.set_property("caps", caps) |
|
|
|
ipcpipelinesink = Gst.ElementFactory.make("ipcpipelinesink", None) |
|
ipcpipelinesink.set_property("fdin", fdin.fileno()) |
|
ipcpipelinesink.set_property("fdout", fdout.fileno()) |
|
|
|
pipeline.add(source) |
|
pipeline.add(capsfilter) |
|
pipeline.add(ipcpipelinesink) |
|
|
|
source.link(capsfilter) |
|
capsfilter.link(ipcpipelinesink) |
|
|
|
|
|
print("parent pipeline created") |
|
pipeline.set_state(Gst.State.PLAYING) |
|
|
|
def start_sink(fdin, fdout): |
|
pipeline = Gst.ElementFactory.make("ipcslavepipeline", None) |
|
ipcpipelinesrc = Gst.ElementFactory.make("ipcpipelinesrc", None) |
|
ipcpipelinesrc.set_property("fdin", fdin.fileno()) |
|
ipcpipelinesrc.set_property("fdout", fdout.fileno()) |
|
navseek = Gst.ElementFactory.make("navseek", None) |
|
navseek.set_property("seek-offset", 1.0) |
|
sink = Gst.ElementFactory.make("autovideosink", None) |
|
pipeline.add(ipcpipelinesrc) |
|
pipeline.add(navseek) |
|
pipeline.add(sink) |
|
ipcpipelinesrc.link(navseek) |
|
navseek.link(sink) |
|
print("child pipeline created") |
|
# in the c version of this I don't have to set this element to playing since it tracks |
|
# the ipcpipelinesink state, but in python I do need to set the state to READY to kick off connecting |
|
# not sure why, if anyone can figure it out please let me know. |
|
pipeline.set_state(Gst.State.READY) |
|
bus = pipeline.get_bus() |
|
bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS) |
|
# we track the parent pipeline state & do not need to set the state to NULL |
|
# pipeline.set_state(Gst.State.NULL) |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description='GStreamer IPC Pipeline Example') |
|
parser.add_argument('--socket-path', required=True, help='Path to the UNIX socket') |
|
parser.add_argument('--role', choices=['producer', 'consumer'], required=True, help='Role: producer or consumer') |
|
args = parser.parse_args() |
|
|
|
Gst.init(None) |
|
|
|
if args.role == 'producer': |
|
# Producer code |
|
# Create UNIX domain socket, bind, listen, accept connection |
|
# Use accepted socket's fd for fdin and fdout |
|
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
|
|
|
# If the socket path already exists, remove it |
|
if os.path.exists(args.socket_path): |
|
os.unlink(args.socket_path) |
|
|
|
# Bind to the socket path |
|
server_sock.bind(args.socket_path) |
|
|
|
# Listen for connections |
|
server_sock.listen(1) |
|
print(f"Producer: Listening on {args.socket_path}") |
|
|
|
# Accept a connection |
|
conn, _ = server_sock.accept() |
|
print("Producer: Accepted connection") |
|
|
|
# Set the connected socket to non-blocking mode if needed |
|
conn.setblocking(False) |
|
|
|
# Start the source pipeline, pass conn as fdin and fdout |
|
start_source(conn, conn) |
|
|
|
elif args.role == 'consumer': |
|
# Consumer code |
|
# Create UNIX domain socket, connect to the socket at the given path |
|
# Use connected socket's fd for fdin and fdout |
|
client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
|
print(f"Consumer: Connecting to {args.socket_path}") |
|
|
|
# Connect to the socket |
|
client_sock.connect(args.socket_path) |
|
print("Consumer: Connected to producer") |
|
|
|
# Set the socket to non-blocking mode if needed |
|
client_sock.setblocking(False) |
|
|
|
# Start the sink pipeline, pass client_sock as fdin and fdout |
|
start_sink(client_sock, client_sock) |
|
|
|
run() |
|
|
|
if __name__ == '__main__': |
|
main() |