【RabbitMQ 项目】服务端:消费者管理模块

文章目录

  • 一.概念辨析
    • 1.什么是消费者?
    • 2.服务端为什么要管理消费者?
    • 3.怎么管理消费者?
    • 4.需要管理生产者吗?
  • 二.编写思路
    • 1.定义消费者
    • 2.定义队列消费者管理管理类
    • 3.定义消费者管理类
  • 三.代码实践

一.概念辨析

1.什么是消费者?

在服务端的视角,每收到一个订阅队列的请求,就多了一个消费者。在消费客户端同样如此,每订阅一个队列,就多了一个消费者。所以这里的消费者指的不是消费客户端,而是可以从指定队列中取走消息的角色。

一个消费客户端中可以有多个消费者,因为可以同时订阅多个队列。消费者和队列是绑定在一起的,不过要注意,一个消费者对应一个队列,但一个队列可以对应多个消费者,这样就可以负载均衡式选择消费者消费消息

2.服务端为什么要管理消费者?

服务端为什么要有消费者?
消费客户端通过信道订阅了一个队列,你需要把这个操作记录下来,以便后续知道消息要推送给哪个客户端的哪个信道。怎么记录呢?新增一个消费者。所以消费者就是一个表征订阅队列的角色
为什么要管理消费者?
当一个队列中新增消息后,就要选择一个订阅该队列的消费者进行消费,服务端肯定不止一个消费者,所以自然要管理起来

3.怎么管理消费者?

消费者与信道,队列都有关,信道关闭,信道内的消费者也要销毁。队列中有新消息,需要选择一个与关联的消费者推送消息。
所以既可以以队列为单元管理消费者,也可以连接为单元管理,选择哪个呢?
我选择的是以队列为单元管理,因为选择消费者推送消息这个动作很频繁,以队列为单元管理消费者提高消费者查找效率。至于连接,只需在其中记录关联的消费者,信道关闭使用消费者管理句柄删除消费者即可

4.需要管理生产者吗?

不需要,因为服务端不会像对消费者那样主动发送数据,只会对生产者发送响应,所以生产者从哪里给我发来请求,我再从哪里响应回去就行。

二.编写思路

1.定义消费者

成员变量:

  1. 唯一标识 id
  2. 订阅的队列名称
  3. 消息处理的回调函数:服务端的“消费”,指的是把消息从队列中取出来,然后构建响应,发送给消费者所在的消费客户端,这个构建响应并发送的过程由消费者提供的回调函数来完成,这个回调函数,哪个模块创建的消费者,哪个模块负责设置(其实就是后面讲的信道模块)
  4. 自动应答标志:所谓自动应答就是不需要消费客户端发送 ACK,而是服务端自己把消息发出去后就删除本地消息

2.定义队列消费者管理管理类

成员变量:

  1. 队列名称
  2. 消费者的 vector 数组
    为什么要选数组,我们说需要为队列负载均衡地选择一个消费者进行消费,采用的方法就是下标轮转,所以要支持随机访问,数组就很合适
  3. 轮转下标
    成员方法:
  4. 新增消费者
  5. 删除消费者
  6. 获取一个消费者

3.定义消费者管理类

成员变量:

  1. 队列消费者管理句柄数组
    成员方法:
  2. 构造函数:根据传入的队列名数组,初始化队列消费者管理句柄
  3. 初始化队列消费者管理句柄:新增队列时使用
  4. 向指定队列新增消费者
  5. 从指定队列删除消费者:
  6. 从指定队列获取一个消费者:负载均衡式消费
  7. 删除队列消费者管理句柄:删除队列时使用

三.代码实践

#pragma once
#include "../common/Log.hpp"
#include "../common/message.pb.h"
#include <functional>
#include <memory>
#include <atomic>
#include <mutex>
#include <vector>
#include <unordered_map>
namespace ns_consumer
{using namespace ns_log;class Consumer;class QueueConsumerManager;using ConsumerPtr = std::shared_ptr<Consumer>;using QueueConsumerManagerPtr = std::shared_ptr<QueueConsumerManager>;using MessagePtr = std::shared_ptr<ns_data::Message>;using ConsumerCallback_t = std::function<void(const std::string& qname, const std::string& consumerId, MessagePtr msgPtr)>;struct Consumer{std::string _id;std::string _qname;ConsumerCallback_t _callback;bool _autoAck;Consumer(const std::string id, const std::string &qname, ConsumerCallback_t callback, bool autoAck): _id(id),_qname(qname),_callback(callback),_autoAck(autoAck){LOG(DEBUG) << "创建消费者: " << _id << endl;}~Consumer(){LOG(DEBUG) << "析构消费者: " << _id << endl;}};class QueueConsumerManager{private:const std::string _qname;std::vector<ConsumerPtr> _consumers;size_t _rotateOrder;std::mutex _mtx;public:QueueConsumerManager(const std::string &qname): _qname(qname),_consumers(),_rotateOrder(0),_mtx(){}/************ 新增消费者* ****************/ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck){std::unique_lock<std::mutex> lck(_mtx);// 判断消费者是否重复for (auto &consumerPtr : _consumers){if (consumerPtr->_id == id){return consumerPtr;}}ConsumerPtr ret = std::make_shared<Consumer>(id, qname, callback, autoAck);_consumers.push_back(ret);return ret;}/*************** 移除消费者* ***************/void removeConsumer(const std::string &cid){std::unique_lock<std::mutex> lck(_mtx);for (auto it = _consumers.begin(); it != _consumers.end(); ++it){if ((*it)->_id == cid){_consumers.erase(it);break;}}}/**************** 负载均衡地获取一个消费者* *************/ConsumerPtr chooseConsumer(){std::unique_lock<std::mutex> lck(_mtx);if (_consumers.size() == 0){return nullptr;}_rotateOrder %= _consumers.size();return _consumers[_rotateOrder++];}};class ConsumerManager{private:std::unordered_map<std::string, QueueConsumerManagerPtr> _qConsumerManagers;std::mutex _mtx;public:ConsumerManager(const std::vector<std::string> &qnames){for (const auto &qname : qnames){_qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);}}/*************** 初始化队列消费者管理句柄--新增队列时调用* **************/void initQueueConsumerManager(const std::string &qname){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname)){return;}_qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);}/******************** 销毁指定的队列消费者管理句柄--销毁队列时调用* ****************/void removeQueueConsumerManager(const std::string &qname){std::unique_lock<std::mutex> lck(_mtx);_qConsumerManagers.erase(qname);}/************** 给指定队列新增消费者* ************/ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname) == 0){LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;return nullptr;}return _qConsumerManagers[qname]->addConsumer(id, qname, callback, autoAck);}/************ 删除指定队列的消费者* ****************/void removeConsumer(const std::string &qname, const std::string &cid){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname) == 0){LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;return;}_qConsumerManagers[qname]->removeConsumer(cid);}/****************** 获取指定队列的一个消费者* **************/ConsumerPtr chooseConsumer(const std::string& qname){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname) == 0){LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;return nullptr;}return _qConsumerManagers[qname]->chooseConsumer();}};
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/55277.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html嵌入百度地图

html嵌入百度地图 key地址 https://lbsyun.baidu.com/apiconsole/key#/home &#xff0c;点进去注册应用、然后复制key换掉即可显示地图 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>百度地图搜索…

C++随心记 续一

C中的模板 在其它语言中如Java或者C#中可能叫做泛型&#xff0c;在C中为模板&#xff0c;泛型的限制通常比模板多。模板可以解决多次的代码重复问题&#xff0c;如以下场景 #include <iostream> #include <string>void print(int value) {std::cout << val…

MySQL存储和处理XML数据

文章目录 一、实战概述二、准备数据三、实战步骤1、创建数据库2、创建数据表3、插入XML数据4、查询XML数据5、修改XML数据6、删除XML数据7、注意事项四、实战小结一、实战概述 MySQL不直接支持XML数据类型,但可以通过TEXT或BLOB类型字段存储XML数据,利用ExtractValue和Update…

Spark Job 对象 详解

在 Apache Spark 中&#xff0c;Job 对象是执行逻辑的核心组件之一&#xff0c;它代表了对一系列数据操作&#xff08;如 transformations 和 actions&#xff09;的提交。理解 Job 的本质和它在 Spark 中的运行机制&#xff0c;有助于深入理解 Spark 的任务调度、执行模型和容…

滚雪球学Oracle[8.3讲]:区块链与Oracle

全文目录&#xff1a; 前言一、Oracle Blockchain的架构与应用1.1 Oracle Blockchain的基本架构1.1.1 Oracle Blockchain的核心组件 1.2 Oracle Blockchain的优势示例&#xff1a;Oracle Blockchain的实际应用 二、区块链与传统数据库的集成2.1 区块链与传统数据库的关系2.2 区…

PWM:控制信号的秘密武器

什么是PWM&#xff1f; PWM&#xff08;Pulse Width Modulation&#xff0c;脉宽调制&#xff09;是一种通过改变信号的占空比来控制电压输出的技术。简单来说&#xff0c;PWM信号由一系列高低电平组成&#xff0c;通过调节高电平持续的时间比例&#xff0c;可以控制信号的平均…

目前最好用的爬虫软件是那个?

作为一名数据工程师&#xff0c;三天两头要采集数据&#xff0c;用过十几种爬虫软件&#xff0c;也用过Python爬虫库&#xff0c;还是建议新手使用现成的软件比较方便。 这里推荐3款不错的自动化爬虫工具&#xff0c;八爪鱼、亮数据、Web Scraper 1. 八爪鱼爬虫 八爪鱼爬虫是一…

ARM base instruction -- ccmp

Conditional Compare (register) sets the value of the condition flags to the result of the comparison of two registers if the condition is TRUE, and an immediate value otherwise. 条件比较&#xff08;寄存器&#xff09;如果条件为真&#xff0c;则将条件标志的值…

Linux:深入理解冯诺依曼结构与操作系统

目录 1. 冯诺依曼体系结构 1.1 结构分析 1.2 存储结构分布图 2. 操作系统 2.1 概念 2.2 如何管理 2.3 什么是系统调用和库函数 1. 冯诺依曼体系结构 1.1 结构分析 不管是何种计算机&#xff0c;如个人笔记本电脑&#xff0c;服务器&#xff0c;都是遵循冯诺依曼结构。…

[leetcode]674_最长连续递增序列

给定一个未经排序的整数数组&#xff0c;找到最长且 连续递增的子序列&#xff0c;并返回该序列的长度。 连续递增的子序列 可以由两个下标 l 和 r&#xff08;l < r&#xff09;确定&#xff0c;如果对于每个 l < i < r&#xff0c;都有 nums[i] < nums[i 1] &am…

可视化图表与源代码显示配置项及页面的动态调整功能分析

可视化图表与源代码显示配置项及页面的动态调整功能分析 文章目录 可视化图表与源代码显示配置项及页面的动态调整功能分析1.分析图表源代码2.分析源代码显示功能**完整代码参考&#xff1a;** 3.分析源代码显示及动态调整**完整代码参考&#xff1a;** 4.分析代码编辑器及运行…

【AI知识点】泛化(Generalization)与过拟合(Overfitting)

泛化&#xff08;generalization&#xff09; 是机器学习中的一个核心概念&#xff0c;指的是模型在训练数据之外的新数据上表现得如何。换句话说&#xff0c;泛化能力衡量的是模型能否在未见过的样本上做出正确的预测或推断。 1. 泛化的背景 当我们训练机器学习模型时&#…

华为云LTS日志上报至观测云最佳实践

华为云LTS简介 华为云云日志服务&#xff08;Log Tank Service&#xff0c;简称 LTS&#xff09;&#xff0c;用于收集来自主机和云服务的日志数据&#xff0c;通过海量日志数据的分析与处理&#xff0c;可以将云服务和应用程序的可用性和性能最大化&#xff0c;为您提供实时、…

vue框架和uniapp框架区别

文章目录 vue框架和uniapp框架区别一、引言二、Vue.js 概述1、Vue.js 简介1.1、特点 2、适用场景 三、Uni-app 概述1、Uni-app 简介1.1、特点 2、适用场景 四、区别与比较1、跨平台能力2、开发体验3、性能优化4、社区和支持 五、总结 vue框架和uniapp框架区别 一、引言 在前端…

基于SSM的爱心慈善公益网站的设计与实现

文未可获取一份本项目的java源码和数据库参考。 选题意义 随着经济的不断进步&#xff0c;发展各种进行公益事业的渠道不断的出现&#xff0c;作为一个礼仪之邦&#xff0c;中华民族一直秉承先人的团结与友善精神&#xff0c;对社会和他人给予帮助关怀。但中国的公益事业相对…

【AIGC】2022-CVPR-利用潜在扩散模型进行高分辨率图像合成

2022-CVPR-High-Resolution Image Synthesis with Latent Diffusion Models 利用潜在扩散模型进行高分辨率图像合成摘要1. 引言2. 相关工作3. 方法3.1. 感知图像压缩3.2. 潜在扩散模型3.3. 调节机制 4. 实验4.1. 关于感知压缩权衡4.2. 利用潜在扩散生成图像4.3. 条件潜在扩散4.…

防sql注入的网站登录系统设计与实现

课程名称 网络安全 大作业名称 防sql注入的网站登录系统设计与实现 姓名 学号 班级 大 作 业 要 求 结合mysql数据库设计一个web登录页面密码需密文存放&#xff08;可以采用hash方式&#xff0c;建议用sha1或md5加盐&#xff09;采用服务器端的验证码&#…

基于Hive和Hadoop的招聘分析系统

本项目是一个基于大数据技术的招聘分析系统&#xff0c;旨在为用户提供全面的招聘信息和深入的职位市场分析。系统采用 Hadoop 平台进行大规模数据存储和处理&#xff0c;利用 MapReduce 进行数据分析和处理&#xff0c;通过 Sqoop 实现数据的导入导出&#xff0c;以 Spark 为核…

OpenCV视频I/O(4)视频采集类VideoCapture之获取异常处理模式函数getExceptionMode()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 getExceptionMode 函数是 OpenCV 中 VideoCapture 类的一个方法&#xff0c;用于获取异常处理模式。这个模式决定了在 VideoCapture 对象操作期间…

英集芯IP5911:集成锂电池充电管理和检测唤醒功能的低功耗8位MCU芯片

英集芯IP5911是一款集成锂电池充电管理、咪头检测唤醒、负载电阻插拔和阻值检测等功能的8bit MCU芯片。其封装采用QFN16&#xff0c;应用时仅需极少的外围器件&#xff0c;就能够有效减小整体方案的尺寸&#xff0c;降低BOM成本&#xff0c;为小型电子设备提供高集成度的解决方…