消息队列(仿RabbitMQ)—— 生产消费模型

        本篇将实现一个3000多行的一个小项目,基于AMQP(高级消息队列协议)的消息队列,主要仿照 RabbitMQ 实现该代码,其本质也是生产消费模型的一个升级版本。实现的功能为:消息发布端将消息发送到服务器端,服务器端自动识别消息的类型,然后决定将消息发送给哪个消息接收端以及是否需要将消息保存起来。

        代码 github 链接:GitHub - jie200408/MyProject: Message queue based on the AMQP model implemented using cpp code

目录如下:

目录

基础架构

模型核心组件 -- 项目中主要存在的对象

Broker主要提供的功能 -- 提供的API接口

交换机类型 -- 特色交换机

持久化 -- 数据保存

网络通信 -- 客户端的网络通信API

消息应答 -- 消息的删除方式

客户端和服务端  -- 整体架构

服务器端

客户端

模块关系图

工具代码实现

协议.proto代码

线程池代码

日志打印代码

其余剩下的工具代码

服务端核心代码实现

交换机管理模块

队列管理模块

绑定管理模块

消息管理模块

虚拟机管理模块

路由匹配模块

消费者管理模块

信道管理模块

连接管理类

服务器代码

Broker服务端模块

客户端核心代码实现

消费者模块

多线程工作模块

信道管理模块

连接管理模块

客户端代码

测试

广播发送测试

直接发送测试

主题匹配测试

基础架构

模型核心组件 -- 项目中主要存在的对象

        既然该项目的底层本质思想是生产者消费者模型,那么该模型中主要存在的角色也就有:生产者(producer)、消费者(consumer)、中间人(broker)、消息的发布(publish)、消息的订阅(subscribe),生产者和消费者的数目不固定,如下:

        生产者的消息通过 Broker 将消息转发给订阅生产者的消费者。在这其中的 Broker 也是最为重要的部分,负责消息的存储,转发,路由等功能。

        而在 AMQP -- 高级消息队列协议模型中,也就是消息中转服务器 Broker,还存在一下概念:

        虚拟机(virtual host):Broker 中的逻辑集合(类似与 MySql 中的数据库),一个 Broker 由一个或者多个 virtual host 组成。

        交换机(exchange):生产者将消息发送到 Broker 中的虚拟机中的 exchange,然后根据特定的转发规则将消息转发给对应的消息队列 queue。

        队列(queue):存储消息的部分,由消费者来决定自己从哪个 queue 上获取消息。

        绑定(binding):exchange 和 queue 之间的关联关系,exchange 和 queue 之间的关系是多对多的(一个 exchange 可以和多个 queue 绑定 可以向多个 queue 发送消息,一个 queue 也可以被多个 exchange 绑定 可以接收多个 exchange 的消息)。

        消息(message):我们需要传递的内容。

        以上组件的关系如下图(图片中的文字较小,可放大看):

Broker主要提供的功能 -- 提供的API接口

        根据上文中的图,本篇中的消息队列中的生成消息和发布消息的逻辑主要为:生产者将生产的消息发送给交换机,交换机通过与队列的绑定以及相关的路由规则,将消息转发给对应的队列,然后队列发送给与之绑定的消费者。所以提供的主要功能有如下:

declareExchange:创建交换机
deleteExchange:删除交换机
declareQueue:创建队列
deleteQueue:删除队列
queueBind:绑定队列
queueUnBind:队列解绑
basicPublis:发布消息
basicAck:消息确认
basicCancel:取消订阅
basicConsume:订阅消息

交换机类型 -- 特色交换机

        对于交换机而言,不同的交换机有着不一样的功能,本篇将交换机实现为如下几种交换机类型:

Direct:直连交换机,当消息的routing_key等于队列的binding_key的时候才匹配转发消息
Fanout:广播交换机,将收到的消息转发到所有与之绑定的队列中
Topic:主题交换机,通过主题判定routing_key与binding_key是否匹配,然后决定如何转发

        理解以上三种方式可以理解为:direct 是直接将消息给某人,fanout则是将消息给所有人,topic将消息给满足条件的人。

持久化 -- 数据保存

        对于在网络中传输的信息以及声明的交换机和队列都会被我们存储起来,只要带有持久化标志,那么就会将其存储在 borker 服务器的本地磁盘中,等待下一次开机或者系统重启的时候,内容不会丢失,对于持久化的数据类型有:exchange、queue、binding、message。

网络通信 -- 客户端的网络通信API

        生产者和消费者本质都是客户端程序,Broker 则是作为服务端程序,服务器和客户端进行通信。在网络通信的过程中,客户端也需要提供对应的 api 来实现对服务器发起请求。需要的 api 接口如下:

newConnection:创建连接
delConnection:关闭连接
openChannel:创建信道
closeChannel:关闭信道
declareExchange:创建交换机
deleteExchange:删除交换机
declareQueue:创建队列
deleteQueue:删除队列
queueBind:绑定队列
queueUnBind:队列解绑
basicPublis:发布消息
basicAck:消息确认
basicCancel:取消订阅
basicConsume:订阅消息

        相比于 Broker 服务器提供的服务中,客户端还提供了 Connection 和 Channel 的操作,其中 Connection 对应的是一个 TCP 连接,Channel 则是 Connection 中的逻辑通道。

        一个 Connection 对应多个 Channel,一个 Channel 对应一个消费者或者生产者,Channel 之间互不干扰。也就是相当于一个 Connection 就管理了多个消费者和生产者之间的通信,达到了长连接的效果,避免频繁的创建和关闭 TCP 连接。

消息应答 -- 消息的删除方式

        消息被传输到消费者之后需要确保消费者真正的拿到了数据,也就是消息需要确保自己被收到,所以才需要消息应答,消息应答同时包括以下两种方式:

        1. 自动应答:消费者只要收到了消息,就算应答完毕,Broker 会直接的删除该消息

        2. 手动应答:消费者手动调用应答接口,Broker 收到请求应答之后,才能真正删除这个消息

        对于如上两种数据消费应答方式,其本质都是需要将被接收的消息给删除掉,不过收到应答的方式更适用于数据可靠性要求较高的场景。

客户端和服务端  -- 整体架构

服务器端

1. 数据管理模块

        在 Broker 服务器端,我们需要传输过来带有持久化标志的数据给保存起来,还有交换机、队列、绑定数据管理三个模块也需要进行数据管理,所以需要数据管理的如下:

交换机数据管理模块
队列数据管理模块
绑定数据管理模块
消息数据管理模块

        对于以上四个模块分别实现数据的管理,也就是增删查,还有持久化的存储。(其中关于消息数据管理模块就是将其存储在当前目录下的二进制文件中,而其余三个则是存储在本地数据库中)

2. 虚拟机数据管理模块

        虚拟机其实就是交换机、队列、绑定、消息四个整合的一个整体,所以对于虚拟机的数据管理模块就是将以上四个模块的合并整理。

3. 交换路由模块

        当一条消息发布到交换机之后,将会由交换机来决定将该消息放入到哪些队列中,也就是对我们的消息进行路由。其中路由主要由两个因素决定,一个是交换机的类型(直接交换、广播交换、主题交换),当直接交换的时候,只有当交换机和队列的 binding_key 和 消息的 routing_key  相等的时候才能发送,广播交换则是直接将消息传递给与交换机绑定的每一个队列中,主题交换则是 binding_key 和 routing_key 满足特定条件的时候才会发送。

 4. 消费者管理模块

        消费者指的是订阅一个队列消息的客户端,一旦当订阅的队列有了消息之后就会推送给这个客户端,在上文中提到的订阅消息的 api 接口也是指的是订阅某个队列,而不是某条消息。

5. 信道管理模块

        一旦当某个客户端想要关闭通信,关闭的不是连接,而是自己对应的通信通道,关闭信道之后我们就需要将客户端的订阅也给取消掉。

6. 连接管理模块

        当一个连接要关闭的时候,我们就应该把连接关联的信道全部关闭。

7. 服务器端broker

        该模块就是将以上所有的模块整合起来,统一的向外提供。

客户端

1. 消费者管理模块

        一个订阅客户端,当订阅一个队列消息的时候,就相当于创建了一个消费者

2. 信道管理模块

        客户端的信道和服务器端的信道是一一对应的,服务器端提供的服务,客户端都有,可以理解为:服务器端向客户端提供服务,客户端向用户提供服务。

3. 连接管理模块

        对于用户来说,所有的服务都是通过信道来完成的,信道在用户的角度就是一个通信信道(不是连接),所以所有的请求都是通过信道来完成的,连接的管理就包含客户端资源的整合。

4. 客户端(订阅客户端/发布客户端)

        订阅客户端和发布客户端都是使用该模块,该模块也是基于以上三个的整合。

模块关系图

        对于以上所有模块的整合关系,如下图(图片文字太小,需要将网页放大到 200% 才可以看清):

工具代码实现

        在实现服务端和客户端的代码前,我们需要先实现一些通用的工具代码,比如日志代码、线程池代码、数据库访问代码、文件访问代码、生成 uuid 代码、字符串分割代码,以及传输信息中的各种协议代码,我将其放入到了一个 MqCommon 目录下统一管理,如下:

        其中的有关传输协议的协议代码,是使用 protobuf 生成的代码,本篇也只会给出生成协议的代码的 .proto 文件,读者可以也可以使用配置的 protobuf 来生成对应的协议代码(本篇使用的是 protobuf 3.14.0 版本,若使用其他版本生成的 protobuf 代码很可能会运行不起来,博主亲自试验过)。

协议.proto代码

        对于 .proto 代码将会使用注释来解释某些变量,如下:

        msg.proto:

syntax = "proto3";package mq;// 交换机类型
enum ExchangeType {UNKNOWNTYPE = 0;// 未知模式DIRECT = 1;     // 直连模式FANOUT = 2;     // 广播模式TOPIC = 3;      // 主题模式
};// 消息传递模式,是否持久化
enum DeliveryMode {UNKNOWNMODE = 0;    // 未知模式UNDURABLE = 1;      // 非持久化DURABLE = 2;        // 持久化
};// 消息的属性
message BasicProperties {string id = 1;                  // 消息idDeliveryMode delivery_mode = 2; // 消息传递模式string routing_key = 3;         // 消息的路由模式
};// 消息的综合定义
message Message {// 消息载荷的定义message Payload {BasicProperties properties = 1; // 消息属性string body = 2;                // 消息正文string vaild = 3;               // 消息是否有效}// 消息载荷Payload payload = 1;// 消息的长度和消息的偏移量,便于解决粘包问题uint32 offset = 2;  uint32 length = 3;
}

        proto.proto:

syntax = "proto3";package mq;import "msg.proto";// 打开信道
message openChannelRequest {string rid = 1;     // 消息idstring cid = 2;     // 信道id
};// 关闭信道
message closeChannelRequest {string rid = 1;     // 消息idstring cid = 2;     // 信道id
};// 声明交换机
message declareExchangeRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring exchange_name = 3;       // 交换机名称ExchangeType exchange_type = 4; // 队列名称bool durable = 5;               // 持久化标志bool auto_delete = 6;           // 是否自动删除标志map<string, string> args = 7;   // 其他参数
};// 删除交换机
message deleteExchangeRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring exchange_name = 3;       // 交换机名称
};// 声明队列
message declareQueueRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring queue_name = 3;          // 队列名称bool exclusive = 4;             // 是否独占标志bool durable = 5;               // 是否持久化标志bool auto_delete = 6;           // 是否自动删除map<string, string> args = 7;   // 其他参数
};// 删除队列
message deleteQueueRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring queue_name = 3;          // 队列名称
};// 交换机-队列绑定
message queueBindRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring exchange_name = 3;       // 交换机名称string queue_name = 3;          // 队列名称string binding_key = 5;         // 绑定属性
};// 交换机-队列取消绑定
message queueUnBindRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring exchange_name = 3;       // 交换机名称string queue_name = 3;          // 队列名称
};// 消息的发布
message basicPublishRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring exchange_name = 3;       // 交换机名称BasicProperties properties = 4; // 消息属性string body = 5;                // 消息正文
};// 消息的确认
message basicAckRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring queue_name = 3;          // 队列名称string message_id = 4;          // 消息id
};// 订阅消息
message basicConsumeRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring queue_name = 3;          // 队列名称string consumer_tag = 4;        // 消费者标识bool auto_ack = 5;              // 自动确认标志
}// 订阅的取消
message basicCancelRequest {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idstring queue_name = 3;          // 队列名称string consumer_tag = 4;        // 消费者标识
};// 消息的推送
message basicConsumeResponce {string cid = 2;                 // 信道idstring consumer_tag = 4;        // 消费者标识string body = 5;                // 消息正文BasicProperties properties = 4; // 消息属性
};// 最常见的响应
message basicCommonResponce {string rid = 1;                 // 消息idstring cid = 2;                 // 信道idbool ok = 3;                    // 收到的消息是否正常
};

线程池代码

        threadpool.hpp:

#ifndef __M_THRPOOL_H__
#define __M_THRPOOL_H__#include <iostream>
#include <vector>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <future>
#include <atomic>namespace mq {class threadpool {private:using func_t = std::function<void(void)>;// 线程入口函数,不断的从任务池中取出任务进行执行void entry() {// 线程的处理,循环一直处理while (!_stop) {std::vector<func_t> temp_taskpool;{std::unique_lock<std::mutex> lock(_mutex);// 等待任务池不为空,或者_stop被置为返回_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });// 取出任务执行temp_taskpool.swap(_taskpool);}for (auto& task : temp_taskpool)task();}}public:using ptr = std::shared_ptr<threadpool>;threadpool(int thread_count = 1) : _stop(false) {for (int i = 0; i < thread_count; i++) _threads.emplace_back(&threadpool::entry, this);}// push函数中传入函数及其对应的参数(可变)// 返回一个future对象,由于future对象我们并不知道其类型是啥,所以需要返回值设置为auto// push内部将函数封装成一个异步任务(packaged_task),同时使用lambda生成一个可调用对象// 然后抛入到任务池中,由线程去执行template <typename F, typename ...Args>auto push(F&& func, Args&& ...args) -> std::future<decltype(func(args...))> {// 1. 将func封装成packaged_taskusing return_type = decltype(func(args...));auto functor = std::bind(std::forward<F>(func), std::forward<Args>(args)...);auto ptask = std::make_shared<std::packaged_task<return_type()>>(functor);std::future<return_type> fu = ptask->get_future();// 2. 将封装好的任务放入到task队列中{std::unique_lock<std::mutex> lock(_mutex);auto task = [ptask](){ (*ptask)(); };_taskpool.push_back(task);// 3. 唤醒一个线程去执行_cv.notify_one();}return fu;}// 等待所有的线程退出void stop() {_stop = true;// 将所有的线程唤醒_cv.notify_all();for (auto& th : _threads)th.join();}~threadpool() {if (_stop == false)stop();}private:std::atomic<bool> _stop;            // 是否停止标志std::vector<std::thread> _threads;  // 多个线程std::mutex _mutex;                  // 锁std::condition_variable _cv;        // 条件变量std::vector<func_t> _taskpool;      // 任务池};
}#endif

日志打印代码

        logger.hpp:

#ifndef __M_LOG_H__
#define __M_LOG_H__#include <iostream>
#include <ctime>namespace mq {#define DEBUG_LEVEL 0#define INFO_LEVEL 1#define ERROR_LEVEL 2#define DEFAULT_LEVEL DEBUG_LEVEL// [current time][current file][line in error]...#define LOG(log_level_str, log_level, format, ...) do {                                                  \if (log_level >= DEFAULT_LEVEL) {                                                                    \time_t t = time(nullptr);                                                                        \struct tm* ptm = localtime(&t);                                                                  \char timestr[32];                                                                                \strftime(timestr, 31, "%H:%M:%S", ptm);                                                          \printf("[%s][%s][%s:%d] " format "", log_level_str, timestr, __FILE__, __LINE__, ##__VA_ARGS__); \}                                                                                                    \} while(0)#define DLOG(format, ...) LOG("DEBUG", DEBUG_LEVEL, format, ##__VA_ARGS__)#define ILOG(format, ...) LOG("INFO", DEBUG_LEVEL, format, ##__VA_ARGS__)#define ELOG(format, ...) LOG("ERROR", DEBUG_LEVEL, format, ##__VA_ARGS__)
}#endif

其余剩下的工具代码

        helper.hpp:

#ifndef __M_HELPER_H__
#define __M_HELPER_H__#include <iostream>
#include <string>
#include <vector>
#include <random>
#include <iomanip>
#include <sstream>
#include <fstream>
#include <atomic>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <sys/stat.h>
#include <sqlite3.h>
#include "logger.hpp"namespace mq {class SqliteHelper {private:// 对数据处理对饮的回调函数typedef int(*SqliteCallback)(void*, int, char**, char**);public:SqliteHelper(const std::string& dbfile): _dbfile(dbfile),_handler(nullptr){}bool open(int safe_lavel = SQLITE_OPEN_FULLMUTEX) {int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_lavel, nullptr);if (ret != SQLITE_OK) {ELOG("创建/打开sqlite失败: %s\n", sqlite3_errmsg(_handler));return false;}return true;}bool exec(const std::string& sql, SqliteCallback cb, void* arg) {int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);if (ret != SQLITE_OK) {ELOG("%s \n, 执行语句失败: %s\n", sql.c_str(), sqlite3_errmsg(_handler));return false;}return true;}void close() {if (_handler)sqlite3_close_v2(_handler);}private:std::string _dbfile;sqlite3* _handler;};class StrHelper {public:static size_t split(const std::string& str, const std::string& sep, std::vector<std::string>& result) {// new....music.#pop...// 该函数的作用就是将字符串按照某一特定的字符分割开size_t pos = 0, index = 0;while (index < str.size()) {pos = str.find(sep, index);if (pos == std::string::npos) {// 最后一次没有找到std::string tmp = str.substr(index);result.push_back(std::move(tmp));return result.size();}if (index == pos) {index = pos + sep.size();continue;}std::string tmp = str.substr(index, pos - index);result.push_back(std::move(tmp));index = pos + sep.size();}return result.size();}        };class UUIDHelper {public:static std::string uuid() {// uuid的数据格式为一个8-4-4-4-12的16进制字符串,如:7a91a05f-52a1-6a01-0000-000000000001std::random_device rd;// 使用一个机器随机数作为伪随机数的种子// 机器数:使用硬件生成的一个数据,生成效率较低std::mt19937_64 generator(rd());// 生成的数据范围为0~255std::uniform_int_distribution<int> distribution(0, 255);std::stringstream ss;for (int i = 0; i < 8; i++) {ss << std::setw(2) << std::setfill('0') << std::hex << distribution(generator);if (i == 3 || i == 5 || i == 7)ss << "-";}// std::cout << ss.str() << std::endl;static std::atomic<size_t> seq(1);size_t num = seq.fetch_add(1);for (int i = 7; i >= 0; i--) {ss << std::setw(2) << std::setfill('0') << std::hex << (num >> (i * 8));if (i == 6)ss << "-";}     return ss.str();   }};    class FileHelper {public:FileHelper(const std::string& filename): _filename(filename){}bool exists() {struct stat st;return (stat(_filename.c_str(), &st) == 0);}size_t size() {// 获取当前文件的大小struct stat st;if (!exists()) return 0;stat(_filename.c_str(), &st);return st.st_size;}// 从offset位置开始读取len长度的内容bool read(char* body, size_t offset, size_t len) {std::ifstream ifs(_filename.c_str(), std::ios::binary | std::ios::in);if (!ifs.is_open()) {ELOG("%s, 打开文件失败\n", _filename.c_str());return false;}ifs.seekg(offset, std::ios::beg);ifs.read(body, len);if (!ifs.good()) {ELOG("%s, 文件读取失败\n", _filename.c_str());ifs.close();return false;}ifs.close();return true;}// 读取所有数据到body中bool read(std::string& body) {size_t filesize = this->size();body.resize(filesize);return this->read(&body[0], 0, filesize);}// 从文件offset位置写入len长度的内容bool write(const char* body, size_t offset, size_t len) {std::fstream fs(_filename.c_str(), std::ios::binary | std::ios::in | std::ios::out);if (!fs.is_open()) {ELOG("%s, 打开文件失败\n", _filename.c_str());return false;}     fs.seekp(offset, std::ios::beg);fs.write(body, len);if (!fs.good()) {ELOG("%s, 文件写入失败\n", _filename.c_str());fs.close();return false;}           fs.close();return true;       }// 写入body的所有数据bool write(const std::string& body) {return this->write(body.c_str(), 0, body.size());}// 文件重命名bool rename(const std::string& new_name) {std::string old_name(_filename);int ret = ::rename(old_name.c_str(), new_name.c_str());if (ret == 0) {_filename = new_name;return true;} else {return false;}}static bool createFile(const std::string& filename) {// 只有当创建的文件具有写属性的时候才会创建,若只有读属性则不可以创建std::ofstream ofs(filename.c_str(), std::ios::binary | std::ios::out);if (!ofs.is_open()) {ELOG("%s, 文件创建失败:%s\n", filename.c_str(), strerror(errno));return false;}    ofs.close();return true;        }static bool removeFile(const std::string& filename) {return (::remove(filename.c_str()) == 0);}static bool createDirectory(const std::string& path) {// "aaa/ccc/sss/qwqw"size_t pos = 0, index = 0;std::string sep = "/";while (index < path.size()) {pos = path.find(sep, index);if (pos == std::string::npos) {int ret = ::mkdir(path.c_str(), 0775);if (ret != 0) {return false;} else {return true;}}std::string sub_path = path.substr(0, pos);int res = ::mkdir(sub_path.c_str(), 0775);if (res != 0 && errno != EEXIST) {ELOG("%s 创建目录失败:%s\n", sub_path.c_str(), strerror(errno));return false;}index = pos + sep.size();}return true;}static bool removeDirectory(const std::string& path) {// 使用指令删除目录std::string cmd = "rm -rf " + path;return (::system(cmd.c_str()) != 0);}static std::string parentDirectory(const std::string& file_path) {// "aaa/ccc/sss/qwqw"size_t pos = file_path.find_last_of("/");if (pos == std::string::npos)return "./";return file_path.substr(0, pos);}private:std::string _filename;};
}#endif

服务端核心代码实现

        这里加上将会实现本篇中最为重要的代码,实现了上文中提到关于服务端的所有代码

交换机管理模块

        对于交换机的管理模块,首先要知道我们要管理的有哪些数据,如下:

交换机数据管理:1. 交换机名称:唯一标识2. 交换机类型:决定消息的转发方式直接交换广播交换主题交换3. 持久化标志:决定当前交换机信息是否需要持久化存储起来4. 自动删除标志:指的是关联了当前交换机的所有客户端都退出,是否删除交换机5. 交换机的其他参数:通常记录一些关于交换机的各种属性,不过本篇并未实现其相关的一些功能交换机的持久化管理:存储到sqlite数据库中1. 创建数据库表2. 删除数据库表3. 向表中插入交换机4. 向表中移除交换机5. 从表中恢复所有持久化的交换机对交换机的管理操作:1. 创建交换机2. 删除交换机:删除的同时需要将绑定的队列也给删除掉    3. 获取指定名称交换机4. 获取当前交换机的数量

        代码如下,对于其中实现的细节,将在注释中给出

        exchange.hpp:

#ifndef __M_EXCHANGE_H__
#define __M_EXCHANGE_H__#include "../MqCommon/logger.hpp"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/msg.pb.h"
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <sstream>
#include <vector>namespace mq {// 交换机类struct Exchange {using ptr = std::shared_ptr<Exchange>;std::string name;                                      // 交换机名称ExchangeType type;                                     // 交换机类型bool durable;                                          // 是否继续持久化管理bool auto_delete;                                      // 是否自动删除google::protobuf::Map<std::string, std::string> args;  // 交换机的其他参数Exchange() {}Exchange(const std::string& ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map<std::string, std::string>& eargs): name(ename),type(etype),durable(edurable),auto_delete(eauto_delete),args(eargs){}// 设置args中的参数,类型为:"key=value&key=value....",相当于将数据反序列化void setArgs(const std::string& args_str) {if (args_str.empty()) return;std::vector<std::string> kvs;StrHelper::split(args_str, "&", kvs);for (auto& kv : kvs) {// 现在将key,value放入到map中size_t pos = kv.find('=');std::string key = kv.substr(0, pos);std::string value = kv.substr(pos + 1);args[key] = value;}}// 将数据序列化std::string getArgs() {std::string result;for (auto& it : args) {std::string kv = it.first + "=" + it.second + "&";result += kv;}return result;}};// 交换机持久化管理类,数据存储在sqlite数据库中class ExchangeMapper {private:// 对应的sql语句#define SQL_DELETE_EM "drop table if exists exchange_table;"#define SQL_INSERT_EM "insert into exchange_table values ('%s', %d, %d, %d, '%s');"#define SQL_REMOVE_EM "delete from exchange_table where name="#define SQL_CREATE_EM "create table if not exists exchange_table (  \name varchar(32) primary key,                                   \type int,                                                       \durable int,                                                    \auto_delete int,                                                \args varchar(128)                                               \);"#define SQL_SELECT_EM "select name, type, durable, auto_delete, args from exchange_table;"// 持久化的交换机恢复数据时的回调函数static int selectCallback(void* args, int numcol, char** row, char** fields) {ExchangeMap* exchange_map = static_cast<ExchangeMap*>(args);Exchange::ptr exchange = std::make_shared<Exchange>();exchange->name = row[0];exchange->type = (ExchangeType)std::stoi(row[1]);exchange->durable = (bool)std::stoi(row[2]);exchange->auto_delete = (bool)std::stoi(row[3]);if (row[4]) exchange->setArgs(row[4]);exchange_map->insert(std::make_pair(exchange->name, exchange));return 0;}public:using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;ExchangeMapper(const std::string& dbfile): _sql_helper(dbfile){// 先获取dbfile上级文件路径std::string parent_path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(parent_path);// 将数据库打开assert(_sql_helper.open());createTable();}void createTable() {int ret = _sql_helper.exec(SQL_CREATE_EM, nullptr, nullptr);if (ret == false) {ELOG("表格创建失败\n");abort();}}void removeTable() {// 删除表格int ret = _sql_helper.exec(SQL_DELETE_EM, nullptr, nullptr);if (ret == false) {ELOG("表格删除失败\n");abort();}            }bool insert(const Exchange::ptr& exchange) {char buff[256];// 向sql语句中写入相关参数int n = snprintf(buff, sizeof(buff) - 1, SQL_INSERT_EM, (char*)exchange->name.c_str(),exchange->type,exchange->durable,exchange->auto_delete,(char*)exchange->getArgs().c_str());buff[n] = 0;std::string cmd(buff);cmd += ";";return _sql_helper.exec(cmd, nullptr, nullptr);}void remove(const std::string& name) {std::stringstream ss;ss << SQL_REMOVE_EM << "'" << name << "'" << ";";int ret = _sql_helper.exec(ss.str(), nullptr, nullptr);if (ret == false) ELOG("删除交换机数据失败\n");            }ExchangeMap recovery() {ExchangeMap result;int ret = _sql_helper.exec(SQL_SELECT_EM, selectCallback, (void*)(&result));return result;}private:SqliteHelper _sql_helper;};// 交换机管理类class ExchangeManager {public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string& dbfile): _mapper(dbfile){// 从底层数据库中直接恢复数据_exchanges = _mapper.recovery();}// 声明交换机,增加一个交换机bool declareExchange(const std::string& name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args) {// 需要先加锁std::unique_lock<std::mutex> lock(_mutex);// 判断当前需要插入的数据是否已经存在,若存在则我们不需要插入auto it = _exchanges.find(name);if (it != _exchanges.end())return true;Exchange::ptr exchange = std::make_shared<Exchange>(name, type, durable, auto_delete, args);// 判断当前的数据是否需要持久化处理if (durable == true) {// 若需要持久化则直接交换机信息放入到持久化管理类中int ret = _mapper.insert(exchange);if (ret == false)return false;}_exchanges[name] = exchange;return true;}// 查找一个交换机Exchange::ptr selectExchange(const std::string& name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);// 当没有找到时直接返回nullif (it == _exchanges.end())return std::make_shared<Exchange>();return it->second;}// 删除交换机void deleteExchange(const std::string& name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);// 若当前的数据中不存在对应的交换机的时候,直接返回if (it == _exchanges.end())return;// 删除数据需要删除交换机存储和持久化交换机管理if (it->second->durable)_mapper.remove(it->first);_exchanges.erase(it->first);}bool exists(const std::string& name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end())return false;else    return true;}size_t size() {std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}void clear() {std::unique_lock<std::mutex> lock(_mutex);// 删除持久化数据管理中的表格_mapper.removeTable();_exchanges.clear();}private:std::mutex _mutex;                                          // 防止出现线程安全问题ExchangeMapper _mapper;                                     // 交换机持久化数据管理std::unordered_map<std::string, Exchange::ptr> _exchanges;  // 所有的交换机及其对应的名称};
}#endif

队列管理模块

        队列作为与交换机相绑定,与用户连接的信道直接相连模块,同样也是一个很重要的模块,其实现的思想大致与交换机一致。对于队列管理需要的变量如下:

队列数据管理类1. 队列名称2. 持久化标志3. 独占标志:标识当前队列被独占,只向一个消费者发送消息4. 自动删除标志5. 其他参数队列持久化管理类1. 创建队列数据库表2. 移除队列数据库表3. 向表中插入数据4. 向表中删除数据5. 从表中恢复所有持久化的数据队列管理类1. 声明队列:持久化标志为true还需将其存储到数据库表中2. 删除队列:删除的队列持久化标志为true还需从数据库表中删除3. 查询队列:根据队列名称获取指定的队列4. 获取所有队列:后面需要获取所有队列从而恢复队列中持久化的数据

        代码如下:

        queue.hpp:

#ifndef __M_QUEUE_H__
#define __M_QUEUE_H__#include "../MqCommon/logger.hpp"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/msg.pb.h"
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <sstream>
#include <vector>namespace mq {struct MsgQueue {std::string name;                                       // 消息队列的名称bool durable;                                           // 持久化标志bool exclusive;                                         // 独占标志bool auto_delete;                                       // 自动删除标志google::protobuf::Map<std::string, std::string> args;   // 其他参数,类型使用google中的map,与protobuf生成的文件相对应using ptr = std::shared_ptr<MsgQueue>;MsgQueue() {}MsgQueue(const std::string& msg_name, bool msg_durable, bool msg_exclusive, bool msg_auto_delete, const google::protobuf::Map<std::string, std::string>& msg_args): name(msg_name),durable(msg_durable),exclusive(msg_durable),auto_delete(msg_auto_delete),args(msg_args){}// 设置args中的参数,类型为:"key=value&key=value....",相当于将数据反序列化void setArgs(const std::string& args_str) {if (args_str.empty()) return;std::vector<std::string> kvs;StrHelper::split(args_str, "&", kvs);for (auto& kv : kvs) {// 现在将key,value放入到map中size_t pos = kv.find('=');std::string key = kv.substr(0, pos);std::string value = kv.substr(pos + 1);args[key] = value;}}// 将数据序列化std::string getArgs() {std::string result;for (auto& it : args) {std::string kv = it.first + "=" + it.second + "&";result += kv;}return result;}    };class MsgQueueMapper {private:// 对应的持久化sqk语句#define SQL_DELETE_MQM "drop table if exists queue_table;"#define SQL_INSERT_MQM "insert into queue_table values ('%s', %d, %d, %d, '%s');"#define SQL_REMOVE_MQM "delete from queue_table where name="#define SQL_CREATE_MQM "create table if not exists queue_table (        \name varchar(32) primary key,                                   \durable int,                                                    \exclusive int,                                                  \auto_delete int,                                                \args varchar(128)                                               \);"#define SQL_SELECT_MQM "select name, durable, exclusive, auto_delete, args from queue_table;"static int selectCallback(void* args, int numcol, char** row, char** fields) {MsgQueueMap* queue_map = static_cast<MsgQueueMap*>(args);MsgQueue::ptr queue = std::make_shared<MsgQueue>();queue->name = row[0];queue->durable = (bool)std::stoi(row[1]);queue->exclusive = (bool)std::stoi(row[2]);queue->auto_delete = (bool)std::stoi(row[3]);if (row[4]) queue->setArgs(row[4]);queue_map->insert(std::make_pair(queue->name, queue));return 0;}public:using MsgQueueMap = std::unordered_map<std::string, MsgQueue::ptr>;MsgQueueMapper(const std::string& dbname): _sql_helper(dbname){// 先获取dbfile上级文件路径std::string parent_path = FileHelper::parentDirectory(dbname);// 创建父级目录FileHelper::createDirectory(parent_path);assert(_sql_helper.open());createTable();}void createTable() {int ret = _sql_helper.exec(SQL_CREATE_MQM, nullptr, nullptr);if (ret == false) {ELOG("表格创建失败\n");abort();}            }void removeTable() {int ret = _sql_helper.exec(SQL_DELETE_MQM, nullptr, nullptr);if (ret == false) {ELOG("表格删除失败\n");abort();}                  }bool insert(MsgQueue::ptr& msgqueue) {char buff[256];int n = snprintf(buff, sizeof(buff) - 1, SQL_INSERT_MQM, (char*)msgqueue->name.c_str(),msgqueue->durable,msgqueue->exclusive,msgqueue->auto_delete,(char*)msgqueue->getArgs().c_str());buff[n] = 0;std::string sql_insert(buff);sql_insert += ";";return _sql_helper.exec(sql_insert, nullptr, nullptr);}void remove(const std::string& name) {std::stringstream sql_remove;sql_remove << SQL_REMOVE_MQM;sql_remove << "'" << name << "'";int ret = _sql_helper.exec(sql_remove.str(), nullptr, nullptr);if (ret == false)ELOG("删除消息队列数据失败\n");}// 恢复所有持久化的数据MsgQueueMap recovery() {MsgQueueMap result;int ret = _sql_helper.exec(SQL_SELECT_MQM, selectCallback, (void*)(&result));return result;}private:// sqlite数据管理句柄SqliteHelper _sql_helper;};class MsgQueueManager {public:using ptr = std::shared_ptr<MsgQueueManager>;MsgQueueManager(const std::string& dbfile): _mapper(dbfile){_msg_queues = _mapper.recovery();}size_t size() {std::unique_lock<std::mutex> lock(_mutex);return _msg_queues.size();}bool exists(const std::string& name) {auto it = _msg_queues.find(name);if (it == _msg_queues.end())return false;elsereturn true;}void clear() {_mapper.removeTable();_msg_queues.clear();}bool declareQueue(const std::string& msg_name, bool msg_durable, bool msg_exclusive, bool msg_auto_delete, const google::protobuf::Map<std::string, std::string>& msg_args) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(msg_name);// 若已经存在,则不用插入if (it != _msg_queues.end())return true;MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = msg_name;mqp->durable = msg_durable;mqp->auto_delete = msg_auto_delete;mqp->args = msg_args;mqp->exclusive = msg_exclusive;if (msg_durable == true) {int ret = _mapper.insert(mqp);if (ret == false)return false;}_msg_queues[mqp->name] = mqp;return true;}void deleteQueue(const std::string& name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(name);// 若当前队列中已经没有了该队列if (it == _msg_queues.end())return;if (it->second->durable)_mapper.remove(name);_msg_queues.erase(it->first);}MsgQueue::ptr selectQueue(const std::string& name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(name);// 若当前队列中已经没有了该队列if (it == _msg_queues.end())return MsgQueue::ptr();return _msg_queues[name];}MsgQueueMapper::MsgQueueMap allQueue() {// 直接返回队列的映射return _msg_queues;}private:std::mutex _mutex;MsgQueueMapper _mapper;MsgQueueMapper::MsgQueueMap _msg_queues;};
}#endif

绑定管理模块

        绑定是队列和交换机之间的关系,其主要主要作用是消息路由时与绑定关系进行匹配,决定是否将消息转发到队列中。

        绑定管理模块主要包括以下管理句柄和功能:

绑定数据管理类1. 交换机名称    2. 队列名称3. 绑定键值 --> binding_key绑定信息持久化管理类1. 创建/删除数据库表2. 插入/删除表中的绑定关系3. 移除交换机的绑定关系4. 移除队列的绑定关系5. 恢复数据库中所有的绑定关系绑定管理类1. 建立交换机和队列的绑定2. 移除交换机和队列的绑定3. 移除指定交换机的绑定4. 移除指定队列的绑定5. 获取指定的绑定6. 获取指定交换机的绑定信息

        绑定关系实现的代码如下:

        binding.hpp:

#ifndef __M_BINDING_H__
#define __M_BINDING_H__#include "../MqCommon/logger.hpp"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <sstream>
#include <vector>namespace mq { class Binding {public:using ptr = std::shared_ptr<Binding>;std::string exchange_name;      // 交换机名称std::string msgqueue_name;      // 队列名称std::string binding_key;        // 绑定键值Binding() {}Binding(const std::string& ename, const std::string& qname, const std::string key): exchange_name(ename),msgqueue_name(qname),binding_key(key){}};// 一个msgqueue可以找到对应的绑定信息using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;// 一个交换机就可以找到对应的所有绑定的队列信息using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;class BindingMapper {private:// 持久化数据管理sql语句#define SQL_DELETE_BM "drop table if exists binding_table;"#define SQL_INSERT_BM "insert into binding_table values ('%s', '%s', '%s');"#define SQL_REMOVE_BM "delete from binding_table where exchange_name='%s' and '%s';"#define SQL_REMOVE_EXCHANGE_BM "delete from binding_table where exchange_name='%s';"#define SQL_REMOVE_MSGQUEUE_BM "delete from binding_table where msgqueue_name='%s';"#define SQL_CREATE_BM "create table if not exists binding_table (      \exchange_name varchar(32),                                      \msgqueue_name varchar(32),                                      \binding_key varchar(128)                                        \);"#define SQL_SELECT_BM "select exchange_name, msgqueue_name, binding_key from binding_table;"// 持久化恢复数据持久化回调函数static int selectCallback(void* args, int numcol, char** row, char** fields) {BindingMap* result = static_cast<BindingMap*>(args);Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);MsgQueueBindingMap& mqbp = (*result)[bp->exchange_name];mqbp[bp->msgqueue_name] = bp;return 0;}public:BindingMapper(const std::string& dbfile): _sql_helper(dbfile){std::string parent_path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(parent_path);assert(_sql_helper.open());createTable();}void createTable() {int ret = _sql_helper.exec(SQL_CREATE_BM, nullptr, nullptr);if (ret == false) {ELOG("表格创建失败\n");abort();}}void removeTable() {int ret = _sql_helper.exec(SQL_DELETE_BM, nullptr, nullptr);if (ret == false) {ELOG("表格删除失败\n");abort();}       }bool insert(Binding::ptr& binding) {char buff[256];int n = snprintf(buff, sizeof(buff) - 1, SQL_INSERT_BM, (char*)binding->exchange_name.c_str(), (char*)binding->msgqueue_name.c_str(), (char*)binding->binding_key.c_str());buff[n] = 0;std::string insert_sql(buff);return _sql_helper.exec(insert_sql, nullptr, nullptr);}void remove(const std::string& ename, const std::string& qname) {char buff[256];int n = snprintf(buff, sizeof(buff) - 1, SQL_REMOVE_BM, ename.c_str(), qname.c_str());buff[n] = 0;std::string remove_sql(buff);_sql_helper.exec(remove_sql, nullptr, nullptr);}// 移除绑定关系void removeExchangeBindings(const std::string& ename) {char buff[256];int n = snprintf(buff, sizeof(buff) - 1, SQL_REMOVE_EXCHANGE_BM, ename.c_str());buff[n] = 0;std::string remove_sql(buff);_sql_helper.exec(remove_sql, nullptr, nullptr);        }void removeMsgQueueBindings(const std::string& qname) {char buff[256];int n = snprintf(buff, sizeof(buff) - 1, SQL_REMOVE_MSGQUEUE_BM, qname.c_str());buff[n] = 0;std::string remove_sql(buff);_sql_helper.exec(remove_sql, nullptr, nullptr);  }BindingMap recovery() {BindingMap result;_sql_helper.exec(SQL_SELECT_BM, selectCallback, (void*)(&result));return result;}private:SqliteHelper _sql_helper;};class BindingManager {public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string& dbfile): _mapper(dbfile){_bindings = _mapper.recovery();}bool bind(const std::string& ename, const std::string& qname, const std::string& key, bool durable) {std::unique_lock<std::mutex> lock(_mutex);// 需要先检查是否已经绑定auto eit = _bindings.find(ename);if (eit != _bindings.end() && eit->second.find(qname) != eit->second.end())return true;// 创建对应的MsgQueueMapMsgQueueBindingMap& mqbp = _bindings[ename];Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);if (durable) {bool ret = _mapper.insert(bp);if (ret == false)return false;}mqbp.insert(std::make_pair(qname, bp));return true;}void unBind(const std::string& ename, const std::string& qname) {std::unique_lock<std::mutex> lock(_mutex);// 先查找当前绑定中是否存在这两个信息auto eit = _bindings.find(ename);if (eit == _bindings.end())return;auto qit = eit->second.find(qname);if (qit == eit->second.end())return;// 现在删除MsgQueueBindingMap& mqbp = _bindings[ename];_mapper.remove(ename, qname);mqbp.erase(qname);}void removeExchangeBindings(const std::string& ename) {std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;// 现在遍历mqbp进行删除_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}void removeMsgQueueBindings(const std::string& qname) {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgQueueBindings(qname);// 开始循环交换机映射的绑定for (auto& eit : _bindings) {// 在每个交换机中寻找与其绑定的消息队列MsgQueueBindingMap& mqbp = eit.second;auto qit = mqbp.find(qname);if (qit == mqbp.end())continue;// // 删除该对应的绑定信息// std::string ename = eit.first;// _mapper.remove(ename, qname);mqbp.erase(qname);}}Binding::ptr getBinding(const std::string& ename, const std::string& qname) {std::unique_lock<std::mutex> lock(_mutex);// 先查找当前绑定中是否存在这两个信息auto eit = _bindings.find(ename);if (eit == _bindings.end())return Binding::ptr();auto qit = eit->second.find(qname);if (qit == eit->second.end())return Binding::ptr();return qit->second;                  }MsgQueueBindingMap getExchangeBindings(const std::string& ename) {std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return MsgQueueBindingMap();return eit->second;}size_t size() {std::unique_lock<std::mutex> lock(_mutex);size_t total = 0;for (auto eit : _bindings)total += eit.second.size();return total;}bool exists(const std::string& ename, const std::string& qname) {std::unique_lock<std::mutex> lock(_mutex);// 先查找当前绑定中是否存在这两个信息auto eit = _bindings.find(ename);if (eit == _bindings.end())return false;auto qit = eit->second.find(qname);if (qit == eit->second.end())return false;return true;           }void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_bindings.clear();}private:std::mutex _mutex;BindingMapper _mapper;BindingMap _bindings;};
}#endif

消息管理模块

        消息的属性包含消息的 id(消息的唯一化标识),持久化标识,以及 routing_key(决定发布到交换机之后,根据绑定 binding_key 决定是否发布到指定队列中) 。

        数据发送的格式在之前的 protobuf 文件中已经给出,本篇对于数据的管理也是数据的存储以及数据的发送和接收的相关函数接口,对于发送信息管理需要存在的接口:

发送消息的持久化管理类:    1. 创建数据保存文件2. 移除数据文件3. 向问价中插入消息4. 从文件中删除数据5. 对数据进行垃圾回收:当文件中的无效数据达到某一比例的时候,将无效数据从文件中删除队列消息管理类:存储在队列中的数据1. 向队列中插入数据:同时还需判读持久化标志,是否需要将数据给持久化到文件中2. 根据消息id从队列中移除数据:本质是将数据给置为无效信息,然后继续垃圾回收3. 获取队首信息:获取队首信息的目的是拿出数据发送出去,然后等待确认4. 恢复所有的数据到当前队列中消息管理类:1. 初始化消息队列2. 移除消息队列3. 向队列中插入消息4. 确认消息:确认已经发送出去的消息,其本质就是将消息给删除5. 获取队列首部消息

        message.hpp:

#ifndef __M_MESSAGE_H__
#define __M_MESSAGE_H__#include "../MqCommon/logger.hpp"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <list>namespace mq {// 存储数据文件的后缀#define DATAFILE_SUBFIX ".mqd"#define TEMPFILE_SUBFIX ".mqd.tmp"using MessagePtr = std::shared_ptr<mq::Message>;class MessageMapper {private:bool insert(MessagePtr& msg, const std::string& filename) {FileHelper helper(filename);// 将数据序列化std::string body = msg->payload().SerializeAsString();size_t offset;offset = helper.size();// 先写入数据的长度,8字节长度,然后在写入消息size_t msg_size = body.size();bool ret = helper.write((char*)(&msg_size), offset, sizeof(size_t));if (ret == false) {ELOG("写入数据长度失败\n");return false;}// 将数据存放到适当的位置ret = helper.write(body.c_str(), offset + sizeof(size_t), body.size());if (ret == false) {ELOG("写入数据失败\n");return false;}// 更新数据中存储的位置,以及数据的长度msg->set_offset(offset + sizeof(size_t));msg->set_length(body.size());return true;}// 从文件中加载之前存储的数据bool load_data(std::list<MessagePtr>& result) {// 先将文件打开,然后逐一从文件中取出数据FileHelper helper(_datafile);size_t offset = 0;size_t filesize = helper.size();while (offset < filesize) {// 从文件中读出数据bool ret;size_t length = 0;ret = helper.read((char*)(&length), offset, sizeof(size_t));if (ret == false) {ELOG("读取数据长度失败\n");return false;}offset += sizeof(size_t);std::string body(length, '\0');ret = helper.read((char*)(&body[0]), offset, length);if (ret == false) {ELOG("读取数据失败\n");return false;}MessagePtr msgp = std::make_shared<Message>();// 对数据载荷进行反序列化msgp->mutable_payload()->ParseFromString(body);offset += length;if (msgp->payload().vaild() == "0") {DLOG("加载到无效数据: %s\n", msgp->payload().body().c_str());continue;}result.emplace_back(msgp);}return true;}bool createMsgFile() {// 只有当当前文件不存在的时候才创建,否则会导致文件被重新刷新丢失数据if (FileHelper(_datafile).exists() == true)return true;bool ret = FileHelper::createFile(_datafile);if (ret == false) {ELOG("创建 %s 文件失败\n", _datafile.c_str());return false;}return true;}public:MessageMapper(const std::string& databasedir, const std::string& qname): _qname(qname){std::string basedir(databasedir);if (basedir.back() != '/') basedir += '/';_datafile = basedir + _qname + DATAFILE_SUBFIX;_tempfile = basedir + _qname + TEMPFILE_SUBFIX;// 只有当当前目录不存在的时候才创建对应的目录// DLOG("当前目录为: %s\n", basedir.c_str());if (FileHelper(basedir).exists() == false)assert(FileHelper::createDirectory(basedir));assert(this->createMsgFile());}bool removeMsgFile() {bool ret = FileHelper::removeFile(_datafile);if (ret == false) {ELOG("移除 %s 文件失败\n", _datafile.c_str());return false;}return true;            }bool insert(MessagePtr& msg) {return this->insert(msg, _datafile);}bool remove(MessagePtr& msg) {// 将数据从文件中删除,删除数据就是将数据的有效位置设置为"0"msg->mutable_payload()->set_vaild("0");std::string body = msg->payload().SerializeAsString();if (body.size() != msg->length()) {ELOG("删除数据失败\n");return false;}// 现在将数据重新写入文件原来的位置FileHelper helper(_datafile);bool ret = helper.write(body.c_str(), msg->offset(), body.size());if (ret == false) {ELOG("覆盖原来数据失败\n");return false;}return true;}// 对我们的数据进行垃圾回收std::list<MessagePtr> gc() {// 1. 先加载数据,拿到有效的数据std::list<MessagePtr> result;bool ret = this->load_data(result);if (ret == false) {ELOG("加载数据失败\n");return result;}// 2. 将有效的数据存放到临时文件中// 先创建临时文件FileHelper::createFile(_tempfile);for (auto& msg : result) {ret = this->insert(msg, _tempfile);if (ret == false) {ELOG("插入数据失败\n");return result;}}// 3. 删除源文件ret = FileHelper::removeFile(_datafile);if (ret == false) {ELOG("删除源文件失败\n");return result;}// 4. 临时文件重命名FileHelper helper(_tempfile);ret = helper.rename(_datafile);if (ret == false) ELOG("重命名临时文件失败\n");return result;}private:std::string _qname;std::string _datafile;std::string _tempfile;};class QueueMessage {private:bool gcCheck() {// 当数据达到两千条,并且有效数据低于50%的时候需要进行垃圾回收if (_total_count > 2000 && _valid_count * 10 / _total_count < 5)return true;elsereturn false;}void gc() {if (this->gcCheck() == false) return;// 现在开始进行垃圾回收std::list<MessagePtr> msgs = _mapper.gc();for (auto& msg : msgs) {auto it = _durable_msgs.find(msg->payload().properties().id());if (it == _durable_msgs.end()) {ELOG("有一条消息没有被持久化管理\n");msgs.push_back(msg);_durable_msgs[msg->payload().properties().id()] = msg;continue;}// 需要重新设置消息的偏移量和长度it->second->set_offset(msg->offset());it->second->set_length(msg->length());}_valid_count = msgs.size();_total_count = msgs.size();}public:using ptr = std::shared_ptr<QueueMessage>;QueueMessage(const std::string& basedir, const std::string& qname): _qname(qname),_mapper(basedir, qname),_valid_count(0),_total_count(0){}void recovery() {// 恢复消息std::unique_lock<std::mutex> lock(_mutex);_msgs = _mapper.gc();for (auto& msg : _msgs) {_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));DLOG("恢复消息:%s\n", msg->payload().body().c_str());}_valid_count = _msgs.size();_total_count = _msgs.size();}bool insert(const BasicProperties* bp, const std::string& body, bool queue_durable) {// 1.先构造对应的消息MessagePtr msg = std::make_shared<Message>();if (bp == nullptr) {DeliveryMode mode = queue_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(std::string());} else {DeliveryMode mode = queue_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());                }msg->mutable_payload()->set_body(body);std::unique_lock<std::mutex> lock(_mutex);// 2.判断消息是否需要持久化if (msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {// 将消息进行持久化处理msg->mutable_payload()->set_vaild("1");bool ret = _mapper.insert(msg);if (ret == false) {DLOG("持久化 %s 数据失败\n", body.c_str());return false;}// 持久化数据加一_valid_count++;_total_count++;_durable_msgs[msg->payload().properties().id()] = msg;}// DLOG("当前插入消息内容: %s\n", msg->payload().body().c_str());// 3.将消息加入到待推送链表中_msgs.push_back(msg);return true;}MessagePtr front() {// 拿出队首消息,然后发送出去std::unique_lock<std::mutex> lock(_mutex);if (_msgs.size() == 0)return MessagePtr();MessagePtr msg = _msgs.front();_msgs.pop_front();// 现在将拿出的队首消息加入到待确认消息_waitack_msgs[msg->payload().properties().id()] = msg;return msg;}bool remove(const std::string& msg_id) {// 删除数据是从待确认消息中进行删除std::unique_lock<std::mutex> lock(_mutex);// 1.先查找该数据是否存在auto it = _waitack_msgs.find(msg_id);if (it == _waitack_msgs.end()) {ILOG("没有找到需要删除的消息 %s\n", msg_id.c_str());return true;}// 2.判断拿到的消息是否存在持久化性质if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {// 还要将持久化的数据给删去_mapper.remove(it->second);_durable_msgs.erase(it->second->payload().properties().id());// 持久化文件中的有效数据减一_valid_count--;this->gc();}// 3.同时删除内存中的数据_waitack_msgs.erase(it->second->payload().properties().id());return true;}size_t getable_count() {// 可获取到的数据std::unique_lock<std::mutex> lock(_mutex);return _msgs.size();}size_t total_count() {std::unique_lock<std::mutex> lock(_mutex);return _total_count;}size_t waitack_count() {std::unique_lock<std::mutex> lock(_mutex);return _waitack_msgs.size();}size_t durable_count() {std::unique_lock<std::mutex> lock(_mutex);return _durable_msgs.size();}void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgFile();_msgs.clear();_durable_msgs.clear();_waitack_msgs.clear();_valid_count = 0;_total_count = 0;}private:std::mutex _mutex;std::string _qname;                                             // 队列名称MessageMapper _mapper;                                          // 持久化操作句柄size_t _valid_count;                                            // 有效消息数量size_t _total_count;                                            // 总消息数量std::list<MessagePtr> _msgs;                                    // 待推送消息链表std::unordered_map<std::string, MessagePtr> _durable_msgs;      // 待持久消息hashstd::unordered_map<std::string, MessagePtr> _waitack_msgs;      // 待确认消息hash};class MessageManager {public:using ptr = std::shared_ptr<MessageManager>; MessageManager(const std::string& basedir): _basedir(basedir){}// 初始化队列消息void initQueueMessage(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);// 查找当前hashmap中是否已经存在auto it = _queue_msgs.find(qname);if (it != _queue_msgs.end()) {// ILOG("当前队列消息已存在: %s\n", qname.c_str());return;}msgp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs[qname] = msgp;}// 恢复内存中的数据msgp->recovery();}void destoryQueueMessahe(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前需要删除的队列不存在: %s\n", qname.c_str());return;}    msgp = it->second;_queue_msgs.erase(qname);            }msgp->clear();}bool insert(const std::string& qname, const BasicProperties* bp, const std::string& body, bool queue_durable) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前需要插入数据的队列不存在: %s\n", qname.c_str());return false;}    msgp = it->second;          }return msgp->insert(bp, body, queue_durable);}   void ack(const std::string& qname, const std::string& msg_id) {// 确认消息,就是将消息在队列中删除QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前需要删除消息的队列不存在: %s\n", qname.c_str());return;}  msgp = it->second;}msgp->remove(msg_id);}MessagePtr front(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前队列不存在: %s\n", qname.c_str());return MessagePtr();}msgp = it->second;}return msgp->front();}size_t getable_count(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前队列不存在: %s\n", qname.c_str());return 0;}msgp = it->second;                }return msgp->getable_count();}size_t total_count(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前队列不存在: %s\n", qname.c_str());return 0;}msgp = it->second;                }return msgp->total_count();}size_t waitack_count(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前队列不存在: %s\n", qname.c_str());return 0;}msgp = it->second;                }return msgp->waitack_count();            }size_t durable_count(const std::string& qname) {QueueMessage::ptr msgp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {ILOG("当前队列不存在: %s\n", qname.c_str());return 0;}msgp = it->second;                }return msgp->durable_count();   }void clear() {std::unique_lock<std::mutex> lock(_mutex);for (auto& it : _queue_msgs)it.second->clear();}private:std::mutex _mutex;std::string _basedir;std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;};}#endif

虚拟机管理模块

        虚拟机本质就是以上四个模块的整合,将以上的四个封装起来统一向外提供消息发送,转发,路由的功能。所以对于虚拟机管理类需要的句柄以及更能为:

虚拟机管理句柄1. 虚拟机名称:虚拟机的唯一标识2. 交换机管理句柄3. 队列管理句柄4. 消息管理句柄5. 绑定管理句柄虚拟机管理类实现的功能:1. 获取所有的队列2. 声明交换机/队列3. 删除交换机/队列4. 获取交换机的绑定信息5. 消息发送6. 消息确认7. 获取队首信息8. 绑定队列和交换机9. 解绑队列和交换机

        host.hpp:

#ifndef __M_HOST_H__
#define __M_HOST_H__#include <google/protobuf/map.h>
#include "exchange.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "binding.hpp"namespace mq {class VirtualHost {public:using ptr = std::shared_ptr<VirtualHost>;VirtualHost(const std::string& hname, const std::string& basedir, const std::string& dbfile): _host_name(hname),_emp(std::make_shared<ExchangeManager>(dbfile)),_mmp(std::make_shared<MessageManager>(basedir)),_bmp(std::make_shared<BindingManager>(dbfile)),_mqmp(std::make_shared<MsgQueueManager>(dbfile)){// 遍历队列恢复历史消息MsgQueueMapper::MsgQueueMap mqp = _mqmp->allQueue();for (auto& q : mqp)_mmp->initQueueMessage(q.first);}MsgQueueMapper::MsgQueueMap allQueue() {return _mqmp->allQueue();}bool declareExchange(const std::string& ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map<std::string, std::string>& eargs) {return _emp->declareExchange(ename, etype, edurable, eauto_delete, eargs);}void deleteExchange(const std::string& ename) {// 删除一个交换机,同时需要删除一个交换机的绑定消息_bmp->removeExchangeBindings(ename);_emp->deleteExchange(ename);}bool declareQueue(const std::string& msg_name, bool msg_durable, bool msg_exclusive, bool msg_auto_delete, const google::protobuf::Map<std::string, std::string>& msg_args) {// 声明一个队列,现在消息管理中将队列进行初始化_mmp->initQueueMessage(msg_name);return _mqmp->declareQueue(msg_name, msg_durable, msg_exclusive, msg_auto_delete, msg_args);}void deleteQueue(const std::string& qname) {// 删除一个队列,需要将和队列所有有关的数据都删除掉_bmp->removeMsgQueueBindings(qname);_mmp->destoryQueueMessahe(qname);_mqmp->deleteQueue(qname);}MsgQueueBindingMap exchangeBindings(const std::string& ename) {// 获取交换机的绑定信息return _bmp->getExchangeBindings(ename);}bool basicPublish(const std::string& qname, const BasicProperties* bp, const std::string& body) {// 增加一条消息MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("需要增加消息的队列不存在,队列: %s\n", qname.c_str());return false;}return _mmp->insert(qname, bp, body, mqp->durable);}void basicAck(const std::string& qname, const std::string& msg_id) {return _mmp->ack(qname, msg_id);}// 获取一个队首消息,用于消费MessagePtr basicConsume(const std::string& qname) {          return _mmp->front(qname);}bool bind(const std::string& ename, const std::string& qname, const std::string& key) {// 绑定前先找到对应的交换机和队列MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("当前队列不存在,队列: %s\n", qname.c_str());return false;}Exchange::ptr ep = _emp->selectExchange(ename);if (ep.get() == nullptr) {DLOG("当前交换机不存在,交换机: %s\n", ename.c_str());return false;}return _bmp->bind(ename, qname, key, ep->durable & mqp->durable);}void unBind(const std::string& ename, const std::string& qname) {   return _bmp->unBind(ename, qname);    }bool existsExchange(const std::string& ename) {return _emp->exists(ename);}bool existsQueue(const std::string& qname) {return _mqmp->exists(qname);}bool existsBinding(const std::string& ename, const std::string& qname) {return _bmp->exists(ename, qname);}Exchange::ptr selectExchange(const std::string& ename) {return _emp->selectExchange(ename);}void clear() {_emp->clear();_mmp->clear();_bmp->clear();_mqmp->clear();}private:std::string _host_name;ExchangeManager::ptr _emp;      // 交换机管理句柄MessageManager::ptr _mmp;       // 消息管理句柄BindingManager::ptr _bmp;       // 绑定管理句柄MsgQueueManager::ptr _mqmp;     // 队列管理句柄};}#endif

路由匹配模块

        路由匹配模块决定了一条消息是否能够发布到指定的队列中去。在交换机和队列中的 binding_key 就是队列接收的规则,在每条消息要发布的消息中,都有一个 routing_key,这是消息的发布规则,对于交换机的三种类型,对应的路由匹配规则如下:

1. 交换机类型为广播:直接将消息发布给交换机的所有绑定队列中2. 交换机类型为直接:routing_key == bind_key 的时候才发送消息3. 交换机类型为主题:只有 routing_key 匹配 binding_key 的时候才会发布消息出去binding_key:由数字、字母、下划线构成,并且使用 . 分成若干部分,并支持 *,# 通配符例如:new.music.# (表示交换机绑定的当前队列是一个用于发布音乐新闻的队列)routing_key:由字母、数字、下划线构成,例如:new.music.pop,表示当前发布的消息为一个流行音乐的新闻对于本篇中两者的主题匹配规则为:使用 routing_key 中的每个单词,与 binding_key 中的单词进行逐个匹配,若匹配到 . 表示当前匹配哪个单词都可以,若匹配到 # 表示此后的一个或者多个单词都可以匹配成功(但是 # . 两个符号不能同时相邻出现,两个 # 也不可以,但是两个 . 可以)

        对于以上主题匹配规则,使用的是动态规划算法实现的,如下:

        route.hpp:

#ifndef __M_ROUTE_H__
#define __M_ROUTE_H__#include "../MqCommon/msg.pb.h"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/logger.hpp"
#include <iostream>
#include <string>
#include <vector>namespace mq {class Router {public:static bool isLegalRoutingKey(const std::string& routing_key) {// 判断当前的routingkey是否合法for (auto& ch : routing_key) {if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.') )continue;return false;}return true;}static bool isLegalBindingKey(const std::string& binding_key) {// 1. 先判断是否存在非法字符for (auto& ch : binding_key) {if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.') ||(ch == '*' || ch == '#'))continue;return false;}// 2. 判断* #是否和其他字符相连接std::vector<std::string> sub_words;StrHelper::split(binding_key, ".", sub_words);for (auto& word : sub_words) {if (word.size() > 1 && (word.find('#') != std::string::npos || word.find('*') != std::string::npos))return false;}// 3. 判断* #是否连接一起for (int i = 1; i < sub_words.size(); i++) {if (sub_words[i] == "#" && sub_words[i - 1] == "*")return false;if (sub_words[i] == "#" && sub_words[i - 1] == "#")return false;if (sub_words[i] == "*" && sub_words[i - 1] == "#")return false;                }return true;}static bool route(ExchangeType type, const std::string& routing_key, const std::string& binding_key) {if (type == ExchangeType::DIRECT)return routing_key == binding_key;else if (type == ExchangeType::FANOUT)return true;// 使用动态规划来判断当前是否匹配std::vector<std::string> bkeys, rkeys;int n_bkey = StrHelper::split(binding_key, ".", bkeys);int n_rkey = StrHelper::split(routing_key, ".", rkeys);std::vector<std::vector<bool>> dp(n_bkey + 1, std::vector<bool>(n_rkey + 1, false));dp[0][0] = true;if (n_bkey > 0 && bkeys[0] == "#")dp[1][0] = true;for (int i = 1; i <= n_bkey; i++)for (int j = 1; j <= n_rkey; j++) {if (bkeys[i - 1] == "*" || rkeys[j - 1] == bkeys[i - 1])dp[i][j] = dp[i - 1][j - 1];else if (bkeys[i - 1] == "#")dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}return dp[n_bkey][n_rkey];}};}#endif

消费者管理模块

        对于客户端存在两种:消息发布客户端、消息订阅客户端。所以只有订阅了消息的客户端才能算是一个消费者。

        消费者存在的意义:当指定队列有了消息之后,需要将消息推送给消费者客户端(也就是需要推送消息的时候,可以直接的找打消费者的信息),有关消费者类及其管理类的实现如下:

消费者类:    1. 消费者标识:唯一的标识一个消费者2. 订阅队列名称:与消费者对应的队列3. 自动确认标志:当消息推送到该消费者之后,是否自动确认消息已经收到4. 消息处理回调函数:队列有了一条消息之后,通过哪个函数进行处理队列消费者类:以队列为单元进行管理1. 新增/删除消费者2. RR轮转获取一个消费者:一条消息只需要被一个客户端处理即可消费者管理类:封装以上队列消费者类1. 初始化队列消费者结构2. 删除队列消费者结构3. 向指定队列添加消费者4. 获取/删除指定队列消费者

        consumer.hpp:

#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__#include "../MqCommon/msg.pb.h"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/logger.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <unordered_map>namespace mq {using ConsumerCallback = std::function<void(const std::string&, const BasicProperties* bp, const std::string&)>;struct Consumer {using ptr = std::shared_ptr<Consumer>;std::string tag;            // 消费者标识std::string qname;          // 消费者绑定队列名称bool auto_ack;              // 是否自动确认ConsumerCallback callback;  // 回调函数Consumer() {ILOG("new Consumer %p\n", this);}Consumer(const std::string& ctag, const std::string& queue_name, bool ack, const ConsumerCallback& cb): tag(ctag),qname(queue_name),auto_ack(ack),callback(cb){ILOG("new Consumer %p\n", this);}~Consumer() {ILOG("del Consumer %p\n", this);}};class QueueConsumer {public: using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string qname): _qname(qname),_rr_seq(0){}// 队列新增一个消费者对象Consumer::ptr create(const std::string& ctag, const std::string& queue_name, bool ack, const ConsumerCallback& cb) {std::unique_lock<std::mutex> lock(_mutex);// 先遍历查找当前是否已经存在for (auto& consumer : _consumers) {// 若已经存在则直接返回一个空的指针if (consumer->tag == ctag)return Consumer::ptr();}// 构造对象Consumer::ptr consumer = std::make_shared<Consumer>(ctag, queue_name, ack, cb);_consumers.push_back(consumer);return consumer;}// 从队列中移除一个消费者对象void remove(const std::string& ctag) {std::unique_lock<std::mutex> lock(_mutex);for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {// 找到对应的元素然后删除if ((*it)->tag == ctag) {_consumers.erase(it);return;}}}Consumer::ptr choose() {std::unique_lock<std::mutex> lock(_mutex);// 若没有消费者则直接返回if (_consumers.empty())return Consumer::ptr();// 轮转选取出一个消费者消费消息int index = _rr_seq % _consumers.size();_rr_seq++;return _consumers[index];}bool exists(const std::string& ctag) {std::unique_lock<std::mutex> lock(_mutex);for (auto& consumer : _consumers) {if (consumer->tag == ctag)return true;}     return false;       }void clear() {std::unique_lock<std::mutex> lock(_mutex);_rr_seq = 0;_consumers.clear();}bool empty() {std::unique_lock<std::mutex> lock(_mutex);return _consumers.empty();}private:std::mutex _mutex;                  // 锁std::string _qname;                 // 队列名称uint64_t _rr_seq;                   // 轮转序号std::vector<Consumer::ptr> _consumers;   // 消费者管理};class ConsumerManager {public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager() {}void initQueueConsumer(const std::string& qname) {std::unique_lock<std::mutex> lock(_mutex);// 查找当前管理的队列中是否已经存在auto it = _qconsumers.find(qname);if (it != _qconsumers.end())    return;QueueConsumer::ptr qcp = std::make_shared<QueueConsumer>(qname);_qconsumers[qname] = qcp;}void destroyQueueConsumer(const std::string& qname) {std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);// 找不到直接退出if (it == _qconsumers.end())    return;  _qconsumers.erase(qname);          }Consumer::ptr create(const std::string& ctag, const std::string& qname, bool ack, const ConsumerCallback& cb) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if (it == _qconsumers.end()) {DLOG("没有找到队列: %s 的管理句柄\n", qname.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->create(ctag, qname, ack, cb);}void remove(const std::string& qname, const std::string& ctag) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if (it == _qconsumers.end()) {DLOG("没有找到队列: %s 的管理句柄\n", qname.c_str());return;}qcp = it->second;}qcp->remove(ctag);           }Consumer::ptr choose(const std::string& qname) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if (it == _qconsumers.end()) {DLOG("没有找到队列: %s 的管理句柄\n", qname.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->choose();              }bool empty(const std::string& qname) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if (it == _qconsumers.end()) {DLOG("没有找到队列: %s 的管理句柄\n", qname.c_str());return false;}qcp = it->second;}return qcp->empty();              }bool exists(const std::string& qname, const std::string& ctag) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if (it == _qconsumers.end()) {DLOG("没有找到队列: %s 的管理句柄\n", qname.c_str());return false;}qcp = it->second;}return qcp->exists(ctag);   }void clear() {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear();}~ConsumerManager() {}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;};
}#endif

信道管理模块

        信道就是与客户端进行连接通信的主要模块,通信的同时将客户端发送来的数据分配给对应的模块,让其执行结束之后在将应答发送回客户端。一个通信信道就是进行网络哦通信的载体,而一个真正的通信连接,可以创建出多个通信通道,每一个信道之间,在用户眼中都是独立的,但是在底层它们使用的是同一个通信连接进行网络通信(为了充分的利用资源,不用多次与同一个服务器进行 TCP 连接,通信连接细化出了通信信道)

        对于接收到的请求是上文中使用 protobuf 生成的请求类,一个请求类中包含消息的所有信息。同时发送回去的应答也是基于 protobuf 生成的应答类。如下:

信道类:1. 信道唯一的标识2. 信道关联的消费者3. 信道关联的连接4. protobuf协议处理器5. 消费者管理句柄6. 虚拟机7. 线程池信道管理类:管理多个信道1. 打开信道2. 关闭信道3. 获取指定的信道        

        channel.hpp:

#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__#include <muduo/net/TcpConnection.h>
#include "../MqThird/include/codec.h"
#include "../MqCommon/proto.pb.h"
#include "../MqCommon/msg.pb.h"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/logger.hpp"
#include "../MqCommon/threadpool.hpp"
#include "consumer.hpp"
#include "host.hpp"
#include "route.hpp"namespace mq {using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;// 以下请求的智能指针全都是基于proto.pb.h中生成的信息管理类using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;using basicConsumeResponcePtr = std::shared_ptr<basicConsumeResponce>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;class Channel {private:// 基础响应,将响应发回给客户端void basicResponce(bool ok, const std::string& rid, const std::string& cid) {basicCommonResponce resp;// 设置响应的各个参数resp.set_cid(cid);resp.set_rid(rid);resp.set_ok(ok);_codec->send(_conn, resp);}// 使用这个作为回调函数进行消息消费void consume(const std::string& qname) {// 1. 取出一个消息MessagePtr mp = _host->basicConsume(qname);if (mp.get() == nullptr) {DLOG("消费消息失败,%s 队列没有可以消费的消息\n", qname.c_str());return;}// 2. 取出一个消费者Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr) {DLOG("消费消息失败,%s 队列没有消费者\n", qname.c_str());return;}// 进行消息消费cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 若当前为自动删除,则直接将消息给删除了,否则需要之后手动删除if (cp->auto_ack)_host->basicAck(qname, mp->payload().properties().id());}// 消息处理回调函数void callback(const std::string& tag, const BasicProperties* bp, const std::string& body) {basicConsumeResponce resp;resp.set_body(body);resp.set_cid(_cid);resp.set_consumer_tag(tag);if (bp) {resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_routing_key(bp->routing_key());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());}_codec->send(_conn, resp);}public:using ptr = std::shared_ptr<Channel>;Channel(const std::string& id, const VirtualHost::ptr& host, const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr conn, const threadpool::ptr& pool): _cid(id),_conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool){}// 交换机声明void declareExchange(const declareExchangeRequestPtr& req) {bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());basicResponce(ret, req->rid(), req->cid());}// 删除交换机void deleteExchange(const deleteExchangeRequestPtr& req) {_host->deleteExchange(req->exchange_name());basicResponce(true, req->rid(), req->cid());}// 队列声明void declareQueue(const declareQueueRequestPtr& req) {bool ret = _host->declareQueue(req->queue_name(), req->durable(), req->exclusive(), req->auto_delete(), req->args());if (ret == false) return basicResponce(ret, req->rid(), req->cid());_cmp->initQueueConsumer(req->queue_name());basicResponce(ret, req->rid(), req->cid());}// 删除队列void deleteQueue(const deleteQueueRequestPtr& req) {_host->deleteQueue(req->queue_name());_cmp->destroyQueueConsumer(req->queue_name());basicResponce(true, req->rid(), req->cid());}// 绑定void bind(const queueBindRequestPtr& req) {bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());basicResponce(ret, req->rid(), req->cid());}// 解绑void unBind(const queueUnBindRequestPtr& req) {_host->unBind(req->exchange_name(), req->queue_name());basicResponce(true, req->rid(), req->cid());}// 发布消息void basicPublish(const basicPublishRequestPtr& req) {// 取出一个交换机Exchange::ptr ep = _host->selectExchange(req->exchange_name());if (ep.get() == nullptr) return basicResponce(false, req->rid(), req->cid());// 根据获取的交换机找到对应的绑定信息BasicProperties* bp = nullptr;std::string routing_key;if (req->has_properties()) {bp = req->mutable_properties();routing_key = req->properties().routing_key();}MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());for (auto& binding : mqbm) {if (Router::route(ep->type, routing_key, binding.second->binding_key)) {// 将消息加入到队列中_host->basicPublish(binding.first, bp, req->body());auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}basicResponce(true, req->rid(), req->cid()); }// 确认消息void basicAck(const basicAckRequestPtr& req) {_host->basicAck(req->queue_name(), req->message_id());basicResponce(true, req->rid(), req->cid()); }// 订阅消息void basicConsume(const basicConsumeRequestPtr& req) {// 判断当前队列是否存在bool ret = _host->existsQueue(req->queue_name());if (ret == false)return basicResponce(false, req->rid(), req->cid()); auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);    _consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);if (_consumer.get() == nullptr)return basicResponce(false, req->rid(), req->cid()); basicResponce(true, req->rid(), req->cid()); }// 取消订阅void basicCancel(const basicCancelRequestPtr& req) {// 取消订阅就是将消费者从消费者管理句柄中删除_cmp->remove(req->queue_name(), req->consumer_tag());basicResponce(true, req->rid(), req->cid()); }~Channel() {if (_consumer.get() != nullptr)_cmp->remove(_consumer->qname, _consumer->tag);}private:std::string _cid;                          // 信道唯一标识Consumer::ptr _consumer;                   // 信道关联的消费者muduo::net::TcpConnectionPtr _conn;        // 信道关联的连接ProtobufCodecPtr _codec;                   // 协议处理器,protobuf协议处理句柄  ConsumerManager::ptr _cmp;                 // 消费者管理句柄VirtualHost::ptr _host;                    // 虚拟机threadpool::ptr _pool;                     // 线程池};class ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {ILOG("new Channel %p\n", this);}bool openChannel(const std::string& cid, const VirtualHost::ptr& host, const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr conn, const threadpool::ptr& pool) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it != _channels.end())return false;Channel::ptr channel = std::make_shared<Channel>(cid, host, cmp, codec, conn, pool);_channels[cid] = channel;return true;}void closeChannel(const std::string& cid) {std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr getChannel(const std::string& cid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it == _channels.end())return Channel::ptr();return it->second;        }~ChannelManager() {ILOG("del Channel %p\n", this);}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
}#endif

连接管理类

        在网络通信连接中,我们使用的是 muduo 库来实现的底层通信,muduo 库本身就存在 Connection 连接的对象类,但是在我们的连接中,我们还有一个上层通信的概念,这个概念在 muduo 库中是没有的,所以我们还需要对 muduo 库中的 Connection 连接进行二次封装,形成我们自己的连接管理类,如下:

管理数据:1. muduo 库的连接通信2. 当前连接关联的信道管理句柄连接提供的操作1. 创建信道2. 关闭信道连接管理类:1. 新增连接/关闭连接2. 获取指定连接

        connectIon.hpp:

#ifndef __M_CONNECTION_H__
#define __M_CONNECTION_H__
#include "channel.hpp"namespace mq {class Connection {private:void basicResponce(bool ok, const std::string& rid, const std::string& cid) {basicCommonResponce resp;resp.set_cid(cid);resp.set_rid(rid);resp.set_ok(ok);_codec->send(_conn, resp);}public:using ptr = std::shared_ptr<Connection>;Connection(const VirtualHost::ptr& host, const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr conn, const threadpool::ptr& pool) : _conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool),_channels(std::make_shared<ChannelManager>()){}void openChannel(const openChannelRequestPtr& req) {// 先检查是否存在bool ret = _channels->openChannel(req->cid(), _host, _cmp, _codec, _conn, _pool);if (ret == false) {DLOG("信道已经存在,信道ID重复\n");return this->basicResponce(false, req->rid(), req->cid());}ILOG("%s 信道创建成功\n", req->cid().c_str());this->basicResponce(true, req->rid(), req->cid());}void closeChannel(const closeChannelRequestPtr& req) {_channels->closeChannel(req->cid());this->basicResponce(true, req->rid(), req->cid());}Channel::ptr getChannel(const std::string& cid) {return _channels->getChannel(cid);}~Connection() {}private:// 一个连接模块处理多个信道,一个信道处理一个消费者muduo::net::TcpConnectionPtr _conn;        // 信道关联的连接ProtobufCodecPtr _codec;                   //协议处理器,protobuf协议处理句柄  ConsumerManager::ptr _cmp;                 // 消费者管理句柄VirtualHost::ptr _host;                    // 虚拟机threadpool::ptr _pool;                     // 线程池    ChannelManager::ptr _channels;             // 管理多个信道的句柄};class ConnectionManager {public:using ptr = std::shared_ptr<ConnectionManager>;ConnectionManager() {}void newConnection(const VirtualHost::ptr& host, const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr conn, const threadpool::ptr& pool) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it != _conns.end())return;Connection::ptr connection = std::make_shared<Connection>(host, cmp, codec, conn, pool);_conns[conn] = connection;}void delConnection(const muduo::net::TcpConnectionPtr conn) {std::unique_lock<std::mutex> lock(_mutex);_conns.erase(conn);}Connection::ptr getConnection(const muduo::net::TcpConnectionPtr conn) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end())return Connection::ptr();return it->second;}private:// 用于管理多个连接std::mutex _mutex;std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns;   // 记录muduo连接与server端连接的映射};
}#endif

服务器代码

#include "broker.hpp"int main() {mq::BrokerServer server(8085, "./data/");server.start();return 0;
}

Broker服务端模块

        这个模块是以上所有功能的整合,主要实现的功能也是将以上关于通信的结构全都注册到对应的 protobuf 分发器对象中,这样就可以当接收到特定的消息的时候就可以自动匹配对应的处理函数。需要的模块如下:

Broker类:1. 服务器对象:muduo库中的TcpServer对象2. 监视IO的接口3. 请求分发器对象4. 虚拟机5. 消费者管理模块6. 连接管理模块7. 线程池管理:多线程处理请求8. protobuf协议处理器实现的主要功能就是将需要提供的功能注册到服务器中,让其可以自动分配给对应的响应处理函数

        broker.hpp:

#ifndef __M_BROKER_H__
#define __M_BROKER_H__#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/TcpServer.h>
#include "../MqThird/include/codec.h"
#include "../MqThird/include/dispatcher.h"
#include "../MqCommon/msg.pb.h"
#include "../MqCommon/proto.pb.h"
#include "../MqCommon/threadpool.hpp"
#include "host.hpp"
#include "consumer.hpp"
#include "connection.hpp"namespace mq {#define DEFAULT_DBFILE "/meta.db"#define HOST_NAME "MyVirtualHost"class BrokerServer {private:using MessagePtr = std::shared_ptr<google::protobuf::Message>;;void onConnection(const muduo::net::TcpConnectionPtr& conn) {if (conn->connected()) _connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);else _connection_manager->delConnection(conn);           }// 默认的处理函数void unUnKnownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "unUnKnownMessage" << message->GetTypeName();conn->shutdown(); // 关闭连接}// 打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("打开信道时,没有找到对应的连接\n");return;}mconn->openChannel(message);}void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭信道时,没有找到对应的连接\n");return;}mconn->closeChannel(message);}void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明交换机时,没有找到对应的连接\n");conn->shutdown();return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("声明交换机时,没有找到对应的信道\n");return;}          channel->declareExchange(message); }void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭交换机时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("关闭交换机时,没有找到对应的信道\n");return;}          channel->deleteExchange(message);         }void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明队列时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("声明队列时,没有找到对应的信道\n");return;}          channel->declareQueue(message);         }void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭队列时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("关闭队列时,没有找到对应的信道\n");return;}          channel->deleteQueue(message);            }void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("绑定队列时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("绑定队列时,没有找到对应的信道\n");return;}          channel->bind(message);           }void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("解绑队列时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("解绑队列时,没有找到对应的信道\n");return;}          channel->unBind(message);          }void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("发布消息时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("发布消息时,没有找到对应的信道\n");return;}          channel->basicPublish(message);             }void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("确认消息时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("确认消息时,没有找到对应的信道\n");return;}          channel->basicAck(message);          }void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("订阅消息时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("订阅消息时,没有找到对应的信道\n");return;}          channel->basicConsume(message); }void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("取消订阅时,没有找到对应的连接\n");return;}Channel::ptr channel = mconn->getChannel(message->cid());if (channel.get() == nullptr) {DLOG("取消订阅时,没有找到对应的信道\n");return;}          channel->basicCancel(message);         }public:BrokerServer(uint16_t port, const std::string& basedir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),_dispathcher(std::bind(&BrokerServer::unUnKnownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispathcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_virtual_host(std::make_shared<VirtualHost>(HOST_NAME, basedir, basedir + DEFAULT_DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_threadpool(std::make_shared<threadpool>()){// 恢复历史队列消息MsgQueueMapper::MsgQueueMap qmap = _virtual_host->allQueue();for (auto& q : qmap)_consumer_manager->initQueueConsumer(q.first);// 现在注册业务处理请求函数_dispathcher.registerMessageCallback<openChannelRequest>(std::bind(&BrokerServer::onOpenChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispathcher.registerMessageCallback<closeChannelRequest>(std::bind(&BrokerServer::onCloseChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));         _dispathcher.registerMessageCallback<declareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));   _dispathcher.registerMessageCallback<deleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispathcher.registerMessageCallback<declareQueueRequest>(std::bind(&BrokerServer::onDeclareQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));   _dispathcher.registerMessageCallback<deleteQueueRequest>(std::bind(&BrokerServer::onDeleteQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispathcher.registerMessageCallback<queueBindRequest>(std::bind(&BrokerServer::onQueueBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));   _dispathcher.registerMessageCallback<queueUnBindRequest>(std::bind(&BrokerServer::onQueueUnBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispathcher.registerMessageCallback<basicPublishRequest>(std::bind(&BrokerServer::onBasicPublish, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));                                   _dispathcher.registerMessageCallback<basicAckRequest>(std::bind(&BrokerServer::onBasicAck, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispathcher.registerMessageCallback<basicConsumeRequest>(std::bind(&BrokerServer::onBasicConsume, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));                 _dispathcher.registerMessageCallback<basicCancelRequest>(std::bind(&BrokerServer::onBasicCancel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));}      void start() {// 服务器开始运行_server.start();// 开始IO监控_baseloop.loop();}private:muduo::net::TcpServer _server;              // 服务器对象muduo::net::EventLoop _baseloop;            // 主事件循环器,响应和监听IO事件ProtobufDispatcher _dispathcher;            // 请求分发器对象,需要向分发器中的注册处理函数ProtobufCodecPtr _codec;                    // protobuf 协议处理器,针对收到的请求数据进行protobuf协议处理 VirtualHost::ptr _virtual_host;             // 虚拟机ConsumerManager::ptr _consumer_manager;     // 消费者管理句柄ConnectionManager::ptr _connection_manager; // 连接管理句柄threadpool::ptr _threadpool;                // 线程池管理句柄};}#endif

客户端核心代码实现

消费者模块

        这里的消费者模块和客户端的消费者模块基本一致,不过没有相对于消费者那么多的接口,仅仅只有一个消费者类,如下:

        consumer.hpp:

#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__#include "../MqCommon/msg.pb.h"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/logger.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <functional>namespace mq {using ConsumerCallback = std::function<void(const std::string&, const BasicProperties* bp, const std::string&)>;struct Consumer {using ptr = std::shared_ptr<Consumer>;std::string tag;            // 消费者标识           ----> 标识订阅的消费者std::string qname;          // 消费者绑定队列名称   ---->  标识订阅者对应的队列bool auto_ack;              // 是否自动确认         ----> 对于队列发送过来的消息是否自动确认ConsumerCallback callback;  // 回调函数            ---->  获取消息之后处理的回调函数Consumer() {}Consumer(const std::string& ctag, const std::string& queue_name, bool ack, const ConsumerCallback& cb): tag(ctag),qname(queue_name),auto_ack(ack),callback(cb){}};}#endif 

多线程工作模块

        该模块就是整合线程池和监控IO事件的模块,如下:

        worker.hpp:

#ifndef __M_WORKER_H__
#define __M_WORKER_H__#include <muduo/net/EventLoopThread.h>
#include "../MqCommon/helper.hpp"
#include "../MqCommon/logger.hpp"
#include "../MqCommon/threadpool.hpp"namespace mq {class AsyncWorker {// 这一个对象可以用于多个连接使用public:using ptr = std::shared_ptr<AsyncWorker>;threadpool pool;                            // 事件处理线程池muduo::net::EventLoopThread loopthread;     // 用于循环监控io事件};
}#endif

信道管理模块

        信道管理和服务端的也很相似,实现的接口基本一致,不过相对于服务端提供的各种响应接口,客户端往往是接收来自服务端的推送之后,逐一的面向推送信息进行处理,然后手动准备应答,然后发送出去。如下:

        channel.hpp:

#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__#include "../MqThird/include/codec.h"
#include "../MqCommon/proto.pb.h"
#include "../MqCommon/msg.pb.h"
#include "../MqCommon/helper.hpp"
#include "../MqCommon/logger.hpp"
#include "../MqCommon/threadpool.hpp"
#include "consumer.hpp"
#include <muduo/net/TcpConnection.h>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unordered_map>namespace mq {using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponcePtr = std::shared_ptr<basicConsumeResponce>;using basicCommonResponcePtr = std::shared_ptr<basicCommonResponce>;using MessagePtr = std::shared_ptr<google::protobuf::Message>;class Channel {private:// 基础响应basicCommonResponcePtr waitResponce(const std::string& rid) {std::unique_lock<std::mutex> lock(_mutex);_cond.wait(lock, [&rid, this]() {return _basic_resp.find(rid) != _basic_resp.end();});basicCommonResponcePtr resp = _basic_resp[rid];_basic_resp.erase(rid);return resp;}public:using ptr = std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn, const ProtobufCodecPtr& codec): _channel_id(UUIDHelper::uuid()),_conn(conn),_codec(codec){}bool declareExchange(const std::string& ename, ExchangeType etype, bool edurable, bool eauto_delete, google::protobuf::Map<std::string, std::string>& eargs) {// 1. 构建对象std::string rid = UUIDHelper::uuid();declareExchangeRequest req;req.set_cid(_channel_id);req.set_rid(rid);req.set_durable(edurable);req.set_exchange_name(ename);req.set_exchange_type(etype);req.set_auto_delete(eauto_delete);req.mutable_args()->swap(eargs);// 2. 将构建的对象发送出去// protobuf底层设计了自己的发送和接收缓冲区,发送和接收是异步工作的// 所以只有对方确认收到(等待响应)之后我们才可以返回 _codec->send(_conn, req);// 3. 等待响应basicCommonResponcePtr resp = waitResponce(rid);// 4. 返回return resp->ok();}void deleteExchange(const std::string& ename) {std::string rid = UUIDHelper::uuid();deleteExchangeRequest req;req.set_cid(_channel_id);req.set_rid(rid);req.set_exchange_name(ename);_codec->send(_conn, req);waitResponce(rid);}bool declareQueue(const std::string& qname, bool qdurable, bool qexclusive, bool qauto_delete, google::protobuf::Map<std::string, std::string>& qargs) {std::string rid = UUIDHelper::uuid();declareQueueRequest req;req.set_cid(_channel_id);req.set_rid(rid);req.set_durable(qdurable);req.set_queue_name(qname);req.set_auto_delete(qauto_delete);req.mutable_args()->swap(qargs);    req.set_exclusive(qexclusive);      _codec->send(_conn, req);// 3. 等待响应basicCommonResponcePtr resp = waitResponce(rid);// 4. 返回return resp->ok();              }void deleteQueue(const std::string& qname) {std::string rid = UUIDHelper::uuid();deleteQueueRequest req;req.set_cid(_channel_id);req.set_rid(rid);    req.set_queue_name(qname);_codec->send(_conn, req);waitResponce(rid);}bool queueBind(const std::string& ename, const std::string& qname, const std::string& key) {std::string rid = UUIDHelper::uuid();queueBindRequest req;req.set_cid(_channel_id);req.set_rid(rid);    req.set_queue_name(qname);    req.set_exchange_name(ename);req.set_binding_key(key);   _codec->send(_conn, req);// 3. 等待响应basicCommonResponcePtr resp = waitResponce(rid);// 4. 返回return resp->ok();         }void queueUnBind(const std::string& ename, const std::string& qname) {std::string rid = UUIDHelper::uuid();queueUnBindRequest req;req.set_cid(_channel_id);req.set_rid(rid);    req.set_queue_name(qname);    req.set_exchange_name(ename);_codec->send(_conn, req);waitResponce(rid);          }void basicPublish(const std::string& ename, const BasicProperties* bp, const std::string& body) {// 发送消息给交换机,让交换机来自动匹配消息发给哪个队列std::string rid = UUIDHelper::uuid();basicPublishRequest req;req.set_cid(_channel_id);req.set_rid(rid);    req.set_exchange_name(ename);req.set_body(body);if (bp != nullptr) {req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, req);waitResponce(rid);          }void basicAck(const std::string& msg_id) {if (_consumer.get() == nullptr) {DLOG("确认消息时,找不到对应的消费者信息\n");return;}std::string rid = UUIDHelper::uuid();basicAckRequest req;req.set_cid(_channel_id);req.set_rid(rid);           req.set_queue_name(_consumer->qname);req.set_message_id(msg_id);     _codec->send(_conn, req);waitResponce(rid);          }// 订阅消息bool basicConsume(const std::string& consumer_tag, const std::string& qname, bool auto_ack, const ConsumerCallback& cb) {if (_consumer.get() != nullptr) {DLOG("消费者已经存在,订阅失败\n");return false;}std::string rid = UUIDHelper::uuid();basicConsumeRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_queue_name(qname);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);_codec->send(_conn, req);basicCommonResponcePtr resp = waitResponce(rid);if (resp->ok() == false) {DLOG("添加订阅失败\n");return false;}// 生成当前信道对应的消费者_consumer = std::make_shared<Consumer>(consumer_tag, qname, auto_ack, cb);return true;}void basicCancel() {if (_consumer.get() == nullptr)return;std::string rid = UUIDHelper::uuid();basicCancelRequest req;req.set_cid(_channel_id);req.set_rid(rid);req.set_consumer_tag(_consumer->tag);req.set_queue_name(_consumer->tag);_codec->send(_conn, req);waitResponce(rid);// 取消订阅,也就将当前的消费者重置          _consumer.reset();}std::string cid() {return _channel_id;}~Channel() {basicCancel();}public:// 收到消息之后,向对应响应消息队列中加入响应消息void putBasicResponce(const basicCommonResponcePtr& resp) {std::unique_lock<std::mutex> lock(_mutex);_basic_resp[resp->rid()] = resp;// 从外部接收到消息之后,唤醒之前等待的线程_cond.notify_all();}// 收到响应之后,需要找到对应的消费者去处理消息void consume(const basicConsumeResponcePtr& resp) {if (_consumer.get() == nullptr) {DLOG("处理消息时,订阅者为找到\n");return;}if (_consumer->tag != resp->consumer_tag()) {DLOG("处理消息时,订阅者和请求消息不对应\n");return;}_consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());}bool openChannel() {std::string rid = UUIDHelper::uuid();openChannelRequest req;req.set_cid(_channel_id);req.set_rid(rid);_codec->send(_conn, req);basicCommonResponcePtr resp = waitResponce(rid);return resp->ok();}void closeChannel() {std::string rid = UUIDHelper::uuid();closeChannelRequest req;req.set_cid(_channel_id);req.set_rid(rid);_codec->send(_conn, req);  waitResponce(rid);         }private:std::string _channel_id;                                                // 信道idmuduo::net::TcpConnectionPtr _conn;                                     // 信道关联的网络通信对象ProtobufCodecPtr _codec;                                                // 协议处理对象Consumer::ptr _consumer;                                                // 信道关联的消费者std::mutex _mutex;                                                      // 锁:和条件变量共同维护响应和处理的先后顺序std::condition_variable _cond;                                          // 条件变量 std::unordered_map<std::string, basicCommonResponcePtr> _basic_resp;    // <req_id(rid), resp> 请求对应的响应信息队列};class ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}// 创建信道Channel::ptr create(const muduo::net::TcpConnectionPtr& conn, const ProtobufCodecPtr& codec) {std::unique_lock<std::mutex> lock(_mutex);Channel::ptr channel = std::make_shared<Channel>(conn, codec);std::string cid = channel->cid();_channels[cid] = channel;return channel;}// 移除信道void remove(const std::string& cid) {std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}// 获取指定的队列Channel::ptr get(const std::string& cid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it == _channels.end())return Channel::ptr();return it->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
}#endif

连接管理模块

        这里的连接管理模块,其实对应的就是服务器端中的 Broker 服务端代码,不过在客户端中弱化了客户端的概念,其实本质就是实现了客户端应该实现的相关功能,所以在该代码中,实现的也是主要 api 函数的分发,当响应来临时自动进行匹配,如下:

#ifndef __M_CONNECTION_H__
#define __M_CONNECTION_H__#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/base/CountDownLatch.h>
#include <iostream>
#include "../MqThird/include/codec.h"
#include "../MqThird/include/dispatcher.h"
#include "channel.hpp"
#include "worker.hpp"namespace mq {class Connection {private:// 对于该连接模块,其本质就是属于客户端模块,对于客户端而言,其实际是和信道直接关联的// 不过在该代码中弱化了客户端的概念// 获取一个消息,然后将消息加入到信道中void onBasicResponce(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponcePtr& message, muduo::Timestamp) {// 首先要先获取信道Channel::ptr channel = _channels->get(message->cid());if (channel.get() == nullptr) {DLOG("没有找到对应的信道\n");return;}// 向响应队列中加入消息channel->putBasicResponce(message);}// 将获取的消息放入多线程中进行处理void onConsumeResponce(const muduo::net::TcpConnectionPtr& conn, const basicConsumeResponcePtr& message, muduo::Timestamp) {Channel::ptr channel = _channels->get(message->cid());if (channel.get() == nullptr) {DLOG("没有找到对应的信道\n");return;}// 然后将消息处理任务当道线程中_worker->pool.push([channel, message](){channel->consume(message);});            }void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr&conn){if (conn->connected()) {_latch.countDown();//唤醒主线程中的阻塞_conn = conn;}else {//连接关闭时的操作_conn.reset();}}    void connect() {_client.connect();_latch.wait();      //阻塞等待,直到连接建立成功}public:using ptr = std::shared_ptr<Connection>;Connection(const std::string &sip, int sport, const AsyncWorker::ptr& worker): _worker(worker),_latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_channels(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<basicCommonResponce>(std::bind(&Connection::onBasicResponce, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<basicConsumeResponce>(std::bind(&Connection::onConsumeResponce, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1)); // 构造的时候就直接开始连接this->connect();     }Channel::ptr openChannel() {Channel::ptr channel = _channels->create(_conn, _codec);bool ret = channel->openChannel();if (ret == false) {DLOG("创建信道失败\n");return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr& channel) {channel->closeChannel();_channels->remove(channel->cid());}private:muduo::CountDownLatch _latch;                           //实现同步的muduo::net::TcpConnectionPtr _conn;                     //客户端对应的连接muduo::net::TcpClient _client;                          //客户端ProtobufDispatcher _dispatcher;                         //请求分发器ProtobufCodecPtr _codec;                                //协议处理器   AsyncWorker::ptr _worker;                               // 任务处理线程 & IO事件监控线程ChannelManager::ptr _channels;                          // 信道管理 };}#endif

客户端代码

        客户端代码主要用来实现测试我们的整体代码,因为我们代码没有直接的前端交互,所以只能修改客户端的代码来不断的测试我们的代码。如下代码中分别有着主题消息匹配代码,直接匹配发送,广播匹配代码,如下:

        consume_client.cc:

#include "connection.hpp"// 收到消息之后的回调函数
void cb(const mq::Channel::ptr& channel, const std::string& consumer_tag, const mq::BasicProperties* bp, const std::string& body) {std::cout << consumer_tag << " 得到消息: " << body << std::endl;channel->basicAck(bp->id());
}int main(int argc ,char* argv[]) {if (argc != 2) {DLOG("please input the two args: ./consume_client queue1\n");return -1;}// 创建连接以及信道mq::AsyncWorker::ptr awp = std::make_shared<mq::AsyncWorker>();mq::Connection::ptr conn = std::make_shared<mq::Connection>("127.0.0.1", 8085, awp);    mq::Channel::ptr channel = conn->openChannel();// 主题匹配接收google::protobuf::Map<std::string, std::string> google_tmp;// // 直接匹配接收// channel->declareExchange("exchange1", mq::ExchangeType::DIRECT, true, false, google_tmp1);// // 广播匹配接收// channel->declareExchange("exchange1", mq::ExchangeType::FANOUT, true, false, google_tmp1);channel->declareExchange("exchange1", mq::ExchangeType::TOPIC, true, false, google_tmp);channel->declareQueue("queue1", true, false, false, google_tmp);channel->declareQueue("queue2", true, false, false, google_tmp);channel->queueBind("exchange1", "queue1", "queue1");channel->queueBind("exchange1", "queue2", "news.music.#");auto callback = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1", argv[1], false, callback);// 主线程在这循环等待while (true)std::this_thread::sleep_for(std::chrono::seconds(3));conn->closeChannel(channel);}

        publish_client.cc:

#include "connection.hpp"int main() {mq::AsyncWorker::ptr awp = std::make_shared<mq::AsyncWorker>();mq::Connection::ptr conn = std::make_shared<mq::Connection>("127.0.0.1", 8085, awp);mq::Channel::ptr channel = conn->openChannel();google::protobuf::Map<std::string, std::string> google_tmp1;google::protobuf::Map<std::string, std::string> google_tmp2;google::protobuf::Map<std::string, std::string> google_tmp3;// 主题匹配发送channel->declareExchange("exchange1", mq::ExchangeType::TOPIC, true, false, google_tmp1);// // 直接匹配发送// channel->declareExchange("exchange1", mq::ExchangeType::DIRECT, true, false, google_tmp1);// // 广播匹配发送// channel->declareExchange("exchange1", mq::ExchangeType::FANOUT, true, false, google_tmp1);channel->declareQueue("queue1", true, false, false, google_tmp2);channel->declareQueue("queue2", true, false, false, google_tmp3);channel->queueBind("exchange1", "queue1", "queue1");channel->queueBind("exchange1", "queue2", "news.music.#");// 循环发送信息for (int i = 0; i < 10; i++) {mq::BasicProperties bp;bp.set_id(mq::UUIDHelper::uuid());bp.set_routing_key("news.music.pop");bp.set_delivery_mode(mq::DeliveryMode::DURABLE);channel->basicPublish("exchange1", &bp, "hello world-" + std::to_string(i));}mq::BasicProperties bp;bp.set_id(mq::UUIDHelper::uuid());bp.set_routing_key("news.music.classic");bp.set_delivery_mode(mq::DeliveryMode::DURABLE);channel->basicPublish("exchange1", &bp, "hello world-" + std::to_string(10));bp.set_routing_key("news.sport.football");channel->basicPublish("exchange1", &bp, "hello world-" + std::to_string(11));conn->closeChannel(channel);return 0;
}

        根据如上的消息发布代码,我们需要使用 consume_client.cc 代码生成两个消费客户端,分别是 queue1 消费者和 queue2 消费者。

测试

        对于如上代码的测试,本篇都是运行服务器代码,然后运行两个消费客户端代码,最后运行发布消息客户端代码,分别有如下三种测试:广播测试、直接匹配测试、主题匹配测试。

广播发送测试

        服务端:

        消息接收端 queue1:

        消息接收端 queue2:

        消息发布端:

直接发送测试

        服务端:

        接收消息客户端 queue1:

        接收消息客户端 queue2:

        发布消息客户端:

        因为匹配规则匹配不到,所以消息接收客户端都不会接收到消息

主题匹配测试

        服务端:

        接收客户端 queue1:

        接收客户端 queue2:

        消息发布客户端:

        如上,只有能匹配的 queue2 客户端接收到了消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/57542.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何开启华为交换机 http

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目…

腾讯 C++ 客户端一面,居然遇见了一道简单题。它怎么用go、C++解决

腾讯是目前全国最强的互联网公司之一&#xff0c;它有很好的福利尤其是能给应届生不错的工资待遇。 也正因如此&#xff0c;想进入腾讯工作的难度和竞争的激烈程度非常之大。 虽然感觉腾讯像是更看重个人综合能力的一家公司&#xff0c;算法题的好坏占面评比相对小些 但是竞争…

二、Linux 系统命令

一、系统命令 # 清屏 (Ctrl L) $ clear# 退出登录 $ exit # 历史命令 $ history $ history | grep java -jar 1. 系统信息 # 查看版本&#xff0c;当前操作系统发行版信息 $ cat /etc/redhat-release CentOS Linux release 7.2.1511 (Core) # 查看操作系统位数 $ getco…

【2022工业3D异常检测文献】Patch+FPFH: 结合3D手工点云描述符和颜色特征的异常检测方法

AN EMPIRICAL INVESTIGATION OF 3D ANOMALY DETECTION AND SEGMENTATION 1、Background PatchCore 方法&#xff1a; PatchCore是一种基于2D图像的异常检测方法&#xff0c;它使用预训练的深度学习模型&#xff08;如在ImageNet上预训练的模型&#xff09;来提取图像的局部特…

Memory Bus in SOC

在 SoC架构设计中&#xff0c;Memory Bus 是一个关键的组成部分&#xff0c;它负责连接 SoC 中的各个模块&#xff08;如 CPU、GPU、DMA、外设等&#xff09;与外部存储器&#xff08;如 DDR、NAND、Flash 等&#xff09;&#xff0c;起到连接处理器和存储器之间的桥梁作用&…

Qt优秀开源项目之二十四:EXCEL读写利器QXlsx

QXlsx是基于Qt5/Qt6的Excel文件&#xff08;*.xlsx&#xff09;的读写库。 github地址&#xff1a;https://github.com/QtExcel/QXlsx QXlsx既可以编译成库&#xff0c;也可以直接引用源码QXlsx-master\QXlsx\QXlsx.pri QXls提供了非常丰富的Examples&#xff0c;比如&#xff…

LED电子看板减少人工记录的错误

在当今快节奏的生产和管理环境中&#xff0c;准确性和效率是企业追求的关键目标。而传统的人工记录方式&#xff0c;常常因人为因素而出现各种错误&#xff0c;影响着企业的决策和运营。然而&#xff0c;随着科技的不断进步&#xff0c;LED 电子看板的出现为解决这一难题提供了…

无法获得下列许可 SOLIDWORKS Standard。 无法从使用许可服务器内读取数据,(-16,10009,10054)

无法获得下列许可 SOLIDWORKS Standard。 无法从使用许可服务器内读取数据&#xff0c;(-16,10009,10054) 错误如图 打开xxclean 扩展功能 服务无法启动

10.23Python_matplotlib_乱码问题

中英文问题解决方案 在使用 Matplotlib 绘图时&#xff0c;经常会出现中文字体显示问题。以下是一些解决方案&#xff1a; Windows 系统解决方案 在代码开始处添加以下代码&#xff0c;以支持中文显示&#xff1a; import matplotlib.pyplot as plt plt.rcParams[font.sans…

联想与Meta合作基于Llama大模型推出面向PC的个人AI智能体——AI Now | LeetTalk Daily...

“LeetTalk Daily”&#xff0c;每日科技前沿&#xff0c;由LeetTools AI精心筛选&#xff0c;为您带来最新鲜、最具洞察力的科技新闻。 联想集团昨日在美国西雅图召开年度Tech World大会。联想CEO杨元庆在主题演讲中&#xff0c;与Meta创始人兼CEO马克扎克伯格一道宣布&#x…

《15分钟轻松学Go》教程目录

在AI快速发展的时代&#xff0c;学习Go语言依然很有用。Go语言擅长处理高并发任务&#xff0c;也就是说可以同时处理很多请求&#xff0c;这对于需要快速响应的AI服务非常重要。另外&#xff0c;Go适合用来处理和传输大量数据&#xff0c;非常适合机器学习模型的数据预处理。 …

C++笔记---哈希表

1. 哈希的概念 哈希(hash)又称散列&#xff0c;是一种组织数据的方式。从译名来看&#xff0c;有散乱排列的意思。 本质就是通过哈希函数把关键字Key跟存储位置建立一个映射关系&#xff0c;查找时通过这个哈希函数计算出Key存储的位置&#xff0c;进行快速查找。 STL中的un…

【Python数据库操作】使用SQLite和MySQL进行数据存储和查询!

【Python数据库操作】使用SQLite和MySQL进行数据存储和查询&#xff01; 在现代应用程序中&#xff0c;数据存储与管理是至关重要的。Python为开发者提供了多种与数据库进行交互的方式&#xff0c;其中SQLite和MySQL是最常用的两种数据库。本文将深入探讨如何使用Python进行SQ…

No.20 笔记 | WEB安全 - 任意文件操作详解 part 2

一、文件后缀名验证 &#xff08;一&#xff09;验证方式分类 基于白名单验证&#xff1a;只允许上传白名单中指定后缀名的文件。基于黑名单验证&#xff1a;只允许上传黑名单中未包含后缀名的文件。 &#xff08;二&#xff09;实验准备 修改 Apache 的 httpd - conf 文件…

uni-app写的微信小程序如何体积太大如何处理

方法一&#xff1a;对主包进行分包处理&#xff0c;将使用url: /pages/components/equipment/equipment跳转页面的全部拆分为分包&#xff0c;如url: /pagesS/components/equipment/equipment 在pages.json中添加 "subPackages": [{ "root"…

2024年五一杯数学建模C题煤矿深部开采冲击地压危险预测求解全过程论文及程序

2024年五一杯数学建模 C题 煤矿深部开采冲击地压危险预测 原题再现&#xff1a; “煤炭是中国的主要能源和重要的工业原料。然而&#xff0c;随着开采深度的增加&#xff0c;地应力增大&#xff0c;井下煤岩动力灾害风险越来越大&#xff0c;严重影响着煤矿的安全高效开采。在…

transient关键字详解

今天没打算写blog&#xff0c;在看一篇关于多线程环境下SimpleDateFormat线程不安全的问题&#xff0c;一般我们都知道多线程下这个是不安全&#xff0c;但是为什么不安全不太清楚&#xff0c;我在看的这篇文章讲的比较透彻&#xff0c;我根据文章中讲结合SimpleDateFormat源码…

[Unity Demo]从零开始制作空洞骑士Hollow Knight第十五集:制作更多地图,更多敌人,更多可交互对象

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、第一个代表性场景 1.制作更多敌人2.制作更多可交互对象二、第二个代表性场景 1.制作更多敌人2.制作更多可交互对象三、第三个代表性场景 1.制作更多敌人2.制…

苹果手机照片误删还能恢复吗?3款数据恢复工具推荐

照片是人们记录生活点滴与美好回忆的重要方式之一。而苹果手机则具备了较强的拍照功能&#xff0c;深受市场欢迎&#xff0c;但其也存在误删照片并难以恢复的难题。现在市面上也又很多照片恢复软件&#xff0c;其功能参差不齐。今天&#xff0c;小编为您找到了3款高效且可靠的苹…

猫咪掉毛上岸!一招解决清理难题——好用的宠物空气净化器

养宠前就知道猫咪有换毛季&#xff0c;我了解的是一年有两次&#xff0c;养宠后才知道&#xff0c;一次是半年...秋天风大的时候更加严重&#xff0c;直接就是一只“蒲公英”&#xff0c;随时散落一地的猫毛。早晚给它梳毛&#xff0c;每次都能收获巨大一张猫饼。 家里的地板上…