浅谈如何自我实现一个消息队列服务器(8)——编写客户端部分

文章目录

  • 一、客户端部分涵盖3大核心类
    • 1.1、ConnectionFactory 类
    • 1.2、Connection 类
    • 1.3、Channel 类
  • 二、3级类结构优势
  • 三、客户端实现逻辑
    • 3.1、ConnectionFactory
      • 3.1.1、ConnectionFactory类所含字段
      • 3.1.2、ConnectionFactory类所含方法
    • 3.2、Connection
      • 3.2.1、Connection类所含字段
      • 3.2.2、Connection类所含方法
    • 3.3、Channel
      • 3.3.1、Channel类所含字段
      • 3.3.2、Channel类所含方法

一、客户端部分涵盖3大核心类

1.1、ConnectionFactory 类

该类称为 “连接工厂”,通过这个类来持有服务器的地址。(因为当客户端与服务器建立连接时,需要知道服务器的 ip 和 端口号)

该类的主要功能:能够创建出 TCP 连接。(即 Connection 对象)

1.2、Connection 类

1、Connection 表示一个 TCP 连接,那么该 Connection 里就会持有一个 Socket 对象,客户端通过 Socket 对象来与服务器进行网络通信。

2、Connection 还负责:读取响应、写入请求。(服务器处收到客户端的请求叫做: 读取请求,服务器返回响应给客户端叫做: 写入响应;客户端收到服务器的响应叫做: 读取响应,客户端发送请求给服务器叫做: 写入请求)

3、由于TCP的创建/销毁开销较大,因此决定进行 TCP 的复用:一个TCP连接里可以发送/接收多次请求/响应,即一个TCP连接中可以含有多个Channel子逻辑。因此 Connection 类还要管理多个 Channel 对象,每个 Channel 就是一个逻辑上的连接。

1.3、Channel 类

1、Channel 表示一个逻辑上的连接,一个 TCP 连接就是通过 channel 来进行复用的。

譬如说,一个客户端上有多个模块,这些模块都需要与服务器进行通信,那每个模块都可以拥有自己的 Channel 对象,但是这几个 Channel 对象,共用的是同一个 TCP 连接,但它们并不会互相干扰。Connection 类 和 Channel 的关系就像,一辆公车。同一辆公车上,可以搭载学生上学、工人上班、老板出门…

2、Channel 上还需要提供服务器上的一系列 核心API。

二、3级类结构优势

客户端部分采用了3级类结构:类ConnectionFactory、类Connection、类Channel
该结构优势:当客户端比较复杂时,譬如一个客户端含有多个不同的板块,这时候多个模块通过 Channel 各自与服务器进行通信,互不干扰,但是通过 Connection 复用同一个 TCP,这样的话,逻辑上解耦合了,也高内聚了,效率也提高了。

三、客户端实现逻辑

在 mqclient 包下 新建三个新类,分别是:ConnectionFactory、Connection、Channel。

3.1、ConnectionFactory

3.1.1、ConnectionFactory类所含字段

1、需持有服务器ip地址
2、需持有服务器端口号
3、(由于当前服务器还只是一个单机版,即只含有一个虚拟主机,因此此时不需要 虚拟主机名、用户名、密码 来确定访问服务器的哪个虚拟主机,但是后续项目扩展,这些字段都会用到)

3.1.2、ConnectionFactory类所含方法

1、getter、setter方法。

2、创建新的 TCP 连接的方法(newConnection(String host,int port)): 由于创建TCP连接时需要知道服务器的 ip地址 和 端口号,因此传参 host、port。

3.2、Connection

3.2.1、Connection类所含字段

1、TCP连接来进行网络通信,因此需含有 Socket 对象。
2、使用哈希表来管理 Connection 中的众多 channel。
3、Socket 对象后续需要杯基于,来进行一些读写数据的操作,读写操作就离不开 Socket 里面的流对象,因此IO流也是所需字段。
3、线程池,用来处理回调方法。

3.2.2、Connection类所含方法

1、Connection 的构造方法(携带ip地址和端口号俩参数):初始化Socket、初始化所有流对象、处理服务器返回的响应,尤其是服务器收到生产者的消息后,将消息推送给对应消费者消费的这部分响应。
      1.1、设置一个扫描线程循环扫描服务器返回给客户端的响应,当连接断开时,扫描结束。
      1.2、从 socket 里读到响应数据,然后对响应进行处理,但服务器返回的响应含有两种:(1)、普通响应。(2)、服务器推送给客户端(订阅者)的消息。针对不同的响应,使用方法 dispatchResponse() 实现。

2、客户端发送请求(public void writeRequest(Request request))

3、客户端读取/接收响应(public Response readResponse())

4、使用 createChannel() 方法 在Connection 中创建出 Channel
      4.1、通过 UUID算法 生成唯一标识的的 channelId
      4.2、构造 Channel 对象。
      4.3、将新构造出的 Channel 对象加入到 Connection 类中定义用来管理 Channel 的 channelMap 中。
      4.4、客户端这边的Connection类里创建了Channel对象,此时也需要告知服务器,服务器就将客户端新建的 Channel 对象加入到 服务器中定义的 sessions 哈希表中。那么我们需要在 Channel 类中创建一个createChannel()。根据createChannel()的返回值判断此次与服务器的通信是否顺利,不顺利,就把刚刚4.3步骤加入了的Channel对象从sessions表中删除。
      4.5、返回channel

5、使用方法 close() 关闭 connection,释放资源。

6、期望使用方法 disPatchResponse(Response response) 来处理服务器返回的响应,到底是针对请求的响应,还是推送的消息。
      6.1、首先判断响应类型是否是 服务器推送回来的消息,如果是:
            6.1.1、首先将响应中的payload解析出来(反序列化)成 SubscribeReturns 类,然后根据响应中的channelId从channelMap中找到对应的 Channel 对象,判断 channel 是否存在,如果channel 不存在,说明该消息对应的channel在客户端中不存在,直接抛异常。如果channel存在,就执行channel里的回调方法,但是并不在此扫描线程中执行回调方法,因为不知道回调函数要执行多长时间,而扫描线程一边需要判断服务器返回的响应类型,一边处理回调方法的话,实在忙不过来,因此,将回调方法直接投入线程池中,让线程池中的线程执行回调方法即可。
      6.2、如果响应是针对请求的响应,此时就将响应的payload解析出来,变成 BasicReturns ,根据 BasicReturns 里面的 channelId 在 channelMap里查询对应的 Channel 对象,如果 Channel 对象不存在,说明当前响应对应的客户端并不存在,直接抛异常,如果不为空,就将当前响应存入记录服务器返回响应的哈希表basicReturnsMap里,使用方法 putReturns(basicReturns)处理。

7、 期望使用方法 putReturns(basicReturns)来唤醒那些发送了请求,正在阻塞等待着服务器返回相应的响应的客户端线程。
      7.1、将当前响应添加至basicReturnsMap表中。
      7.2、使用 notofyAll() 唤醒阻塞等待的线程。由于不知道到底有多少个线程正在阻塞等待,因此使用 notifyAll() 全部唤醒。

3.3、Channel

3.3.1、Channel类所含字段

1、由于一个TCP复用,因此一个TCP里包含多个 Channel,此时就需要定义 Connection 对象作为成员变量之一。
2、channelId,每一个Channel的身份标识。
3、定义一个哈希表,用来存储服务器给服务器返回的响应。约定一个channel中只能有一个回调,如果队列想去订阅多个消息,就需要含有多个channel。

3.3.2、Channel类所含方法

1、构造方法,含有两个参数,channelId、Connection对象,参数1channelId表明当前channel的唯一身份标识,参数2Connection表明当前的channel对象属于哪个TCP连接里的。

