Skip to content

Instantly share code, notes, and snippets.

@joanpau
Last active October 2, 2015 01:30
Show Gist options
  • Save joanpau/75256d447af6b7171057 to your computer and use it in GitHub Desktop.
Save joanpau/75256d447af6b7171057 to your computer and use it in GitHub Desktop.
Test growing and pruning branches dynamically in a gstreamer pipeline.
/**
* Test a dynamic pipeline made of:
* - source elements (videotestsrc, capsfilter, and tee)
* - sink elements (queue, and appsink)
*
* After building the pipeline and playing it for a while,
* pause it and remove the sink elements.
* Then add identical elements and try to play again.
* The pipeline fails to change the state to PLAYING.
*
* Steps reproducing the failure:
* [1] (READY ) build the pipeline,
* go to playing
* [2] (PLAYING) grab some frames from the appsink
* go to paused
* [3] (PAUSED ) remove all none source elements
* [4] (PAUSED ) add new elements identical to those removed
* go to playing *THIS ONE FAILS*
* [5] (FAILURE)
*
* When simulating a live-source (videotestsrc property is-live=true)
* a not negotiated error occurs.
* Otherwise (videotestsrc property is-live=false)
* getting the state just before the second transition to PLAYING
* returns ASYNC with current=PAUSED and pending=PAUSED (what does it mean?).
* The transition from PAUSED to PLAYING never completes.
*
* If we fall back to READY instead of PAUSED to remove and add the elements
* (that is READY -> PLAYING -> READY (remove and add elements) -> PLAYING)
* it works fine.
*/
#include <stdio.h>
#include <stdlib.h>
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <unistd.h>
static const gchar * MEDIA_TYPE = "video/x-raw";
static const gchar * FORMAT =gst_object_unref(peer); "RGB";
static const gint RATE_NUM = 15;
static const gint RATE_DEN = 2;
static const gint WIDTH = 1280;
static const gint HEIGHT = 960;
static const gint DEPTH = 3;
typedef struct {
GCond cond;
GMutex mutex;
gboolean eos_at_sink;
gboolean blocked;
GstState current;
GstState pending;
GstElement * pipeline;
GstElement * source;
GstElement * filter;
GstElement * tee;
GstElement * queue;gst_object_unref(peer);
GstElement * sink;
} PipeElements;
static GstStateChangeReturn
change_state(GstElement * pipeline, GstState state,
GstState * current, GstState * pending);
static void
save_frames(GstAppSink * sink, gsize frames, const gchar * pattern);
static GstPadProbeReturn
send_eos_to_peer
(GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
static void
wait_eos_at_sink
(PipeElements * elements);
static GstBusSyncReply
handle_bus_message
(GstBus * bus, GstMessage * message, gpointer user_data);
int main(int argc, char** argv) {
const gchar * errmsg;
gsize frames;
PipeElements * elements;
GstCaps * caps;
GstPad * srcpad, * snkpad;
GstBus * bus;
GstStateChangeReturn ret;
gst_init (&argc, &argv);
frames = argc > 1 ? atoi(argv[1]) : 0;
elements = g_try_new(PipeElements, 1);
if (! elements) {
errmsg = "could not create pipeline elements struct";
goto error_message;
}
elements->blocked = FALSE;
elements->eos_at_sink = FALSE;
g_mutex_init(&elements->mutex);
g_cond_init(&elements->cond);
elements->pipeline = gst_pipeline_new(NULL);
elements->source = gst_element_factory_make("videotestsrc", NULL);
elements->filter = gst_element_factory_make("capsfilter", NULL);
elements->tee = gst_element_factory_make("tee", NULL);
elements->queue = gst_element_factory_make("queue", NULL);
elements->sink = gst_element_factory_make("appsink", NULL);
if (! (elements->pipeline && elements->source && elements->filter &&
elements->tee && elements->queue && elements->sink)) {
errmsg = "could not create pipeline elements";
if (elements->pipeline) gst_object_unref(elements->pipeline);
if (elements->source) gst_object_unref(elements->source);
if (elements->filter) gst_object_unref(elements->filter);
if (elements->tee) gst_object_unref(elements->tee);
if (elements->queue) ggst_object_unref(peer);st_object_unref(elements->queue);
if (elements->sink) gst_object_unref(elements->sink);
goto error_elements;
}
g_object_set(elements->pipeline, "message-forward", TRUE, NULL);
g_object_set(elements->source, "is-live", TRUE, "pattern", 18, NULL);
caps = gst_caps_new_simple(
MEDIA_TYPE, "format", G_TYPE_STRING, FORMAT,
"width", G_TYPE_INT, WIDTH, "height", G_TYPE_INT, HEIGHT,
"framerate", GST_TYPE_FRACTION, RATE_NUM, RATE_DEN, NULL);
g_object_set(elements->filter, "caps", caps, NULL);
gst_caps_unref(caps);
gst_bin_add_many(GST_BIN(elements->pipeline),
elements->source, elements->filter, elements->tee,
elements->queue, elements->sink, NULL);
srcpad = gst_element_get_request_pad(elements->tee, "src_%u");
snkpad = gst_element_get_static_pad(elements->queue, "sink");
gst_object_unref(srcpad);
gst_object_unref(snkpad);
if (! gst_element_link_many(elements->source, elements->filter, elements->tee,
elements->queue, elements->sink, NULL)) {
errmsg = "could not link gst_object_unref(peer);pipeline elements";
goto error_pipeline;
}
bus = gst_pipeline_get_bus (GST_PIPELINE (elements->pipeline));
gst_bus_set_sync_handler(bus, handle_bus_message, elements, NULL);
gst_object_unref(bus);
ret = change_state(elements->pipeline, GST_STATE_PLAYING,
&elements->current, &elements->pending);
if (elements->current != GST_STATE_PLAYING) {
errmsg = "could not change pipeline state to PLAYING";
goto error_pipeline;
}
save_frames(GST_APP_SINK(elements->sink), frames, "frame_1_%02d.ppm");
snkpad = gst_element_get_static_pad(elements->queue, "sink");
srcpad = gst_pad_get_peer(snkpad);
gst_pad_add_probe(srcpad, GST_PAD_PROBE_TYPE_IDLE,
send_eos_to_peer, elements, NULL);
gst_object_unref(snkpad);gst_object_unref(peer);
gst_object_unref(srcpad);
wait_eos_at_sink(elements);
ret = change_state(elements->pipeline, GST_STATE_PAUSED,
&elements->current, &elements->pending);
if (! (ret == GST_STATE_CHANGE_NO_PREROLL ||
ret == GST_STATE_CHANGE_SUCCESS)) {
errmsg = "could not change pipeline state to PAUSED";
goto error_pipeline;
}
snkpad = gst_element_get_static_pad(elements->queue, "sink");
srcpad = gst_pad_get_peer(snkpad);
gst_pad_unlink(srcpad, snkpad);
gst_element_release_request_pad(elements->tee, srcpad);
gst_object_unref(snkpad);
gst_object_unref(srcpad);
gst_bin_remove_many(GST_BIN(elements->pipeline),
gst_object_ref(elements->queue),
gst_object_ref(elements->sink), NULL);
gst_element_set_state(elements->queue, GST_STATE_NULL);
gst_element_set_state(elements->sink, GST_STATE_NULL);
gst_object_unref(elements->qugst_object_unref(peer);eue);
gst_object_unref(elements->sink);
elements->queue = gst_element_factory_make("queue", NULL);
elements->sink = gst_element_factory_make("appsink", NULL);
if (! (elements->queue && elements->sink)) {
errmsg = "could not create terminal pipeline elements";
if (elements->queue) gst_object_unref(elements->queue);
if (elements->sink) gst_object_unref(elements->sink);
goto error_pipeline;
}
gst_bin_add_many(GST_BIN(elements->pipeline),
elements->queue, elements->sink, NULL);
srcpad = gst_element_get_request_pad(elements->tee, "src_%u");
snkpad = gst_element_get_static_pad(elements->queue, "sink");
gst_object_unref(srcpad);
gst_object_unref(snkpad);
if (! gst_element_link_many(elements->tee, elements->queue,
elements->sink, NULL)) {
errmsg = "could not link pipeline terminal elements";
goto error_pipeline;
}
ret = change_state(elements->pipeline, GST_STATE_PLAYING,
&elements->current, &elements->pending);
if (elements->current != GST_STATE_PLAYING) {
errmsg = "could not change pipeline state to PLAYING";
goto error_pipeline;
}
save_frames(GST_APP_SINK(elements->sink), frames, "frame_2_%02d.ppm");
gst_element_set_state(elements->pipeline, GST_STATE_NULL);
gst_object_unref(elements->pipeline);
g_cond_clear(&elements->cond);
g_mutex_clear(&elements->mutex);
g_free(elements);
return (EXIT_SUCCESS);
error_pipeline:
gst_element_set_state(elements->pipeline, GST_STATE_NULL);
gst_object_unref(elements->pipeline);
error_elements:
g_cond_clear(&elements->cond);
g_mutex_clear(&elements->mutex);
g_free(elements);
error_message:
g_printerr("%s\n", errmsg);
return (EXIT_FAILURE);
}
static GstBusSyncReply
handle_bus_message
(GstBus * bus, GstMessage * message, gpointer user_data)
{
GError * error;
gchar * debug;
const GstStructure * structure;
GstMessage * forwarded;
PipeElements * elements;
elements = user_data;
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ELEMENT:
/*
* In real code this would be done with a derived bin,
* overriding its message handling virtual function
* to intercept the EOS message from the sink.
* We do it here for simplicity.
*/
structure = gst_message_get_structure (message);
if (gst_structure_has_name (structure, "GstBinForwarded")) {
gst_structure_get (structure, "message", GST_TYPE_MESSAGE, &forwarded, NULL);
if (GST_MESSAGE_TYPE (forwarded) == GST_MESSAGE_EOS) {
g_print("EOS from element %s\n",
GST_MESSAGE_SRC_NAME (forwarded));
if (GST_MESSAGE_SRC(forwarded) == GST_OBJECT(elements->sink)) {
g_mutex_lock(&elements->mutex);
elements->eos_at_sink = TRUE;
g_cond_signal(&elements->cond);
g_mutex_unlock(&elements->mutex);
}
}
gst_message_unref (forwarded);
}
return GST_BUS_DROP;
case GST_MESSAGE_ERROR:
gst_message_parse_error(message, &error, &debug);
g_print("%12s: %s (%s)\n", "ERROR",
error->message, debug ? debug : "no debug info");
g_error_free (error);
g_free (debug);
gst_debug_bin_to_dot_file(GST_BIN(elements->pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "error");
return GST_BUS_PASS;
case GST_MESSAGE_WARNING:
gst_message_parse_warning(message, &error, &debug);
g_print("%12s: %s (%s)\n", "WARNING",
error->message, debug ? debug : "no debug info");
g_error_free (error);
g_free (debug);
gst_debug_bin_to_dot_file(GST_BIN(elements->pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "warning");
return GST_BUS_PASS;
case GST_MESSAGE_INFO:
gst_message_parse_info(message, &error, &debug);
g_print("%12s: %s (%s)\n", "INFO",
error->message, debug ? debug : "no debug info");
g_error_free (error);
g_free (debug);
return GST_BUS_PASS;
case GST_MESSAGE_EOS:
g_print("%12s: %s (%s)\n",
"EOS", "EOS message received", "no debug info");
return GST_BUS_PASS;
default:
return GST_BUS_PASS;
}
}
static GstPadProbeReturn
send_eos_to_peer
(GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
PipeElements * elements;
GstPad * peer;
elements = user_data;
if (g_atomic_int_compare_and_exchange(&elements->blocked, FALSE, TRUE)) {
peer = gst_pad_get_peer(pad);
gst_pad_send_event(peer, gst_event_new_eos());
gst_object_unref(peer);
}
return GST_PAD_PROBE_OK;
}
static void
wait_eos_at_sink
(PipeElements * elements)
{
g_mutex_lock(&elements->mutex);
while (! elements->eos_at_sink)
g_cond_wait(&elements->cond, &elements->mutex);
g_mutex_unlock(&elements->mutex);
}
static GstStateChangeReturn
change_state(GstElement * pipeline, GstState state,
GstState * current, GstState * pending)
{
GstStateChangeReturn ret;
GstClockTime timeout;
ret = gst_element_set_state(pipeline, state);
timeout = 500 * GST_MSECOND * (ret == GST_STATE_CHANGE_ASYNC ? 1 : 0);
ret = gst_element_get_state(pipeline, current, pending, timeout);
g_print("pipeline state change to %8s (return, current, pending): %s %s %s\n",
gst_element_state_get_name(state),
gst_element_state_change_return_get_name(ret),
gst_element_state_get_name(*current),
gst_element_state_get_name(*pending));
return ret;
}
static void
save_frames(GstAppSink * sink, gsize frames, const gchar * pattern)
{
GstSample * sample;
GstBuffer * buffer;
gpointer bytes;
gsize n, size, copied;
gchar * name;
FILE * file;
for (n = 0; n < frames; n++) {
sample = gst_app_sink_pull_sample(GST_APP_SINK(sink));
buffer = gst_sample_get_buffer(sample);
size = gst_buffer_get_size(buffer);
gst_buffer_extract_dup(buffer, 0, size, &bytes, &copied);
gst_sample_unref(sample);
name = g_strdup_printf(pattern, n);
file = fopen(name, "wb");
printf("Save frame to file: %s\n", name);
if (file == NULL) {
perror("Can't create output file");
} else {
fprintf(file, "P6\n%u %u\n255\n", WIDTH, HEIGHT);
fwrite(bytes, 1, WIDTH*HEIGHT*DEPTH, file);
fclose(file);
}
g_free(name);
g_free(bytes);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment