仿 RabbitMQ 实现的简易消息队列

文章目录

  • 项目介绍
  • 开放环境
  • 第三⽅库介绍
    • Protobuf
    • Muduo库
  • 需求分析
    • 核⼼概念
    • 实现内容
  • 消息队列系统整体框架
    • 服务端模块
      • 数据管理模块
      • 虚拟机数据管理模块
      • 交换路由模块
      • 消费者管理模块
      • 信道(通信通道)管理模块
      • 连接管理模块
    • 客户端模块
  • 公共模块
    • 日志类
    • 其他工具类
      • SQLite基础操作
      • 字符串操作
      • 唯一UUID生成
      • 文件基础操作
    • 消息类型和交换机类型的定义
  • 服务端
    • 持久化数据管理中⼼模块
      • 交换机数据管理
      • 队列数据管理模块
      • 绑定数据管理模块
      • 消息数据管理模块
    • 虚拟机管理模块
    • 交换路由模块
    • 信道管理模块
    • 连接管理模块
    • 消费者管理模块
    • ⽹络通信协议设计
    • Server服务器模块
  • 客户端
    • 订阅者模块
    • 信道管理模块
    • 连接管理模块
    • 异步线程池模块
  • 源码

项目介绍

曾经我们学习过阻塞队列(BlockingQueue)。当时我们说阻塞队列最⼤的⽤途,就是⽤来实现⽣产者
消费者模型。

优势:

  • 解耦合
  • ⽀持并发
  • ⽀持忙闲不均
  • 削峰填⾕

在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求。
因此,我们通常会把阻塞队列封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能。这样的服务程
序我们就称为消息队列(MessageQueue,MQ)。

RabbitMQ是⼀个⾮常知名、功能强⼤且⼴泛使⽤的消息队列;

所以我们仿照其模拟实现一个简易的消息队列

开放环境

  • Linux(Ubuntu-22.04)
  • VSCode
  • g++/gdb
  • Makefile
  1. 使用语言C++;
  2. 序列化框架:Protobuf⼆进制序列化
  3. ⽹络通信:muduo库 + 自定义应用层协议
  4. 测试框架:Gtest
  5. 源数据信息数据库:SQLite3

第三⽅库介绍

该项目所要用到的第三方库

Protobuf

序列化&反序列化框架

特点:

  • 语⾔⽆关、平台⽆关:即ProtoBuf⽀持Java、C++、Python等多种语⾔,⽀持多个平台
  • ⾼效:即⽐XML更⼩、更快、更为简单
  • 扩展性、兼容性好:你可以更新数据结构,⽽不影响和破坏原有的旧程序

使用步骤

  1. 编写proto文件
    描述想定义的结构化对象
    对象有什么成员,每个成员有怎么样的属性
    在这里插入图片描述
  2. 编译proto文件
protoc --cpp_out=. xxx.proto

编译后会生成一系列接口代码

.cc 源文件:定义实现了结构化对象数据的访问、操作、序列化、反序列化接口.h头文件:定义了描述的数据结构对象类

根据我们的对象描述生成在.h中的代码
在这里插入图片描述

  1. 把编译生成的头文件包含进我们的代码中

在这里插入图片描述

Muduo库

Muduo由陈硕⼤佬开发,是⼀个基于⾮阻塞IO事件驱动的C++⾼并发TCP⽹络编程库。它是⼀款基于主从Reactor模型的⽹络库,其使⽤的线程模型是oneloopperthread,所谓oneloopperthread指的是

  • ⼀个线程只能有⼀个事件循环(EventLoop),⽤于响应计时器和IO事件
  • ⼀个⽂件描述符只能由⼀个线程进⾏读写,换句话说就是⼀个TCP连接必须归属于某个EventLoop
    管理

在这里插入图片描述

需求分析

核⼼概念

实现内容

  • broke服务器:消息队列服务器;
  • 消息发布客户端:向服务器发布消息;
  • 消息订阅客户端:从服务器订阅消息;

在这里插入图片描述

在这里插入图片描述

消息队列系统整体框架

在这里插入图片描述

服务端模块

数据管理模块

在这里插入图片描述

虚拟机数据管理模块

在这里插入图片描述

交换路由模块

在这里插入图片描述

消费者管理模块

在这里插入图片描述

信道(通信通道)管理模块

在这里插入图片描述

连接管理模块

在这里插入图片描述

客户端模块

在这里插入图片描述
上述整体思维导图
在这里插入图片描述

整个项目分为三大模块:mqcommon公共模块,mqserver消息队列服务端模块、
mqclient客户端模块。

公共模块

我们将常用的一些工具封装起来,放在mqcommon目录下

日志类

编写一个日志工具模块mq_logger.hpp,进行日志打印

#ifndef __M_LOG_H__
#define __M_LOG_H__
#include <iostream>
#include <ctime>#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\if (level >= DEFAULT_LEVEL) {\time_t t = time(nullptr);\struct tm* ptm = localtime(&t);\char time_str[32];\strftime(time_str, 31, "%H:%M:%S", ptm);\printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\}\
}#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
#endif

其他工具类

mq_helper.hpp下主要实现了

  • SQLite基础操作
  • 字符串操作
  • 唯一UUID生成
  • 文件基础操作

SQLite基础操作

  • 创建/打开数据库文件
  • 执行 SQL 语句
  • 关闭数据库
class SqliteHelper
{
public:typedef int (*SqliteCallback)(void *, int, char **, char **);SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr) {}bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX){// int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);if (ret != SQLITE_OK){ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));return false;}return true;}bool exec(const std::string &sql, SqliteCallback cb, void *arg){// int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);if (ret != SQLITE_OK){ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));return false;}return true;}void close(){if (_handler)sqlite3_close_v2(_handler);}private:std::string _dbfile;sqlite3 *_handler;
};

字符串操作

路由匹配中。交换机需要根据消息的binding_key和队列的routing_key进行匹配,但这些都是由一些特殊符号分开的,所以我需要取出来

class StrHelper
{
public:static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result){size_t pos, idx = 0;while (idx < str.size()){pos = str.find(sep, idx);if (pos == std::string::npos){result.push_back(str.substr(idx));return result.size();}if (pos == idx){idx = pos + sep.size();continue;}result.push_back(str.substr(idx, pos - idx));idx = pos + sep.size();}return result.size();}
};

唯一UUID生成

对于每个消息,都需要生成唯一的ID进行标识
我们使用8个随机数字 + 8字节序号,共16字节数组 生成32位16进制字符

class UUIDHelper
{
public:static std::string uuid(){std::random_device rd;std::mt19937_64 gernator(rd());std::uniform_int_distribution<int> distribution(0, 255);std::stringstream ss;for (int i = 0; i < 8; i++){ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator);if (i == 3 || i == 5 || i == 7){ss << "-";}}static std::atomic<size_t> seq(1);size_t num = seq.fetch_add(1);for (int i = 7; i >= 0; i--){ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> (i * 8)) & 0xff);if (i == 6)ss << "-";}return ss.str();}
};

文件基础操作

包括文件的创建、删除、读、写;

class FileHelper
{
public:FileHelper(const std::string &filename) : _filename(filename) {}bool exists(){struct stat st;return (stat(_filename.c_str(), &st) == 0);}size_t size(){struct stat st;int ret = stat(_filename.c_str(), &st);if (ret < 0){return 0;}return st.st_size;}bool read(char *body, size_t offset, size_t len){// 1. 打开文件std::ifstream ifs(_filename, std::ios::binary | std::ios::in);if (ifs.is_open() == false){ELOG("%s 文件打开失败!", _filename.c_str());return false;}// 2. 跳转文件读写位置ifs.seekg(offset, std::ios::beg);// 3. 读取文件数据ifs.read(body, len);if (ifs.good() == false){ELOG("%s 文件读取数据失败!!", _filename.c_str());ifs.close();return false;}// 4. 关闭文件ifs.close();return true;}bool read(std::string &body){// 获取文件大小,根据文件大小调整body的空间size_t fsize = this->size();body.resize(fsize);return read(&body[0], 0, fsize);}bool write(const char *body, size_t offset, size_t len){// 1. 打开文件std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out);if (fs.is_open() == false){ELOG("%s 文件打开失败!", _filename.c_str());return false;}// 2. 跳转到文件指定位置fs.seekp(offset, std::ios::beg);// 3. 写入数据fs.write(body, len);if (fs.good() == false){ELOG("%s 文件写入数据失败!!", _filename.c_str());fs.close();return false;}// 4. 关闭文件fs.close();return true;}bool write(const std::string &body){return write(body.c_str(), 0, body.size());}bool rename(const std::string &nname){return (::rename(_filename.c_str(), nname.c_str()) == 0);}static std::string parentDirectory(const std::string &filename){// /aaa/bb/ccc/ddd/test.txtsize_t pos = filename.find_last_of("/");if (pos == std::string::npos){// test.txtreturn "./";}std::string path = filename.substr(0, pos);return path;}static bool createFile(const std::string &filename){std::fstream ofs(filename, std::ios::binary | std::ios::out);if (ofs.is_open() == false){ELOG("%s 文件打开失败!", filename.c_str());return false;}ofs.close();return true;}static bool removeFile(const std::string &filename){return (::remove(filename.c_str()) == 0);}static bool createDirectory(const std::string &path){//  aaa/bbb/ccc    cccc// 在多级路径创建中,我们需要从第一个父级目录开始创建size_t pos, idx = 0;while (idx < path.size()){pos = path.find("/", idx);if (pos == std::string::npos){return (mkdir(path.c_str(), 0775) == 0);}std::string subpath = path.substr(0, pos);int ret = mkdir(subpath.c_str(), 0775);if (ret != 0 && errno != EEXIST){ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));return false;}idx = pos + 1;}return true;}static bool removeDirectory(const std::string &path){// rm -rf path// system()std::string cmd = "rm -rf " + path;return (system(cmd.c_str()) != -1);}private:std::string _filename;
};

消息类型和交换机类型的定义

消息是需要进行持久化存储的,因此涉及到数据的序列化和反序列化,此处我们使用protobuf来生成;
消息本身要素

  • 消息属性:消息ID,消息投递方式,消息的routing_key
  • 消息有效载荷内容

消息额外内容

  • 消息的存储位置
  • 消息的长度
  • 消息是否有效

交换机类型

  • direct
  • fanout
  • topic

消息投递模式

  • undurable
  • durable

服务端

持久化数据管理中⼼模块

在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据

交换机数据管理

  1. 定义交换机类

交换机类需要管理的数据

    // 1. 交换机名称std::string name;// 2. 交换机类型ExchangeType type;// 3. 交换机持久化标志bool durable;// 4. 是否自动删除标志bool auto_delete;// 5. 其他参数std::unordered_map<std::string, std::string> args;
// 1. 定义交换机类struct Exchange{using ptr = std::shared_ptr<Exchange>;// 1. 交换机名称std::string name;// 2. 交换机类型ExchangeType type;// 3. 交换机持久化标志bool durable;// 4. 是否自动删除标志bool auto_delete;// 5. 其他参数std::unordered_map<std::string, std::string> args;Exchange() {}// 构造函数,初始化交换机的各个属性Exchange(const std::string &ename, ExchangeType etype,bool edurable, bool eauto_delete, std::unordered_map<std::string, std::string> &eargs): name(ename), type(etype), durable(edurable),auto_delete(eauto_delete), args(eargs) {}// 解析字符串形式的参数,并将其存储到args中void setArgs(const std::string &str_args){// key=val&key=val&std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);// 解析每个键值对for (const auto &sub_arg : sub_args){size_t pos = sub_arg.find('=');if (pos != std::string::npos){std::string key = sub_arg.substr(0, pos);std::string value = sub_arg.substr(pos + 1);args[key] = value;}}}// 将args中的内容序列化为字符串并返回std::string getArgs(){std::string result;for (const auto &pair : args){result += pair.first + "=" + pair.second + "&"; // 拼接键值对}return result;}};
  1. 定义交换机数据持久化管理类–数据存储在sqlite数据库中

通过在数据库中操作数据表的方式,来管理这些持久化的交换机

// 2. 定义交换机数据持久化管理类--数据存储在sqlite数据库中using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;class ExchangeMapper{public:// 构造函数,初始化数据库文件路径ExchangeMapper(const std::string &dbfile): _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}// 创建数据库表void createTable(){
#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (ret == false){DLOG("创建交换机数据库表失败!!");abort(); // 直接异常退出程序}}// 删除数据库表void removeTable(){
#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (ret == false){DLOG("删除交换机数据库表失败!!");abort(); // 直接异常退出程序}}// 插入一个交换机到数据库bool insert(Exchange::ptr &exp){std::stringstream ss;ss << "insert into exchange_table values(";ss << "'" << exp->name << "', ";ss << exp->type << ", ";ss << exp->durable << ", ";ss << exp->auto_delete << ", ";ss << "'" << exp->getArgs() << "');";return _sql_helper.exec(ss.str(), nullptr, nullptr);}// 从数据库中删除指定名称的交换机void remove(const std::string &name){std::stringstream ss;ss << "delete from exchange_table where name=";ss << "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}// using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;ExchangeMap recovery(){ExchangeMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";// 调用 _sql_helper 的 exec 方法执行 SQL 查询// selectCallback 是回调函数,用于处理查询结果// &result 是传递给回调函数的参数,用于存储查询结果_sql_helper.exec(sql, selectCallback, &result);return result;}private:// 静态回调函数,用于处理 SQL 查询的每一行结果// arg: 传递给回调函数的参数,这里是一个 ExchangeMap 指针,用于存储查询结果// numcol: 当前行的列数// row: 当前行的数据,是一个字符串数组,每一列对应一个字符串// fields: 列名数组(未使用)static int selectCallback(void *arg, int numcol, char **row, char **fields){// 将 arg 转换为 ExchangeMap 指针ExchangeMap *result = (ExchangeMap *)arg;// 创建一个新的 Exchange 对象,使用智能指针管理内存auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (zwbMQ::ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);// 如果第五列(args)不为空,则调用 setArgs 方法设置交换机的参数if (row[4])exp->setArgs(row[4]);// 将 Exchange 对象插入到 result 中,以交换机的名称作为键// (*result)[exp->name] = exp;result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};
  1. 定义交换机数据内存管理类

对外提供的交换机管理接口类,用户可以使用这里的接口管理交换机

// 3. 定义交换机数据内存管理类
class ExchangeManager
{
public:using ptr = std::shared_ptr<ExchangeManager>;// 构造函数,初始化数据库文件路径ExchangeManager(const std::string &dbfile): _mapper(dbfile){// 初始化时从数据库加载所有交换机数据到内存_exchanges = _mapper.recovery();}// 声明一个交换机bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) != _exchanges.end()){return true; // 如果已存在,返回 true}// 创建新的交换机对象auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);if (durable == true){bool ret = _mapper.insert(exp);if (ret == false)return false;}// 将交换机插入到内存中_exchanges.insert(std::make_pair(name, exp));return true;}// 删除指定名称的交换机void deleteExchange(const std::string &name){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) == _exchanges.end()){return;}// 从数据库中删除交换机if (_exchanges.find(name)->second->durable == true)_mapper.remove(name);// 从内存中删除交换机_exchanges.erase(name);}// 获取指定名称的交换机对象Exchange::ptr selectExchange(const std::string &name){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) == _exchanges.end()){return Exchange::ptr();}return _exchanges.find(name)->second;}// 判断指定名称的交换机是否存在bool exists(const std::string &name){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) == _exchanges.end()){return false;}return true;}// 获取当前交换机的数量size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}// 清理所有交换机数据void clear(){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全_mapper.removeTable();_exchanges.clear();}private:// 互斥锁,用于线程安全std::mutex _mutex;// 交换机数据持久化管理对象ExchangeMapper _mapper;// 内存中存储的交换机数据std::unordered_map<std::string, Exchange::ptr> _exchanges;
};

队列数据管理模块

  1. 定义队列结构体

队列要管理的主要数据

    std::string name;                                  // 队列名称bool durable;                                      // 是否持久化bool exclusive;                                    // 是否独占bool auto_delete;                                  // 是否自动删除std::unordered_map<std::string, std::string> args; // 其他参数
 // 定义队列结构体
struct MsgQueue
{using ptr = std::shared_ptr<MsgQueue>;  // 定义智能指针别名,方便使用 shared_ptr 管理 MsgQueue 对象std::string name;                                  // 队列名称bool durable;                                      // 是否持久化bool exclusive;                                    // 是否独占bool auto_delete;                                  // 是否自动删除std::unordered_map<std::string, std::string> args; // 其他参数MsgQueue() {}MsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, std::unordered_map<std::string, std::string> qargs): name(qname), durable(qdurable), exclusive(qexclusive), auto_delete(qauto_delete), args(qargs){}// 解析字符串形式的参数,并将其存储到args中void setArgs(const std::string &str_args){// key=val&key=val&std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);// 解析每个键值对for (const auto &sub_arg : sub_args){size_t pos = sub_arg.find('=');if (pos != std::string::npos){std::string key = sub_arg.substr(0, pos);std::string value = sub_arg.substr(pos + 1);args[key] = value;}}}// 将args中的内容序列化为字符串并返回std::string getArgs(){std::string result;for (const auto &pair : args){result += pair.first + "=" + pair.second + "&"; // 拼接键值对}return result;}
};
  1. 定义队列数据持久化管理类–数据存储在sqlite数据库中

通过在数据库中操作数据表的方式,来管理这些持久化的队列

// 2. 定义队列数据持久化管理类--数据存储在sqlite数据库中
class MsqQueueMapper
{
public:// 构造函数,接受一个数据库文件路径作为参数MsqQueueMapper(const std::string &dbfile): _sql_helper(dbfile) // 初始化 SQLite 帮助类{// 获取数据库文件的父目录路径std::string path = FileHelper::parentDirectory(dbfile);// 创建目录(如果不存在)FileHelper::createDirectory(path);// 打开数据库连接,并断言连接成功assert(_sql_helper.open());// 创建消息队列表createTable();}// 定义智能指针别名,方便使用 shared_ptr 管理 MsgQueue 对象using ptr = std::shared_ptr<MsgQueue>;// 队列名称std::string name;// 是否持久化bool durable;// 是否独占bool exclusive;// 是否自动删除bool auto_delete;// 其他参数std::unordered_map<std::string, std::string> args;// 创建消息队列表void createTable(){std::stringstream sql;sql << "create table if not exists queue_table(";sql << "name varchar(32) primary key,"; sql << "durable int,";                  sql << "exclusive int,";              sql << "auto_delete int,";           sql << "args varchar(128));";       assert(_sql_helper.exec(sql.str(), nullptr, nullptr));}// 删除消息队列表void removeTable(){std::string sql = "drop table if exists queue_table";assert(_sql_helper.exec(sql, nullptr, nullptr));}// 插入一条消息队列记录bool insert(MsgQueue::ptr &queue){std::stringstream sql;sql << "insert into queue_table values(";sql << "'" << queue->name << "',";       sql << queue->durable << ",";         sql << queue->exclusive << ",";        sql << queue->auto_delete << ",";       sql << "'" << queue->getArgs() << "');";return _sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定名称的消息队列记录bool remove(const std::string &name){std::stringstream sql;sql << "delete from queue_table where name=";sql << "'" << name << "';";return _sql_helper.exec(sql.str(), nullptr, nullptr);}// 定义队列映射类型,键为队列名称,值为 MsgQueue 智能指针using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;// 从数据库中恢复所有消息队列记录QueueMap recovery(){QueueMap result;std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:// 静态回调函数,用于处理 SQL 查询的每一行结果// arg: 传递给回调函数的参数,这里是一个 QueueMap 指针,用于存储查询结果// numcol: 当前行的列数// row: 当前行的数据,是一个字符串数组,每一列对应一个字符串// fields: 列名数组(未使用)static int selectCallback(void *arg, int numcol, char **row, char **fields){// 将 arg 转换为 QueueMap 指针QueueMap *result = (QueueMap *)arg;// 创建一个新的 MsgQueue 对象,使用智能指针管理内存MsgQueue::ptr mqp = std::make_shared<MsgQueue>();   mqp->name = row[0];                         // 队列名称mqp->durable = (bool)std::stoi(row[1]);     // 是否持久化mqp->exclusive = (bool)std::stoi(row[2]);   // 是否独占mqp->auto_delete = (bool)std::stoi(row[3]); // 是否自动删除if (row[4])mqp->setArgs(row[4]); // 其他参数// 将队列对象插入到结果映射中result->insert(std::make_pair(mqp->name, mqp));return 0;}SqliteHelper _sql_helper;
};
  1. 定义队列数据内存管理类

对外提供的队列管理接口类,用户可以使用这里的接口管理队列

// 3. 定义交换机数据内存管理类
// 3. 定义队列数据内存管理类class MsgQueueManager
{
public:using ptr = std::shared_ptr<MsgQueueManager>;// 构造函数,初始化 MsgQueueManager 对象// 参数 dbfile 是用于持久化消息队列的数据库文件路径MsgQueueManager(const std::string &dbfile): _mapper(dbfile) // 初始化 _mapper 对象,用于数据库操作{// 从数据库中恢复消息队列数据_msg_queues = _mapper.recovery();}// 声明一个新的消息队列// 返回 true 表示队列声明成功,false 表示失败bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete, const google::protobuf::Map<std::string, std::string> qargs){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(qname);         // 查找队列是否已存在if (it != _msg_queues.end()){return true; // 如果队列已存在,直接返回 true}MsgQueue::ptr mqp = std::make_shared<MsgQueue>(); // 创建一个新的消息队列对象mqp->name = qname;                                // 设置队列名称mqp->durable = qdurable;                          // 设置队列是否持久化mqp->exclusive = qexclusive;                      // 设置队列是否独占mqp->auto_delete = qauto_delete;                  // 设置队列是否自动删除mqp->args = qargs;                                // 设置队列的其他参数if (qdurable == true){bool ret = _mapper.insert(mqp); // 如果队列是持久化的,将其插入数据库if (ret == false)return false; // 插入失败,返回 false}_msg_queues.insert(std::make_pair(qname, mqp)); // 将队列插入到内存中的队列映射中return true;                                    // 返回 true 表示队列声明成功}// 删除指定名称的消息队列// 参数 name 是要删除的队列名称void deleteQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(name);          // 查找队列是否存在if (it == _msg_queues.end()){return; // 如果队列不存在,直接返回}if (it->second->durable == true)_mapper.remove(name); // 如果队列是持久化的,从数据库中删除_msg_queues.erase(name);  // 从内存中的队列映射中删除队列}// 根据名称查找并返回消息队列// 参数 name 是要查找的队列名称// 返回指向消息队列的智能指针,如果队列不存在,返回空的智能指针MsgQueue::ptr selectQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(name);          // 查找队列是否存在if (it == _msg_queues.end()){return MsgQueue::ptr(); // 如果队列不存在,返回空的智能指针}return it->second; // 返回找到的队列}// 返回所有消息队列的映射// 返回一个包含所有消息队列的映射QueueMap allQueue(){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全return _msg_queues;                        // 返回内存中的队列映射}// 检查指定名称的消息队列是否存在// 参数 name 是要检查的队列名称// 返回 true 表示队列存在,false 表示队列不存在bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(name);          // 查找队列是否存在if (it == _msg_queues.end()){return false; // 如果队列不存在,返回 false}return true; // 返回 true 表示队列存在}// 返回当前消息队列的数量// 返回消息队列的数量size_t size(){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全return _msg_queues.size();                 // 返回队列映射的大小}// 清空所有消息队列// 删除数据库中的表,并清空内存中的队列映射void clear(){_mapper.removeTable(); // 删除数据库中的表_msg_queues.clear();   // 清空内存中的队列映射}private:std::mutex _mutex;      // 互斥锁,用于确保线程安全MsgQueueMapper _mapper; // 用于数据库操作的映射器对象QueueMap _msg_queues;   // 内存中的消息队列映射
};

绑定数据管理模块

实现了一个消息队列系统中的绑定管理功能

将消息队列(Queue)与交换机(Exchange)通过绑定键(Binding Key)关联起来

  1. Binding 结构体,表示绑定信息
// 定义Binding结构体,表示绑定信息
struct Binding
{using ptr = std::shared_ptr<Binding>; // 使用智能指针管理Binding对象std::string exchange_name;            // 交换机名称std::string msgqueue_name;            // 消息队列名称std::string binding_key;              // 绑定键Binding() {}Binding(const std::string &ename, const std::string &qname, const std::string &key): exchange_name(ename), msgqueue_name(qname), binding_key(key) {}
};
  1. BindingMapper 类,负责管理与数据库的交互,包括创建表、插入、删除和恢复绑定信息

数据库建表进行管理

// 定义队列名与绑定信息的映射关系
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
每个队列自己的绑定信息。绑定在哪个交换机、binding_key是什么?

// 定义交换机名称与队列绑定信息的映射关系
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
MsgQueuesBindingMap 和交换机名构建集合,每个交换机都对应着一个队列集合

// 定义队列名与绑定信息的映射关系
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
// 定义交换机名称与队列绑定信息的映射关系
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
// BindingMapper类负责管理与数据库的交互,包括创建表、插入、删除和恢复绑定信息
class BindingMapper
{
public:BindingMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);_sql_helper.open();createTable();}// 创建绑定信息表void createTable(){std::stringstream sql;sql << "create table if not exists binding_table(";sql << "exchange_name varchar(32), ";sql << "msgqueue_name varchar(32), ";sql << "binding_key varchar(128));";assert(_sql_helper.exec(sql.str(), nullptr, nullptr));}// 删除绑定信息表void removeTable(){std::string sql = "drop table if exists binding_table;";_sql_helper.exec(sql, nullptr, nullptr);}// 插入绑定信息bool insert(Binding::ptr &binding){std::stringstream sql;sql << "insert into binding_table values(";sql << "'" << binding->exchange_name << "', ";sql << "'" << binding->msgqueue_name << "', ";sql << "'" << binding->binding_key << "');";return _sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定交换机和队列的绑定信息void remove(const std::string &ename, const std::string &qname){std::stringstream sql;sql << "delete from binding_table where ";sql << "exchange_name='" << ename << "' and ";sql << "msgqueue_name='" << qname << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定交换机的所有绑定信息void removeExchangeBindings(const std::string &ename){std::stringstream sql;sql << "delete from binding_table where ";sql << "exchange_name='" << ename << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定队列的所有绑定信息void removeMsgQueueBindings(const std::string &qname){std::stringstream sql;sql << "delete from binding_table where ";sql << "msgqueue_name='" << qname << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);}// 从数据库中恢复绑定信息BindingMap recovery(){BindingMap result;std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:// SQL查询回调函数,用于处理查询结果static int selectCallback(void *arg, int numcol, char **row, char **fields){BindingMap *result = (BindingMap *)arg;Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);MsgQueueBindingMap &qmap = (*result)[bp->exchange_name];qmap.insert(std::make_pair(bp->msgqueue_name, bp));return 0;}private:SqliteHelper _sql_helper;
};
  1. BindingManager 类,负责管理绑定信息,包括绑定、解绑、删除和查询等操作
// BindingManager类负责管理绑定信息,包括绑定、解绑、删除和查询等操作
class BindingManager
{
public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &dbfile) : _mapper(dbfile){_bindings = _mapper.recovery();}// 绑定交换机和队列bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto it = _bindings.find(ename);if (it != _bindings.end() && it->second.find(qname) != it->second.end()){return true;}Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);if (durable){bool ret = _mapper.insert(bp);if (ret == false)return false;}auto &qbmap = _bindings[ename];qbmap.insert(std::make_pair(qname, bp));return true;}// 解绑交换机和队列void unBind(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return;}auto qit = eit->second.find(qname);if (qit == eit->second.end()){return;}_mapper.remove(ename, qname);_bindings[ename].erase(qname);}// 删除指定交换机的所有绑定信息void removeExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}// 删除指定队列的所有绑定信息void removeMsgQueueBindings(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgQueueBindings(qname);for (auto start = _bindings.begin(); start != _bindings.end(); ++start){start->second.erase(qname);}}// 获取指定交换机的所有绑定信息MsgQueueBindingMap getExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return MsgQueueBindingMap();}return eit->second;}// 获取指定交换机和队列的绑定信息Binding::ptr getBinding(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return Binding::ptr();}auto qit = eit->second.find(qname);if (qit == eit->second.end()){return Binding::ptr();}return qit->second;}// 检查指定交换机和队列的绑定信息是否存在bool exists(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return false;}auto qit = eit->second.find(qname);if (qit == eit->second.end()){return false;}return true;}// 获取所有绑定信息的总数size_t size(){size_t total_size = 0;std::unique_lock<std::mutex> lock(_mutex);for (auto start = _bindings.begin(); start != _bindings.end(); ++start){total_size += start->second.size();}return total_size;}// 清空所有绑定信息void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_bindings.clear();}private:std::mutex _mutex;     // 互斥锁,用于线程安全BindingMapper _mapper; // BindingMapper对象,用于数据库操作BindingMap _bindings;  // 存储所有绑定信息的映射
};

消息数据管理模块

实现了消息队列中的消息管理

消息属性

ID:消息的唯一标识
持久化标志:是否对消息进行持久化
routing_key:消息发布到交换机后,与其绑定的各个队列中的 binding_key 进行匹配,决定发布到哪个队列

消息定义在了proto文件

syntax = "proto3";  
package zwbMQ;    // 交换机类型
enum ExchangeType 
{UNKNOWTYPE = 0;  // 未知类型DIRECT = 1;      // 直接交换FANOUT = 2;      // 广播交换TOPIC = 3;       // 主题交换
};// 消息投递模式
enum DeliveryMode
{UNKNOWMODE = 0;  // 未知模式UNDURABLE = 1;   // 非持久化DURABLE = 2;     // 持久化
};// 消息的基本属性
message BasicProperties 
{string id = 1;               // 消息的唯一标识DeliveryMode delivery_mode = 2;  // 消息的投递模式string routing_key = 3;      // 路由键
};// 消息类型
message Message 
{// 消息的有效负载message Payload {BasicProperties properties = 1;  // 消息的基本属性string body = 2;                 // 消息体string valid = 3;                // 消息的有效性标识};Payload payload = 1;  // 消息的有效负载uint32 offset = 2;    // 消息的偏移量uint32 length = 3;    // 消息的长度
};

上述多出来的属性

  • 消息的有效负载:标志该消息是否属于有效消息,规定当持久化有效消息比例低于总体消息的 50% 时,会执行垃圾回收机制
  • 消息的偏移量:当前消息对于文件起始位置的偏移量
  • 消息长度:从偏移量位置取出该长度消息,解决黏包问题
  1. MessageMapper 类, 负责消息的持久化存储和恢复,包括消息的插入、删除、垃圾回收等操作

持久化存储格式
4字节长度|数据|4字节长度|数据

bool insert(const std::string &filename, MessagePtr &msg) 
{//新增数据都是添加在文件末尾的//1. 进行消息的序列化,获取到格式化后的消息std::string body = msg->payload().SerializeAsString();//2. 获取文件长度FileHelper helper(filename);size_t fsize = helper.size();size_t msg_size = body.size();//写入逻辑:1. 先写入4字节数据长度, 2, 再写入指定长度数据bool ret = helper.write((char*)&msg_size, fsize, sizeof(size_t));if (ret == false) {DLOG("向队列数据文件写入数据长度失败!");return false;}//3. 将数据写入文件的指定位置ret = helper.write(body.c_str(), fsize + sizeof(size_t), body.size());if (ret == false) {DLOG("向队列数据文件写入数据失败!");return false;}//4. 更新msg中的实际存储信息msg->set_offset(fsize + sizeof(size_t));msg->set_length(body.size());return true;
}

垃圾回收机制

当持久化消息的总量超过 2000 条且有效消息比例低于 50% 时,触发垃圾回收;
垃圾回收会将有效消息重新写入临时文件,删除原文件,并将临时文件重命名为原文件;

std::list<MessagePtr> gc() 
{bool ret;std::list<MessagePtr> result;ret = load(result);if (ret == false) {DLOG("加载有效数据失败!\n");return result;}//2. 将有效数据,进行序列化存储到临时文件中FileHelper::createFile(_tmpfile);for (auto &msg : result) {DLOG("向临时文件写入数据: %s", msg->payload().body().c_str());ret = insert(_tmpfile, msg);if (ret == false) {DLOG("向临时文件写入消息数据失败!!");return result;}}//3. 删除源文件ret = FileHelper::removeFile(_datafile);if (ret == false) {DLOG("删除源文件失败!");return result;}//4. 修改临时文件名,为源文件名称ret = FileHelper(_tmpfile).rename(_datafile);if (ret == false) {DLOG("修改临时文件名称失败!");return result;}//5. 返回新的有效数据return result;
}

createMsgFile():创建消息存储文件

bool createMsgFile() 
{if (FileHelper(_datafile).exists() == true) {return true;}bool ret = FileHelper::createFile(_datafile);if (ret == false) {DLOG("创建队列数据文件 %s 失败!", _datafile.c_str());return false;}return true;
}
  1. QueueMessage 类,管理单个消息队列的消息,包括消息的插入、删除、恢复、垃圾回收等操作
class QueueMessage{public:using ptr = std::shared_ptr<QueueMessage>;QueueMessage(std::string &basedir, const std::string &qname):_mapper(basedir, qname), _qname(qname), _valid_count(0), _total_count(0) {}bool recovery() {//恢复历史消息std::unique_lock<std::mutex> lock(_mutex);_msgs = _mapper.gc();for (auto &msg : _msgs) {_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}_valid_count = _total_count = _msgs.size();return true;}bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable) {//1. 构造消息对象MessagePtr msg = std::make_shared<Message>();msg->mutable_payload()->set_body(body);if (bp != nullptr) {DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());}else {DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key("");}std::unique_lock<std::mutex> lock(_mutex);//2. 判断消息是否需要持久化if (msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {msg->mutable_payload()->set_valid("1");//在持久化存储中表示数据有效//3. 进行持久化存储bool ret = _mapper.insert(msg);if (ret == false) {DLOG("持久化存储消息:%s 失败了!", body.c_str());return false;}_valid_count += 1; //持久化信息中的数据量+1_total_count += 1;_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}//4. 内存的管理_msgs.push_back(msg);return true;}MessagePtr front(){std::unique_lock<std::mutex> lock(_mutex);if (_msgs.size() == 0) {return MessagePtr();}//获取一条队首消息:从_msgs中取出数据MessagePtr msg = _msgs.front();_msgs.pop_front();//将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除_waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));return msg;}//每次删除消息后,判断是否需要垃圾回收bool remove(const std::string &msg_id) {std::unique_lock<std::mutex> lock(_mutex);//1. 从待确认队列中查找消息auto it = _waitack_msgs.find(msg_id);if (it == _waitack_msgs.end()) {DLOG("没有找到要删除的消息:%s!", msg_id.c_str());return true;}//2. 根据消息的持久化模式,决定是否删除持久化信息if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {//3. 删除持久化信息_mapper.remove(it->second);_durable_msgs.erase(msg_id);_valid_count -= 1;//持久化文件中有效消息数量 -1gc();  //内部判断是否需要垃圾回收,需要的话则回收一下}//4. 删除内存中的信息_waitack_msgs.erase(msg_id);//DLOG("确认消息后,删除消息的管理成功:%s", it->second->payload().body().c_str());return true;}size_t getable_count() {std::unique_lock<std::mutex> lock(_mutex);return _msgs.size();}size_t total_count() {std::unique_lock<std::mutex> lock(_mutex);return _total_count;}size_t durable_count() {std::unique_lock<std::mutex> lock(_mutex);return _durable_msgs.size();}size_t waitack_count() {std::unique_lock<std::mutex> lock(_mutex);return _waitack_msgs.size();}void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgFile();_msgs.clear();_durable_msgs.clear();_waitack_msgs.clear();_valid_count = 0;_total_count = 0;}private:bool GCCheck() {//持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化if (_total_count > 2000 && _valid_count * 10  / _total_count < 5) {return true;}return false;}void gc() {//1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表if (GCCheck() == false) return;std::list<MessagePtr> msgs = _mapper.gc();for (auto &msg : msgs) {auto it = _durable_msgs.find(msg->payload().properties().id());if (it == _durable_msgs.end()) {DLOG("垃圾回收后,有一条持久化消息,在内存中没有进行管理!");_msgs.push_back(msg); ///做法:重新添加到推送链表的末尾_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));continue;}//2. 更新每一条消息的实际存储位置it->second->set_offset(msg->offset());it->second->set_length(msg->length());}//3. 更新当前的有效消息数量 & 总的持久化消息数量_valid_count = _total_count = msgs.size();}private:std::mutex _mutex;std::string _qname;size_t _valid_count;size_t _total_count;MessageMapper _mapper;std::list<MessagePtr> _msgs;//待推送消息std::unordered_map<std::string, MessagePtr> _durable_msgs;//持久化消息hashstd::unordered_map<std::string, MessagePtr> _waitack_msgs;//待确认消息hash
};
  1. MessageManager 类,对外提供功能接口
class MessageManager {public:using ptr = std::shared_ptr<MessageManager>;MessageManager(const std::string &basedir): _basedir(basedir){}void clear() {std::unique_lock<std::mutex> lock(_mutex);for (auto &qmsg : _queue_msgs) {qmsg.second->clear();}}void initQueueMessage(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it != _queue_msgs.end()) {return ;}qmp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs.insert(std::make_pair(qname, qmp));}qmp->recovery();}void destroyQueueMessage(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {return ;}qmp = it->second;_queue_msgs.erase(it);}qmp->clear();}bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool queue_is_durable) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("向队列%s新增消息失败:没有找到消息管理句柄!", qname.c_str());return false;}qmp = it->second;}return qmp->insert(bp, body, queue_is_durable);}MessagePtr front(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s队首消息失败:没有找到消息管理句柄!", qname.c_str());return MessagePtr();}qmp = it->second;}return qmp->front();}void ack(const std::string &qname, const std::string &msg_id) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("确认队列%s消息%s失败:没有找到消息管理句柄!", qname.c_str(), msg_id.c_str());return ;}qmp = it->second;}qmp->remove(msg_id);return ;}size_t getable_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s待推送消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getable_count();}size_t total_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s总持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->total_count();}size_t durable_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s有效持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->durable_count();}size_t waitack_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s待确认消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->waitack_count();}private:std::mutex _mutex;std::string _basedir;std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
};

虚拟机管理模块

对数据管理模块的整合模块交换机、队列、绑定

VirtualHost 类通过组合多个管理器(ExchangeManager、MsgQueueManager、BindingManager、MessageManager)来实现功能

虚拟机管理信息

  • 交换机数据管理模块句柄
  • 队列数据管理模块句柄
  • 绑定数据管理模块句柄
  • 消息数据管理模块句

虚拟机对外操作

  • 提供虚拟机内交换机声明,交换机删除操作
  • 提供虚拟机内队列声明,队列删除操作
  • 提供虚拟机内交换机-队列绑定,解除绑定操作
  • 获取交换机相关绑定信息

虚拟机管理操作

  • 创建虚拟机
  • 查询虚拟机
    -删除虚拟机
namespace zwbMQ 
{class VirtualHost {public:using ptr = std::shared_ptr<VirtualHost>; // 使用智能指针管理 VirtualHost 对象// 构造函数VirtualHost(const std::string &hname, const std::string &basedir, const std::string &dbfile):_host_name(hname), // 初始化虚拟主机名称_emp(std::make_shared<ExchangeManager>(dbfile)), // 初始化交换机管理器_mqmp(std::make_shared<MsgQueueManager>(dbfile)), // 初始化队列管理器_bmp(std::make_shared<BindingManager>(dbfile)), // 初始化绑定管理器_mmp(std::make_shared<MessageManager>(basedir)) { // 初始化消息管理器// 获取所有队列信息,并通过队列名称恢复历史消息数据QueueMap qm = _mqmp->allQueue();for (auto &q : qm) {_mmp->initQueueMessage(q.first); // 初始化每个队列的消息管理}}// 声明交换机bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args) {return _emp->declareExchange(name, type, durable, auto_delete, args);}// 删除交换机void deleteExchange(const std::string &name) {// 删除交换机时,需要删除与之相关的绑定信息_bmp->removeExchangeBindings(name);return _emp->deleteExchange(name);}// 检查交换机是否存在bool existsExchange(const std::string &name) {return _emp->exists(name);}// 获取指定交换机Exchange::ptr selectExchange(const std::string &ename) {return _emp->selectExchange(ename);}// 声明队列bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs) {// 初始化队列的消息管理_mmp->initQueueMessage(qname);// 创建队列return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}// 删除队列void deleteQueue(const std::string &name) {// 删除队列时,需要删除队列的消息和绑定信息_mmp->destroyQueueMessage(name); // 删除队列的消息_bmp->removeMsgQueueBindings(name); // 删除队列的绑定信息return _mqmp->deleteQueue(name); // 删除队列}// 检查队列是否存在bool existsQueue(const std::string &name) {return _mqmp->exists(name);}// 获取所有队列QueueMap allQueue() {return _mqmp->allQueue();}// 绑定交换机和队列bool bind(const std::string &ename, const std::string &qname, const std::string &key) {// 检查交换机是否存在Exchange::ptr ep = _emp->selectExchange(ename);if (ep.get() == nullptr) {DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());return false;}// 检查队列是否存在MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());return false;}// 绑定交换机和队列return _bmp->bind(ename, qname, key, ep->durable && mqp->durable);}// 解绑交换机和队列void unBind(const std::string &ename, const std::string &qname) {return _bmp->unBind(ename, qname);}// 获取指定交换机的所有绑定信息MsgQueueBindingMap exchangeBindings(const std::string &ename) {return _bmp->getExchangeBindings(ename);}// 检查绑定是否存在bool existsBinding(const std::string &ename, const std::string &qname) {return _bmp->exists(ename, qname);}// 发布消息到指定队列bool basicPublish(const std::string &qname, BasicProperties *bp, const std::string &body) {// 检查队列是否存在MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("发布消息失败,队列%s不存在!", qname.c_str());return false;}// 插入消息return _mmp->insert(qname, bp, body, mqp->durable);}// 从指定队列消费消息MessagePtr basicConsume(const std::string &qname) {return _mmp->front(qname);}// 确认消息void basicAck(const std::string &qname, const std::string &msgid) {return _mmp->ack(qname, msgid);}// 清空虚拟主机中的所有数据void clear() {_emp->clear(); // 清空交换机_mqmp->clear(); // 清空队列_bmp->clear(); // 清空绑定_mmp->clear(); // 清空消息}private:std::string _host_name; // 虚拟主机名称ExchangeManager::ptr _emp; // 交换机管理器MsgQueueManager::ptr _mqmp; // 队列管理器BindingManager::ptr _bmp; // 绑定管理器MessageManager::ptr _mmp; // 消息管理器};
}#endif

交换路由模块

先把消息发给某个虚拟机,再告知要发给虚拟机上的哪个交换机,由交换机决定把这些消息放到哪些队列中

当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?交换路由
模块就是决定这件事情的。

绑定信息中有一个binding_key, 消息中有一个routing_key

  • ⼴播:将消息⼊队到该交换机的所有绑定队列中
  • 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
  • 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
namespace zwbMQ
{using namespace zwbMQ;class Router{public:static bool isLegalRoutingKey(const std::string &routing_key){// routing_key:只需要判断是否包含有非法字符即可, 合法字符( a~z, A~Z, 0~9, ., _)for (auto &ch : routing_key){if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.')){continue;}return false;}return true;}static bool isLegalBindingKey(const std::string &binding_key){// 1. 判断是否包含有非法字符, 合法字符:a~z, A~Z, 0~9, ., _, *, #for (auto &ch : binding_key){if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.') ||(ch == '*' || ch == '#')){continue;}return false;}// 2. *和#必须独立存在:  news.music#.*.#std::vector<std::string> sub_words;StrHelper::split(binding_key, ".", sub_words);for (std::string &word : sub_words){if (word.size() > 1 &&(word.find("*") != std::string::npos ||word.find("#") != std::string::npos)){return false;}}// 3. *和#不能连续出现for (int i = 1; i < sub_words.size(); i++){if (sub_words[i] == "#" && sub_words[i - 1] == "*"){return false;}if (sub_words[i] == "#" && sub_words[i - 1] == "#"){return false;}if (sub_words[i] == "*" && sub_words[i - 1] == "#"){return false;}}return true;}static bool route(ExchangeType type, const std::string &routing_key, const std::string &binding_key){if (type == ExchangeType::DIRECT){return (routing_key == binding_key);}else if (type == ExchangeType::FANOUT){return true;}// 主题交换:要进行模式匹配    news.#   &   news.music.pop// 1. 将binding_key与routing_key进行字符串分割,得到各个的单词数组std::vector<std::string> bkeys, rkeys;int n_bkey = StrHelper::split(binding_key, ".", bkeys);int n_rkey = StrHelper::split(routing_key, ".", rkeys);// 2. 定义标记数组,并初始化[0][0]位置为true,其他位置为falsestd::vector<std::vector<bool>> dp(n_bkey + 1, std::vector<bool>(n_rkey + 1, false));dp[0][0] = true;// 3. 如果binding_key以#起始,则将#对应行的第0列置为1.for (int i = 1; i <= bkeys.size(); i++){if (bkeys[i - 1] == "#"){dp[i][0] = true;continue;}break;}// 4. 使用routing_key中的每个单词与binding_key中的每个单词进行匹配并标记数组for (int i = 1; i <= n_bkey; i++){for (int j = 1; j <= n_rkey; j++){// 如果当前bkey是个*,或者两个单词相同,表示单词匹配成功,则从左上方继承结果if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*"){dp[i][j] = dp[i - 1][j - 1];}else if (bkeys[i - 1] == "#"){// 如果当前bkey是个#,则需要从左上,左边,上边继承结果dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}}}return dp[n_bkey][n_rkey];}};
}

信道管理模块

Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection

#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__#include "muduo/net/TcpConnection.h"          // muduo库的TCP连接类
#include "muduo/proto/codec.h"                // Protobuf编解码器
#include "muduo/proto/dispatcher.h"           // 消息分发器
#include "../mqcommon/mq_logger.hpp"          // 日志模块
#include "../mqcommon/mq_helper.hpp"          // 工具函数
#include "../mqcommon/mq_msg.pb.h"            // 消息相关的Protobuf定义
#include "../mqcommon/mq_proto.pb.h"          // 协议相关的Protobuf定义
#include "../mqcommon/mq_threadpool.hpp"      // 线程池
#include "mq_consumer.hpp"                    // 消费者管理
#include "mq_host.hpp"                        // 虚拟主机管理
#include "mq_route.hpp"                       // 路由模块namespace zwbMQ 
{// 定义一系列智能指针类型别名,简化代码using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;// Channel类:管理与客户端的通信通道,处理消息的发布、订阅、确认等操作class Channel {public:using ptr = std::shared_ptr<Channel>; // 定义Channel的智能指针类型别名// 构造函数:初始化Channel的各个成员变量Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool): _cid(id), _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool) {DLOG("new Channel: %p", this); // 日志记录Channel的创建}// 析构函数:销毁Channel时移除相关的消费者~Channel() {if (_consumer.get() != nullptr) {_cmp->remove(_consumer->tag, _consumer->qname); // 移除消费者}DLOG("del Channel: %p", this); // 日志记录Channel的销毁}// 声明交换机void declareExchange(const declareExchangeRequestPtr &req) {bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());return basicResponse(ret, req->rid(), req->cid()); // 发送响应}// 删除交换机void deleteExchange(const deleteExchangeRequestPtr &req) {_host->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 声明队列void declareQueue(const declareQueueRequestPtr &req) {bool ret = _host->declareQueue(req->queue_name(),req->durable(), req->exclusive(),req->auto_delete(), req->args());if (ret == false) {return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}_cmp->initQueueConsumer(req->queue_name()); // 初始化队列的消费者管理句柄return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 删除队列void deleteQueue(const deleteQueueRequestPtr &req) {_cmp->destroyQueueConsumer(req->queue_name()); // 销毁队列的消费者管理句柄_host->deleteQueue(req->queue_name()); // 删除队列return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 绑定队列到交换机void queueBind(const queueBindRequestPtr &req) {bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());return basicResponse(ret, req->rid(), req->cid()); // 发送响应}// 解绑队列与交换机void queueUnBind(const queueUnBindRequestPtr &req) {_host->unBind(req->exchange_name(), req->queue_name());return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 发布消息void basicPublish(const basicPublishRequestPtr &req) {// 1. 判断交换机是否存在auto ep = _host->selectExchange(req->exchange_name());if (ep.get() == nullptr) {return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}// 2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());BasicProperties *properties = nullptr;std::string routing_key;if (req->has_properties()) {properties = req->mutable_properties();routing_key = properties->routing_key();}for (auto &binding : mqbm) {if (Router::route(ep->type, routing_key, binding.second->binding_key)) {// 3. 将消息添加到队列中_host->basicPublish(binding.first, properties, req->body());// 4. 向线程池中添加一个消息消费任务auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 确认消息void basicAck(const basicAckRequestPtr &req) {_host->basicAck(req->queue_name(), req->message_id());return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 订阅队列消息void basicConsume(const basicConsumeRequestPtr &req) {// 1. 判断队列是否存在bool ret = _host->existsQueue(req->queue_name());if (ret == false) {return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}// 2. 创建队列的消费者auto cb = std::bind(&Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);_consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 取消订阅void basicCancel(const basicCancelRequestPtr &req) {_cmp->remove(req->consumer_tag(), req->queue_name()); // 移除消费者return basicResponse(true, req->rid(), req->cid()); // 发送响应}private:// 回调函数:处理消费者接收到的消息,并将消息推送给客户端void callback(const std::string tag, const BasicProperties *bp, const std::string &body) {basicConsumeResponse resp;resp.set_cid(_cid);resp.set_body(body);resp.set_consumer_tag(tag);if (bp) {resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp); // 发送消息给客户端}// 消费消息:从队列中取出消息并推送给订阅者void consume(const std::string &qname) {// 1. 从队列中取出一条消息MessagePtr mp = _host->basicConsume(qname);if (mp.get() == nullptr) {DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}// 3. 调用订阅者对应的消息处理函数,实现消息的推送cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 4. 判断如果订阅者是自动确认,直接删除消息if (cp->auto_ack) _host->basicAck(qname, mp->payload().properties().id());}// 发送基本响应void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp); // 发送响应给客户端}private:std::string _cid;                      // Channel的唯一标识Consumer::ptr _consumer;               // 当前Channel的消费者muduo::net::TcpConnectionPtr _conn;    // TCP连接ProtobufCodecPtr _codec;               // Protobuf编解码器ConsumerManager::ptr _cmp;             // 消费者管理器VirtualHost::ptr _host;                // 虚拟主机threadpool::ptr _pool;                 // 线程池};// ChannelManager类:管理多个Channel实例class ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>; // 定义ChannelManager的智能指针类型别名// 打开Channelbool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it != _channels.end()) {DLOG("信道:%s 已经存在!", id.c_str()); // 日志记录return false;}auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel)); // 添加Channelreturn true;}// 关闭Channelvoid closeChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id); // 移除Channel}// 获取ChannelChannel::ptr getChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it == _channels.end()) {return Channel::ptr(); // 返回空指针}return it->second; // 返回Channel}private:std::mutex _mutex; // 互斥锁,保证线程安全std::unordered_map<std::string, Channel::ptr> _channels; // 存储Channel的映射表};
}#endif // __M_CHANNEL_H__

连接管理模块

对muduo库中的Connection进⾏⼆次封装管理,并额外提供项⽬所需操作

新增连接,删除连接,获取连接,打开信道,关闭信道

#include "mq_channel.hpp" // 包含Channel类的头文件namespace zwbMQ {// Connection类:管理与客户端的连接,负责创建和管理Channelclass Connection {public:using ptr = std::shared_ptr<Connection>; // 定义Connection的智能指针类型别名// 构造函数:初始化Connection的各个成员变量Connection(const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool): _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool),_channels(std::make_shared<ChannelManager>()) {} // 初始化ChannelManager// 打开Channelvoid openChannel(const openChannelRequestPtr &req) {// 1. 判断信道ID是否重复,创建信道bool ret = _channels->openChannel(req->cid(), _host, _cmp, _codec, _conn, _pool);if (ret == false) {DLOG("创建信道的时候,信道ID重复了"); // 日志记录return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}DLOG("%s 信道创建成功!", req->cid().c_str()); // 日志记录// 3. 给客户端进行回复return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 关闭Channelvoid closeChannel(const closeChannelRequestPtr &req) {_channels->closeChannel(req->cid()); // 关闭指定ID的Channelreturn basicResponse(true, req->rid(), req->cid()); // 发送响应}// 获取指定ID的ChannelChannel::ptr getChannel(const std::string &cid) {return _channels->getChannel(cid); // 返回Channel}private:// 发送基本响应void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid); // 设置请求IDresp.set_cid(cid); // 设置Channel IDresp.set_ok(ok);   // 设置操作结果_codec->send(_conn, resp); // 发送响应给客户端}private:muduo::net::TcpConnectionPtr _conn;    // TCP连接ProtobufCodecPtr _codec;               // Protobuf编解码器ConsumerManager::ptr _cmp;             // 消费者管理器VirtualHost::ptr _host;                // 虚拟主机threadpool::ptr _pool;                 // 线程池ChannelManager::ptr _channels;         // Channel管理器};// ConnectionManager类:管理多个Connection实例class ConnectionManager {public:using ptr = std::shared_ptr<ConnectionManager>; // 定义ConnectionManager的智能指针类型别名// 构造函数ConnectionManager() {}// 创建新的Connectionvoid newConnection(const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全auto it = _conns.find(conn);if (it != _conns.end()) {return; // 如果Connection已存在,直接返回}// 创建新的Connection并添加到管理器中Connection::ptr self_conn = std::make_shared<Connection>(host, cmp, codec, conn, pool);_conns.insert(std::make_pair(conn, self_conn));}// 删除Connectionvoid delConnection(const muduo::net::TcpConnectionPtr &conn) {std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全_conns.erase(conn); // 移除指定Connection}// 获取指定TCP连接的ConnectionConnection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn) {std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全auto it = _conns.find(conn);if (it == _conns.end()) {return Connection::ptr(); // 如果Connection不存在,返回空指针}return it->second; // 返回Connection}private:std::mutex _mutex; // 互斥锁,保证线程安全std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns; // 存储Connection的映射表};}

消费者管理模块

这里的消费者指的是订阅了某一条队列的消息,分为发布客户端和订阅客户端;
当消费者订阅该队列后,所连接的服务器就会将该队列的消息轮询的推送给订阅该队列的消费者;

消费者数据

struct Consumer 
{using ptr = std::shared_ptr<Consumer>;std::string tag;    // 消费者标识std::string qname;  // 消费者订阅的队列名称bool auto_ack;      // 自动确认标志ConsumerCallback callback;Consumer(){DLOG("new Consumer: %p", this);}Consumer(const std::string &ctag, const std::string &queue_name,  bool ack_flag, const ConsumerCallback &cb):tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) {DLOG("new Consumer: %p", this);}~Consumer() {DLOG("del Consumer: %p", this);}
};
  • tag:消费者的唯一标识
  • qname:消费者订阅的队列名称
  • auto_ack:是否自动确认消息(若为true,消息被取走后,自动从待确认列表中删除;若为false,要等到客户端确认收到后,再进行删除)
  • callback:消息处理回调函数(队列有了消息后,通过该函数进行处理,服务端调用这个回调函数)

以队列为单元的消费者管理结构

  • 成员变量
    • _qname:队列名称
    • _mutex:互斥锁,用于线程安全
    • _rr_seq:轮转序号,用于轮转选择消费者
    • _consumers:存储消费者
  • 方法
    • create:新增消费者
    • remove:移除指定消费者
    • choose:轮转选择消费者
    • empty:判断是否没有消费者
    • exists:判断指定消费者是否存在
    • clear:清除所有消费
class QueueConsumer {public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0){}// 队列新增消费者Consumer::ptr create(const std::string &ctag, const std::string &queue_name,  bool ack_flag, const ConsumerCallback &cb) {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 判断消费者是否重复for (auto &consumer : _consumers) {if (consumer->tag == ctag) {return Consumer::ptr();}}//3. 没有重复则新增--构造对象auto consumer = std::make_shared<Consumer>(ctag, queue_name, ack_flag, cb);//4. 添加管理后返回对象_consumers.push_back(consumer);return consumer;}// 队列移除消费者void remove(const std::string &ctag) {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 遍历查找-删除for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {if ((*it)->tag == ctag) {_consumers.erase(it);return ;}}return;}// 队列获取消费者:RR轮转获取Consumer::ptr choose() {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);if (_consumers.size() == 0) {return Consumer::ptr();}//2. 获取当前轮转到的下标int idx = _rr_seq % _consumers.size();_rr_seq++;//3. 获取对象,返回return _consumers[idx];}// 是否为空bool empty() {std::unique_lock<std::mutex> lock(_mutex);return _consumers.size() == 0;}// 判断指定消费者是否存在bool exists(const std::string &ctag) {std::unique_lock<std::mutex> lock(_mutex);//2. 遍历查找for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {if ((*it)->tag == ctag) {return true;}}return false;}// 清理所有消费者void clear() {std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq = 0;}private:std::string _qname;std::mutex _mutex;uint64_t _rr_seq;//轮转序号std::vector<Consumer::ptr> _consumers;
};

管理的消费者操作

class ConsumerManager {public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager(){}void initQueueConsumer(const std::string &qname) {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 重复判断auto it = _qconsumers.find(qname);if (it != _qconsumers.end()) {return ;}//3. 新增auto qconsumers = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qconsumers));}void destroyQueueConsumer(const std::string &qname) {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.erase(qname);}Consumer::ptr create(const std::string &ctag, const std::string &queue_name,  bool ack_flag, const ConsumerCallback &cb) {// 获取队列的消费者管理单元句柄,通过句柄完成新建QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->create(ctag, queue_name, ack_flag, cb);}void remove(const std::string &ctag, const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return ;}qcp = it->second;}return qcp->remove(ctag);}Consumer::ptr choose(const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->choose();}bool empty(const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->empty();}bool exists(const std::string &ctag, const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->exists(ctag);}void clear() {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;
};

⽹络通信协议设计

借助于muduo库来进行实现,消息体是使⽤Protobuf进⾏序列化和反序列化,采用LV这种数据结构,先有一个定长的长度L,其描述了value有多长,来解决念包问题;

value中针对不同的请求有着不同的协议字段,而协议字段采用的是Protobuf进行序列化和反序列化

Server服务器模块

在这里插入图片描述

对于服务器来说,并没有什么实际的功能,
而是将之前代码的一个整合
有一个虚拟机管理句柄
有一个消费者管理句柄
有一个连接管理句柄
有一个工作线程池

