根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)

目录

一、客户端代码实现

1.1、需求分析

1.2、具体实现

1)实现 ConnectionFactory

2)实现 Connection

3)实现 Channel

二、编写 Demo 

2.1、实例 

2.1、实例演示


一、客户端代码实现


1.1、需求分析

RabbitMQ 的客户端设定:一个客户端可以有多个模块,每个模块都可以和 broker server 之间建立 “逻辑上的连接” (channel),这几个模块的channel 彼此之间是互相不影响的,同时这几个 channel 又复用的同一个 TCP 连接,省去了频繁 建立/销毁 TCP 连接的开销(三次握手、四次挥手......).

这里,我们也按照这样的逻辑实现 消息队列 的客户端,主要涉及到以下三个核心类:

  1. ConnectionFactory:连接工厂,这个类持有服务器的地址,主要功能就是创建 Connection 对象.
  2. Connection:表示一个 TCP连接,持有 Socket 对象,用来 写入请求/读取响应,管理多个Channel 对象.
  3. Channel:表示一个逻辑上的连接,需要提供一系列的方法,去和服务器提供的核心 API 对应(客户端提供的这些方法的内部,就是写入了一个特定的请求,然后等待服务器响应).

1.2、具体实现

1)实现 ConnectionFactory

主要用来创建 Connection 对象.

public class ConnectionFactory {//broker server 的 ip 地址private String host;//broker server 的端口号private int port;//    //访问 broker server 的哪个虚拟主机
//    //这里暂时先不涉及
//    private String virtualHostName;
//    private String username;
//    private String password;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}
}

2)实现 Connection

属性如下

    private Socket socket;//一个 socket 连接需要管理多个 channelprivate ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;// DataXXX 主要用来 读取/写入 特定格式数据(例如 readInt())private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//用来处理 0xc 的回调,这里开销可能会很大,不希望把 Connection 阻塞住,因此使用 线程池 来处理private ExecutorService callbackPool;

构造如下

