ConsumerProducer库:高效处理任务队列,提升系统多线程调度性能

ConsumerProducer库概述

ConsumerProducer库是一个用于多线程任务处理的C++库。它提供了一种机制,允许用户定义任务的优先级和处理方式,并通过多线程方式高效地处理任务队列中的任务。

  • 代码符合Misra C++标准;
  • 模块提供设置线程优先级、处理线程个数及任务队列个数的功能;
  • 模块提供低优先级队列和高优先级队列管理功能。
  • 生产者添加任务的时候会根据优先级添加到低优先级队列还是高优先级队列,消费者获取任务的时候优先获取高优先级队列中的任务进行处理。
  • 模块具有统计任务总的等待时间消耗,处理时间消耗、丢弃时间消耗信息的功能。

仓库路径:https://gitee.com/liudegui/consumer-producer

ConsumerProducerConfig结构体

首先,让我们来看一下ConsumerProducerConfig结构体。这个结构体定义了任务处理的配置参数,如输入队列的名称、优先级、工作线程数量等。通过配置这些参数,用户可以根据实际需求灵活地调整任务处理的行为。

ConsumerProducer类

ConsumerProducer类是ConsumerProducer库的核心组件,它提供了任务的添加、处理和管理功能。该类包含了任务队列、线程管理等重要功能,下面我们将详细介绍其主要成员和功能。

MyJob结构体

MyJob结构体表示一个任务,包含了任务的相关信息,如任务指针、时间戳、任务ID等。通过MyJob结构体,用户可以轻松地管理和操作任务。

MyCpQueue类

MyCpQueue类实现了一个任务队列,用于存储任务并支持任务的添加和弹出操作。该类采用循环队列的方式实现,保证了任务的高效处理。

ConsumerProducer类成员函数

ConsumerProducer类提供了一系列成员函数,用于任务的添加、处理和管理。这些函数包括添加任务、启动处理线程、暂停任务处理等,为用户提供了丰富的操作接口。

函数名称功能
ConsumerProducer:: ConsumerProducer构造函数
ConsumerProducer::add_job添加job到队列
ConsumerProducer:: add_job_wait_done添加job并等待任务完成
ConsumerProducer:: shutdown关闭线程
ConsumerProducer:: queue_length获取队列中存在的job数量
ConsumerProducer:: max_queue_length获取队列可以存放的最大job数量
ConsumerProducer:: get_threads获取线程池中存放线程容器的首地址
ConsumerProducer:: dropped_job_count获取丢弃的job数量
ConsumerProducer:: blocked_job_count获取被阻塞的job数量
ConsumerProducer:: print_stats打印线程池状态信息

核心代码