 muduo::net::EventLoop _baseloop; // 主事件循环器,⽤于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件muduo::net::TcpServer _server;//服务器对象ProtobufDispatcher _dispatcher;//请求分发器对象--要向其中注册请求处理函数ProtobufCodecPtr _codec;//protobuf协议处理器--针对收到的请求数据进行protobuf协议处理VirtualHost::ptr _virtual_host;ConsumerManager::ptr _consumer_manager;ConnectionManager::ptr _connection_manager;threadpool::ptr _threadpool;//异步⼯作线程池,主要⽤于队列消息的推送⼯作

借助muduo库实现一个TCP服务器对象,当有新连接到来后要如何进行读取和分发以及上面的io事件如何处理都是在这个模块来进行实现的,而对于新连接来说,会有一个dispatcher分发器,这个分发器的作用就是根据不同的消息类型进行合适的分发,装载到不同的处理函数当中,

namespace zwbMQ 
{// 定义数据库文件路径和虚拟主机名称#define DBFILE "/meta.db"#define HOSTNAME "MyVirtualHost"class Server {public:// 定义消息指针类型typedef std::shared_ptr<google::protobuf::Message> MessagePtr;// 构造函数,初始化服务器Server(int port, const std::string &basedir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_threadpool(std::make_shared<threadpool>()) {// 初始化所有队列的消费者管理结构QueueMap qm = _virtual_host->allQueue();for (auto &q : qm) {_consumer_manager->initQueueConsumer(q.first);}// 注册各种请求的处理函数_dispatcher.registerMessageCallback<zwbMQ::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置服务器的消息回调和连接回调_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}// 启动服务器void start() {_server.start();_baseloop.loop();}private:// 处理打开信道的请求void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("打开信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->openChannel(message);}// 处理关闭信道的请求void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->closeChannel(message);}// 处理声明交换机的请求void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明交换机时,没有找到信道!");return;}return cp->declareExchange(message);}// 处理删除交换机的请求void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除交换机时,没有找到信道!");return;}return cp->deleteExchange(message);}// 处理声明队列的请求void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明队列时,没有找到信道!");return;}return cp->declareQueue(message);}// 处理删除队列的请求void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除队列时,没有找到信道!");return;}return cp->deleteQueue(message);}// 处理队列绑定的请求void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列绑定时,没有找到信道!");return;}return cp->queueBind(message);}// 处理队列解绑的请求void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列解除绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列解除绑定时,没有找到信道!");return;}return cp->queueUnBind(message);}// 处理消息发布的请求void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("发布消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("发布消息时,没有找到信道!");return;}return cp->basicPublish(message);}// 处理消息确认的请求void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("确认消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("确认消息时,没有找到信道!");return;}return cp->basicAck(message);}// 处理队列消息订阅的请求void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息订阅时,没有找到信道!");return;}return cp->basicConsume(message);}// 处理队列消息取消订阅的请求void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到信道!");return;}return cp->basicCancel(message);}// 处理未知消息void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}// 处理连接事件void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {_connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);} else {_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop; // 事件循环muduo::net::TcpServer _server; // 服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象ProtobufCodecPtr _codec; // protobuf协议处理器VirtualHost::ptr _virtual_host; // 虚拟主机ConsumerManager::ptr _consumer_manager; // 消费者管理器ConnectionManager::ptr _connection_manager; // 连接管理器threadpool::ptr _threadpool; // 线程池};
}

客户端

客户端模块主要有以下的四大模块

  • 订阅者模块:表示这是一个消费者

  • 信道模块:包含一些常用接口,消息发布确认等等

  • 连接模块:它的作用是进行打开和关闭信道

  • 异步线程模块:客户端连接的IO事件监控,推送来的消息进行异步处理线程

订阅者模块

当消费者订阅了一个队列,那么当这个队列有了消息之后,就会自动推送给这个消费者

客户端的订阅者和服务端的消费者是一样的

namespace zwbMQ 
{using ConsumerCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;struct Consumer {using ptr = std::shared_ptr<Consumer>;std::string tag;    //消费者标识std::string qname;  //消费者订阅的队列名称bool auto_ack;      //自动确认标志ConsumerCallback callback;Consumer(){DLOG("new Consumer: %p", this);}Consumer(const std::string &ctag, const std::string &queue_name,  bool ack_flag, const ConsumerCallback &cb): tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) {DLOG("new Consumer: %p", this);}~Consumer() {DLOG("del Consumer: %p", this);}};
}

信道管理模块

服务端中有信道,客户端也有,二者的机制基本一致,无论哪个信道最终的目的都是提供服务;

不同的是,服务端的信道是为服务器提供服务;客户端的信道是给用户提供

信道所管理的信息

std::string _cid;                                                    // Channel ID
muduo::net::TcpConnectionPtr _conn;                                  // TCP 连接
ProtobufCodecPtr _codec;                                             // Protobuf 编解码器
Consumer::ptr _consumer;                                             // 消费者对象
std::mutex _mutex;                                                   // 互斥锁
std::condition_variable _cv;                                         // 条件变量
std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp; // 响应哈希表

信道的组织操作

namespace zwbMQ
{typedef std::shared_ptr<google::protobuf::Message> MessagePtr;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;class Channel{public:using ptr = std::shared_ptr<Channel>;// 构造函数,初始化 Channel 对象Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec);// 析构函数,调用 basicCancel 方法~Channel();// 返回 Channel 的 IDstd::string cid();// 打开 Channel,发送 openChannelRequest 请求并等待响应bool openChannel();// 关闭 Channel,发送 closeChannelRequest 请求并等待响应	void closeChannel();// 声明 Exchange,发送 declareExchangeRequest 请求并等待响应bool declareExchange(const std::string &name,ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string, std::string> &args);// 删除 Exchange,发送 deleteExchangeRequest 请求并等待响应void deleteExchange(const std::string &name);// 声明 Queue,发送 declareQueueRequest 请求并等待响应bool declareQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string, std::string> &qargs);// 删除 Queue,发送 deleteQueueRequest 请求并等待响应void deleteQueue(const std::string &qname);// 绑定 Queue 和 Exchange,发送 queueBindRequest 请求并等待响应bool queueBind(const std::string &ename,const std::string &qname,const std::string &key);// 解绑 Queue 和 Exchange,发送 queueUnBindRequest 请求并等待响应void queueUnBind(const std::string &ename, const std::string &qname);// 发布消息,发送 basicPublishRequest 请求并等待响应void basicPublish(const std::string &ename,const BasicProperties *bp,const std::string &body);// 确认消息,发送 basicAckRequest 请求并等待响应void basicAck(const std::string &msgid);// 取消消费者订阅,发送 basicCancelRequest 请求并等待响应void basicCancel();// 订阅消息,发送 basicConsumeRequest 请求并等待响应bool basicConsume(const std::string &consumer_tag,const std::string &queue_name,bool auto_ack,const ConsumerCallback &cb);// 将基础响应放入哈希表中void putBasicResponse(const basicCommonResponsePtr &resp);// 处理收到的消息推送void consume(const basicConsumeResponsePtr &resp);private:// 等待响应,直到收到指定 rid 的响应basicCommonResponsePtr waitResponse(const std::string &rid);private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;};

信道的管理操作

 class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager();Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec);// 移除指定 Channelvoid remove(const std::string &cid);// 获取指定 ChannelChannel::ptr get(const std::string &cid);private:std::mutex _mutex;                                       // 互斥锁std::unordered_map<std::string, Channel::ptr> _channels; // Channel 哈希表};

连接管理模块

先创建连接,通过连接创建信道,通过信道提供服务
这个模块就是针对于muduo库客户端进行的二次封装
给用户提供了一个创建Channel的接口,创建信道后,借助信道来提供服务

muduo::CountDownLatch _latch; // 用于实现同步的 CountDownLatch
muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接
muduo::net::TcpClient _client; // 客户端
ProtobufDispatcher _dispatcher; // 请求分发器
ProtobufCodecPtr _codec; // 协议处理器AsyncWorker::ptr _worker; // 异步工作线程
ChannelManager::ptr _channel_manager; // 信道管理器
  • 构造函数,初始化连接
  • 打开一个信道
  • 关闭一个信道
  • 处理基础响应消息
  • 处理消费响应消息
  • 处理未知消息
  • 处理连接事件

异步线程池模块

