Skip to content

Instantly share code, notes, and snippets.

@rgon
Created October 9, 2025 11:13
Show Gist options
  • Save rgon/4e9291e2325c78fad066858e826f2f47 to your computer and use it in GitHub Desktop.
Save rgon/4e9291e2325c78fad066858e826f2f47 to your computer and use it in GitHub Desktop.
Gstreamer ipcpipelinesink example

Gstreamer IPC Pipeline Example

This example runs a videotestsrt in one process, passes it with ipcpipelinesink/src via Unix file descriptors and plays it back on the main process.

Spike-example to test viability of a larger multimedia project, since I didn't find any online.

Learnings:

  • Unix file descriptors allow zero-copy data passing between processes. Pipes are file descriptors. stdout/err are as well
  • ipcpipelinesrc allows inter-process gst pipeline data. You cannot use appsink amongst processes
  • It requires two (!) FDs for bidirectional messaging: video data out, control/transport data in
  • File descriptors can be passed to child processes by clearing the FD_CLOEXEC flag. By default, std::io::pipe creates FDs with the CLOEXEC flag cleared for safety,so they aren't available with that API

Why nix::unistd::pipe and not std::io::pipe

We could use std::io::pipe with the traits std::os::fd::{AsRawFd} instead of nix::unistd::pipe, but for some reason, these file descriptors inherit CLOEXEC, while the nix::unistd::pipe ones don't.

In this case, we just get the error:

ERROR: from element /GstPipeline:pipeline0/GstIpcPipelineSink:ipcpipelinesink0: Could not write to resource.

This could be solved by importing nix = { version = "0.30.1", features = ["fs"] } or libc, and setting the flags with fcntl:

use nix::fcntl::{fcntl, FcntlArg, FdFlag};
// ...
    // Clear the CLOEXEC flag on the FDs that the child will use
    // This is necessary because std::io::pipe() sets CLOEXEC by default
    fcntl(&data_write_fd, FcntlArg::F_SETFD(FdFlag::empty()))?;
    fcntl(&ctrl_read_fd, FcntlArg::F_SETFD(FdFlag::empty()))?;

Written in Rust, although the concepts will translate to other languages.

[package]
name = "gstreamerIPCdemo"
version = "0.1.0"
edition = "2024"
[dependencies]
gstreamer = "0.24.2"
nix = "0.30.1"
extern crate gstreamer as gst;
use std::process::Command;
use std::os::unix::io::AsRawFd;
use std::thread;
use std::time::Duration;
use gst::prelude::*;
use nix::unistd;
fn main() -> Result<(), Box<dyn std::error::Error>> {
gst::init()?;
// Create TWO pipes for bidirectional communication:
// 1. Data pipe: child writes data, parent reads data
let (data_read_fd, data_write_fd) = unistd::pipe()?;
// 2. Control pipe: parent writes control/acks, child reads control/acks
let (ctrl_read_fd, ctrl_write_fd) = unistd::pipe()?;
// Get raw FD values
let data_read_raw = data_read_fd.as_raw_fd();
let data_write_raw = data_write_fd.as_raw_fd();
let ctrl_read_raw = ctrl_read_fd.as_raw_fd();
let ctrl_write_raw = ctrl_write_fd.as_raw_fd();
println!("Data pipe: read={}, write={}", data_read_raw, data_write_raw);
println!("Control pipe: read={}, write={}", ctrl_read_raw, ctrl_write_raw);
// --- Spawn the Child Process (Producer) ---
let mut child =
Command::new("gst-launch-1.0")
.arg("-v")
.arg("videotestsrc")
.arg("is-live=true")
.arg("!")
.arg("video/x-raw,width=320,height=240,framerate=30/1")
.arg("!")
.arg("ipcpipelinesink")
.arg(format!("fdin={}", ctrl_read_raw)) // Child reads control messages
.arg(format!("fdout={}", data_write_raw)) // Child writes data
.stderr(std::process::Stdio::inherit())
.spawn()?
;
println!("Child process spawned with PID: {}", child.id());
// Give child time to initialize
thread::sleep(Duration::from_millis(200));
// --- Run the Parent Pipeline (Consumer) ---
let parent_pipeline_desc = format!(
"ipcpipelinesrc fdin={} fdout={} ! videoconvert ! autovideosink",
data_read_raw, // Parent reads data
ctrl_write_raw // Parent writes control messages
);
println!("Parent pipeline: {}", parent_pipeline_desc);
let pipeline = gst::parse::launch(&parent_pipeline_desc)?;
match pipeline.set_state(gst::State::Playing) {
Ok(_) => println!("Parent pipeline PLAYING"),
Err(e) => {
eprintln!("Failed to start parent: {:?}", e);
return Err(Box::new(e));
}
}
// Close unused FDs in parent (child's write end of control pipe, child's read end of data pipe)
// Actually, we need to keep them open until GStreamer dups them, then close our copies
thread::sleep(Duration::from_millis(100));
drop(data_write_fd);
drop(ctrl_read_fd);
println!("Parent closed unused FDs");
// --- 5. Handle pipeline messages ---
let bus = pipeline.bus().expect("Pipeline without bus");
// Run the pipeline for 10 seconds
for msg in bus.iter_timed(gst::ClockTime::from_seconds(10)) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
eprintln!(
"Error from element {}: {} ({})",
err.src().map(|s| s.path_string()).unwrap_or_else(|| "NONE".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into())
);
break;
}
_ => (),
}
}
// Cleanup
pipeline.set_state(gst::State::Null)?;
match child.wait() {
Ok(status) => println!("Child process exited with status: {}", status),
Err(e) => eprintln!("Failed to wait on child process: {}", e),
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment