文章目录
- 一.概念辨析
- 1.什么是消费者?
- 2.服务端为什么要管理消费者?
- 3.怎么管理消费者?
- 4.需要管理生产者吗?
- 二.编写思路
- 1.定义消费者
- 2.定义队列消费者管理管理类
- 3.定义消费者管理类
- 三.代码实践
一.概念辨析
1.什么是消费者?
在服务端的视角,每收到一个订阅队列的请求,就多了一个消费者。在消费客户端同样如此,每订阅一个队列,就多了一个消费者。所以这里的消费者指的不是消费客户端,而是可以从指定队列中取走消息的角色。
一个消费客户端中可以有多个消费者,因为可以同时订阅多个队列。消费者和队列是绑定在一起的,不过要注意,一个消费者对应一个队列,但一个队列可以对应多个消费者,这样就可以负载均衡式选择消费者消费消息
2.服务端为什么要管理消费者?
服务端为什么要有消费者?
消费客户端通过信道订阅了一个队列,你需要把这个操作记录下来,以便后续知道消息要推送给哪个客户端的哪个信道。怎么记录呢?新增一个消费者。所以消费者就是一个表征订阅队列的角色
为什么要管理消费者?
当一个队列中新增消息后,就要选择一个订阅该队列的消费者进行消费,服务端肯定不止一个消费者,所以自然要管理起来
3.怎么管理消费者?
消费者与信道,队列都有关,信道关闭,信道内的消费者也要销毁。队列中有新消息,需要选择一个与关联的消费者推送消息。
所以既可以以队列为单元管理消费者,也可以连接为单元管理,选择哪个呢?
我选择的是以队列为单元管理,因为选择消费者推送消息这个动作很频繁,以队列为单元管理消费者提高消费者查找效率。至于连接,只需在其中记录关联的消费者,信道关闭使用消费者管理句柄删除消费者即可
4.需要管理生产者吗?
不需要,因为服务端不会像对消费者那样主动发送数据,只会对生产者发送响应,所以生产者从哪里给我发来请求,我再从哪里响应回去就行。
二.编写思路
1.定义消费者
成员变量:
- 唯一标识 id
- 订阅的队列名称
- 消息处理的回调函数:服务端的“消费”,指的是把消息从队列中取出来,然后构建响应,发送给消费者所在的消费客户端,这个构建响应并发送的过程由消费者提供的回调函数来完成,这个回调函数,哪个模块创建的消费者,哪个模块负责设置(其实就是后面讲的信道模块)
- 自动应答标志:所谓自动应答就是不需要消费客户端发送 ACK,而是服务端自己把消息发出去后就删除本地消息
2.定义队列消费者管理管理类
成员变量:
- 队列名称
- 消费者的 vector 数组
为什么要选数组,我们说需要为队列负载均衡地选择一个消费者进行消费,采用的方法就是下标轮转,所以要支持随机访问,数组就很合适- 轮转下标
成员方法:- 新增消费者
- 删除消费者
- 获取一个消费者
3.定义消费者管理类
成员变量:
- 队列消费者管理句柄数组
成员方法:- 构造函数:根据传入的队列名数组,初始化队列消费者管理句柄
- 初始化队列消费者管理句柄:新增队列时使用
- 向指定队列新增消费者
- 从指定队列删除消费者:
- 从指定队列获取一个消费者:负载均衡式消费
- 删除队列消费者管理句柄:删除队列时使用
三.代码实践
#pragma once
#include "../common/Log.hpp"
#include "../common/message.pb.h"
#include <functional>
#include <memory>
#include <atomic>
#include <mutex>
#include <vector>
#include <unordered_map>
namespace ns_consumer
{using namespace ns_log;class Consumer;class QueueConsumerManager;using ConsumerPtr = std::shared_ptr<Consumer>;using QueueConsumerManagerPtr = std::shared_ptr<QueueConsumerManager>;using MessagePtr = std::shared_ptr<ns_data::Message>;using ConsumerCallback_t = std::function<void(const std::string& qname, const std::string& consumerId, MessagePtr msgPtr)>;struct Consumer{std::string _id;std::string _qname;ConsumerCallback_t _callback;bool _autoAck;Consumer(const std::string id, const std::string &qname, ConsumerCallback_t callback, bool autoAck): _id(id),_qname(qname),_callback(callback),_autoAck(autoAck){LOG(DEBUG) << "创建消费者: " << _id << endl;}~Consumer(){LOG(DEBUG) << "析构消费者: " << _id << endl;}};class QueueConsumerManager{private:const std::string _qname;std::vector<ConsumerPtr> _consumers;size_t _rotateOrder;std::mutex _mtx;public:QueueConsumerManager(const std::string &qname): _qname(qname),_consumers(),_rotateOrder(0),_mtx(){}/************ 新增消费者* ****************/ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck){std::unique_lock<std::mutex> lck(_mtx);// 判断消费者是否重复for (auto &consumerPtr : _consumers){if (consumerPtr->_id == id){return consumerPtr;}}ConsumerPtr ret = std::make_shared<Consumer>(id, qname, callback, autoAck);_consumers.push_back(ret);return ret;}/*************** 移除消费者* ***************/void removeConsumer(const std::string &cid){std::unique_lock<std::mutex> lck(_mtx);for (auto it = _consumers.begin(); it != _consumers.end(); ++it){if ((*it)->_id == cid){_consumers.erase(it);break;}}}/**************** 负载均衡地获取一个消费者* *************/ConsumerPtr chooseConsumer(){std::unique_lock<std::mutex> lck(_mtx);if (_consumers.size() == 0){return nullptr;}_rotateOrder %= _consumers.size();return _consumers[_rotateOrder++];}};class ConsumerManager{private:std::unordered_map<std::string, QueueConsumerManagerPtr> _qConsumerManagers;std::mutex _mtx;public:ConsumerManager(const std::vector<std::string> &qnames){for (const auto &qname : qnames){_qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);}}/*************** 初始化队列消费者管理句柄--新增队列时调用* **************/void initQueueConsumerManager(const std::string &qname){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname)){return;}_qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);}/******************** 销毁指定的队列消费者管理句柄--销毁队列时调用* ****************/void removeQueueConsumerManager(const std::string &qname){std::unique_lock<std::mutex> lck(_mtx);_qConsumerManagers.erase(qname);}/************** 给指定队列新增消费者* ************/ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname) == 0){LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;return nullptr;}return _qConsumerManagers[qname]->addConsumer(id, qname, callback, autoAck);}/************ 删除指定队列的消费者* ****************/void removeConsumer(const std::string &qname, const std::string &cid){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname) == 0){LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;return;}_qConsumerManagers[qname]->removeConsumer(cid);}/****************** 获取指定队列的一个消费者* **************/ConsumerPtr chooseConsumer(const std::string& qname){std::unique_lock<std::mutex> lck(_mtx);if (_qConsumerManagers.count(qname) == 0){LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;return nullptr;}return _qConsumerManagers[qname]->chooseConsumer();}};
}