  • EventLoopThread模块进行IO事件监控
  • 收到推送的消息,需要对推送过来的消息进行处理,要一个线程池来完成消息处理
  • 多个连接用一个EventLoopThread进行IO监控

源码

https://gitee.com/nuyoahc/message-queue-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/69488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CANoe查看CAN报文发送周期

在CANoe软件中&#xff0c;Analysis -> Select other options 下的 Toggle Grid 和 Toggle Samples 选项确实用于控制分析窗口中的显示方式和采样行为&#xff0c;从而更清晰地查看CAN报文周期。 Toggle Grid&#xff08;切换网格&#xff09; 功能&#xff1a;启用网格线…

【Go语言圣经】第八节:Goroutines和Channels

DeepSeek 说 Goroutines 和 Channels 最近非常流行询问DeepSeek某些相关概念或热点的解释&#xff0c;因此在开始系统性地学习《Go语言圣经》之前&#xff0c;我首先向DeepSeek进行了提问。具体的Prompt如下&#xff1a; 有关Golang当中的Goroutines和Channels&#xff0c;我现…

e2studio开发RA4M2(10)----定时器AGT输出PWM

e2studio开发RA4M2.10--定时器AGT输出PWM 概述视频教学样品申请硬件准备参考程序源码下载选择计时器新建工程工程模板保存工程路径芯片配置工程模板选择时钟设置SWD调试口设置GPIO口配置AGT定时器AGT定时器属性配置初始化AGT启动AGT PWM模块AGTIO 和 AGTO演示 概述 AGT模块是R…

使用PyCharm进行Django项目开发环境搭建

如果在PyCharm中创建Django项目 1. 打开PyCharm&#xff0c;选择新建项目 2.左侧选择Django&#xff0c;并设置项目名称 3.查看项目解释器初始配置 4.新建应用程序 执行以下操作之一&#xff1a; 转到工具| 运行manage.py任务或按CtrlAltR 在打开的manage.pystartapp控制台…

【Java基础】为什么不支持多重继承?方法重载和方法重写之间区别、Exception 和 Error 区别?

Hi~&#xff01;这里是奋斗的明志&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f331;&#x1f331;个人主页&#xff1a;奋斗的明志 &#x1f331;&#x1f331;所属专栏&#xff1a;Java基础面经 &#x1f4da;本系列文章为个…

bladeX微服务框架如何修改nacos分组

nacos中注册的服务他的分组&#xff08;分组名称&#xff09;怎么修改 在org.springblade.common.launch // 指定注册IP PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.ip", "127.0.0.1"); // 指定注册端口 PropsUtil.setProperty(props, &…

大数据项目2a:基于spark的电影推荐和分析系统设计与实现

1、项目目的 本项目的目的是设计并实现一个基于Spark的电影推荐系统&#xff0c;以应对大数据环境下电影推荐服务的挑战。通过整合电影、评分和用户数据集&#xff0c;并利用SparkSql框架进行高效处理&#xff0c;系统能够为用户提供个性化的电影推荐。项目采用多种先进技术&…

机器学习常用包matplotlib篇(四)绘图规范

前言 为了让 Matplotlib 绘图代码更规范、易读&#xff0c;且为后期图形完善预留空间&#xff0c;建议遵循一些规范绘图方法。&#x1f609; 1.管理图形对象 建议使用 plt.figure() 或者 plt.subplots() 管理完整的图形对象&#xff0c;而非直接用 plt.plot(...) 绘图。这样能…

LVGL4种输入设备详解(触摸、键盘、实体按键、编码器)

lvgl有触摸、键盘、实体按键、编码器四种输入设备 先来分析一下这四种输入设备有什么区别 &#xff08;1&#xff09;LV_INDEV_TYPE_POINTER 主要用于触摸屏 用到哪个输入设备保留哪个其他的也是&#xff0c;保留触摸屏输入的任务注册&#xff0c;其它几种种输入任务的注册&…

5G技术解析:从核心概念到关键技术

1. 引言 5G技术的迅猛发展正在重塑我们的生活方式和社会结构。它不仅仅是新一代的移动通信技术&#xff0c;更是一场深刻的技术革命。5G网络正在以其惊人的高速、低延迟和大带宽能力&#xff0c;为智能家居、自动驾驶、工业自动化、远程医疗等另一带来前所未有的可能性。 本文…

背包问题1

核心&#xff1a; // f[i][j] 表示只看前i个物品&#xff0c;总体积是j的情况下&#xff0c;总价值是多少 //res maxx(f[n][]0-v] //f[i][j]: //1 不选第i个物品 f[i][j] f[i-1][j] //2 选第i个物品 f[i][j] f[i-1][j-v[i]] w[i]

Redis | 十大数据类型

文章目录 十大数据类型概述key操作命令数据类型命令及落地运用redis字符串&#xff08;String&#xff09;redis列表&#xff08;List&#xff09;redis哈希表&#xff08;Hash&#xff09;redis集合&#xff08;Set&#xff09;redis有序集合&#xff08;ZSet / SortedSet&…

DeepSeek图解10页PDF

以前一直在关注国内外的一些AI工具&#xff0c;包括文本型、图像类的一些AI实践&#xff0c;最近DeepSeek突然爆火&#xff0c;从互联网收集一些资料与大家一起分享学习。 本章节分享的文件为网上流传的DeepSeek图解10页PDF&#xff0c;免费附件链接给出。 1 本地 1 本地部…

C# OpenCvSharp 部署MOWA:多合一图像扭曲模型

目录 说明 效果 项目 代码 下载 参考 C# OpenCvSharp 部署MOWA&#xff1a;多合一图像扭曲模型 说明 算法模型的paper名称是《MOWA: Multiple-in-One Image Warping Model》 ariv链接 https://arxiv.org/pdf/2404.10716 效果 Stitched Image 翻译成中文意思是&…

vite+vue3搭建前端项目并使用 Bulma 框架

vitevue3搭建前端项目并使用 Bulma 框架 bluma css框架参照。 https://bulma.org.cn/documentation/start/overview/ 1. 创建项目 npm init vitelatest ai-imageneration --template vue选择 vue 和 typescript 作为模板&#xff1a; 2. 安装依赖 npm install npm install…

Spring 6.2.2 @scope(“prototype“)原理

Spring Prototype 原理&#xff1f; 前置准备 创建一个MyService类 Scope("prototype") Service("myService") public class MyService {public String getMessage() {return "Hello, World!";} }创建一个main类&#xff0c;用于debug。 pr…

RabbitMQ 可靠性投递

文章目录 前言一、RabbitMQ自带机制1、生产者发送消息注意1.1、事务&#xff08;Transactions&#xff09;1.2、发布确认&#xff08;Publisher Confirms&#xff09;1.2.1、同步1.2.2、异步 2、消息路由机制2.1、使用备份交换机&#xff08;Alternate Exchanges&#xff09;2.…

【实用技能】如何借助3D文档控件Aspose.3D, 在Java中无缝制作 3D 球体

概述 创建 3D 球体是 3D 图形设计的一个基本方面。无论您是在开发游戏、模拟还是可视化&#xff0c;无缝创建 3D 球体模型的能力都至关重要。Aspose.3D通过提供强大的 3D 图形 SDK 在各个行业中发挥着重要作用。它允许开发人员轻松创建、操作和转换 3D 模型。此 SDK 对于希望将…

C语言基础系列【3】VSCode使用

前面我们提到过VSCode有多么的好用&#xff0c;本文主要介绍如何使用VSCode编译运行C语言代码。 安装 首先去官网&#xff08;https://code.visualstudio.com/&#xff09;下载安装包&#xff0c;点击Download for Windows 获取安装包后&#xff0c;一路点击Next就可以。 配…

windows安装WSL完整指南

本文首先介绍WSL&#xff0c;然后一步一步安装WSL及Ubuntu系统&#xff0c;最后讲解如何在两个系统之间访问和共享文件信息。通过学习该完整指南&#xff0c;能帮助你快速安装WSL&#xff0c;解决安装和使用过程中的常见问题。 理解WSL&#xff08;Windows Subsystem for Linux…