1.引言
如果看了gstreamer官方教程配置多线程出现编译不过的问题了,不妨进来看看这篇文章或许能解决一些编译问题。
GStreamer 本质上是多线程的,并且是完全线程安全的。大多数线程内部对应用程序是隐藏的,这应该使应用程序开发更容易。但是,在某些情况下,应用程序可能希望影响其中的某些部分。 GStreamer 允许应用程序在pipeline的某些部分强制使用多个线程,GStreamer 还可以在创建线程时通知您,以便您可以配置要使用的线程优先级或线程池等内容。
2.GStreamer 中的调度
GStreamer pipeline中的每个element决定如何调度它。element可以选择是基于推还是基于拉的方式调度它们的pad。例如,一个element可以选择启动一个线程以开始从sink pad拉取 或/并 开始从source pad推送。element也可以选择使用上游(upstream )或下游(downstream )线程分别以推和拉模式对数据进行处理。 GStreamer 对element选择如何调度没有任何限制。有关更多详细信息,请参阅插件编写指南。
在任何情况下都会发生的是,某些element将启动一个线程来处理它们的数据,称为“流线程”(“streaming threads”)。当element需要创建一个流线程时,流线程或叫做 GstTask 对象是从 GstTaskPool 创建的。在下一节中,我们将看到如何接收任务和池的通知。
3.在 GStreamer 中配置线程
STREAM_STATUS 消息发布在总线上以通知有关流线程的状态。你将从消息中获得以下信息:
当将要创建新线程时,您将收到 GST_STREAM_STATUS_TYPE_CREATE 类型的通知。然后可以在 GstTask 中配置 GstTaskPool。自定义任务池将为任务提供自定义线程以实现流线程。
-
如果要配置自定义任务池,则需要同步处理此消息。如果你在此消息返回时未在任务上配置任务池,则任务将使用其默认池。
-
进入或离开线程时。这是你可以配置线程优先级的时刻。当线程被销毁时,你也会收到通知。
-
当线程开始、暂停和停止时,你会收到消息。这可用于在 gui 应用程序中可视化流线程的状态。
4.如何提高线程的优先级
让我们看一下上面的pipeline。我们希望提高流线程的优先级。appsrc element将启动流线程来将appsrc中src pad上的数据推送给对端queue element的sink pad。改变优先级的流程是这样的:
-
当从 READY 到 PAUSED 状态时, appsrc 将需要一个流线程来将数据推送到 queue 中。它将发布一条STREAM_STATUS 消息,指示其对流线程的需求。
-
应用程序将使用同步总线处理程序对 STREAM_STATUS 消息做出反应。然后它将在消息内的 GstTask 上配置自定义GstTaskPool。自定义任务池负责创建线程。在这个例子中,我们将创建一个具有更高优先级的线程。
-
或者,由于同步消息是在线程上下文中调用的,您可以使用线程 ENTER/LEAVE 通知来更改当前线程的优先级或调度策略。
第一步,我们需要实现一个可以在任务上配置的自定义 GstTaskPool。下面是一个 GstTaskPool 子类的实现,它使用 pthreads 创建一个 SCHED_RR 实时线程。请注意,创建实时线程可能需要额外的权限。
#include <gst/gst.h>
#include <gst/gsttaskpool.h>#define TEST_TYPE_RT_POOL \(test_rt_pool_get_type())GType test_rt_pool_get_type (void);
GstTaskPool *test_rt_pool_new (void);typedef struct _GstTaskPool TestRTPool;
typedef struct _GstTaskPoolClass TestRTPoolClass;typedef struct
{pthread_t thread;
} TaskPoolRTId;G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);static void
default_prepare (GstTaskPool * pool, GError ** error)
{/* we don't do anything here. We could construct a pool of threads here that* we could reuse later but we don't */
}static void
default_cleanup (GstTaskPool * pool)
{
}static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,GError ** error)
{TestRTId *tid;gint res;pthread_attr_t attr;struct sched_param param;tid = g_slice_new0 (TestRTId);pthread_attr_init (&attr);if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)g_warning ("setschedpolicy: failure: %p", g_strerror (res));param.sched_priority = 50;if ((res = pthread_attr_setschedparam (&attr, ¶m)) != 0)g_warning ("setschedparam: failure: %p", g_strerror (res));if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)g_warning ("setinheritsched: failure: %p", g_strerror (res));res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);if (res != 0) {g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,"Error creating thread: %s", g_strerror (res));g_slice_free (TestRTId, tid);tid = NULL;}return tid;
}static void
default_join (GstTaskPool * pool, gpointer id)
{TestRTId *tid = (TestRTId *) id;pthread_join (tid->thread, NULL);g_slice_free (TestRTId, tid);
}static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{GstTaskPoolClass *gsttaskpool_class;gsttaskpool_class = (GstTaskPoolClass *) klass;gsttaskpool_class->prepare = default_prepare;gsttaskpool_class->cleanup = default_cleanup;gsttaskpool_class->push = default_push;gsttaskpool_class->join = default_join;
}static void
test_rt_pool_init (TestRTPool * pool)
{
}GstTaskPool *
test_rt_pool_new (void)
{GstTaskPool *pool;pool = g_object_new (TEST_TYPE_RT_POOL, NULL);return pool;
}
编写任务池时要实现的重要功能是“push”功能。 实现上应该启动一个调用给定函数的线程。 更复杂的实现可能希望在池中保留一些线程,因为创建和销毁线程并不总是最快的操作。
在下一步中,我们需要在 appsrc 需要时实际配置自定义任务池。 为此,我们使用同步处理程序拦截 STREAM_STATUS 消息。
static GMainLoop* loop;static void
on_stream_status (GstBus *bus,GstMessage *message,gpointer user_data)
{GstStreamStatusType type;GstElement *owner;const GValue *val;GstTask *task = NULL;gst_message_parse_stream_status (message, &type, &owner);val = gst_message_get_stream_status_object (message);/* see if we know how to deal with this object */if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {task = g_value_get_object (val);}switch (type) {case GST_STREAM_STATUS_TYPE_CREATE:if (task) {GstTaskPool *pool;pool = test_rt_pool_new();gst_task_set_pool (task, pool);}break;default:break;}
}static void
on_error (GstBus *bus,GstMessage *message,gpointer user_data)
{g_message ("received ERROR");g_main_loop_quit (loop);
}static void
on_eos (GstBus *bus,GstMessage *message,gpointer user_data)
{g_main_loop_quit (loop);
}int
main (int argc, char *argv[])
{GstElement *bin, *AppSrc;GstBus *bus;GstStateChangeReturn ret;gst_init (&argc, &argv);/* create a new bin to hold the elements */bin = gst_pipeline_new ("pipeline");g_assert (bin);//pseudo-code/* create some elements */AppSrc = gst_element_factory_make ("appsrc", "appsrc");............./* add objects to the main pipeline */gst_bin_add_many (GST_BIN (bin),AppSrc ,..., ..., NULL);/* link the elements */gst_element_link_many(AppSrc,..., ...,null);loop = g_main_loop_new (NULL, FALSE);/* get the bus, we need to install a sync handler */bus = gst_pipeline_get_bus (GST_PIPELINE (bin));gst_bus_enable_sync_message_emission (bus);gst_bus_add_signal_watch (bus);g_signal_connect (bus, "sync-message::stream-status",(GCallback) on_stream_status, NULL);g_signal_connect (bus, "message::error",(GCallback) on_error, NULL);g_signal_connect (bus, "message::eos",(GCallback) on_eos, NULL);/* start playing */ret = gst_element_set_state (bin, GST_STATE_PLAYING);if (ret != GST_STATE_CHANGE_SUCCESS) {g_message ("failed to change state");return -1;}/* Run event loop listening for bus messages until EOS or ERROR */g_main_loop_run (loop);/* stop the bin */gst_element_set_state (bin, GST_STATE_NULL);gst_object_unref (bus);g_main_loop_unref (loop);return 0;
}
请注意,该程序可能需要 root 权限才能创建实时线程。 当无法创建线程时,状态更改函数将失败,我们在上面的应用程序中捕获了这一点。
当pipeline中有多个线程时,您将收到多个 STREAM_STATUS 消息。 您应该使用消息的所有者(可能是 pad 或启动线程的element)来确定该线程在应用程序上下文中的功能是什么。