GStreamer中如何自定义配置线程优先级

1.引言

如果看了gstreamer官方教程配置多线程出现编译不过的问题了,不妨进来看看这篇文章或许能解决一些编译问题。

GStreamer 本质上是多线程的,并且是完全线程安全的。大多数线程内部对应用程序是隐藏的,这应该使应用程序开发更容易。但是,在某些情况下,应用程序可能希望影响其中的某些部分。 GStreamer 允许应用程序在pipeline的某些部分强制使用多个线程,GStreamer 还可以在创建线程时通知您,以便您可以配置要使用的线程优先级或线程池等内容。

2.GStreamer 中的调度

GStreamer pipeline中的每个element决定如何调度它。element可以选择是基于推还是基于拉的方式调度它们的pad。例如,一个element可以选择启动一个线程以开始从sink pad拉取 或/并 开始从source pad推送。element也可以选择使用上游(upstream )或下游(downstream )线程分别以推和拉模式对数据进行处理。 GStreamer 对element选择如何调度没有任何限制。有关更多详细信息,请参阅插件编写指南。

在任何情况下都会发生的是,某些element将启动一个线程来处理它们的数据,称为“流线程”(“streaming threads”)。当element需要创建一个流线程时,流线程或叫做 GstTask 对象是从 GstTaskPool 创建的。在下一节中,我们将看到如何接收任务和池的通知。

3.在 GStreamer 中配置线程

STREAM_STATUS 消息发布在总线上以通知有关流线程的状态。你将从消息中获得以下信息:

当将要创建新线程时,您将收到 GST_STREAM_STATUS_TYPE_CREATE 类型的通知。然后可以在 GstTask 中配置 GstTaskPool。自定义任务池将为任务提供自定义线程以实现流线程。

  • 如果要配置自定义任务池,则需要同步处理此消息。如果你在此消息返回时未在任务上配置任务池,则任务将使用其默认池。

  • 进入或离开线程时。这是你可以配置线程优先级的时刻。当线程被销毁时,你也会收到通知。

  • 当线程开始、暂停和停止时,你会收到消息。这可用于在 gui 应用程序中可视化流线程的状态。

4.如何提高线程的优先级

请添加图片描述
让我们看一下上面的pipeline。我们希望提高流线程的优先级。appsrc element将启动流线程来将appsrc中src pad上的数据推送给对端queue element的sink pad。改变优先级的流程是这样的:

  • 当从 READY 到 PAUSED 状态时, appsrc 将需要一个流线程来将数据推送到 queue 中。它将发布一条STREAM_STATUS 消息,指示其对流线程的需求。

  • 应用程序将使用同步总线处理程序对 STREAM_STATUS 消息做出反应。然后它将在消息内的 GstTask 上配置自定义GstTaskPool。自定义任务池负责创建线程。在这个例子中,我们将创建一个具有更高优先级的线程。

  • 或者,由于同步消息是在线程上下文中调用的,您可以使用线程 ENTER/LEAVE 通知来更改当前线程的优先级或调度策略。

第一步,我们需要实现一个可以在任务上配置的自定义 GstTaskPool。下面是一个 GstTaskPool 子类的实现,它使用 pthreads 创建一个 SCHED_RR 实时线程。请注意,创建实时线程可能需要额外的权限。

#include <gst/gst.h>
#include <gst/gsttaskpool.h>#define TEST_TYPE_RT_POOL \(test_rt_pool_get_type())GType test_rt_pool_get_type (void);
GstTaskPool *test_rt_pool_new (void);typedef struct _GstTaskPool      TestRTPool;
typedef struct _GstTaskPoolClass TestRTPoolClass;typedef struct
{pthread_t thread;
} TaskPoolRTId;G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);static void
default_prepare (GstTaskPool * pool, GError ** error)
{/* we don't do anything here. We could construct a pool of threads here that* we could reuse later but we don't */
}static void
default_cleanup (GstTaskPool * pool)
{
}static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,GError ** error)
{TestRTId *tid;gint res;pthread_attr_t attr;struct sched_param param;tid = g_slice_new0 (TestRTId);pthread_attr_init (&attr);if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)g_warning ("setschedpolicy: failure: %p", g_strerror (res));param.sched_priority = 50;if ((res = pthread_attr_setschedparam (&attr, &param)) != 0)g_warning ("setschedparam: failure: %p", g_strerror (res));if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)g_warning ("setinheritsched: failure: %p", g_strerror (res));res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);if (res != 0) {g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,"Error creating thread: %s", g_strerror (res));g_slice_free (TestRTId, tid);tid = NULL;}return tid;
}static void
default_join (GstTaskPool * pool, gpointer id)
{TestRTId *tid = (TestRTId *) id;pthread_join (tid->thread, NULL);g_slice_free (TestRTId, tid);
}static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{GstTaskPoolClass *gsttaskpool_class;gsttaskpool_class = (GstTaskPoolClass *) klass;gsttaskpool_class->prepare = default_prepare;gsttaskpool_class->cleanup = default_cleanup;gsttaskpool_class->push = default_push;gsttaskpool_class->join = default_join;
}static void
test_rt_pool_init (TestRTPool * pool)
{
}GstTaskPool *
test_rt_pool_new (void)
{GstTaskPool *pool;pool = g_object_new (TEST_TYPE_RT_POOL, NULL);return pool;
}

编写任务池时要实现的重要功能是“push”功能。 实现上应该启动一个调用给定函数的线程。 更复杂的实现可能希望在池中保留一些线程,因为创建和销毁线程并不总是最快的操作。

在下一步中,我们需要在 appsrc 需要时实际配置自定义任务池。 为此,我们使用同步处理程序拦截 STREAM_STATUS 消息。

static GMainLoop* loop;static void
on_stream_status (GstBus     *bus,GstMessage *message,gpointer    user_data)
{GstStreamStatusType type;GstElement *owner;const GValue *val;GstTask *task = NULL;gst_message_parse_stream_status (message, &type, &owner);val = gst_message_get_stream_status_object (message);/* see if we know how to deal with this object */if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {task = g_value_get_object (val);}switch (type) {case GST_STREAM_STATUS_TYPE_CREATE:if (task) {GstTaskPool *pool;pool = test_rt_pool_new();gst_task_set_pool (task, pool);}break;default:break;}
}static void
on_error (GstBus     *bus,GstMessage *message,gpointer    user_data)
{g_message ("received ERROR");g_main_loop_quit (loop);
}static void
on_eos (GstBus     *bus,GstMessage *message,gpointer    user_data)
{g_main_loop_quit (loop);
}int
main (int argc, char *argv[])
{GstElement *bin, *AppSrc;GstBus *bus;GstStateChangeReturn ret;gst_init (&argc, &argv);/* create a new bin to hold the elements */bin = gst_pipeline_new ("pipeline");g_assert (bin);//pseudo-code/* create some elements */AppSrc = gst_element_factory_make ("appsrc", "appsrc");............./* add objects to the main pipeline */gst_bin_add_many (GST_BIN (bin),AppSrc ,..., ..., NULL);/* link the elements */gst_element_link_many(AppSrc,..., ...,null);loop = g_main_loop_new (NULL, FALSE);/* get the bus, we need to install a sync handler */bus = gst_pipeline_get_bus (GST_PIPELINE (bin));gst_bus_enable_sync_message_emission (bus);gst_bus_add_signal_watch (bus);g_signal_connect (bus, "sync-message::stream-status",(GCallback) on_stream_status, NULL);g_signal_connect (bus, "message::error",(GCallback) on_error, NULL);g_signal_connect (bus, "message::eos",(GCallback) on_eos, NULL);/* start playing */ret = gst_element_set_state (bin, GST_STATE_PLAYING);if (ret != GST_STATE_CHANGE_SUCCESS) {g_message ("failed to change state");return -1;}/* Run event loop listening for bus messages until EOS or ERROR */g_main_loop_run (loop);/* stop the bin */gst_element_set_state (bin, GST_STATE_NULL);gst_object_unref (bus);g_main_loop_unref (loop);return 0;
}

