【RabbitMQ 项目】服务端:服务器模块

文章目录

  • 一.编写思路
  • 二.代码实践
  • 三.服务端模块关系总结

一.编写思路

成员变量:

  1. muduo 库中的 TCP 服务器
  2. EventLoop 对象:用于主线程循环监控连接事件
  3. 协议处理句柄
  4. 分发器:用于初始化协议处理器,便于把不同请求派发给不同的业务处理函数
  5. 连接管理句柄
  6. 虚拟机句柄
  7. 消费者管理句柄
  8. 线程池管理句柄
    成员方法:
    向外提供的只有 2 个方法:
  9. start:启动服务
  10. 构造函数:
  • 完成各项成员的初始化,
  • 注册 TCP 服务器的两个回调函数:
    OnMessage:从接收缓冲区把数据读到用户缓冲区后的回调函数
    OnConnection:Tcp 连接建立或断开时的回调函数
  • 给分发器注册业务处理函数(私有成员方法,共 12 个)
    信道打开与与关闭;交换机,队列,绑定添加与删除,订阅与取消订阅,发布与确认消息
    私有成员(业务处理函数):
    如果是创建或关闭信道,直接用连接管理句柄新增或删除信道,然后构建响应返回
    如果是其他请求,先用连接管理句柄找到信道(请求中携带了信道 id),再使用信道提供的服务

二.代码实践

BrokerServer.hpp:

#pragma once
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include "VirtualHost.hpp"
#include "Connection.hpp"
#include "Consumer.hpp"
#include <functional>
#include <stdio.h>
#include <unistd.h>namespace ns_server
{using ConnectionManagerPtr = std::shared_ptr<ns_connection::ConnectionManager>;using VirtualHostPtr = std::shared_ptr<ns_data::VirtualHost>;using ConsumerManagerPtr = std::shared_ptr<ns_consumer::ConsumerManager>;using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;/************* 定义协议的结构化数据的智能指针(在分发器中注册时需要的格式)* *************/using OpenChannelRequestPtr = std::shared_ptr<ns_protocol::OpenChannelRequest>;using CloseChannelRequestPtr = std::shared_ptr<ns_protocol::CloseChannelRequest>;using DeclareExchangeRequestPtr = std::shared_ptr<ns_protocol::DeclareExchangeRequest>;using DeleteExchangeRequestPtr = std::shared_ptr<ns_protocol::DeleteExchangeRequest>;using DeclareMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeclareMsgQueueRequest>;using DeleteMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeleteMsgQueueRequest>;using BindRequestPtr = std::shared_ptr<ns_protocol::BindRequest>;using UnbindRequestPtr = std::shared_ptr<ns_protocol::UnbindRequest>;using PublishMessageRequestPtr = std::shared_ptr<ns_protocol::PublishMessageRequest>;using SubscribeQueueRequestPtr = std::shared_ptr<ns_protocol::SubscribeQueueRequest>;using CancelSubscribeRequestPtr = std::shared_ptr<ns_protocol::CancelSubscribeRequest>;using AckRequestPtr = std::shared_ptr<ns_protocol::AckRequest>;class BrokerServer{public:private:muduo::net::EventLoop _baseLoop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher;ProtobufCodecPtr _codecPtr;VirtualHostPtr _vhPtr;ConsumerManagerPtr _consumerManagerPtr;ConnectionManagerPtr _connManagerPtr;ThreadPoolPtr _threadPoolPtr;public:BrokerServer(int serverPort, const std::string &dbName, const std::string &msgDir): _baseLoop(),_server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", serverPort), "TcpServer", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&BrokerServer::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)){// 初始化成员_codecPtr = std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));_vhPtr = std::make_shared<ns_data::VirtualHost>(dbName, msgDir);_threadPoolPtr = std::make_shared<ns_tp::ThreadPool>();_threadPoolPtr->start();std::vector<std::string> qnames;_vhPtr->getAllQueueName(&qnames);_consumerManagerPtr = std::make_shared<ns_consumer::ConsumerManager>(qnames);_connManagerPtr = std::make_shared<ns_connection::ConnectionManager>();// 给_server注册两个回调函数_server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));// 给分发器注册业务处理函数_dispatcher.registerMessageCallback<ns_protocol::OpenChannelRequest>(std::bind(&BrokerServer::onOpenChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::CloseChannelRequest>(std::bind(&BrokerServer::onCloseChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeclareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeclareMsgQueueRequest>(std::bind(&BrokerServer::onDeclareMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeleteMsgQueueRequest>(std::bind(&BrokerServer::onDeleteMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::BindRequest>(std::bind(&BrokerServer::onBind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::UnbindRequest>(std::bind(&BrokerServer::onUnbind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::SubscribeQueueRequest>(std::bind(&BrokerServer::onSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::CancelSubscribeRequest>(std::bind(&BrokerServer::onCancelSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::PublishMessageRequest>(std::bind(&BrokerServer::onPublishMessage,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::AckRequest>(std::bind(&BrokerServer::onAck,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start(){// 开启监听状态_server.start();// 开始循环监控事件_baseLoop.loop();}private:// 给TcpServer设置的回调函数void onConnection(const muduo::net::TcpConnectionPtr &connPtr){if (connPtr->connected()){_connManagerPtr->newConnection(connPtr, _codecPtr, _vhPtr, _consumerManagerPtr, _threadPoolPtr);}else{_connManagerPtr->deleteConnection(connPtr);}}// 业务处理函数void onUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr, MessagePtr msgPtr, muduo::Timestamp time){cout << "未知消息" << endl;connPtr->shutdown();}/************* 信道创建与删除* ***************/void onOpenChannel(const muduo::net::TcpConnectionPtr &connPtr, const OpenChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "打开信道时, 未找到Connection" << endl;return;}myConnPtr->openChannel(*reqPtr);LOG(DEBUG) << "create new channel, channelId: " << reqPtr->channel_id() << endl;}void onCloseChannel(const muduo::net::TcpConnectionPtr &connPtr, const CloseChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "关闭信道时, 未找到Connection" << endl;return;}myConnPtr->closeChannel(*reqPtr);LOG(DEBUG) << "close channel, channelId: " << reqPtr->channel_id() << endl;}/********** 交换机声明与删除* ********/void onDeclareExchange(const muduo::net::TcpConnectionPtr &connPtr, const DeclareExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "声明交换机时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->declareExchange(*reqPtr);LOG(DEBUG) << "声明交换机, exchangeName: " << reqPtr->exchange_name() << endl;}void onDeleteExchange(const muduo::net::TcpConnectionPtr &connPtr, const DeleteExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除交换机时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->deleteExchange(*reqPtr);LOG(DEBUG) << "删除信道, exchangeName: " << reqPtr->exchange_name() << endl;}/************* 队列声明与删除* ***************/void onDeclareMsgQueue(const muduo::net::TcpConnectionPtr &connPtr, const DeclareMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "声明队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->declareMsgQueue(*reqPtr);LOG(DEBUG) << "声明队列, queueName: " << reqPtr->queue_name() << endl;}void onDeleteMsgQueue(const muduo::net::TcpConnectionPtr &connPtr, const DeleteMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->deleteMsgQueue(*reqPtr);LOG(DEBUG) << "删除队列, queueName: " << reqPtr->queue_name() << endl;}/*********** 绑定与解绑* ***********/void onBind(const muduo::net::TcpConnectionPtr &connPtr, const BindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "添加绑定时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->bind(*reqPtr);LOG(DEBUG) << "绑定: " << reqPtr->ename() << "->" << reqPtr->qname() << ": " << reqPtr->binding_key() << endl;}void onUnbind(const muduo::net::TcpConnectionPtr &connPtr, const UnbindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除绑定时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->unbind(*reqPtr);LOG(DEBUG) << "解绑: " << reqPtr->ename() << "->" << reqPtr->qname() << endl;}/************** 订阅与取消订阅* ************/void onSubScribe(const muduo::net::TcpConnectionPtr &connPtr, const SubscribeQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "订阅队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->subscribeQueue(*reqPtr);LOG(DEBUG) << "订阅队列" << ", qname: " << reqPtr->qname() << endl;}void onCancelSubScribe(const muduo::net::TcpConnectionPtr &connPtr, const CancelSubscribeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "取消订阅队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->cancelSubscribe(*reqPtr);LOG(DEBUG) << "取消订阅队列" << ", qname: " << reqPtr->qname() << endl;}/********* 发布与应答* **************/void onPublishMessage(const muduo::net::TcpConnectionPtr &connPtr, const PublishMessageRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "发布消息时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->publishMessage(*reqPtr);LOG(DEBUG) << "publish message: " << reqPtr->msg().saved_info().body() << endl;}void onAck(const muduo::net::TcpConnectionPtr &connPtr, const AckRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "确认消息时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->ackMessage(*reqPtr);LOG(DEBUG) << "应答消息, msgId: " << reqPtr->msg_id() << endl;}void sendCommonResponse(const muduo::net::TcpConnectionPtr &connPtr, const std::string &channelId,const std::string &responseId, bool ok){ns_protocol::CommomResponse resp;resp.set_channel_id(channelId);resp.set_response_id(responseId);resp.set_ok(ok);_codecPtr->send(connPtr, resp);}};
}