这里不光需要初始化属性,还需要创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理

    public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理Thread t = new Thread(() -> {try {while(!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {//连接正常断开的,此时这个异常可以忽略System.out.println("[Connection] 连接正常断开!");} catch(IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}

释放 Connection 相关资源

    public void close() {try {callbackPool.shutdown();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

使用这个方法来区别,当前的响应是一个针对控制请求的响应,还是服务器推送过来的消息.

如果是服务器推送过来的消息,就响应表明是 0xc,也就是一个回调,通过线程池来进行处理;

如果只是一个普通的响应,就把这个结果放到 channel 的 哈希表中(随后 channel 会唤醒所有阻塞等待响应的线程,去 map 中拿数据).

    public void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if(response.getType() == 0xc) {//服务器推送过来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 该消息对应的 channel 再客户端中不存在!channelId=" + channel.getChannelId());}//执行该 channel 对象内部的回调(这里的开销未知,有可能很大,同时不希望把这里阻塞住,所以使用线程池来执行)callbackPool.submit(() -> {try {channel.getConsumer().handlerDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch(MqException | IOException e) {e.printStackTrace();}});} else {//当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把这个结果放到 channel 的 哈希表中Channel channel = channelMap.get(basicReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}

发送请求和读取响应

    /*** 发送请求* @param request* @throws IOException*/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求!type=" + request.getType() + ", length=" + request.getLength());}/*** 读取响应*/public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if(n != response.getLength()) {throw new IOException("读取的响应格式不完整! n=" + n + ", responseLen=" + response.getLength());}response.setPayload(payload);System.out.println("[Connection] 收到响应!type=" + response.getType() + ", length=" + response.getLength());return response;}

在 Connection 中提供创建 Channel 的方法

    public Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);//放到 Connection 管理的 channel 的 Map 集合中channelMap.put(channelId, channel);//同时也需要把 “创建channel” 这个消息告诉服务器boolean ok = channel.createChannel();if(!ok) {//如果创建失败,就说明这次创建 channel 操作不顺利//把刚才加入 hash 表的键值对再删了channelMap.remove(channelId);return null;}return channel;}

Ps:代码中使用了很多次 UUID ,这里我们和之前一样,使用加前缀的方式来进行区分.

3)实现 Channel

属性和构造如下

    private String channelId;// 当前这个 channel 是属于哪一个连接private Connection connection;//用来存储后续客户端收到的服务器响应,已经辨别是哪个响应(要对的上号) key 是 ridprivate ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();//如果当前 Channel 订阅了某个队列,就需要记录对应的回调是什么,当该队列消息返回回来的时候,调用回调//此处约定一个 Channel 只能有一个回调private Consumer consumer;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}public Connection getConnection() {return connection;}public void setConnection(Connection connection) {this.connection = connection;}public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {return basicReturnsMap;}public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {this.basicReturnsMap = basicReturnsMap;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer = consumer;

实现 0x1 创建 channel

主要就是构造构造出 request,然后发送请求到 BrokerServer 服务器,阻塞等待服务器响应.

    /*** 0x1* 和服务器进行交互,告诉服务器,此处客户端已经创建了新的 channel 了* @return*/public boolean createChannel() throws IOException {//构造 payloadBasicArguments arguments = new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(arguments);//发送请求Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);//等待服务器响应BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}/*** 生成 rid* @return*/public String generateRid() {return "R-" + UUID.randomUUID().toString();}/*** 阻塞等待服务器响应* @param rid* @return*/private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while((basicReturns = basicReturnsMap.get(rid)) == null) {//查询结果为空,就说明咱们去菜鸟驿站要取的包裹还没到//此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}basicReturnsMap.remove(rid);return basicReturns;}/*** 由 Connection 中的方法调用,区分为普通响应之后触发* 将响应放回到 channel 管理的 map 中,并唤醒所有线程* @param basicReturns*/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {//当前也不知道有多少线程再等待上述的这个响应//因此就把所有等待的线程唤醒notifyAll();}}

Ps:其他的 请求操作也和 0x1 的方式几乎一样,这里不一一展示了,主要说一下 0xa

0xa 消费者订阅队列消息,这里要先设置好回调到属性中,方便 Connection 通过这个属性来 处理回调

值得注意的一点, 我们约定 channelId 就是 consumerTag

    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws IOException, MqException {//先设置回调if(this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息回调了,不能重复!");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 注意:此处的 consumerTag 使用 channelId 来表示basicConsumeArguments.setQueueName(queueName);basicConsumeArguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.isOk();}

二、编写 Demo 


2.1、实例 

到了这里基本就实现完成了一个 跨主机/服务器 之间的生产者消费者模型了(功能上可以满足日常开发对消息队列的使用),但是还具有很强的扩展性,可以继续参考 RabbitMQ,如果有想法的,或者是遇到不会的问题,可以私信我~

以下我来我来编写一个 demo,模拟 跨主机/服务器 之间的生产者消费者模型(这里为了方便,就在本机演示).

首先再 spring boot 项目的启动类中 创建 BrokerServer ,绑定端口号,然后启动

@SpringBootApplication
public class RabbitmqProjectApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(RabbitmqProjectApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}

编写消费者

public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机和队列(这里和生产者创建交换机和队列不冲突,谁先启动,就按照谁的创建,即使已经存在交换机和队列,再创建也不会有什么副作用)channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("demoQueue", true, false, false, null);//消费者消费消息channel.basicConsume("demoQueue", true, new Consumer() {@Overridepublic void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("开销消费");System.out.println("consumerTag=" + consumerTag);System.out.println("body=" + new String(body));System.out.println("消费完毕");}});//由于消费者不知道生产者要生产多少,就在这里通过循环模拟一直等待while(true) {Thread.sleep(500);}}}

编写生产者

public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机和队列(这里和消费者创建交换机和队列不冲突,谁先启动,就按照谁的创建,即使已经存在交换机和队列,再创建也不会有什么副作用)channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("demoQueue", true, false, false, null);//生产消息byte[] body1 = "Im cyk1 !".getBytes();channel.basicPublish("demoExchange", "demoQueue", null, body1);Thread.sleep(500);//关闭连接channel.close();connection.close();}}

2.1、实例演示

启动 spring boot 项目(启动 BrokerServer)

运行消费者(消费者和生产者谁先后运行都可以)

 

运行生产者

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/50486.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mybatis的动态SQL及关键属性和标识的区别(对SQL更灵活的使用)

&#xff08; 虽然文章中有大多文本内容&#xff0c;想了解更深需要耐心看完&#xff0c;必定大有受益 &#xff09; 目录 一、动态SQL ( 1 ) 是什么 ( 2 ) 作用 ( 3 ) 优点 ( 4 ) 特殊标签 ( 5 ) 演示 二、#和$的区别 2.1 #使用 ( 1 ) #占位符语法 ( 2 ) #优点 2.…

朴素贝叶斯==基于样本特征来预测样本属于的类别y

目录 朴素贝叶斯基于样本特征来预测样本属于的类别y 朴素贝叶斯算法的基本概念与核心思想 假设两个特征维度之间是相互独立的 拉普拉斯平滑增加出现次数保证0不出现 ​编辑 基于样本特征来预测样本属于的类别y 什么是拉普拉斯平滑 朴素贝叶斯基于样本特征来预测样本属于的…

软考高项(九)项目范围管理 ★重点集萃★

&#x1f451; 个人主页 &#x1f451; &#xff1a;&#x1f61c;&#x1f61c;&#x1f61c;Fish_Vast&#x1f61c;&#x1f61c;&#x1f61c; &#x1f41d; 个人格言 &#x1f41d; &#xff1a;&#x1f9d0;&#x1f9d0;&#x1f9d0;说到做到&#xff0c;言出必行&am…

C#反编译工具ILSPY

ILSPY ILSpy 是一个开源的.Net程序集浏览器和反编译工具。 Visual Studio 2022附带了默认情况下启用的F12反编译支持&#xff08;使用我们的引擎v7.1&#xff09;。 在Visual Studio 2019中&#xff0c;您必须手动启用F12支持。转到“工具”/“选项”/“文本编辑器”/C#/Adva…

c#设计模式-结构型模式 之 外观模式

概述 外观模式&#xff08;Facade Pattern&#xff09;又名门面模式&#xff0c;隐藏系统的复杂性&#xff0c;并向客户端提供了一个客户端可以访问系统的接口。这种类型的设计模式属于结构型模式&#xff0c;它向现有的系统添加一个接口&#xff0c;来隐藏系统的复杂性。该模式…

【洛谷算法题】P1000-超级玛丽游戏【入门1顺序结构】

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P1000-超级玛丽游戏【入门1顺序结构】&#x1f30f;题目描述&#x1f30f;输入格…

Android 多渠道打包及VasDolly使用

目录 1.添加productFlavors的配置buildConfigFieldmanifestPlaceholdersresValue 2.设置apk文件的名称&#xff0c;便于识别3.添加vasdolly、添加gradle脚本&#xff08;windows&#xff09; 作用&#xff1a;一次性可以打多个apk包&#xff0c;名字、包名、logo等可以不相同。…

jstat(JVM Statistics Monitoring Tool):虚拟机统计信息监视工具

jstat&#xff08;JVM Statistics Monitoring Tool&#xff09;&#xff1a;虚拟机统计信息监视工具 用于监视虚拟机各种运行状态信息的命令行工具。 它可以显示本地或者远程虚拟机进程中的类加载、内存、垃圾收集、即时编译等运行时数据&#xff0c;在没有GUI图形界面、只提…

如何使用CSS实现一个水平居中和垂直居中的布局?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 水平居中布局⭐ 垂直居中布局⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣…

共创无线物联网数字化新模式|协创数据×企企通采购与供应链管理平台项目成功上线

近日&#xff0c;全球无线物联网领先者『协创数据技术股份有限公司』&#xff08;以下简称“协创数据”&#xff09;SRM采购与供应链项目全面上线&#xff0c;并于近日与企企通召开成功召开项目上线总结会。 基于双方资源和优势&#xff0c;共同打造了物联网特色的数字化采购供…

齐聚众力,中国移动以“百川”定乾坤

近日&#xff0c;由工业和信息化部、宁夏回族自治区人民政府主办的2023中国算力大会在宁夏银川举办。中国移动党组书记、董事长杨杰参加开幕式&#xff0c;并在大会主论坛作题为《算网筑基锻引擎 数实融合创未来》的主旨演讲。在演讲中&#xff0c;杨杰表示&#xff1a;未来&am…

【Linux】线程篇Ⅱ:

线程Ⅱ &#x1f517;接上篇【线程篇Ⅰ】五、线程库 和 线程 id六、同步与互斥 &#x1f517;接上篇【线程篇Ⅰ】 &#x1f449;【Linux】线程篇Ⅰ&#xff1a;线程和task_struct 执行流的理解、相关接口命令、线程异常、线程的私有和共享 五、线程库 和 线程 id 对于 Linux …

Dockerfile快速搭建自己专属的LAMP环境,生成镜像lamp:v1.1,并推送到私有仓库

环境&#xff1a; CentOS 7 Linux 3.10.0-1160.el7.x86_64 具体要求如下&#xff1a; &#xff08;1&#xff09;基于centos:6基础镜像&#xff1b; &#xff08;2&#xff09;指定作者信息&#xff1b; &#xff08;3&#xff09;安装httpd、mysql、mysql-server、php、ph…

sql递归查询

一、postgresql 递归sql with recursive p as(select t1.* from t_org_test t1 where t1.id2union allselect t2.*from t_org_test t2 join p on t2.parent_idp.id) select id,name,parent_id from p; sql中with xxxx as () 是对一个查询子句做别名&#xff0c;同时数据库会对…

ubuntu卸载python导致无法进入图形页面

首先&#xff0c;进入tty界面&#xff0c;输入用户名和密码。如果忘记了用户名和密码&#xff0c;建议直接重装系统。修改tty的编码格式&#xff0c;输入 $LANG查看tty的编码格式&#xff0c;建议修改编码格式&#xff0c;避免中文或其他信息显示为■■■■■或◆◆◆◆◆ ex…

uCharts 运行微信小程序时,图表放在scroll-view中点击后不能正确获取点击的currentIndex一直为-1

图表在APP和H5中的点击位置是正常的,在微信小程序中会出现点击位置不对且有部分地方点击不到,最终我的解决方法如下。 1.查看包裹图表的元素中有没有元素开启了定位,可以去除定位属性试一试。 2.为微信平台的图表添加 isScrollView="true"属性。 解决方案: 添加 …

【快速傅里叶变换(fft)和逆快速傅里叶变换】生成雷达接收到的经过多普勒频移的脉冲雷达信号(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【InsCode】InsCode打造的JavaSE与Linux命令互融的伪Linux文件系统小项目

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;啥技术都喜欢捣鼓捣鼓&#xff0c;喜欢分享技术、经验、生活。 &#x1f60e;人生感悟&#xff1a;尝尽人生百味&#xff0c;方知世间冷暖。 &#x1f4d6;所属专栏&#xff1a;Ja…

Ajax+Vue+ElementUI

文章目录 1.Ajax1.1 介绍1.2 Ajax快速入门1.3 案例-用户注册时&#xff0c;检测用户名是否数据库已经存在1.4 Axios1.4.1 Axios快速入门1.4.2 请求别名 1.5 JSON1.5.1 Json的基础语法1.5.2 FastJson的使用5.3.2 Fastjson 使用 2. Vue2.1 介绍2.2 Vue快速入门2.3 Vue常用指令和生…