rabbitMq------客户端模块

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 消费者模块
  • 信道管理模块
    • 管理的字段
    • 提供的接口
  • 信道内存管理
  • 连接管理类


前言

在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念。在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。


消费者模块

客户端这块对于消费者模块是不需要管理的,当进行消息订阅的时候,就会创建出一个消费者,而这消费者的作用就是:
• 描述当前信道订阅了哪个队列的消息。
• 描述了收到消息后该如何对这条消息进⾏处理。
• 描述收到消息后是否需要进⾏确认回复。

using ConsumerCallBack = std::function<void(const std::string&,BasicProperties *,const std::string &)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string _qname; //消费者订阅的队列名称std::string _ctag;  //消费者标识bool _auto_ack;  //自动应答标志ConsumerCallBack _cb;   }

信道管理模块

同样的客户端也有信道,其功能与服务端几乎一致,只不过客户端的信道是为用户提供服务的,而服务器的信道是为客户端的请求提供服务的。也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务。

管理的字段

客户端的信道需要管理的字段中有一个哈希表,是请求id和通用响应结构的映射。
因为在muduo库中发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步。

class Channel{private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec; Consumer::ptr _consumer;//由于muduo库的发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;}

提供的接口

客户端的channel和服务端的接口都是几乎一致的,客户端的接口中是组织请求,向服务端发起请求,服务端的接口是接收请求进行业务处理。

但客户端中的channel提供了打开信道和关闭信道这俩个接口,他们是向服务端发起请求,在服务器上创建信道。

//这两个接口是向服务端发送请求,在服务端创建对应信道bool openChannel(){//组织请求openChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);//发送_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void closeChannel(){//组织请求closeChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);//发送_codec->send(_conn,req);waitResponse(rid);return;}

其中每个接口中都需要调用waitResponse,用来同步等待响应。

basicCommonResponsePtr waitResponse(std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid) != _basic_resp.end();});basicCommonResponsePtr basic_resp = _basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}

信道内存管理

对信道的一个总的管理类。
有一个哈希表,是信道ID和信道对象的映射。
他提供了三个接口,创建信道/删除信道/获取指定信道。

class ChannelManager{private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;}

连接管理类

在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。

class Connection {public:using ptr = std::shared_ptr<Connection>;
private:muduo::CountDownLatch _latch;//实现同步的muduo::net::TcpConnectionPtr _conn;//客户端对应的连接muduo::net::TcpClient _client;//客户端ProtobufDispatcher _dispatcher;//请求分发器ProtobufCodecPtr _codec;//协议处理器AsyncWorker::ptr _worker;ChannelManager::ptr _channel_manager;}

在客户端这边,会收到两种响应,一种是基础响应,一种是消息推送响应。
我们需要注册对于这两种类型响应的回调函数。

基础响应,在基础响应中有一个cid字段,根据cid获取指定的信道对象,
调用信道对象中的putBasicResponse接口。
也就是往信道的hashMap中添加响应值。

void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp) {//1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}//2. 将得到的响应对象,添加到信道的基础响应hash_map中channel->putBasicResponse(message);
}

消息推送,在收到消息推送的响应后,需要更具响应中的rid,获取指定的信道对象,然后封装一个任务,这个任务就是调用信道中的consume接口,这个接口就是执行消费者中设置的回调函数。我们把这个任务放入到线程池中。

void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumerResponsePtr& message, muduo::Timestamp){
//1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}//2. 封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel, message](){channel->consume(message);});
}