2、public boolean createChannel(): 期望在这个方法里边,和服务器进行通信,告知服务器说,客户端这边创建了新的Channel,Connection 调用 Channel 里的 createChannel()来与服务器进行通信,通信完成后,服务器会返回true/false作为服务器的响应结果。
      2.1、与服务器进行通信,势必需要发送请求/接收响应…因此构造一个 BasicArguments 对象,设置 BasicArguments 对象里的属性,channelId即当前的,rid通过 私有方法获取到。
      2.2、构造请求对象,设置请求里的属性,payload 内容即 BasicArguments 对象序列化的结果。
      2.3、使用 Connection 对象里的发送请求,将请求发给服务器。请求发出后,等待服务器处理该请求,并给我们返回一个响应结果,但是由于这些都是需要时间的,因此服务器什么时候返回响应,不知道,客户端此处就需要进行阻塞等待服务器的响应。
      2.4、构造一个返回类 BasicReturns 接收服务器返回的响应。
      2.5、使用方法 waitResult(basicReturns.getRid()) 来阻塞等待服务器的响应。
      2.6、返回 服务器响应值。

3、public BasicReturns waitResult(basicReturns.getRid()):期待通过该方法阻塞等待获取到服务器的响应,或取的响应其 rid 必须是和 发送请求时的 rid 一致。构造一个 BasicReturns 对象,判断当前 从 basicReturnsMap 表中获取到的basicReturns 对象是否为空,为空,说明服务器未返回此响应,wait()阻塞等待,如果服务器的响应返回后,就会在 Connection类里收到,此时就会把消息投到对应channelMap中,同时调用 notify() 唤醒正在阻塞的线程。读取成功消息后,将此消息从 basicReturnsMap中删除。返回 basicReturns。

4、使用 UUID 算法生成到 rid:private String getRid()

5、关闭 channel(public boolean close()):期望该方法能够给服务器发送一个 type 为 0x2 的请求,进行关闭 channel 即可。
      5.1、构造一个 BasicArguments 对象,设置其对象里的相应属性。
      5.2、构造一个 Request 对象,设置 request 对象里的相应属性。
      5.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

6、创建交换机(public boolean exchangeDeclare(String exchangeName,ExchangeType exchangeType,boolean durable,boolean autoDelete,Map<String , Object> arguments)):
      6.1、构造一个 ExchangeDeclareArguments 对象,设置该对象里的相应属性。
      6.2、构造一个 Request 对象,设置其对象里的相应属性。
      6.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

7、销毁交换机(public boolean exchangeDelete(String exchangeName)):
      7.1、构造一个 ExchangeDeleteArguments 对象,设置该对象里的相应属性。
      7.2、构造一个 Request 对象,设置其对象里的相应属性。
      7.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

8、创建队列(public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String , Object> arguments)):
      8.1、构造一个 queueDeclareArguments 对象,设置该对象里的相应属性。
      8.2、构造一个 Request 对象,设置其对象里的相应属性。
      8 .3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

9、删除队列(public boolean queueDelete(String queueName)):
      9.1、构造一个 queueDeleteArguments 对象,设置该对象里的相应属性。
      9.2、构造一个 Request 对象,设置其对象里的相应属性。
      9 .3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

10、创建绑定(public boolean queueBind(String exchangeName,String queueName,String bindingKey)):
      10.1、构造一个 queueBindArguments 对象,设置该对象里的相应属性。
      10.2、构造一个 Request 对象,设置其对象里的相应属性。
      10.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

11、删除绑定(public boolean queueUnbind(String exchangeName,String queueName)):
      11.1、构造一个 queueUnbindArguments 对象,设置该对象里的相应属性。
      11.2、构造一个 Request 对象,设置其对象里的相应属性。
      11.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

12:发布消息(public boolean basicPublish( String exchangeName,String routingKey,BasicProperties basicPropertoes,byte[] body)):
      12.1、构造一个 basicPublishArguments 对象,设置该对象里的相应属性。
      12.2、构造一个 Request 对象,设置其对象里的相应属性。
      12.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