struct ConsumerProducerConfig {std::string in_name;int32_t in_priority;uint32_t in_worker_num;uint32_t in_queue_size;uint32_t in_prefer_queue_size;uint32_t in_hi_cp_queue_size;uint32_t in_cp_cpu_set_size;const cpu_set_t *in_cp_cpuset;bool in_allow_log;
};
template <typename Worker>
class ConsumerProducer {public:/*** @brief ConsumerProducer process callback function* @param job       MyJob to process* @param prefer    True if the job is preferred, otherwise false*/using ConsumeFunc = std::function<int32_t(std::shared_ptr<Worker> job, bool prefer)>;private:enum MyJobPriority : int32_t {PRIORITY_LOW = 0,PRIORITY_HIGH = 1,PRIORITY_MAX = 2,};class MyJob {private:std::shared_ptr<Worker> my_job_ptr_;int64_t job_timestamp_;uint64_t job_job_id_;bool job_not_discardable_;public:MyJob() : my_job_ptr_(nullptr), job_timestamp_(0), job_job_id_(0), job_not_discardable_(false) {}MyJob(std::shared_ptr<Worker> job_task, int64_t in_timestamp, uint64_t job_id, bool not_discardable): my_job_ptr_(job_task),job_timestamp_(in_timestamp),job_job_id_(job_id),job_not_discardable_(not_discardable) {}public:inline bool is_not_discardable() const {return job_not_discardable_;}inline uint64_t get_job_id() const {return job_job_id_;}inline int64_t get_timestamp() const {return job_timestamp_;}std::shared_ptr<Worker> get_my_job() const {return my_job_ptr_;}};class MyCpQueue {private:std::string cpqueue_name_;uint32_t cpqueue_head_{0};uint32_t cpqueue_tail_{0};bool cpqueue_full_{false};bool cpqueue_empty_{true};uint32_t cpqueue_queue_size_{0};uint32_t cpqueue_count_{0};std::vector<MyJob> jobs_;bool cp_queue_allow_log_{false};public:MyCpQueue(const std::string &in_name, uint32_t in_queue_size, bool in_allow_log) {cpqueue_name_ = in_name;cpqueue_queue_size_ = in_queue_size;jobs_.resize(in_queue_size);cp_queue_allow_log_ = in_allow_log;}~MyCpQueue() {if (cp_queue_allow_log_) {if (cpqueue_count_ > 0) {MY_LOG_ERROR("%s cpqueue_count_ = %d", cpqueue_name_.c_str(), cpqueue_count_);}}}bool is_full() const {return cpqueue_full_;}bool cpq_is_empty() const {return cpqueue_empty_;}uint32_t cp_queue_queue_length() const {return cpqueue_count_;}void cpq_add_job(const MyJob &in_job) {jobs_[cpqueue_tail_] = in_job;cpqueue_tail_++;if (cpqueue_tail_ == cpqueue_queue_size_) {cpqueue_tail_ = 0;}if (cpqueue_tail_ == cpqueue_head_) {cpqueue_full_ = true;}cpqueue_empty_ = false;cpqueue_count_++;if (cp_queue_allow_log_) {if (cpqueue_count_ > cpqueue_queue_size_) {MY_LOG_PANIC("%s cpqueue_count_ = %u cpqueue_queue_size_ = %u", cpqueue_name_.c_str(), cpqueue_count_,cpqueue_queue_size_);}}return;}uint32_t pop(MyJob &out_job, bool peek_only = false) {uint32_t ret_value = cpqueue_count_;if (cpqueue_empty_) {if (cp_queue_allow_log_) {if (cpqueue_count_ > 0) {MY_LOG_PANIC("%s cpqueue_count_ = %u", cpqueue_name_.c_str(), cpqueue_count_);}}ret_value = 0;} else {if (cp_queue_allow_log_) {if (cpqueue_count_ == 0) {MY_LOG_PANIC("%s cpqueue_count_ = %u", cpqueue_name_.c_str(), cpqueue_count_);}} else {}out_job = jobs_[cpqueue_head_];if (peek_only == false) {cpqueue_head_++;if (cpqueue_head_ == cpqueue_queue_size_) {cpqueue_head_ = 0;}cpqueue_count_--;if (cpqueue_head_ == cpqueue_tail_) {cpqueue_empty_ = true;if (cp_queue_allow_log_) {if (cpqueue_count_ != 0) {MY_LOG_PANIC("%s cpqueue_count_ = %u", cpqueue_name_.c_str(), cpqueue_count_);}}}}}cpqueue_full_ = false;return ret_value;}inline uint32_t peek(MyJob &out_job) {return pop(out_job, true);}};private:std::string cp_name_;int32_t cp_priority_;MyCpQueue cp_queue_;std::atomic_bool blocking_;std::atomic_bool paused_;uint32_t cp_prefer_queue_size_;MyCpQueue cp_hi_queue_;uint32_t cp_worker_num_;std::vector<std::thread> threads_;ConsumeFunc cp_consume_func_;std::mutex cp_mutex_;std::condition_variable cp_not_full_;std::condition_variable cp_not_empty_;std::condition_variable cp_job_done_cond_;std::atomic_int cp_started_;std::atomic_bool cp_shutdown_;uint32_t cp_cpusetsize_;const cpu_set_t *cp_cpuset_ = nullptr;bool cp_allow_log_;int64_t start_time_;int64_t total_wait_time_[PRIORITY_MAX];int64_t total_process_time_[PRIORITY_MAX];int64_t total_drop_time_[PRIORITY_MAX];uint64_t added_job_[PRIORITY_MAX];uint64_t finished_job_[PRIORITY_MAX];uint64_t blocked_job_[PRIORITY_MAX];uint64_t dropped_job_[PRIORITY_MAX];uint32_t cp_job_id_[PRIORITY_MAX];uint64_t finished_job_id_[PRIORITY_MAX];uint32_t max_queue_length_;int64_t last_active_time_;int64_t last_elapse_time_;int32_t cp_pid_{0};private:/*** @brief Get the job id of corresponding priority* @param arry_idx Index of the priority* @return Return the job id of corresponding priority*/uint32_t assign_job_id_(uint64_t arry_idx) {return ++cp_job_id_[arry_idx];}/*** @brief Mark the last finished job id of corresponding priority* @param arry_idx Index of the priority* @param id  MyJob id to mark*/void update_done_job_id_(uint64_t arry_idx, uint64_t id_in) {finished_job_id_[arry_idx] = id_in;}/*** @brief Get a job from high priority queue or low priority queue* @param job      Buffer to store the job* @param priorty  Buffer to store the priority of the job* @return Return the queue size where the job is from*/uint32_t get_job_(MyJob &out_job, MyJobPriority &priority_out) {bool exit_flag = false;uint32_t ret_value = 0;while (exit_flag == false) {std::unique_lock<std::mutex> lck(cp_mutex_);if (cp_queue_.cpq_is_empty() && cp_hi_queue_.cpq_is_empty() && (cp_shutdown_.load() == false)) {cp_not_empty_.wait(lck);}ret_value = cp_hi_queue_.pop(out_job);if (ret_value == 0) {priority_out = PRIORITY_LOW;ret_value = cp_queue_.pop(out_job);} else {priority_out = PRIORITY_HIGH;}lck.unlock();if (ret_value != 0) {cp_not_full_.notify_all();exit_flag = true;} else {if (cp_shutdown_) {cp_not_empty_.notify_all();exit_flag = true;ret_value = 0;} else {// do nothing}}}return ret_value;}/*** @brief Get the total number of added jobs* @return Return the total number of added jobs*/uint64_t added_job_total_() const {uint64_t ret_value = 0;for (int32_t arry_index = 0; arry_index < PRIORITY_MAX; arry_index++) {ret_value += added_job_[arry_index];}return ret_value;}/*** @brief Get the total number of finished jobs* @return Return the total number of finished jobs*/uint64_t finished_job_total_() const {uint64_t ret_value = 0;for (int32_t arry_index = 0; arry_index < PRIORITY_MAX; arry_index++) {ret_value += finished_job_[arry_index];}return ret_value;}/*** @brief Get the total number of dropped jobs* @return Return the total number of dropped jobs*/uint64_t dropped_job_total_() const {uint64_t ret_value = 0;for (uint32_t arry_index = 0; arry_index < PRIORITY_MAX; arry_index++) {ret_value += dropped_job_[arry_index];}return ret_value;}public:static void *consumer_thread_func_(ConsumerProducer *in_context);public:/*** @brief ConsumerProducer  ructor.*        MyJob in high priority queue will be processed first, and will be marked as preferred.*        MyJob in low priority queue will be marked as preferred*        if the size of low priority queue is less or equal than prefer_queue_size,*        others will be marked as not preferred.* @param name              Name of the ConsumerProducer* @param priority          MyJobPriority of the threads* @param worker_num        Number of threads to create* @param consume_func      Process function of the threads* @param consume_context   Context of the process function* @param queue_size        Size of the queue for jobs* @param prefer_queue_size Size of the prefered jobs in the low priority queue* @param hi_cp_queue_size  Size of the queue for high priority jobs* @param cpusetsize        Size of the CPU set* @param cpuset            CPU set to bind the threads* @param cp_allow_log_     Allow logging*/explicit ConsumerProducer(const ConsumerProducerConfig &in_config, const ConsumeFunc &in_consume_func);~ConsumerProducer() {if (cp_allow_log_) {if ((!cp_shutdown_.load()) || paused_.load()) {MY_LOG_ERROR("%s shutdown=%d paused=%d started=%d", cp_name_.c_str(), cp_shutdown_.load(), paused_.load(),cp_started_.load());}}}/*** @brief Add job to the queue* @param in                MyJob to add_job* @param high_priority     True if the job is high priority, otherwise false.* @param job_id_out        Buffer to store job id of the added job* @param not_discardable   True if the job is not discardable, otherwise false.* @return  Return 0: normal enqueue*                 1: block enqueue*                 2: give up enqueue*                 3: discard head and enqueue*/int32_t add_job_do_(std::shared_ptr<Worker> job, bool high_priority, uint64_t *const job_id_out,bool not_discardable);/*** @brief Create threads and start processing jobs*/void start_process() {cp_started_++;cp_shutdown_ = false;// creates threadsthreads_.reserve(cp_worker_num_);for (uint64_t arry_index = 0; arry_index < cp_worker_num_; arry_index++) {threads_.emplace_back(consumer_thread_func_, this);}std::this_thread::sleep_for(std::chrono::milliseconds(10));}/*** @brief Add job to the queue* @param in_job          MyJob to add_job* @param high_priority   True if the job is high priority, otherwise false.*                        High priority job will be added to high priority queue,*                        otherwise low priority queue.*                        If the high priority queue is full, the high priority job will wait until*                        the queue is not full.* @param not_discardable True if the job is not discardable, otherwise false.*                        If the low priority queue is full, and not_discardable is true,*                        the low priority job will wait until the queue is not full.*                        If the low priority queue is full, and not_discardable is false,*                        then will comsume the oldest job if possible, otherwise will cosume the*                        current job immediately.* @return Return 0: normal enqueue*                1: block enqueue*                2: give up enqueue*                3: discard head and enqueue*/inline void add_job(std::shared_ptr<Worker> in_job, bool high_priority = false, bool not_discardable = false);/*** @brief Add job to the quque and wait until the job is processed* @param in_job        MyJob to add_job* @param high_priority True if the job is high priority, otherwise false.* @return Return 0: normal enqueue*                1: block enqueue*                2: give up enqueue*                3: discard head and enqueue*/int32_t add_job_wait_done(std::shared_ptr<Worker> in_job, bool high_priority = false);/*** @brief Shutdown the threads and wait until all jobs are stopped*/void shutdown_threads();/*** @brief Pause adding jobs to the queue, and wait until all jobs are processed or dropped*/void flush_and_pause();/*** @brief Resume adding jobs to the queue*/void resume();/*** @brief Get the normal queue length* @return Return the normal queue length*/inline uint64_t queue_length() {return cp_queue_.cp_queue_queue_length();}/*** @brief Get the normal queue max length* @return Return the normal queue max length*/inline uint64_t max_queue_length() {return max_queue_length_;}/*** @brief  Get the total number of dropped jobs* @return Return the total number of dropped jobs*/inline uint64_t dropped_job_count();/*** @brief  Get the total number of blocked jobs* @return Return the total number of blocked jobs*/inline uint64_t blocked_job_count();/*** @brief Print current status*/void print_stats(void);/*** @brief Get current status string* @param str_buf       Buffer to store the status string*/void get_stats_string(std::string &output_buffer);
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11013.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于STM32的IIC通信

IIC通信 • I2C&#xff08;Inter IC Bus&#xff09;是由Philips公司开发的一种通用数据总线 • 两根通信线&#xff1a;SCL&#xff08;串行时钟线&#xff09;、SDA&#xff08;串行数据线&#xff09; • 同步&#xff0c;半双工 • 带数据应答 • 支持总线挂载多…

英语学习笔记11——It this your shirt?

It this your shirt? 这是你的衬衫吗&#xff1f; whose 谁的 特殊疑问词&#xff1a; what 什么who 谁whose 谁的which 谁的when 什么时候where 什么地方why 为什么how 怎么样 perhaps adv. 大概 【注意拼写】 catch v. 抓住 口语&#xff1a;Catch! 接着&#xff01;v.…

Boss让你设计架构图,你懵逼了,解救你的参考图来啦。

架构图是指用于描述系统或软件的结构和组成部分之间关系的图形表示。 它是一种高层次的图示&#xff0c;用于展示系统的组件、模块、接口和数据流等&#xff0c;以及它们之间的相互作用和依赖关系。架构图通常被用于可视化系统的整体设计和组织结构&#xff0c;帮助人们理解系…

HTML学习笔记汇总

整理一些常见问题的Links&#xff0c;不定期更新。 Html生成自定义函数的图形&#xff08;2024/5/10&#xff09;-CSDN博客 HTML中插入图片&#xff08;2024/5/10&#xff09;-CSDN博客 Html给网页加icon图标_html icon-CSDN博客

信息系统项目管理师(高项)--学习笔记【第5章:信息系统工程】

目录 第5章 信息系统工程5.1 软件工程5.1.1架构设计5.1.2需求分析5.1.3软件设计5.1.4软件实现5.1.5部署交付5.1.6过程管理 5.2 数据工程5.2.1数据建模5.2.2数据标准化5.2.3数据运维5.2.4数据开发利用5.2.5数据库安全 5.3 系统集成5.3.1集成基础5.3.2网络集成5.3.3数据集成5.3.4…

函数式接口-闭包与柯里化

闭包 定义 示例 注意 这个外部变量 x 必须是effective final 你可以生命他是final&#xff0c;你不声明也会默认他是final的&#xff0c;并且具有final的特性&#xff0c;不可变一旦x可变&#xff0c;他就不是final&#xff0c;就无法形成闭包&#xff0c;也无法与函数对象一…

Linux程序依赖动态链接库目录管理和案例分析

Linux程序运行时查找依赖的动态链接库路径 编译时指定的-rpath&#xff1a;如果程序在编译时使用了-Wl,-rpath,链接器选项&#xff0c;那么程序在运行时也会在这些指定的目录中搜索库。环境变量LD_LIBRARY_PATH指定的目录&#xff1a;这是一个环境变量&#xff0c;可以包含一系…

docker八大架构之应用数据分离架构

数据分离架构 什么是数据分离架构&#xff1f; 数据分离架构是指应用服务&#xff08;应用层&#xff09;和数据库服务&#xff08;数据层&#xff09;使用不同的服务器来进行操作&#xff0c;如下边的两个图所示。当访问到应用层后&#xff0c;如果需要获取数据会进行访问另…

prometheus、mysqld_exporter、node_export、Grafana安装配置

工具简介 Prometheus&#xff08;普罗米修斯&#xff09;&#xff1a;是一个开源的服务监控系统和时间序列数据库 mysqld_exporter&#xff1a; 用于监控 mysql 服务器的开源工具&#xff0c;它是由 Prometheus 社区维护的一个官方 Exporter。该工具通过连接到mysql 服务器并执…

MySQL中,关于日期类型的那些事儿,你知道哪些?

在MySQL数据库中&#xff0c;除了前面我们聊到的数字类型和字符串类型&#xff0c;还有一个常用的数据类型&#xff1a;日期类型。在我们业务表中&#xff0c;基本上每个业务表都有日期类型&#xff0c;用于记录创建时间和修改时间。比如我们的用户表&#xff0c;一般除了要记录…

Metasploit Framework渗透测试相关思考题?

1. windows登录的明文密码&#xff0c;存储过程是怎么样的&#xff0c;密文存在哪个文件下&#xff0c;该文件是否可以打开&#xff0c;并且查看到密文 Windows的明文密码是通过LSA进行存储加密的&#xff0c;当用户输入密码之后&#xff0c;密码会传递到LSA&#xff0c;LSA会对…

Linux流程控制

if语句 基本格式 if condition thencommand1 fi 写成一行 if [ $(ps -ef | grep -c "ssh") -gt 1 ]; then echo "true"; fi if-else语句 格式 if condition thencommand1 command2...commandN elsecommand fi if else- if else if condition1 th…

OpenCV 描述子总结

1.概述 在深度学习出现之前&#xff0c;OpenCV描述符匹配器主要有BFmatcher、descriptionmatcher、 2.理论对比 3.代码实现 #include <iostream> #include <opencv2/opencv.hpp>int main(int argc, char** argv) {if(argc ! 2) {std::cerr << "Usage:…

【Java EasyExcel】自定义处理器

工具类 public class ExcelUtils {public static void setValidation(Sheet sheet, DataValidationHelper helper,DataValidationConstraint constraint,CellRangeAddressList addressList,String msgHead, String msgContext) {DataValidation dataValidation helper.create…

vue3 <script setup> 形式父子组件传值

一、父子组件传值 在Vue 3中&#xff0c;选项式api风格下父子组件传值可以通过props进行父组件向子组件传递数据&#xff0c;通过自定义事件&#xff08;$emit&#xff09;实现子组件向父组件传递数据。但是组合式api风格下有所不同。 1、父组件向子组件传值 父组件通过:par…

【pkuseg】由于网络策略组织下载请求,因此直接在github中下载细分领域模型medicine

【pkuseg】由于网络策略组织下载请求&#xff0c;因此直接在github中下载细分领域模型medicine 写在最前面解决方案pkuseg是什么&#xff1f;报错原因报错详情 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2024每日百字篆刻时光&#xff0c;感谢你的陪伴与支持…

黏土滤镜制作方法:探索黏土特效制作的魅力

在数字时代&#xff0c;图像处理已经成为我们生活的一部分&#xff0c;无论是社交媒体上的照片分享&#xff0c;还是专业设计领域的创作&#xff0c;都离不开对图像的精心处理。而黏土滤镜&#xff0c;作为一种独特而富有艺术感的图像处理效果&#xff0c;受到了越来越多人的喜…

时序分解 | Matlab实现LMD局域均值分解

时序分解 | Matlab实现LMD局域均值分解 目录 时序分解 | Matlab实现LMD局域均值分解效果一览基本介绍程序设计参考资料 效果一览 基本介绍 时序分解 | Matlab实现LMD局域均值分解 Matlab语言 1.算法新颖小众&#xff0c;用的人很少&#xff0c;包含分解图 2.直接替换数据即可用…

带头单链表 C++实现

节点定义 带头单链表&#xff1a;我们只需要一个结点指针指向整个链表的第一个节点&#xff0c;这样我们就可以通过next指针访问整个链表内的所有节点 template<class T> struct ListNode {T _val;ListNode* _next;ListNode(const T &val):_val(val),_next(nullptr){…

【C++】继承(菱形继承的深入理解)

在本篇博客中&#xff0c;作者将会带领你深入的理解C中的继承。 注意&#xff01;&#xff01;&#xff01;本篇博客是在32位机器下进行讲解的&#xff0c;64位下会有所不同&#xff0c;但大同小异。 一. 继承的概念及定义 继承的概念 什么是继承&#xff1f;为什么要有继承&…