请注意,该程序可能需要 root 权限才能创建实时线程。 当无法创建线程时,状态更改函数将失败,我们在上面的应用程序中捕获了这一点。

当pipeline中有多个线程时,您将收到多个 STREAM_STATUS 消息。 您应该使用消息的所有者(可能是 pad 或启动线程的element)来确定该线程在应用程序上下文中的功能是什么。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/11893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ--死信队列

目录 一、死信队列介绍 1.死信 2.死信的来源 2.1 TTL 2.2 死信的来源 3.死信队列 4.死信队列的用途 二、死信队列的实现 1.导入依赖 pom.xml 2.application.properties 3.配置类 4.生产者 5.业务消费者&#xff08;正常消费者&#xff09; 6.死信队列消费者 一、…

Linux系统安全整改实践指南

在当前信息化高速发展的时代&#xff0c;Linux操作系统凭借其开源、稳定和高效的特点&#xff0c;在服务器市场占据着举足轻重的地位。然而&#xff0c;随着网络威胁的日益复杂化&#xff0c;确保Linux系统的安全性成为了一项至关重要的任务。本文旨在提供一套全面的Linux系统安…

leetcode 1319.连通网络的操作次数

思路&#xff1a;DFS&#xff08;连通块&#xff09; 其实一开始的时候&#xff0c;并不知道这道题的精髓在哪&#xff0c;总想着&#xff0c;啊&#xff1f;这怎么用图论的思想做啊&#xff1f; 细细思考之后&#xff0c;这道题还是比较有意思的&#xff0c;需要有一定的数据…

# Mysql 数据库区分大小写吗?

Mysql 数据库区分大小写吗&#xff1f; 1、MySQL 数据库在区分大小写方面有特定的行为&#xff0c;这取决于多个因素&#xff0c;包括操作系统、配置参数以及使用的字符集。 2、数据库名和表名&#xff1a; 在 Linux 系统中&#xff0c;数据库和表名是严格区分大小写的。 而…

【前端性能优化】深入解析重绘和回流,构建高性能Web界面

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 &#x1f3af; 引言&#xff1a;探索Web性能的基石&#x1f3d7;️ 基础概念&#xff1a;什么是重绘和回流&#xff1f;&#x1f4cc; 回流&#xff08;Reflow&#xff09;&#x1f4cc; 重绘&#xff08;Repaint&#xff0…

蓝桥杯国赛每日一题:交换瓶子(图论,环,贪心)

题目描述&#xff1a; 有 N 个瓶子&#xff0c;编号 1∼N&#xff0c;放在架子上。 比如有 5 个瓶子&#xff1a; 2 1 3 5 4要求每次拿起 2 个瓶子&#xff0c;交换它们的位置。 经过若干次后&#xff0c;使得瓶子的序号为&#xff1a; 1 2 3 4 5对于这么简单的情况&#…

使用Flask部署Web应用:从入门到精通

文章目录 第一部分&#xff1a;准备工作第二部分&#xff1a;部署Flask应用到AWS部署到AWS Lambda 第三部分&#xff1a;部署Flask应用到腾讯云服务器部署到腾讯云服务器 第四部分&#xff1a;优化和扩展结论 在现代软件开发中&#xff0c;Web应用的部署是一个至关重要的环节。…

使用Flask-SocketIO构建实时Web应用

文章目录 准备工作编写代码编写HTML模板运行应用 随着互联网的发展&#xff0c;实时性成为了许多Web应用的重要需求之一。传统的HTTP协议虽然可以实现实时通信&#xff0c;但是其长轮询等机制效率低下&#xff0c;无法满足高并发、低延迟的需求。为了解决这一问题&#xff0c;诞…

python常见数据的存取

python数据的存取 python数据的存取数据的保存3.1.1 保存list3.1.2 保存Dict3.1.3 保存Set3.1.4 保存Dataframe3.1.5 保存Matrix 3.2 数据的读取3.2.1 读取txt文件中的数据3.2.2 读取excel文件中的数据3.2.3 读取csv文件中的数据3.2.4 读取stata文件中的数据3.2.5 读取R文件中的…

计算机发展史故事【14】