13、订阅消息(public boolean basicComsume(String queueName,boolean autoAck,Comsumer comsumer)):
      13.1、先对当前 comsumer 进行判断,如果 comsumer != null ,说明 channel 已经设置过回调了,不再重复设置。
      13.2、构造一个 basicComsumeArguments 对象,设置该对象里的相应属性。
      13.3、构造一个 Request 对象,设置其对象里的相应属性。
      13.4、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

14、确认消息(public boolean basicAck(String queueName,String messageId)):
      13.1、构造一个 basicAckArguments 对象,设置该对象里的相应属性。
      13.2、构造一个 Request 对象,设置其对象里的相应属性。
      13.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/10221.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

proxySQL 安装与配置

# 官网下载 https://www.proxysql.com/documentation/installing-proxysql/ > cd ~/Downloads # 使用 dpkg 命令来安装下载的 .deb 包 > sudo dpkg -i proxysql_2.6.2-debian12_amd64.deb # 如果 dpkg 命令报告缺少依赖关系或安装过程中遇到问题&#xff0c;可以尝试运…

MySQL-双主复制

mysql 双主复制&#xff1a; 实现和MHA&#xff08;高可用架构&#xff09;相同的效果。 使用keepalived实现先双主热备&#xff0c;防止单点故障的主从结构。 两台mysql互做主从&#xff0c;各自安装keepalived实现热备&#xff0c;其他从服务器和热备的虚拟IP建立主从连接mys…

Linux下安装mysql8.0(以rpm包安装)

前言&#xff1a;原文在我的博客网站中&#xff0c;持续更新数通、系统方面的知识&#xff0c;欢迎来访&#xff01; Linux下安装mysql8.0&#xff08;以rpm包安装&#xff09;https://myweb.myskillstree.cn/125.html 目录 1、查操作系统信息 2、下载mysql 8.0.34的rpm包 …

MySQL中获取指定日期区间内所有日期

在数据库应用开发中&#xff0c;经常需要根据给定的日期范围获取区间内的每一天日期&#xff0c;这对于统计分析、报表生成、任务调度等领域至关重要。MySQL作为广泛应用的关系型数据库管理系统&#xff0c;提供了丰富的日期函数和查询技巧来实现这一需求。本文将深入探讨如何在…

【Anaconda 3 】Jupyter Notebook 的安装配置及使用

Jupyter Notebook 的安装配置及使用 一、引言 Jupyter Notebook 是一种交互式笔记本&#xff0c;它允许用户将代码、注释、方程式、可视化内容等整合到一个文档中&#xff0c;并支持多种编程语言&#xff0c;如 Python、R、Julia 等。它在数据科学、机器学习和教育领域中得到…

Goland GC

Goland GC 引用Go 1.3 mark and sweep 标记法Go 1.5 三色标记法屏障机制插入屏障删除写屏障总结 Go 1.8 混合写屏障(hybrid write barrier)机制总结 引用 https://zhuanlan.zhihu.com/p/675127867 Garbage Collection&#xff0c;缩写为GC&#xff0c;一种内存管理回收的机制…

条件平差——以水准网平差为例 (python详细过程版)

目录 一、原理概述二、案例分析三、代码实现四、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、原理概述 条件平差的函数模型和随机模型为: A V + W = 0

大模型时代,程序员如何卷?

最近在看电影《碟中谍7》&#xff0c;该片讲述了特工伊森亨特尝试与一个被称为智体的全能人工智能作战&#xff0c;其可以即时访问任何在线网络&#xff0c;他和他的团队成员试图找回控制人工智能智体所必需的两部分钥匙并将其摧毁的故事。 在剧中&#xff0c;智体是一个虚拟反…

文旅行业| 某景区导游培养和管理项目成功案例纪实