连接管理提供了两个接口,打开信道和关闭信道。这是给用户提供的,用来创建一个信道,通过信道进行服务的调用。

 Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if (ret == false) {DLOG("打开信道失败!");return Channel::ptr();}return channel;
}void closeChannel(const Channel::ptr &channel) {channel->closeChannel();_channel_manager->remove(channel->cid());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/880953.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF之UI进阶--控件样式与样式模板及词典

WPF的优势之一就是能够更加容易快捷的对窗体和控件的外面进行改造&#xff0c;换句话说&#xff0c;那就是UI设计个性化更加容易。主要是借助了样式、模板及词典来实现的。那么本篇博文就一一对他们进行介绍。 文章目录 一、样式1: 定义样式2: 使用Setter设置属性关于Property和…

C或C++判断指针是否指向同一块内存

有时需要判断指针是否指同一块内存&#xff0c;例如设计字符串时&#xff1a; &#xff08;1&#xff09;insert函数 &#xff08;2) replace函数 &#xff08;3&#xff09;assign函数 难点是迭代器&#xff0c;判断是否同一个迭代器时&#xff0c;需要你在设计迭代器时加…

Kubernetes-环境篇-01-mac开发环境搭建

1、brew安装 参考知乎文章&#xff1a;https://zhuanlan.zhihu.com/p/111014448 苹果电脑 常规安装脚本&#xff08;推荐 完全体 几分钟安装完成&#xff09; /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"苹果电脑 极…

Rstudio:强大的R语言集成开发环境(IDE)

Rstudio 应该是 R 语言使用的标配&#xff0c;尽管 Rstudio 的母公司 Posit 推出了新一代的集成开发环境 Positron&#xff0c;但其还处于开发阶段。作为用户不妨让其成熟后再使用&#xff0c;现阶段还是 Rstudio 更稳定。 如果你在生物信息学或统计学领域工作&#xff0c;R语言…

Python | Leetcode Python题解之第455题分发饼干

题目&#xff1a; 题解&#xff1a; class Solution:def findContentChildren(self, g: List[int], s: List[int]) -> int:g.sort()s.sort()m, n len(g), len(s)i j count 0while i < m and j < n:while j < n and g[i] > s[j]:j 1if j < n:count 1i …

uni-app - - - - -vue3使用i18n配置国际化语言

uni-app - - - - -使用i18n配置国际化语言 1. 安装vue-i18n2. 配置文件2.1 创建如下文件2.2 文件配置2.3 main文件导入i18n 3. 页面内使用3.1 template内直接使用3.2 变量接收使用 1. 安装vue-i18n npm install vue-i18n --save2. 配置文件 2.1 创建如下文件 locales文件夹里…

水泵模块(5V STM32)

目录 一、介绍 二、传感器原理 1.尺寸介绍 2.继电器控制水泵电路原理图 三、程序设计 main.c文件 bump.h文件 bump.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 水泵模块(bump)通常是指用于液体输送系统的组件&#xff0c;它负责将水或其他流体从低处提…

四.网络层(上)

目录 4.1网络层功能概述 4.2 SDN基本概念 4.3 路由算法与路由协议 4.3.1什么是路由协议&#xff1f; 4.3.2什么是路由算法&#xff1f; 4.3.3路由算法分类 (1)静态路由算法 (2)动态路由算法 ①全局性 OSPF协议与链路状态算法 ②分散性 RIP协议与距离向量算法 4.3.…

【C语言】内存函数的使用和模拟实现

文章目录 一、memcpy的使用和模拟实现二、memmove的使用和模拟实现三、memset的使用四、memcmp的使用 一、memcpy的使用和模拟实现 在之前我们学习了使用和模拟实现strncpy函数&#xff0c;它是一个字符串函数&#xff0c;用来按照给定的字节个数来拷贝字符串&#xff0c;那么问…

【本地免费】SimpleTex 图像识别latex公式

文章目录 相关教程相关文献安装教程 由于mathpix开始收费了&#xff0c;于是本文将介绍一款目前本地免费的SimpleTex工具 相关教程 【超详细安装教程】LaTeX-OCR 图像识别latex公式&#xff08;开源免费&#xff09;_latex图片识别-CSDN博客 相关文献 SimpleTex主页——致力…

数据结构双向链表和循环链表

目录 一、循环链表二、双向链表三、循环双向链表 一、循环链表 循环链表就是首尾相接的的链表&#xff0c;就是尾节点的指针域指向头节点使整个链表形成一个循环&#xff0c;这就弥补了以前单链表无法在后面某个节点找到前面的节点&#xff0c;可以从任意一个节点找到目标节点…

5.3 克拉默法则、逆矩阵和体积

本节是使用代数而不是消元法来求解 A x b A\boldsymbol x\boldsymbol b Axb 和 A − 1 A^{-1} A−1。所有的公式都会除以 det ⁡ A \det A detA&#xff0c; A − 1 A^{-1} A−1 和 A − 1 b A^{-1}\boldsymbol b A−1b 中的每个元素都是一个行列式除以 A A A 的行列式。…

C(十一)scanf、getchar(第三弹)

问题引入&#xff1a;如何实现输入一串密码&#xff0c;如&#xff1a;“123 xxxx” &#xff0c;然后读取并确认&#xff0c;是 -- Y&#xff1b;否 -- N。 自然的&#xff0c;我们想到用scanf&#xff0c;但是在使用过程中你是否遇到跟我一样的困惑呢&#xff1f;如下&…

如何高效删除 MySQL 日志表中的历史数据?实战指南

在处理高并发的物联网平台或者其他日志密集型应用时&#xff0c;数据库中的日志表往往会迅速增长&#xff0c;数据量庞大到数百GB甚至更高&#xff0c;严重影响数据库性能。如何有效管理这些庞大的日志数据&#xff0c;特别是在不影响在线业务的情况下&#xff0c;成为了一项技…

【LeetCode HOT 100】详细题解之二叉树篇

【LeetCode HOT 100】详细题解之二叉树篇 94 二叉树的中序遍历方法一&#xff1a;递归方法二&#xff1a;迭代 104 二叉树的最大深度方法一&#xff1a;递归方法二&#xff1a;迭代 226 翻转二叉树方法一&#xff1a;递归方法二&#xff1a;迭代 101 对称二叉树方法一&#xff…

小程序-使用npm包

目录 Vant Weapp 安装 Vant 组件库 使用 Vant 组件 定制全局主题样式 API Promise化 1. 基于回调函数的异步 API 的缺点 2. 什么是 API Promise 化 3. 实现 API Promise 化 4.调用 Promise 化之后的异步 API 小程序对 npm 的支持与限制 目前&#xff0c;小程序中已经…

Java 之深入理解 String、StringBuilder、StringBuffer

前言 由于发现 String、StringBuilder、StringBuffer 面试的时候会经常问到&#xff0c;这里就顺便总结一下&#xff1a;本文重点会以这三个字符串类的性能、线程安全、存储结构这三个方面进行分析 ✨上期回顾&#xff1a;Java 哈希表 ✨目录 前言 String 介绍 String 的不可变…

全局安装cnpm并设置其使用淘宝镜像的仓库地址(地址最新版)

npm、cnpm和pnpm基本概念 首先介绍一下npm和cnpm是什么&#xff0c;顺便说一下pnpm。 npm npm&#xff08;Node Package Manager&#xff09;是Node.js的默认包管理器&#xff0c;用于安装、管理和分享JavaScript代码包。它是全球最大的开源库生态系统之一&#xff0c;提供了数…

如何使用ssm实现基于HTML的中国传统面食介绍网站的搭建+vue

TOC ssm758基于HTML的中国传统面食介绍网站的搭建vue 第1章 绪论 1.1选题动因 当前的网络技术&#xff0c;软件技术等都具备成熟的理论基础&#xff0c;市场上也出现各种技术开发的软件&#xff0c;这些软件都被用于各个领域&#xff0c;包括生活和工作的领域。随着电脑和笔…

微服务SpringGateway解析部署使用全流程

官网地址&#xff1a; Spring Cloud Gateway 目录 1、SpringGateway简介 1、什么是网关 2、为什么用网关【为了转发】 2、应用&#xff1a; 1.启动nacos 2.创建网关项目 3.网关配置1 4.网关配置2【了解】 5.过滤器配置【了解】 1、SpringGateway简介 核心功能有三个&…