大象踢踏舞 如果要把电脑50 年的历史划分为两个不同的阶段&#xff0c;那么&#xff0c;1981 年无疑是个分界线。就在那一年&#xff0c;IBM 公司推出个人电脑PC 机&#xff0c;使人类社会大步跨进个人电脑新时代。今天&#xff0c;全世界正在使用的PC 机已达到2 亿台&#xf…

视频拼接融合产品的产品与架构设计(三)内存和显存单元数据迁移

上一篇文章 视频拼接融合产品的产品与架构设计(二) 这一篇沉下先来&#xff0c;彻底放弃了界面&#xff0c;界面最终的体现是最后要做的&#xff0c;现在要做的是产品的架构&#xff0c;使用链式架构方式迁移数据。同时增加插件口&#xff0c;方便编程序。 插件架构 为了视频…

Android 开机过程画面

Android 开机画面流程 Android 开机动画加载流程涉及bootloader、内核、Android 核心进程、Android文件系统 Bootloader(引导加载程序):当设备启动时,首先由 Bootloader 加载。Bootloader 位于设备的固化存储器中,其主要功能是初始化硬件并启动操作系统。 内核加载:Boo…

Kivy 项目51斩百词 3 屏幕页面转换

MRWord/pages/indexpage/index.py class IndexPage(GridLayout):# 初始化def __init__(self, **kwargs):super().__init__(**kwargs)staticmethoddef index_to_upload():App.get_running_app().screen_manager.current "Upload"定义了一个名为 IndexPage 的类&…

短剧奔向小程序,流量生意如何开启?

随着移动互联网的飞速发展&#xff0c;小程序作为一种轻量级、易传播的应用形态&#xff0c;逐渐在各个领域展现出其独特的商业价值。而最近爆火的短剧小视频作为一种受众广泛的娱乐形式&#xff0c;与小程序结合后&#xff0c;不仅为观众提供了更为便捷的观看体验&#xff0c;…

代码随想录算法训练营第五十三天|LeetCode1143.最长公共子序列、LeetCode1035.不相交的线、LeetCode53.最大子序和

LeetCode 1143 最长公共子序列 题目链接&#xff1a;1143. 最长公共子序列 - 力扣&#xff08;LeetCode&#xff09; 【解题思路】 1.确定dp数组含义 dp[i][j] &#xff1a;长度为[0,i-1]的字符串和长度为[0,j-1]的字符串的最长公共子序列为dp[i][j] 2.确定递推公式 text1[i…

Linux线程(三)死锁与线程同步

目录 一、什么是死锁 死锁的四个必要条件 如何避免死锁 避免死锁算法 二、Linux线程同步 三 、条件变量 1、条件变量基本原理 2、条件变量的使用 3、条件变量使用示例 为什么 pthread_cond_wait 需要互斥量? 一、什么是死锁 死锁是计算机科学中的一个概念&#xff0c;…

Python-VBA函数之旅-type函数

目录 一、type函数的常见应用场景 二、type函数使用注意事项 三、如何用好type函数&#xff1f; 1、type函数&#xff1a; 1-1、Python&#xff1a; 1-2、VBA&#xff1a; 2、推荐阅读&#xff1a; 个人主页&#xff1a; https://myelsa1024.blog.csdn.net/ 一、type函…

设计一个游戏的基本博弈框架

设计一个游戏的基本博弈框架&#xff0c;玩家通过操作改变某个数值&#xff0c;这个数值的变动会引发一系列实时变化&#xff0c;并且当这些数值累计到特定阈值时&#xff0c;会导致游戏中出现其他变化&#xff0c;可以分为以下几个步骤&#xff1a; 1. 确定游戏类型和主题 首…

【rk3568】linux与amp内存分配

关于AMP问题&#xff1a; 1、内存分配&#xff1a;linux端与rtos端内存要分割开。 2、在device/rockchip/rk3568/BoardConfig-rk3568-evb1-ddr4-v10.mk中会定义内存地址需要注意在linux端也需要保留rtos使用的的内存地方&#xff0c;否则可能rtos用的的内存会被linux端使用到…