——整合导游资源并进行统一管理&#xff0c;构建完善的培养与管理机制&#xff0c;发挥景区导游价值 【客户行业】文旅行业&#xff1b;景区&#xff1b;文旅企业 【问题类型】人才培养&#xff1b;人员管理 【客户背景】 南方某5A级景区&#xff0c;作为国内极具代表性和特…

学习使用jQuery将光标移动到textarea的末尾

学习使用jQuery将光标移动到textarea的末尾 代码 代码 $(document).ready(function(){var textarea $(#your-qipa-id); // 替换为你的textarea IDtextarea.focus(); // 将焦点设置到textarea// 获取textarea的值的长度var len textarea.val().length;// 使用setSelectionRan…

【python】python中的argparse模块,教你如何自定义命令行参数

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

intellij idea中build project与build module以及rebuild module的区别与联系

IntelliJ IDEA是一个功能强大的Java集成开发环境(IDE)。它提供了几个与构建相关的操作,包括: Build Project: - 这将构建整个项目,包括其中的所有模块。 - 它将编译所有已修改的源文件,并重新生成输出文件(如.class文件)。 - 这通常用于确保整个项目的代码是最新的,并且可以正…

Openssl X509证书从HexStream中解析

整体思路 从hex 转换成字节流 然后从字节流中进行解析 You have access to the raw certificate in memory. In the case that you have access to the raw encoding of the certificate in memory, you can parse it as follows. This is useful if you have stored raw cer…

【Pip】pip 安装第三方包异常:[SSL:CERTIFICATE_VERIFY_FAILED]解决方案

pip 安装第三方包异常:[SSL:CERTIFICATE_VERIFY_FAILED] 大家好 我是寸铁&#x1f44a; 总结了一篇pip 安装第三方包异常:[SSL:CERTIFICATE_VERIFY_FAILED]✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 报错 今天在安装第三方包时报错如下: 解决方案 本质上是需要指定信任的镜像…

开启异步线程的方法

1&#xff0c;开启异步线程&#xff0c;在启动类上加注解&#xff1a; 2&#xff0c;自定义线程池&#xff1a; Configuration public class PromotionConfig {Beanpublic Executor generateExchangeCodeExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExec…

搜维尔科技:【案例分享】Xsens用于工业制造艺术创新设计平台

用户名称&#xff1a;北京理工大学 主要产品&#xff1a;Xsens MVN Awinda惯性动作捕捉系统 在设计与艺术学院的某实验室内&#xff0c;通过Xsens惯性动作捕捉&#xff0c;对人体动作进行捕捉&#xff0c;得到人体三维运动数据&#xff0c;将捕到的数据用于后续应用研究。…

蓝桥杯备战9.拼数

P1012 [NOIP1998 提高组] 拼数 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 这道题作对了&#xff0c;但是题解看到更好的 我写的史 #include<bits/stdc.h> #define endl \n #define int long long using namespace std; const int N 2e710,M 1e310; int a[N],h[…

x_t格式介绍

x_t格式 X_T格式&#xff0c;通常被称为Parasolid文件格式&#xff0c;是一种用于3D CAD数据交换的文件格式。Parasolid本身是一个几何建模内核&#xff0c;由Siemens PLM Software开发和维护&#xff0c;广泛应用于许多主流CAD、CAM、CAE系统中&#xff0c;如SolidWorks、Sol…

小心电子合同这个坑:手写签名图片

一、引言 在数字化浪潮的推动下&#xff0c;电子合同因其便捷性和高效性受到广泛应用。然而&#xff0c;在使用电子合同的过程中&#xff0c;一个看似简单的签名方式——手写签名图片&#xff0c;却可能带来意想不到的法律风险。本文将详细解析这一陷阱&#xff0c;并为大家提…

单播、组播、广播

​​​​​​ 概念 单播&#xff08;Unicast&#xff09; 单播是网络中最常用、最基本的通信方式。在单播通信中&#xff0c;数据包从一个节点发送到特定的另一个节点。换句话说&#xff0c;发送端和接收端之间建立一对一的连接&#xff0c;然后进行数据传输。 例如&#x…