三.服务端模块关系总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/54761.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Golang | Leetcode Golang题解之第433题最小基因变化

题目&#xff1a; 题解&#xff1a; func diffOne(s, t string) (diff bool) {for i : range s {if s[i] ! t[i] {if diff {return false}diff true}}return }func minMutation(start, end string, bank []string) int {if start end {return 0}m : len(bank)adj : make([][…

OpenHarmony标准系统mipi摄像头适配

OpenHarmony标准系统mipi摄像头适配 本文档以rk3568为例&#xff0c;讲述如何在OpenHarmony 标准系统rk设备上适配mipi摄像头。 开发环境 OpenHarmony标准系统4.1rrk3568设备摄像头ov5648,ov8858 文档约定&#xff1a;4.1r_3568为OpenHarmony标准系统源码根目录 1.适配准备:得…

树莓派pico上手

0 介绍 不同于作为单板计算机的树莓派5&#xff0c;树莓派 pico 是一款低成本、高性能的微控制器板&#xff0c;具有灵活的数字接口。主要功能包括&#xff1a; 英国树莓派公司设计的 RP2040 微控制器芯片双核 Arm Cortex M0 处理器&#xff0c;弹性的时钟频率高达 133 MHz26…

Spring AOP的应用

目录 1、maven坐标配置与xml头配置 2、代理方式的选择与配置 3、AOP的三种配置方式 3.1、XML模式 3.1.1 创建目标类和方法 3.1.2 创建切面 3.1.3 切面xml配置与表达式说明 3.1.4 单测 3.2 纯注解模式 3.2.1 开启注解相关配置 3.2.2 创建目标类和方法 3.2.3 创建切面…

FGPA实验——触摸按键

本文系列都基于正点原子新起点开发板 FPGA系列 1&#xff0c;verlog基本语法&#xff08;随时更新&#xff09; 2&#xff0c;流水灯&#xff08;待定&#xff09; 3&#xff0c;FGPA实验——触摸按键 一、触摸操作原理实现 分类&#xff1a;电阻式&#xff08;不耐用&…

二叉树进阶

目录 1. 二叉搜索树实现 1.1 二叉搜索树概念 2.2 二叉搜索树操作 ​编辑 ​编辑 2.3 二叉搜索树的实现 2.3.0 Destroy() 析构 2.3.1 Insert&#xff08;&#xff09;插入 2.3.2 InOrder&#xff08;&#xff09; 打印搜索二叉树 ​编辑​编辑 2.3.3 Find() 查找 …

el-table表格点击该行任意位置时也勾选上其前面的复选框

需求&#xff1a;当双击表格某一行任意位置时&#xff0c;自动勾选上其前面的复选框 1、在el-table 组件的每一行添加row-dblclick事件&#xff0c;用于双击点击 <el-table:data"tableData"ref"tableRef"selection-change"handleSelectionChange&q…

如何在Chrome最新浏览器中调用ActiveX控件?

小编最近登陆工商银行网上银行&#xff0c;发现工商银行的个人网银网页&#xff0c;由于使用了ActiveX安全控件&#xff0c;导致不能用高版本Chrome浏览器打开&#xff0c;目前只有使用IE或基于IE内核的浏览器才能正常登录网上银行&#xff0c;而IE已经彻底停止更新了&#xff…

AI绘图网页版工具

https://chat.bushao.info/?inVitecodeCHBEPQQOOM 一款AI绘图工具&#xff0c;很好玩&#xff0c;推荐&#xff1b; 我自己根据文本生成的图&#xff0c;感觉还不错。

ROC、TPR、FPR的含义

1、ROC&#xff08;Receiver Operating Characteristic&#xff09; ROC&#xff08;Receiver Operating Characteristic&#xff09;曲线是一种用于评估分类模型性能的工具。它通过绘制真阳性率&#xff08;True Positive Rate, TPR&#xff09;与假阳性率&#xff08;False…

仪表放大器AD620

AD623 是一款低功耗、高精度的仪表放大器&#xff0c;而不是轨到轨运算放大器。它的输入电压范围并不覆盖整个电源电压&#xff08;轨到轨&#xff09;&#xff0c;但在单电源供电下可以处理接近地电位的输入信号。 AD620 和 AD623 都是仪表放大器&#xff0c;但它们在一些关键…

vscode【实用插件】Notes 便捷做笔记

安装 在 vscode 插件市场的搜索 Notes点 安装 安装成功后&#xff0c;vscode 左侧栏会出现 使用 初次使用 需先选择一个本地目录 重启 vscode 后&#xff0c;得到 切换笔记目录 新建笔记 快捷键为 Alt N 默认会创建 .md 文件 配合插件 Markdown Preview Enhanced 预览 .md…

2024中国新能源汽车零部件交易会,开源网安展示了什么?

近日&#xff0c;2024中国新能源汽车零部件交易会在十堰国际会展中心举行。开源网安车联网安全实验室携车联网安全相关产品及解决方案亮相本次交易会&#xff0c;保障智能网联汽车“车、路、云、网、图、边”安全&#xff0c;推动智能网联汽车技术突破与产业化发展。 中国新能源…

【深度学习】(7)--神经网络之保存最优模型

文章目录 保存最优模型一、两种保存方法1. 保存模型参数2. 保存完整模型 二、迭代模型 总结 保存最优模型 我们在迭代模型训练时&#xff0c;随着次数初始的增多&#xff0c;模型的准确率会逐渐的上升&#xff0c;但是同时也随着迭代次数越来越多&#xff0c;由于模型会开始学…

【从0开始自动驾驶】用python做一个简单的自动驾驶仿真可视化界面

【从0开始自动驾驶】用python做一个简单的自动驾驶仿真可视化界面 废话几句废话不多说&#xff0c;直接上源码目录结构init.pysimulator.pysimple_simulator_app.pyvehicle_config.json 废话几句 自动驾驶开发离不开仿真软件成品仿真软件种类多https://zhuanlan.zhihu.com/p/3…

【CSS】鼠标 、轮廓线 、 滤镜 、 堆叠层级

cursor 鼠标outline 轮廓线filter 滤镜z-index 堆叠层级 cursor 鼠标 值说明值说明crosshair十字准线s-resize向下改变大小pointer \ hand手形e-resize向右改变大小wait表或沙漏w-resize向左改变大小help问号或气球ne-resize向上右改变大小no-drop无法释放nw-resize向上左改变…

蓝桥杯1.小蓝的漆房

样例输入 2 5 2 1 1 2 2 1 6 2 1 2 2 3 3 3样例输出 1 2 import math import os import sys tint(input())#执行的次数 for j in range(t):n,kmap(int,input().split())#n为房间数 k为一次能涂的个数alist(map(int,input().split()))#以列表的形式存放房间的颜色maxvaluemath…

处理RabbitMQ连接和认证问题

在使用RabbitMQ进行消息队列管理时&#xff0c;我们可能会遇到各种连接和认证问题。本文将介绍如何诊断和解决这些问题&#xff0c;并通过使用RabbitMQ的管理端进行登录验证来确保配置正确。 1. 问题概述 在最近的一次部署中&#xff0c;我们遇到了两个主要问题&#xff1a; …

IPSec隧道协议学习(一)

前情回顾 前面介绍的GRE隧道协议&#xff0c;可以字LAN之间通过Internet建立隧道&#xff0c;实现网络间资源共享&#xff0c;但是GRE隧道协议不能实现加密功能&#xff0c;传输的数据不受加密保护&#xff0c;为了实现在隧道间传输数据包收到加密保护&#xff0c;需要使用IPS…

GitLab发送邮件功能详解:如何配置自动化?

GitLab发送邮件的设置指南&#xff1f;怎么优化GitLab发送邮件&#xff1f; GitLab作为一个强大的代码管理平台&#xff0c;不仅提供了代码托管、CI/CD等功能&#xff0c;还集成了发送邮件的功能&#xff0c;使得开发团队能够及时获取项目动态。AokSend将详细介绍如何配置GitL…