【JavaEE Spring 项目】消息队列的设计

消息队列的设计

  • 一、消息队列的背景知识
  • 二、需求分析
    • 核心概念
    • ⼀个⽣产者, ⼀个消费者
    • N 个⽣产者, N 个消费者
    • Broker Server 中的相关概念
    • 核⼼ API
    • 交换机类型 (Exchange Type)
    • 持久化
    • ⽹络通信
    • 消息应答
  • 三、 模块划分
  • 四、 项⽬创建
  • 五、创建核心类
    • 创建 Exchange
    • 创建 MSGQUeue
    • 创建 Binding
    • 创建Message
  • 六、 数据库设计
    • 配置 SQLite
    • 实现创建表
    • 实现数据库基本操作
    • 实现 DataBaseManager
    • 测试 DataBaseManager
  • 七、消息存储设计
    • 设计思路
    • 创建 MessageFileManager 类
    • 实现统计⽂件读写
    • 实现创建队列⽬录
    • 实现删除队列⽬录
    • 检查队列⽂件是否存在
    • 实现消息对象序列化/反序列化
    • 实现写⼊消息⽂件
    • 实现删除消息
    • 实现消息加载
    • 实现垃圾回收(GC)
    • 测试 MessageFileManager
  • ⼋、 整合数据库和⽂件
    • 创建 DiskDataCenter
    • 封装 Exchange ⽅法
    • 封装 Queue ⽅法
    • 封装 Binding 方法
    • 封装 Message ⽅法
  • 九、 内存数据结构设计
    • 创建 MemoryDataCenter
    • 封装 Exchange ⽅法
    • 封装 Queue ⽅法
    • 封装 Binding ⽅法
    • 封装 Message ⽅法
    • 针对未确认的消息的处理
    • 实现重启后恢复内存
    • 测试 MemoryDataCenter
  • ⼗、 虚拟主机设计
    • 创建 VirtualHost
    • 实现构造⽅法和 getter
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发布消息
    • 路由规则
    • 订阅消息
    • 消息确认
    • 测试 VirtualHost
  • ⼗⼀、 ⽹络通信协议设计
    • 明确需求
    • 设计应⽤层协议
    • 定义 Request / Response
    • 定义参数⽗类
    • 定义返回值⽗类
    • 定义其他参数类
  • ⼗⼆、 实现 BrokerServer
    • 创建 BrokerServer 类
    • 启动/停⽌服务器
    • 实现处理连接
    • 实现 readRequest
    • 实现 writeResponse
    • 实现处理请求
    • 实现 clearClosedSession
  • ⼗三、 实现客⼾端
    • 创建 ConnectionFactory
    • Connection 和 Channel 的定义
    • 封装请求响应读写操作
    • 创建 channel
    • 发送请求
    • 关闭 channel
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发送消息
    • 订阅消息
    • 确认消息
    • 处理响应
    • 关闭 Connection
    • 测试代码
  • 项目结果

一、消息队列的背景知识

  • Java标准库中有提供阻塞队列的数据结构, 阻塞队列最重要的用途是实现生产者消费者模型;
  • 生产者消费者模型存在诸多好处, 是后端开发的常用编程方式
    • 解耦合
    • 削峰填谷
  • 在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求, 因此我们通常会把阻塞队列, 封装成一个独立的服务器程序, 并且赋予其更丰富的功能, 这样的程序就被称为 消息队列 (Message Queue, MQ)
  • 市场上有许多消息队列, 如
    • RabbitMQ
    • Kafka
  • 这里仿照 RabbitMQ 来模拟实现一下消息队列

二、需求分析

核心概念

  • ⽣产者 (Producer)
  • 消费者 (Consumer)
  • 中间⼈ (Broker)
  • 发布 (Publish) : 生产者向中间人这里投递消息的过程
  • 订阅 (Subscribe) : 哪些消费者要从这个中间人这里取数据, 这个注册的过程, 称为 “订阅”
  • 消费 (Consume): 消费者从中间人这里取走消息后处理数据的动作

通过取快递来理解上述概念

  1. 商家就是生产者
  2. "我"就是消费者
  3. 菜鸟驿站就是中间人
  4. 首先可以是商家向菜鸟驿站发快递 (发布)
  5. 接着 ''我" 关注哪个商家发的快递 (订阅)
  6. 最后"我"从菜鸟驿站中取走快递后, 并使用快递里的商品 (消费)

⼀个⽣产者, ⼀个消费者

在这里插入图片描述

N 个⽣产者, N 个消费者

在这里插入图片描述
其中, Broker 是最核⼼的部分. 负责消息的存储和转发.

Broker Server 中的相关概念

  • 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”, 是一个逻辑上的集合, 一个 BrokerServer 上可以存在多个 VirtualHost
  • 交换机(Exchange): 生产者把消息先发到Broker的Exchange上, 在根据不同的规则, 把消息转发给不同的Queue
  • 队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个Queue上读取消息
  • 绑定(Binding): Exchange 和 Queue 之间的关联关系, Exchange 和 Queue可以理解成 “多对多” 关系, 使用一个关联表就可以把这两个概念联系起来
  • 消息 (Message): 传递的内容

所谓的Exchange 和 Queue 可以理解成 “多对多” 关系, 和数据库中的 “多对多” 一样, 意思是:
一个Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
一个 Queue 也可被多个Exchange 绑定(一个 Queue 中的消息可以来自于多个 Exchange)

关系结构图

在这里插入图片描述
这些概念, 既需要在内存中存储, 也需要在硬盘上存储.

  • 内存存储: 方便使用
  • 磁盘存储: 重启服务器后数据不丢失

核⼼ API

对于 Broker 来说, 要实现以下核心 API, 通过这些 API 来实现消息队列的基本功能

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.

交换机类型 (Exchange Type)

对于 RabbitMQ 来说, 主要支持四种交换机类型

  • Direct
  • Fanout
  • Topic
  • Header

其中 Header 这种⽅式⽐较复杂, ⽐较少⻅. 常⽤的是前三种交换机类型. 此处也主要实现这三种.

  • Direct: 生产者发送消息时, 直接指定被该交换机绑定的队列名
  • Fanout: 生产者发送的消息会被复制到交换机的所有队列中
  • Topic: 绑定队列到交换机上时, 指定一个字符串为 bindingKey, 发送消息指定一个字符串为 routingKey, 当 routingKey 和 bindingKey 满足一定匹配条件的时候, 则把消息投递到指定队列中

这三个操作就像有发奖品一样

  • Direct是发一个专属的奖品给特定的人, 只有指定的人才能领取
  • Fanout 就是给每一个人都发一个安慰奖
  • Topic是有奖竞猜, 出了一道题, 只有作答并正确的人才能领取到奖品

持久化

Exchange, Queue, Binding, Message 都有持久化需求.

当程序重启 / 主机重启, 保证上述内容不丢失.

⽹络通信

⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.

在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.

Connection 对应⼀个 TCP 连接

Channel 则是 Connection 中的逻辑通道

⼀个 Connection 中可以包含多个 Channel.

Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.

这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.

Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆.
在这里插入图片描述

消息应答

被消费的消息, 需要进行应答

应答模式分成两种

  • 自动应答: 消费者只要消费了消息, 就算应答完毕了, Broker 直接删除这个消息
  • 手动应答: 消费者手动调用应答接口, Broker 收到应答请求之后, 才真正删除这个消息

手动应答的目的, 是为了保证消息确实被消费者处理成功了, 在一些对于数据可靠性要求高的场景, 比较常见

三、 模块划分

在这里插入图片描述
可以看到, 像 交换机, 队列, 绑定, 消息, 这⼏个核⼼概念在内存和硬盘中都是存储了的.

其中内存为主, 是⽤来实现消息转发的关键; 硬盘为辅, 主要是保证服务器重启之后, 之前的信息都可以正常保持.

四、 项⽬创建

创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 8.
依赖引⼊ Spring Web 、 MyBatis 和 lombok.

五、创建核心类

创建包 mqserver.mq

创建 Exchange

/*** Created with IntelliJ IDEA.* Description:这个类表示一个交换机** @author: zxj* @date: 2024-02-25* @time: 20:05:48*/
@Data
public class Exchange {// 此处使用 name 来作为交换机的身份标识 (唯一的)private String name;// 交换机类型: Direct, Fanout, Topicprivate ExchangeType type = ExchangeType.DIRECT;// 该交换机是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.private Boolean durable = false;// RabbitMQ 有的字段, 相关功能待开发// 该属性表示 如果当前交换机没有人用了, 就会自动删除private Boolean autoDelete = false;// arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发private Map<String,Object> arguments = new HashMap<>();
}
package en.edu.zxj.mq.mqserver.core;/*** Created with IntelliJ IDEA.* Description:交换机类型*     • Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.*     • Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.*     • Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey.*              当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.** @author: zxj* @date: 2024-02-25* @time: 20:10:02*/
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private Integer type;ExchangeType(Integer type) {this.type = type;}public Integer getType() {return type;}
}
  • name: 交换机的名字, 相当于交换机的身份标识
  • type: 交换机的类型, 三种取值, DIRECT, FANOUT, TOPIC
  • durable: 交换机是否要持久化存储, true 为持久化, false 不持久化
  • autoDelete: 使用完毕后是否自动删除
  • arguments: 交换机的其他参数属性

创建 MSGQUeue

/*** Created with IntelliJ IDEA.* Description:消息队列,* 类名叫做 MSGQueue, ⽽不是 Queue, 是为了防⽌和标准库中的 Queue 混淆** @author: zxj* @date: 2024-02-25* @time: 20:19:52*/
@Data
public class MSGQueue {// 表示队列的身份标识.private String name;// 该消息队列是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.private Boolean durable = false;// 以下为保留字段// exclusive 为 true, 表示这个队列只能被一个消费者使用(别人用不了), 如果为 false 则是大家都能使用private Boolean exclusive = false;// 该属性表示 如果当前交换机没有人用了, 就会自动删除private Boolean autoDelete = false;// arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发private Map<String, Object> arguments = new HashMap<>();
}    

类名叫做MSGQUeue, 而不是 Queue, 是为了防止和标准库中的Queue混淆

  • name: 队列名字, 相当于队列的身份标识
  • durable: 交换机是否要持久化存储, true 为持久化, false 不持久化
  • exclusive: 独占(排他), 队列只能被一个消费者使用
  • autoDelete: 使用完毕后是否自动删除
  • arguments: 消息队列的其他参数属性

创建 Binding

/*** Created with IntelliJ IDEA.* Description:表示队列和交换机之间的关联关系** @author: zxj* @date: 2024-02-25* @time: 20:24:23*/
@Data
public class Binding {// exchangeName 交换机名字private String exchangeName;// queueName 队列名字private String queueName;// bindingKey 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配.private String bindingKey;
}
  • exchangeName: 交换机名字
  • queueName: 队列名字
  • bindingKey: 只在交换机类型为 TOPIC 时才有效, 用于和消息中的 routingKey 进行匹配

创建Message

/*** Created with IntelliJ IDEA.* Description:表示一个要传递的消息* 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.* 此时就需要针对 Message 进行序列化和反序列化.* 此处使用 标准库 自带的 序列化/反序列化 操作.** @author: zxj* @date: 2024-02-25* @time: 20:55:17*/
@Data
@Component
public class Message implements Serializable {// Message 核心属性private BasicProperties basicProperties = new BasicProperties();// 存储需要传输的消息, 使用字节的方式存储private byte[] body;// 辅助属性/*** 一个文件中会存储很多信息, 如何找到某个消息, 在文件中的具体位置呢?* 使用下列的两个偏移量进行表示, [offset, offsetEnd)* 这两个属性并不需要被反序列化保存到文件中, 此时信息一旦被写入文件之后, 所在的位置就固定了, 并不需要单独存储* 这两个属性存在的目的, 主要是为了让内存中的 Message 对象, 能够快速找到对应硬盘上的 Message 位置.**/private long offsetBeg = 0; // 消息数据开头距离文件开头的偏移位置 (单位是字节)private long offsetEnd = 0; // 消息数据结尾距离文件开头的偏移位置 (单位是字节)/*** 使用这个属性表示该消息在文件中是否是有效信息. (针对文件中的信息, 如果删除, 使用逻辑删除的方式)* 0x1 表示有效, 0x0 表示无效**/private byte isValid = 0x1;/*** @description: 工厂模式创建 Message 实例* 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程* 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId* 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主**/public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message = new Message();if (basicProperties != null) {message.setBasicProperties(basicProperties);}// 此处生成的 MessageId 以 M- 作为前缀.message.setMessageId("M-" + UUID.randomUUID());message.setRoutingKey(routingKey);message.setBody(body);/* 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.此处只是在内存中创建一个 Message 对象.*/return message;}
}
/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-02-25* @time: 21:18:11*/
@Component
@Data
public class BasicProperties implements Serializable {// 消息的唯一身份标识, 此处为了保证 id 的唯一性, 使用 UUID 来作为 messageIdprivate String messageId;/*** 是一个消息上带有的内容, 和 bindingKey 做匹配* 如果当前交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名* 如果当前交换机类型是 FANOUT, 此时 routingKey 没有意义* 如果当前交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey做匹配, 符合要求的才能转发给对应的队列**/private String routingKey;// 这个属性表示消息是否要持久化: 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样的)private Integer deliverMode = 1;
}
  • Message 需要实现 Serializable 接口, 后续需要把Message写入文件以及进行网络传输
  • basicProperties: 是消息的属性信息, body 是消息体
  • offsetBegoffsetEnd 表示消息在消息文件中所在的起始位置和结束位置, 这一块具体的设计后续再说; 使用 transient 关键字避免属性被序列化
  • isValid 用来标识消息在文件中是否有效, 这一块具体设计后续再说
  • createMessageWithId 相当于一个工厂方法, 用来创建一个 Message 实例, messageId 通过 UUID 的方式来生成

六、 数据库设计

对于 Exchange,MSGQUeue,Binding,我们使用数据库进行持久化保存

此处我们使用的数据库是 SQLite,是一个更轻量的数据库

SQLite 只是一个动态库,我们在 Java 中直接注入 SQLite 依赖即可直接使用,不必安装其他的软件

配置 SQLite

引入 pom.xml 依赖

 <!--导入 sqlite 数据库--><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.45.1.0</version></dependency>

配置数据源 application.yml

spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:configuration:map-underscore-to-camel-case: true #配置驼峰自动转换log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句mapper-locations: classpath:mapper/**Mapper.xml

username 和 password 空着即可

此处我们约定, 把数据库文件放到 ./data/meta.db 中

SQLite 只是把数据单存的存储到一个文件中, 非常简单方便

实现创建表

@Mapper
public interface MetaMapper {// 建表操作void createExchangeTable();void createMSGQueueTable();void createBindingTable();
}

本身 MyBatis 针对 MySQL 和 Oracle 是可以执行多个 sql 语句的, 但是 SQLite 不行

MetaMapper.xml 中 具体实现 sql 语句

<update id="createExchangeTable">create table if not exists exchange_table(name          varchar(64) primary key,type          int comment '0 表示 Direct, 1 表示 Fanout, 2 表示 Topic',durable       boolean,auto_delete   boolean,arguments     varchar(1024),`delete_flag` tinyint(4) DEFAULT '0',`create_time` datetime   DEFAULT CURRENT_TIMESTAMP,`update_time` datetime   DEFAULT CURRENT_TIMESTAMP);</update><update id="createMSGQueueTable">create table if not exists msg_queue_table(name          varchar(64) primary key,durable       boolean,exclusive     boolean,auto_delete   boolean,arguments     varchar(1024),`delete_flag` tinyint(4) DEFAULT '0',`create_time` datetime   DEFAULT CURRENT_TIMESTAMP,`update_time` datetime   DEFAULT CURRENT_TIMESTAMP);</update><update id="createBindingTable">create table if not exists binding_table(exchange_name varchar(64),queue_name    varchar(64),binding_key   varchar(64),`delete_flag` tinyint(4) DEFAULT '0',`create_time` datetime   DEFAULT CURRENT_TIMESTAMP,`update_time` datetime   DEFAULT CURRENT_TIMESTAMP);</update>

实现数据库基本操作

给 mapper.MetaMapper 中添加

// 相关的增删改查操作Integer insertExchange(Exchange exchange);Integer deleteExchangeByName(String name);List<Exchange> selectAllExchanges();Integer insertMSGQueue(MSGQueue msgQueue);Integer deleteMSGQueueByName(String name);List<MSGQueue> selectAllMSGQueues();Integer insertBinding(Binding binding);Integer deleteBinding(String exchangeName, String queueName);List<Binding> selectAllBindings();

相关sql语句实现

    <insert id="insertExchange">insert into exchange_table (name, type, durable, auto_delete, arguments)values (#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});</insert><insert id="insertMSGQueue">insert into msg_queue_table (name, durable, exclusive, auto_delete, arguments)values (#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});</insert><insert id="insertBinding">insert into binding_table (exchange_name, queue_name, binding_key)values (#{exchangeName}, #{queueName}, #{bindingKey});</insert><update id="deleteExchangeByName">update exchange_tableset delete_flag = 1where name = #{nanme};</update><update id="deleteMSGQueueByName">update msg_queue_tableset delete_flag = 1where name = #{nanme};</update><update id="deleteBinding">update binding_tableset delete_flag = 1where exchange_name = #{exchangeName}and queue_name = #{queueName};</update><select id="selectAllExchanges" resultType="en.edu.zxj.mq.mqserver.core.Exchange">select name,type,durable,auto_delete,arguments,delete_flag,create_time,update_timefrom exchange_tablewhere delete_flag = 0;</select><select id="selectAllMSGQueues" resultType="en.edu.zxj.mq.mqserver.core.MSGQueue">select name,durable,exclusive,auto_delete,arguments,delete_flag,create_time,update_timefrom msg_queue_tablewhere delete_flag = 0;</select><select id="selectAllBindings" resultType="en.edu.zxj.mq.mqserver.core.Binding">select exchange_name,queue_name,binding_key,delete_flag,create_time,update_timefrom binding_tablewhere delete_flag = 0;</select>

实现 DataBaseManager

mqserver.datacenter.DatabaseManager

  1. 创建 DatabaseManager 类 – 通过这个类来封装对数据库的操作

/*** Created with IntelliJ IDEA.* Description:通过这个类来封装针对数据库的操作.** @author: zxj* @date: 2024-02-26* @time: 21:21:21*/
@Slf4j
public class DatabaseManager {// 由于 DataBaseManager 不是⼀个 Bean// 需要⼿动来获取实例private MetaMapper metaMapper;public void init() {metaMapper = MqApplication.context.getBean(MetaMapper.class);if (!checkDBExits()) {// 数据库不存在// 1. 先创建目录File file = new File("./data/");if (!file.exists()) {file.mkdirs();}// 2. 建表createTables();// 3. 插入默认的数据createDefaultData();log.info("创建数据库成功~");} else {log.info("数据库已经存在!");}}}

如果数据库已经存在了, 就不必建库建表了

针对 MqApplication, 需要新增⼀个 context 属性. 并初始化

@SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context = null;public static void main(String[] args) {context = SpringApplication.run(MqApplication.class, args);}
}
  1. 实现 checkDBExists
 private boolean checkDBExits() {File file = new File("./data/meta.db");return file.exists();}
  1. 实现 createTable
 /*** 这个方法用来建表.* 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)* 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)**/private void createTables() {metaMapper.createMSGQueueTable();metaMapper.createBindingTable();metaMapper.createExchangeTable();log.info("建表成功");}
  1. 实现 createDefaultData
/*** @description: 给数据库表中, 添加默认的数据* 此处主要是添加一个默认的交换机* RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT**/
private void createDefaultData() {// 构造一个默认的交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(false);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info("创建默认的数据成功~");
}

默认数据主要是创建⼀个默认的交换机. 这个默认交换机没有名字, 并且是直接交换机.

  1. 封装其他的数据库操作
// 封装其他的数据库操作Integer insertExchange(Exchange exchange) {return metaMapper.insertExchange(exchange);}Integer deleteExchangeByName(String name) {return metaMapper.deleteExchangeByName(name);}List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}Integer insertMSGQueue(MSGQueue msgQueue) {return metaMapper.insertMSGQueue(msgQueue);}Integer deleteMSGQueueByName(String name) {return metaMapper.deleteMSGQueueByName(name);}List<MSGQueue> selectAllMSGQueues() {return metaMapper.selectAllMSGQueues();}Integer insertBinding(Binding binding) {return metaMapper.insertBinding(binding);}Integer deleteBinding(String exchangeName, String queueName) {return metaMapper.deleteBinding(exchangeName, queueName);}List<Binding> selectAllBindings() {return metaMapper.selectAllBindings();}// 清理资源public void deleteDB() {File file = new File("./data/meta.db");if (file.delete()) {log.info("删除数据库文件成功~");} else {log.info("删除数据库文件失败~");}File dataDir = new File("./data/");// 使用 delete 删除目录的时候, 需要保证目录是空的.if (dataDir.delete()) {log.info("删除数据库目录成功~");} else {log.info("删除数据库目录失败~");}}

测试 DataBaseManager

使⽤ Spring ⾃带的单元测试, 针对上述代码进⾏测试验证.

在 test ⽬录中, 创建 DataBaseManagerTests

  1. 准备⼯作

@SpringBootTest
class DatabaseManagerTest {private final DatabaseManager databaseManager = new DatabaseManager();// 接下来需要编写多个方法, 每个方法都是一个/一组单元测试用例// 还需要做一个准备工作, 需要写两个方法, 分别用于今 "准备工作" 和 "收尾工作"// 使用这个方法, 来执行准备工作, 每个用例执行前, 都要调用这个方法@BeforeEachpublic void setUp() {// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的// 所以就需要先把 context 对象给搞出来, 给搞出来MqApplication.context = SpringApplication.run(MqApplication.class);databaseManager.init();}// 使用这个方法, 来执行收尾工作, 每个用例执行后, 都要调用这个方法@AfterEachpublic void tearDown() {/*这里需要进行操作, 就是把数据库给清空~ (把数据库文件, meta.db 直接删了就行了)注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件如果 meta.db 被别人打开了, 次数的删除文件操作是不会成功的 (Windows 系统的限制, Linux 没有这个问题)另一方面, 获取 context 操作, 会占用 8080 端口, 此处的 close 也是释放 8080*/MqApplication.context.close();databaseManager.deleteDB();}
}
  • @SpringBootTest 注解表示该类是一个测试类
  • @BeforeEach 每个测试用例之前执行, 一般用来做准备工作, 此处进行数据库初始化, 以及针对 Spring 服务的初始化
  • @AfterEach 每个测试用例之后执行, 一般用来做收尾工作, 此处需要先关闭 Spring 项目, 再删除数据库
  1. 编写测试用例
  • @Test 注解表示一个测试用例
  • Assertions 是断言, 用来判定结果的
  • 每个用例执行之前, 都会先调用 setUp, 每次用例执行后, 都会调用 tearDown
  • 确保每个用例执行的都是 “clean” 的, 也就是每个测试用例不会被上一个测试用例干扰

具体代码

@Testvoid init() {// 由于 init 方法, 已经在上面 setUp 方法中调用了, 直接在测试用例代码中, 检查当前的数据库状态即可// 直接从数据库中查询, 看数据是否符合预期.// 查交换机表, 里面应该有一个数据 (匿名的 exchange); 查消息队列表, 没有数据; 查绑定表, 没有数据List<Exchange> exchanges = databaseManager.selectAllExchanges();List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();List<Binding> bindings = databaseManager.selectAllBindings();/*直接打印结果, 通过肉眼来检查结果, 可以但是不优雅, 不方便更好的方法是使用断言System.out.println(exchanges.size());assertEquals 判定结果是不是相等注意 assertEquals 两个参数的顺序, 虽然比较相等, 谁在前, 谁在后, 无所谓但是 assertEquals 的形参, 第一个形参叫做 expected (预期), 第二个形参叫做 actual (实际的)*/Assertions.assertEquals(1, exchanges.size());Assertions.assertEquals("", exchanges.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT, exchanges.get(0).getType());Assertions.assertEquals(0, msgQueues.size());Assertions.assertEquals(0, bindings.size());}@org.jetbrains.annotations.NotNullprivate Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setDurable(true);exchange.setAutoDelete(false);exchange.setArguments("11", "aa");exchange.setArguments("22", "bb");return exchange;}@Testvoid insertExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.String exchangeName = "exchangeTest";Exchange exchange = createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果List<Exchange> exchanges = databaseManager.selectAllExchanges();Exchange newExchange = exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(true, newExchange.getDurable());Assertions.assertEquals(false, newExchange.getAutoDelete());Assertions.assertEquals("aa", newExchange.getArguments("11"));Assertions.assertEquals("bb", newExchange.getArguments("22"));}@Testvoid deleteExchangeByName() {// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!String exchangeName = "exchangeTest";Exchange exchange = createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果List<Exchange> exchanges = databaseManager.selectAllExchanges();Exchange newExchange = exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());// 删除交换机databaseManager.deleteExchangeByName(exchangeName);exchanges = databaseManager.selectAllExchanges();Assertions.assertEquals(1, exchanges.size());Assertions.assertEquals("", exchanges.get(0).getName());}@Testvoid selectAllExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.String exchangeName = "exchangeTest";Exchange exchange = createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果List<Exchange> exchanges = databaseManager.selectAllExchanges();Exchange newExchange = exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(true, newExchange.getDurable());Assertions.assertEquals(false, newExchange.getAutoDelete());Assertions.assertEquals("aa", newExchange.getArguments("11"));Assertions.assertEquals("bb", newExchange.getArguments("22"));}private MSGQueue createTestMSGQueue(String msgQueueName) {MSGQueue msgQueue = new MSGQueue();msgQueue.setName(msgQueueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);msgQueue.setArguments("a", 1);msgQueue.setArguments("b", 2);return msgQueue;}@Testvoid insertMSGQueue() {// 插入数据String msgQueueName = "testMSGQueueName";MSGQueue msgQueue = createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew = msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());Assertions.assertEquals(true, msgQueueNew.getDurable());Assertions.assertEquals(false, msgQueueNew.getAutoDelete());Assertions.assertEquals(false, msgQueueNew.getExclusive());Assertions.assertEquals(1, msgQueueNew.getArguments("a"));Assertions.assertEquals(2, msgQueueNew.getArguments("b"));}@Testvoid deleteMSGQueueByName() {// 插入数据String msgQueueName = "testMSGQueueName";MSGQueue msgQueue = createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew = msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());// 依据名字删除databaseManager.deleteMSGQueueByName(msgQueueName);msgQueues = databaseManager.selectAllMSGQueues();Assertions.assertEquals(0, msgQueues.size());}@Testvoid selectAllMSGQueue() {// 插入数据String msgQueueName = "testMSGQueueName";MSGQueue msgQueue = createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew = msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());}private @NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(msgQueueName);binding.setBindingKey("Hello word");return binding;}@Testvoid insertBinding() {// 插入 binding 数据String exchangeName = "testBindingExchangeName";String msgQueueName = "testBindingMSGQueueName";Binding binding = createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询List<Binding> bindings = databaseManager.selectAllBindings();Binding bindingNew = bindings.get(0);Assertions.assertEquals(1, bindings.size());Assertions.assertEquals(exchangeName, bindingNew.getExchangeName());Assertions.assertEquals(msgQueueName, bindingNew.getQueueName());Assertions.assertEquals("Hello word", bindingNew.getBindingKey());}@Testvoid deleteBinding() {// 插入 binding 数据String exchangeName = "testBindingExchangeName";String msgQueueName = "testBindingMSGQueueName";Binding binding = createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询List<Binding> bindings = databaseManager.selectAllBindings();Binding bindingNew = bindings.get(0);Assertions.assertEquals(1, bindings.size());// 删除databaseManager.deleteBinding(exchangeName, msgQueueName);bindings = databaseManager.selectAllBindings();Assertions.assertEquals(0, bindings.size());}@Testvoid selectAllBinding() {// 插入 binding 数据String exchangeName = "testBindingExchangeName";String msgQueueName = "testBindingMSGQueueName";Binding binding = createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询List<Binding> bindings = databaseManager.selectAllBindings();Binding bindingNew = bindings.get(0);Assertions.assertEquals(1, bindings.size());}

七、消息存储设计

设计思路

消息需要再硬盘上存储, 但是并不是直接放到数据库中,而是直接使用文件存储。

原因如下:

  1. 对于消息的操作不需要复杂的 增删改查
  2. 对于文件的操作效率比数据库会高很多

主流 mq 的实现 (包括 RabbitMQ), 都是把消息存储在文件中, 而不是数据库中

我们给每个队列分配一个目录, 目录的名字为 data + 队列名, 形如 ./data/testQueue, 该目录中包含两个固定名字的文件

  • queue_data.txt 消息数据文件, 用来保存消息内容
  • queue_stat.txt 消息统计文件, 用来保存消息统计信息

queue_data.txt 文件格式:

使用二进制方式存储.

每个消息分成两个部分:

  • 前四个字节, 表示 Message 对象的长度(字节为单位)
  • 后面若干个字节, 表示 Message 内容
  • 消息和消息之间收尾相连

每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化

在这里插入图片描述

Message 对象中的 offsetBeg 和 offsetEnd 正是用来描述每个消息体所在的位置

queue_static.txt 文件格式:
使用文本方式存储

文件中只包含一行, 里面包含两列(都是整数), 使用 \t 分割.

第一列表示当前总的消息数目. 第二列表示有效消息数目.

形如:

2000\t1500

创建 MessageFileManager 类

创建 mqserver.datacenter.MessageFileManager


/*** Created with IntelliJ IDEA.* Description:消息持久化存储* 存储单位是以队列名字为单位存储的** @author: zxj* @date: 2024-02-28* @time: 13:43:23*/
@Slf4j
public class MessageFileManger {private final static String BASIC_DIR = "./data/";/*** @description: 内部类, 用于管理 queue_stat.txt 中的数据* 存储格式: totalCount \t validCount* 作用: 为了后面垃圾回收功能做准备的* 约定: 当 有效信息的占比低于 50% 时, 并且 总的消息数目大于 2000 时, 触发 GC 功能**/static public class Stat {// 总信息的存储数目public Integer totalCount;// 有效的信息数目public Integer validCount;// 最少消息数目private static final Integer atLeastCount = 2000;// 最低有效信息占比private static final Double minProportion = 0.5;}public void init() {// 暂时不需要任何初始化操作, 方便后续扩展}// 设定信息存储的目录和文件/*** @description: 用来获取指定队列信息存储的目录**/@Contract(pure = true)private @NotNull String getQueueDir(String queueName) {return BASIC_DIR + queueName;}/*** @description: 用来获取指定队列信息数据存储路径**/@Contract(pure = true)private @NotNull String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}/*** @description: 用来获取指定队列信息记录存储路径**/@Contract(pure = true)private @NotNull String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}/*** @description: 用来获取指定队列新数据存储路径**/@Contract(pure = true)private @NotNull String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}}
  • 内部包含一个 Stat 类, 用来标识消息统计文件的内容
  • getQueueDir, getQueueDataPath, getQueueStatPath, getQueueDataNewPath 用来表示这几个文件的位置

实现统计⽂件读写

这是后续操作的一些准备工作

/*** @description: 读取 queue_stat.txt 文件里面的内容**/private @NotNull Stat readStat(String queueName) throws IOException {Stat stat = new Stat();try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueStatPath(queueName)))) {// 因为 queue_stat.txt 里面存储的内容是文本的, 所以可以使用 Scanner 来读取Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();}return stat;}/*** @description: 向 queue_stat.txt 文件里面写入内容**/private void writeStat(String queueName, @NotNull Stat stat) throws IOException {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream = Files.newOutputStream(Paths.get(getQueueStatPath(queueName)))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();}}

直接使用 Scanner 和 Printer 写即可

实现创建队列⽬录

每个队列都是自己的目录和配置的文件, 通过下列方法把目录和文件先准备好

/*** @description: 创建相关目录信息, 并进行相关的初始化操作**/public void createQueueFiles(String queueName) throws IOException {// 1. 创建对应的目录File fileDir = new File(getQueueDir(queueName));if (!fileDir.exists()) {// 不存在对应目录if (!fileDir.mkdirs()) {throw new IOException("创建目录失败, fileDir: " + fileDir.getAbsoluteFile());}}// 2. 创建 queue_data.txt 文件File fileData = new File(getQueueDataPath(queueName));if (!fileData.exists()) {// 不存在对应文件if (!fileData.createNewFile()) {throw new IOException("创建目录失败, fileData: " + fileData.getAbsoluteFile());}}// 3. 创建 queue_stat.txt 文件File fileStat = new File(getQueueDataPath(queueName));if (!fileStat.exists()) {// 不存在对应文件if (!fileStat.createNewFile()) {throw new IOException("创建目录失败, fileStat: " + fileStat.getAbsoluteFile());}}// 4. 初始化 queue_stat.txt 文件Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);}

把上述约定的文件都创建出来, 并对消息统计文件进行初始化

初始化 0\t0 这样的初始值

实现删除队列⽬录

如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除

    /*** @description: 销毁消息的目录文件**/public void destroyQueueFiles(String queueName) throws IOException {// 先删除文件, 在删除目录File fileData = new File(getQueueDataPath(queueName));boolean ok1 = fileData.delete();File fileStat = new File(getQueueStatPath(queueName));boolean ok2 = fileStat.delete();File fileDir = new File(getQueueDir(queueName));boolean ok3 = fileDir.delete();if (!ok1 || !ok2 || !ok3) {// 但凡有一个失败, 就算整体是失败的throw new IOException("删除指定文件和目录失败, dir: " + fileDir.getAbsoluteFile());}}

注意: File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉

检查队列⽂件是否存在

    /*** @description: 判断 queueName 对应的文件是否存在* 比如后续生产者给 broker server 生产消息了, 这消息可能需要被记录到文件上(取决于该信息是否需要持久化)* @return:**/public boolean checkFilesExists(String queueName) {// 判断队里的数据文件和状态文件是否存在即可File fileData = new File(getQueueDataPath(queueName));File fileStat = new File(getQueueStatPath(queueName));return fileStat.exists() && fileData.exists();}

实现消息对象序列化/反序列化

Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此处针对这⾥的逻辑进⾏封装.

创建 common.BinaryUtils


/*** Created with IntelliJ IDEA.* Description:操作二级制数据相关的工具类 -- 提供将 java 对象 反序列化和序列化** @author: zxj* @date: 2024-02-28* @time: 14:33:24*/
public class BinaryUtils {/*** @description: 反序列化, 将字节数组转化为 java 对象**/public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 这里的 readObject 就是将字节数组 反序列为 java 对象object = objectInputStream.readObject();}}return object;}public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数据// 可以把 Object 序列化的数据逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到 ObjectOutputStream中// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream 中, 最终结果就写入到 ByteArrayOutputStream 中了objectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据提取出现, 转化为 byte[]return byteArrayOutputStream.toByteArray();}}}
  • 使用 ByteArrayInputStream / ByteArrayOutputStream 针对 byte[ ]进行封装, 方便后续操作 (这两个流对象是纯内存的, 不需要进行 close)
  • 使用 ObjectInputStream / ObjectOutputStream 进行序列化和反序列化操作, 通过内部的 readObject / writeObject 即可完成对应操作
  • 此处涉及到的序列化对象, 需要实现 Serializable 接口

实现写⼊消息⽂件

/*** @description: 增* 将 message 放到 msgQueue 对应的队列文件中* @param: [msgQueue 消息队列, message 需要存储的信息 - 内存中也会管理该对象]**/public void sendMessage(@NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {// 1. 检查当前队列要写入的文件是否存在if (!checkFilesExists(msgQueue.getName())) {throw new MqException("[MessageFileManager] 队列所对应的文件不存在! queueName=" + msgQueue.getName());}// 2. 把 message 转化为 字节数组byte[] messageBinary = BinaryUtils.toBytes(message);// 将 messageBinary 写入到 msgQueue 所对应的队列文件中// 文件属于一个公共资源, 此时进行写操作, 存在线程安全的问题// 需要对对应的队列进行加锁, 确保同时向同一个队列中写入信息是线程安全的synchronized (msgQueue) {// 3. 设置 Message 对象中 offsetBeg 和 offsetEnd 字段// 3.1 获取此时对应文件的总长度, fileQueueData.length() 就可以获取File fileQueueData = new File(getQueueDataPath(msgQueue.getName()));// 3.2 计算// 把新的 message 写入到文件中, offsetBeg = 旧的总长度 + 4, offsetEnd = 旧的总长度 + messageBinary.length + 4message.setOffsetBeg(fileQueueData.length() + 4);message.setOffsetEnd(fileQueueData.length() + messageBinary.length + 4);// 4. 将 messageBinary 写入到文件的默认, 注意: 这里是追加写try (OutputStream outputStream = new FileOutputStream(getQueueDataPath(msgQueue.getName()), true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 4.1 先写入新 messageBinary 的长度 -- 固定占四个字节// 知识点: outputStream.write() 参数看似是 int 类型, 但是实际上只是写入一个字节的数据, dataOutputStream.writeInt() 就是写四个字节的数据dataOutputStream.writeInt(messageBinary.length);// 4.2 写入主体信息dataOutputStream.write(messageBinary);}}// 5. 更新信息统计文件的信息Stat stat = readStat(msgQueue.getName());stat.validCount += 1;stat.totalCount += 1;writeStat(msgQueue.getName(), stat);}}
  • 考虑线程安全, 按照队列维度进行加锁
  • 使用 DataOutputStream 进行二进制写操作, 比原生 OutputStream 要方便
  • 需要记录 Message 对象在文件中的偏移量, 后续的删除操作依赖这个偏移量定位到信息, offsetBeg是原文件大小的基础上 + 4, 4个字节是存放消息大小的空间
  • 写完消息, 要同时更新统计消息

创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常.

/*** Created with IntelliJ IDEA.* Description:自定义异常信息** @author: zxj* @date: 2024-02-28* @time: 14:30:32*/
public class MqException extends Exception {public MqException() {}public MqException(String message) {super(message);}
}

实现删除消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决.

/*** @description: 删除 message* 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置为 0x0* 1. 先把这个文件中的这一段数据, 读出现来, 还原回 Message 对象* 2. 把 isValid 该成 0;* 3. 把上述数据重新写回文件* 此处这个参数中的 message 对象, 必须要包含有效的 offsetBeg 和 offsetEnd**/public void deleteMessage(@NotNull MSGQueue msgQueue, @NotNull Message message) throws IOException, ClassNotFoundException {// 修改文件, 存在线程安全问题synchronized (msgQueue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(msgQueue.getName()), "rw")) {// 1. 先从文件中读取对应的 message 数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 将当前读出来的二进制数据, 转换成 Message 对象Message diskMessage = (Message) BinaryUtils.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效diskMessage.setIsValid((byte) 0x0);// 此处不需要宰割参数这个 Message 的 isValid 设为 0, 因为这个参数代表的内容中管理的 Message 对象, 而这个对象也马上要被从内存中销毁了// 4. 重新写入文件byte[] buffDest = BinaryUtils.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到下一个信息的位置了,// 因此想要接下来的写入, 能能够刚好写回到之前的位置, 就需要重新调整文件光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(buffDest);// 通过上述这些操作, 对于文件来说, 只是有一个字节发生了改变了而已~}// 更新统计文件, 把一个消息设置为无效了, 此时有效信息个数就需要 -1Stat stat = readStat(msgQueue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(msgQueue.getName(), stat);}}
  • 使用 RandomAccessFile 来随机访问到文件的内容
  • 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在文件中的位置, 通过 randomAccessFile.seek 操作文件指针偏移过去, 在读取
  • 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回文件, 注意写的时候要重新设定文件指针的位置, 文件指针会随着上述的读操作产生改变
  • 最好, 要记得更新统计文件, 把合法消息 -1

实现消息加载

把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键.

    /*** @description: 查* 使用这个方法, 从文件中, 读取所有的消息内容, 加载到内存中 (具体来说是放到一个链表里面)* 这个方法,使用一个 LinkedList, 主要目的是为了后续进行头删操作* 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象, 因为这个方法不需要加锁,只使用 queueName 就够了* 由于该方法是在程序启动调用, 此时服务还不能处理请求, 不涉及多线程操作文件**/public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueDataPath(queueName)))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件的光标long currentOffset = 0;while (true) {// 1. 先读取当前消息长度字段int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取后续的数据长度byte[] buffSrc = new byte[messageSize];int actualSize = dataInputStream.read(buffSrc);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错了throw new MqException("[MessageFileManager] 文件格式错误, queueName=" + queueName);}// 3. 把读到的二进制数据, 反序列化为 Message 对象Message message = (Message) BinaryUtils.fromBytes(buffSrc);// 4. 判定一下, 看看这个消息对象, 是不是无效对象if (message.getIsValid() != 0x1) {// 无效数据, 直接跳过// 虽然消息是无效数据, 但是 offset 仍要更新currentOffset += (4 + messageSize);continue;}// 5. 有效数据, 则需要将这个 Message 对象加入到链表中, 加入之前还需要添加 OffsetBeg 和 OffsetEnd//    进行计算 Offset 的时候, 需要当前文件光标的位置, 由于当下使用的 DataInputStream 并不方便直接获取文件光标位置//    因此需要手动计算下文件光标message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个异常是表示读取到文件的末尾了, 这是 DataInputStream 中规定的// 不需要做什么特殊处理log.info("恢复磁盘文件数据完成");}}return messages;}
  • 使用 DataInputStream 读取数据, 先读 4 个字节为消息懂得长度, 然后在按照这个长度来读取实际消息内容
  • 读取完毕之后, 转化成 Message 对象
  • 同时计算出该对象的 offsetBeg 和 offsetEnd
  • 最终把结果整理成链表, 返回出去
  • 注意, 对于DataInputStream 来说, 如果读到 EOF, 就会抛出一个 EOFException, 而不是返回特定值, 因此需要注意上述循环的结束条件

实现垃圾回收(GC)

上述删除操作, 只是把消息在文件上标记成了无效, 并没有腾出磁盘空间, 因此需要定期的进行批量删除.

此处使用类似于复制算法, 当总消息数超过 2000, 并且有效消息数目少于 50%的时候, 就触发 GC.

GC 的时候会把所有有效消息加载出来, 写入一个新的消息文件中, 使用新文件, 替代旧文件即可

    /*** @description: 检查当前是否需要针对队列的消息数据文件进行 GC**/public boolean checkGC(String queueName) throws IOException {// 判断是否要 GC, 是根据总消息数和有效消息数, 这两个值都是在 消息统计文件中的Stat stat = readStat(queueName);return stat.totalCount > Stat.atLeastCount && (double) stat.validCount / (double) stat.totalCount < Stat.minProportion;}/*** @description: 垃圾回收, 防止存储过多垃圾信息* 通过这个方法, 真正执行消息数据文件的垃圾回收操作* 使用复制算法来完成* 创建一个新的文件, 名字就是 queue_data_new.txt* 把之前消息数据文件中的有效消息都读出来, 写到新的文件中* 删除旧的文件,在把新的文件改名回 queue_data.txt* 同时要更新消息统计文件**/public void gc(MSGQueue msgQueue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的过程, 是针对消息数据文件进行大洗牌, 这个过程中, 其他线程不能针对该队列的消息文件做任何修改synchronized (msgQueue) {// 由于 gc 操作可能比较耗时, 此处统计一下消耗时间long gcBeg = System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(msgQueue.getName()));if (queueDataNewFile.exists()) {// 正常情况下, 这个文件不应该存在, 如果存在, 就是意外, 说明上次 gc 了一半, 程序意味崩溃了throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在, queueName=" + msgQueue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建文件失败 ,queueDataNewFile=" + queueDataNewFile.getAbsoluteFile());}// 2. 从旧文件中, 读取出所有的有效消息对象LinkedList<Message> messages = loadAllMessageFromQueue(msgQueue.getName());// 3. 把有效信息写入到新的文件中try (OutputStream outputStream = Files.newOutputStream(Paths.get(getQueueDataNewPath(msgQueue.getName())))) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryUtils.toBytes(message);// 先写长度dataOutputStream.writeInt(buffer.length);// 在写整体数据dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并把新的文件进行重命名File queueDataOldFile = new File(getQueueDataPath(msgQueue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败, queueDataOldFile: " + queueDataOldFile.getAbsoluteFile());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败, queueDataOldFile: " + queueDataOldFile.getAbsoluteFile() + ", queueDataNewFile: " + queueDataNewFile.getAbsoluteFile());}// 5. 更新统计文件Stat stat = readStat(msgQueue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();long gcEnd = System.currentTimeMillis();log.info("gc 执行消耗时间: {} ms", (gcEnd - gcBeg));}}

如果文件很大, 消息非常多, 可能比较低效, 这种就需要把文件做拆分和合并了
Rabbitmq 是这样实现了, 此处实现简答, 就不做了

测试 MessageFileManager

  • 创建两个队列, 用来辅助测试
  • 使用 ReflectionTestUtils.invokeMethod 来调用私有方法
package en.edu.zxj.mq.mqserver.datacenter;import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.MSGQueue;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-02-28* @time: 18:30:12*/
@SpringBootTest
class MessageFileMangerTest {private MessageFileManger messageFileManger = new MessageFileManger();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";/*** @description: 每个方法执行前的 准备工作**/@BeforeEachvoid setUp() throws IOException {// 创建两个队列messageFileManger.createQueueFiles(queueName1);messageFileManger.createQueueFiles(queueName2);}/*** @description: 每个方法执行前的 收尾工作工作**/@AfterEachvoid tearDown() throws IOException {// 删除两个队列文件messageFileManger.destroyQueueFiles(queueName1);messageFileManger.destroyQueueFiles(queueName2);}// @Test// void init() {//    // 功能待开发// }@Testvoid createQueueFiles() {// 创建文件已经在上面 setUp 阶段执行过了, 此处主要是验证看看文件是否存在File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testvoid destroyQueueFiles() throws IOException {// 删除文件, 看看是否存在, 不存在才对File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());// messageFileManger.destroyQueueFiles(queueName1);// Assertions.assertEquals(false,queueDataFile1.isFile());// Assertions.assertEquals(false,queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testvoid checkFilesExists() {// 在 setUp 阶段, 创建了两个队列, 此处只要判断接口返回的是否是 true 即可Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName1));Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName2));}private Message createTestMessage(String content) {return Message.createMessageWithId("testRoutingKey", null, content.getBytes());}private MSGQueue createTestMSGQueue(String queueName) {MSGQueue msgQueue = new MSGQueue();msgQueue.setName(queueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);return msgQueue;}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {// 构造出消息, 并构造出队列Message message = createTestMessage("testSendMessage");// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 调用发送信息方法messageFileManger.sendMessage(queue, message);// 检查 stat 文件MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger, "readStat", queueName1);Assertions.assertEquals(1, stat.validCount);Assertions.assertEquals(1, stat.totalCount);// 检查 data 文件LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);Message newMessage = messages.get(0);Assertions.assertEquals(true, newMessage.equals(message));System.out.println("message: " + newMessage);}@Testvoid deleteMessage() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 构造 10 条数据, 在进行删除List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 删除数据messageFileManger.deleteMessage(queue,exceptedMessages.get(9));messageFileManger.deleteMessage(queue,exceptedMessages.get(8));// 判断数据是否正确LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(8,actualMessages.size());for (int i = 0; i < 8; i++) {Message exceptedMessage = exceptedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println(i + ":  " + actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}}@Testvoid loadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 构造 100 条数据List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 读取所有数据, 看释放和原来的数据相同// 判断数据是否正确LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(100,actualMessages.size());for (int i = 0; i < 100; i++) {Message exceptedMessage = exceptedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println(i + ":  " + actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}}@Testvoid gc() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 构造 100 条数据List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 保存 gc 前文件的大小File file = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGC = file.length();// 删除偶数下标的数据for (int i = 0; i < 100; i+=2) {messageFileManger.deleteMessage(queue,exceptedMessages.get(i));}// 手动调用 gcmessageFileManger.gc(queue);// 读取所有数据LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50,actualMessages.size());for (int i = 0; i < 50; i++) {Message exceptedMessage = exceptedMessages.get(i * 2 + 1);Message actualMessage = actualMessages.get(i);System.out.println(i + ":  " + actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}File file1 = new File("./data/" + queueName1 + "/queue_data.txt");long afterGC = file1.length();Assertions.assertEquals(true,afterGC < beforeGC);}
}

⼋、 整合数据库和⽂件

上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.

接下来我们把两个部分整合起来, 统⼀进⾏磁盘管理.

创建 DiskDataCenter

使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容.

DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象.

/*** Created with IntelliJ IDEA.* Description:封装访问磁盘数据操作: 数据库 + 文件* 1. 数据库: 交换机, 消息队列, 绑定* 2. 文件: 消息* 上层逻辑如果需要访问磁盘, 就直接调用这个类, 不需要知道下层访问的是数据库还是文件** @author: zxj* @date: 2024-02-28* @time: 21:28:00*/
public class DiskDataCenter {private DatabaseManager databaseManager = new DatabaseManager();private MessageFileManger messageFileManger = new MessageFileManger();public void init() {databaseManager.init();messageFileManger.init();}
}

封装 Exchange ⽅法

    /** 封装交换机操作*/public Integer insertExchange(Exchange exchange) {return databaseManager.insertExchange(exchange);}public Integer deleteExchangeByName(String name) {return databaseManager.deleteExchangeByName(name);}public List<Exchange> selectAllExchanges() {return databaseManager.selectAllExchanges();}

封装 Queue ⽅法

    /** 封装消息队列操作*/public Integer insertMSGQueue(MSGQueue msgQueue) throws IOException {Integer ret = databaseManager.insertMSGQueue(msgQueue);messageFileManger.createQueueFiles(msgQueue.getName());return ret;}public Integer deleteMSGQueueByName(String name) throws IOException {Integer ret = databaseManager.deleteMSGQueueByName(name);messageFileManger.destroyQueueFiles(name);return ret;}public List<MSGQueue> selectAllMSGQueues() {return databaseManager.selectAllMSGQueues();}

封装 Binding 方法

/** 封装绑定机操作*/public Integer insertBinding(Binding binding) {return databaseManager.insertBinding(binding);}public Integer deleteBinding(String exchangeName, String queueName) {return databaseManager.deleteBinding(exchangeName, queueName);}public List<Binding> selectAllBindings() {return databaseManager.selectAllBindings();}

封装 Message ⽅法

/**   封装消息操作*/public void createQueueFiles(String queueName) throws IOException {messageFileManger.createQueueFiles(queueName);}public void destroyQueueFiles(String queueName) throws IOException {messageFileManger.destroyQueueFiles(queueName);}public boolean checkFilesExists(String queueName) {return messageFileManger.checkFilesExists(queueName);}public void sendMessage(@NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {messageFileManger.sendMessage(msgQueue, message);}public void deleteMessage(@NotNull MSGQueue msgQueue, @NotNull Message message) throws IOException, ClassNotFoundException, MqException {messageFileManger.deleteMessage(msgQueue, message);// 删除一条信息之后, 判断是否需要 gcif (messageFileManger.checkGC(msgQueue.getName())) {messageFileManger.gc(msgQueue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManger.loadAllMessageFromQueue(queueName);}
  • 在 deleteMessage 的时候判定是否进⾏ GC.

⼩结

通过上述封装, 把数据库和硬盘⽂件两部分合并成⼀个整体. 上层代码在调⽤的时候则不再关⼼该数据是存储在哪个部分的.

九、 内存数据结构设计

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构.

对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.

创建 MemoryDataCenter

创建 mqserver.datacenter.MemoryDataCenter

/*** Created with IntelliJ IDEA.* Description:内存数据管理类 -- 实际消息转发/存储的类* 该类后续提供的一些方法, 可能会在多线程环境下使用, 因此需要注意线程安全的问题** @author: zxj* @date: 2024-02-29* @time: 20:58:38*/
@Slf4j
public class MemoryDataCenter {// key: 为 exchangeName, value: Exchange 对象private final ConcurrentHashMap<String, Exchange> exchangesMap = new ConcurrentHashMap<>();// key: 为 messageId, value: Message 对象private final ConcurrentHashMap<String, Message> messagesMap = new ConcurrentHashMap<>();// key: 为 exchangeName, value: Exchange 对象private final ConcurrentHashMap<String, MSGQueue> msgQueuesMap = new ConcurrentHashMap<>();// key1: exchangeName, key2: msgQueueName, value: Binding 对象private final ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();}
  • 使用四个哈希表, 管理 Exchange, Queue, Binding, Message
  • 使用一个哈希表 + 链表管理队列 -> 消息之间的关系
  • 使用一个哈希表 + 哈希表管理所有的未被确认的消息

为了保证消息被正确消费了, 会使用两种方式进行确认, 自动 Ack, 和 手动 Ack
其中自动 Ack 是指当消息被消费之后, 就会立即被销毁释放
其中手动 Ack 是指当消息被消费之后, 由消费者主动调用一个 basicAck方法, 进行主动确认, 服务器收到这个确认之后, 才能真正被销毁消息
此处的 “未确认消息” 就是指在手动Ack模式下, 该消息还没有被调用 basicAck, 此时消息不能被删除, 但是要和其他未消费的消息区分开, 于是另搞个结构
当后续basicAck到了, 就可以删除消息了

封装 Exchange ⽅法

/*** 封装 Exchange 操作**/public void insertExchange(Exchange exchange) {exchangesMap.put(exchange.getName(), exchange);log.info("新交换机添加成功! exchangeName: {}", exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangesMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangesMap.remove(exchangeName);log.info("删除交换机成功! exchangeName: {}", exchangeName);}

封装 Queue ⽅法

/*** 封装 MSGQueue 操作**/public void insertMSGQueue(MSGQueue msgQueue) {msgQueuesMap.put(msgQueue.getName(), msgQueue);log.info("新交换机添加成功! msgQueueName: {}", msgQueue.getName());}public MSGQueue getMSGQueue(String msgQueueName) {return msgQueuesMap.get(msgQueueName);}public void deleteMSGQueue(String msgQueueName) {msgQueuesMap.remove(msgQueueName);log.info("删除交换机成功! msgQueueName: {}", msgQueueName);}

封装 Binding ⽅法

/*** 封装 Binding 操作**/public void insetBinding(Binding binding) throws MqException {// 传统的创建 Map 的步骤, 因为不是原子性操作, 存在线程安全的问题, 为了保证线程安全, 可以加上 synchronized 加锁// ConcurrentHashMap<String, Binding> stringBindingConcurrentHashMap = bindingsMap.get(binding.getExchangeName()) ;// if (stringBindingConcurrentHashMap == null) {//     stringBindingConcurrentHashMap = new ConcurrentHashMap<>();//     bindingsMap.put(binding.getExchangeName(),stringBindingConcurrentHashMap);// }// ConcurrentHashMap 中有提供了 computeIfAbsent 方法, 简化了上述步骤, 并且是线程安全的 --// 先使用 exchangeName 查询一下, 如果存在就直接返回, 如果不存在就创建ConcurrentHashMap<String, Binding> stringBindingConcurrentHashMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) -> {return new ConcurrentHashMap<>();});synchronized (stringBindingConcurrentHashMap) {// 这里先查询在插入, 具有强的顺序关系, 数据存在二次覆盖,  存在线程安全的问题// 在根据 msgQueueName 查找, 如果存在, 就直接抛异常, 不存在才能插入if (stringBindingConcurrentHashMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在, exchangeName: " + binding.getExchangeName() +"; msgQueueName: " + binding.getQueueName());}stringBindingConcurrentHashMap.put(binding.getQueueName(), binding);}log.info("绑定添加成功成功, binding.exchangeName: {}, binding.queueName: {},", binding.getExchangeName(), binding.getQueueName());}// 获取绑定// 1. 依据 exchangeName 和 queueName 获取唯一的 bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}// 2. 依据 exchangeName 获取所有的 bindingpublic ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if (bindingMap == null) {// 该交换机没有绑定任何队列, 报错throw new MqException("[MemoryDataCenter] 绑定不存在! binding: " + binding);}bindingsMap.remove(binding.getExchangeName());log.info("删除绑定成功! binding: {}", binding);}

封装 Message ⽅法

/*** 封装信息操作**/// 添加信息public void addMessage(Message message) {messagesMap.put(message.getMessageId(), message);log.info("添加信息成功! messageId: {}", message.getMessageId());}// 依据 Id 查询信息public Message getMessage(String messageId) {return messagesMap.get(messageId);}// 依据 Id 删除信息public void deleteMessage(String messageId) {messagesMap.remove(messageId);log.info("消息被删除! messageId: {}", messageId);}// 发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列数据结构中// 先根据队列的名字, 找到该队列对应的消息链表LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), (k) -> {return new LinkedList<>();});// 把数据假如到 messages 里面synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心中插入一下, 假设如果 message 已经在消息中心存在, 重复插入也没有关系// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器不会对 Message 内容修改 basicProperties 和 body)addMessage(message);log.info("消息被投递到队列中! messageId = " + message.getMessageId());}// 从队列中取消息public Message pollMessage(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);// 为空if (messages == null) {return null;}synchronized (messages) {// 队列中没有任何消息if (messages.isEmpty()) {return null;}// 链表中有元素, 就进行头删Message currentMessage = messages.remove(0);log.info("从消息从队列中取出! messageId: {}", currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中消息的个数public int getMessageCount(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);if (messages == null) {return 0;}synchronized (messages) {return messages.size();}}

针对未确认的消息的处理

// 添加未确认的消息public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAck.computeIfAbsent(queueName, (k) -> {return new ConcurrentHashMap<>();});messageHashMap.put(message.getMessageId(), message);log.info("消息进入待确认队列! messageId: {}", message.getMessageId());}// 删除未确认的消息(消息已经确认了)public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAck.computeIfAbsent(queueName, (k) -> {return new ConcurrentHashMap<>();});messageHashMap.remove(messageId);log.info("消息从待确认队列中删除! messageId: {}", messageId);}// 获取指定的未确认的信息public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageConcurrentHashMap = queueMessageWaitAck.get(queueName);if (messageConcurrentHashMap == null) {return null;}return messageConcurrentHashMap.get(messageId);}

实现重启后恢复内存

// 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中 -- 交换机, 消息队列, 绑定, 消息public void recovery(DiskDataCenter diskDataCenter) throws MqException, IOException, ClassNotFoundException {// 0. 清空之前的所有数据exchangesMap.clear();msgQueuesMap.clear();bindingsMap.clear();messagesMap.clear();queueMessageMap.clear();// 1. 恢复所有的交换机数据List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges) {exchangesMap.put(exchange.getName(), exchange);}log.info("恢复所有的 交换机 数据成功!");// 2. 恢复所有的队列数据List<MSGQueue> msgQueues = diskDataCenter.selectAllMSGQueues();for (MSGQueue msgQueue : msgQueues) {msgQueuesMap.put(msgQueue.getName(), msgQueue);}log.info("恢复所有的 队列 数据成功!");// 3. 恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for (Binding binding : bindings) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) -> {return new ConcurrentHashMap<>();});bindingMap.put(binding.getQueueName(), binding);}log.info("恢复所有的 绑定 数据成功!");// 4. 恢复所有的消息队列//    遍历所有的队列, 根据每个队列的名字, 获取到所有的消息for (MSGQueue msgQueue : msgQueues) {LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(msgQueue.getName());queueMessageMap.put(msgQueue.getName(), messages);for (Message message : messages) {messagesMap.put(message.getMessageId(), message);}}log.info("恢复所有的 消息队列 成功!");log.info("从磁盘中恢复所有数据到内存成功");// 规定:// 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复, 之前考虑硬盘存储的时候, 也没有设定这一块// 这个消息在硬盘上存储的时候, 就是当做 "为被取走"}

测试 MemoryDataCenter

package en.edu.zxj.mq.mqserver.datacenter;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-02-29* @time: 23:07:51*/
@SpringBootTest
class MemoryDataCenterTest {private MemoryDataCenter memoryDataCenter;@BeforeEachpublic void setUp() {memoryDataCenter = new MemoryDataCenter();}@AfterEachpublic void tearDown() {memoryDataCenter = null;}/*** 创建测试 消息 对象**/private @NotNull Message createTestMessage(@NotNull String content) {return Message.createMessageWithId("testRoutingKey", null, content.getBytes());}/*** 创建 消息队列 对象**/private @NotNull MSGQueue createTestMSGQueue(String queueName) {MSGQueue msgQueue = new MSGQueue();msgQueue.setName(queueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);return msgQueue;}/*** 创建 绑定 对象**/private @NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(msgQueueName);binding.setBindingKey("Hello word");return binding;}/*** 创建交换机对象**/@org.jetbrains.annotations.NotNullprivate Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setDurable(true);exchange.setAutoDelete(false);exchange.setArguments("11", "aa");exchange.setArguments("22", "bb");return exchange;}@Testvoid testExchange() {// 对 交换机 相关操作统一测试Exchange exceptedExchange = createTestExchange("exchangeTest");// 1. 插入操作memoryDataCenter.insertExchange(exceptedExchange);// 2. 查找操作Exchange actualExchange = memoryDataCenter.getExchange(exceptedExchange.getName());// 此时 exceptedExchange 和 actualExchange 应该指向同一个空间才对Assertions.assertEquals(exceptedExchange, actualExchange);// 3. 删除操作memoryDataCenter.deleteExchange(exceptedExchange.getName());actualExchange = memoryDataCenter.getExchange(exceptedExchange.getName());// 判断是否为空Assertions.assertNull(actualExchange);}@Testvoid testMSGQueue() {// 对 消息队列 相关操作统一测试MSGQueue exceptedMSGQueue = createTestMSGQueue("testMSGQueue");// 1. 插入操作memoryDataCenter.insertMSGQueue(exceptedMSGQueue);// 2. 查找操作MSGQueue actualMSGQueue = memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());Assertions.assertEquals(exceptedMSGQueue, actualMSGQueue);// 3. 删除操作memoryDataCenter.deleteMSGQueue(exceptedMSGQueue.getName());actualMSGQueue = memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());// 判断是否为空Assertions.assertNull(actualMSGQueue);}@Testvoid testBinding() throws MqException {// 对 绑定 相关操作的统一测试Binding exceptedBinding = createTestBinding("testExchangeName", "testMSGQueueName");// 1. 插入操作memoryDataCenter.insetBinding(exceptedBinding);// 2. 查找操作Binding actrualBinding = memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());Assertions.assertEquals(exceptedBinding, actrualBinding);ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exceptedBinding.getExchangeName());Assertions.assertEquals(1, bindings.size());Assertions.assertEquals(exceptedBinding, bindings.get(exceptedBinding.getQueueName()));// 3. 删除操作memoryDataCenter.deleteBinding(exceptedBinding);actrualBinding = memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());bindings = memoryDataCenter.getBindings(exceptedBinding.getExchangeName());// 判断是否为空Assertions.assertNull(actrualBinding);Assertions.assertNull(bindings);}@Testvoid testMessage() {// 测试 消息相关的增删查 操作Message exceptedMessage = createTestMessage("testMessage");// 1. 插入操作memoryDataCenter.addMessage(exceptedMessage);// 2. 查找操作Message actualMessage = memoryDataCenter.getMessage(exceptedMessage.getMessageId());Assertions.assertEquals(exceptedMessage, actualMessage);// 3. 删除操作memoryDataCenter.deleteMessage(exceptedMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(exceptedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testvoid sendMessage() {// 1. 创建一个队列, 创建十条消息, 把这些消息都插入到队列中MSGQueue queue = createTestMSGQueue("testQueue");List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);memoryDataCenter.sendMessage(queue, message);exceptedMessages.add(message);}// 2. 从这个队列中取出这些消息List<Message> actualMessages = new ArrayList<>();while (true) {Message message = memoryDataCenter.pollMessage(queue.getName());if (message == null) {break;}actualMessages.add(message);}// 3. 比较取出的消息和之前的消息是否是一致的Assertions.assertEquals(exceptedMessages.size(), actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Assertions.assertEquals(exceptedMessages.get(i), actualMessages.get(i));}}@Testvoid testMessageWaitAck() {// 测试 消息相关的增删查 操作Message exceptedMessage = createTestMessage("testMessage");// 1. 插入操作memoryDataCenter.addMessageWaitAck("testQueueName", exceptedMessage);// 2. 查找操作Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueueName", exceptedMessage.getMessageId());Assertions.assertEquals(exceptedMessage, actualMessage);// 3. 删除操作memoryDataCenter.removeMessageWaitAck("testQueueName", exceptedMessage.getMessageId());actualMessage = memoryDataCenter.getMessageWaitAck("testQueueName", exceptedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testvoid recovery() throws IOException, MqException, ClassNotFoundException {MqApplication.context = SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造好数据DiskDataCenter diskDataCenter = new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange exceptedExchange = createTestExchange("testExchangeName");diskDataCenter.insertExchange(exceptedExchange);// 构造消息队列MSGQueue exceptedMSGQueue = createTestMSGQueue("testQueueName");diskDataCenter.insertMSGQueue(exceptedMSGQueue);// 构造绑定Binding exceptedBinding = createTestBinding("testExchangeName", "testMSGQueueName");diskDataCenter.insertBinding(exceptedBinding);// 构造消息Message exceptedMessage = createTestMessage("testContent");diskDataCenter.sendMessage(exceptedMSGQueue, exceptedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果Exchange actualExchangeName = memoryDataCenter.getExchange("testExchangeName");Assertions.assertEquals(exceptedExchange.getName(), actualExchangeName.getName());Assertions.assertEquals(exceptedExchange.getType(), actualExchangeName.getType());Assertions.assertEquals(exceptedExchange.getDurable(), actualExchangeName.getDurable());Assertions.assertEquals(exceptedExchange.getAutoDelete(), actualExchangeName.getAutoDelete());MSGQueue actualMSGQueue = memoryDataCenter.getMSGQueue("testQueueName");Assertions.assertEquals(exceptedMSGQueue.getName(), actualMSGQueue.getName());Assertions.assertEquals(exceptedMSGQueue.getExclusive(), actualMSGQueue.getExclusive());Assertions.assertEquals(exceptedMSGQueue.getAutoDelete(), actualMSGQueue.getAutoDelete());Assertions.assertEquals(exceptedMSGQueue.getDurable(), actualMSGQueue.getDurable());Binding actualBinding = memoryDataCenter.getBinding("testExchangeName", "testMSGQueueName");Assertions.assertEquals(exceptedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(exceptedBinding.getBindingKey(), actualBinding.getBindingKey());Assertions.assertEquals(exceptedBinding.getQueueName(), actualBinding.getQueueName());Message actualMessage = memoryDataCenter.pollMessage("testQueueName");Assertions.assertEquals(exceptedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(exceptedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getIsValid(), actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(exceptedMessage.getBody(), actualMessage.getBody());// 4. 清理磁盘MqApplication.context.close();File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}
}

⼗、 虚拟主机设计

⾄此, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起来

并且实现⼀些 MQ 的关键 API.

创建 VirtualHost

创建 mqserver.VirtualHost

/*** Created with IntelliJ IDEA.* Description:通过这个类, 来标识虚拟主机* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息, 数据* 同时提供 API 供上层调用* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了** @author: zxj* @date: 2024-03-01* @time: 19:23:34*/
@Getter
@Slf4j
public class VirtualHost {private final String virtualHostName;private final DiskDataCenter diskDataCenter = new DiskDataCenter();private final MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// Router 定义转发规则private final Router router = new Router();// ConsumerManager 实现消费逻辑private final ConsumerManager consumerManager = new ConsumerManager(this);
}
  • 其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 后续介绍

实现构造⽅法和 getter

构造⽅法中会针对 DiskDataCenter 和 MemoryDataCenter 进⾏初始化.

同时会把硬盘的数据恢复到内存中.

    public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// 先初始化硬盘操作diskDataCenter.init();// 后初始化内存操作memoryDataCenter.init();// 从磁盘中恢复数据到内存中try {memoryDataCenter.recovery(diskDataCenter);} catch (Exception e) {log.error("从磁盘中恢复数据到内存失败!");e.printStackTrace();return;}log.info("初始化 VirtualHost 成功, VirtualHostName: {}", virtualHostName);}

创建交换机

  • 约定, 交换机 / 队列的名字, 都加上 VirtualHostName 作为前置, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了
  • exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回, 因此不叫做"exchangeCreate"
  • 先写硬盘, 后写内存, 因为硬盘失败概率更大, 如果硬盘写失败了,也就不必写内存了
/*** @description: 创建交换机, declare 表示存在就不创建, 因此不叫做 "exchangeCreate"* 此处的 autoDelete, arguments 其实并没有使用, 只是先预留出来. (RabbitMQ 是支持的)* 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了* 先写磁盘, 后写内存, 因为写磁盘失败概率更大, 如果磁盘写失败了, 也就不必要写内存了* @param: [exchangeName, exchangeType, durable, autoDelete, arguments]* @return: 抛异常就返回 false, 正常执行逻辑就返回 true**/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> arguments) {// 真实的 exchangeName 要加上 virtualHostName 前缀exchangeName = virtualHostName + exchangeName;synchronized (lockerExchange) {try {// 1. 判断交换机是否存在Exchange exsitsExchange = memoryDataCenter.getExchange(exchangeName);if (exsitsExchange != null) {log.info("交换机已经存在, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);return true;}// 2. 构造 Exchange 对象Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3. 先写入磁盘if (exchange.getDurable()) {diskDataCenter.insertExchange(exchange);}// 4. 后写入内存memoryDataCenter.insertExchange(exchange);log.info("交换机创建成功, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);return true;} catch (Exception e) {log.warn("创建交换机失败, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);e.printStackTrace();return false;}}}

删除交换机

/*** @description: 删除交换机**/public boolean exchangeDelete(String exchangeName) {// 真正存储的 exchangeNameexchangeName = virtualHostName + exchangeName;synchronized (lockerExchange) {try {// 1. 判断交换机是否存在Exchange exsitsExchange = memoryDataCenter.getExchange(exchangeName);if (exsitsExchange == null) {throw new MqException("交换机不存在, 无法删除! exchangeName; " + exchangeName);}// 2. 删除磁盘中的交换机diskDataCenter.deleteExchangeByName(exchangeName);// 3. 删除内存中的交换机memoryDataCenter.deleteExchange(exchangeName);log.info("删除交换机成功, exchangeName: {},", exchangeName);return true;} catch (Exception e) {log.warn("删除交换机失败, exchangeName: {},", exchangeName);e.printStackTrace();return false;}}}

创建队列

/*** @description: 创建队列**/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {// 真实的 queueNamequeueName = virtualHostName + queueName;synchronized (lockerQueue) {try {// 1. 判断队列是否存在MSGQueue existsQueue = memoryDataCenter.getMSGQueue(queueName);if (existsQueue != null) {log.info("队列已经存在, queueName: {}", queueName);return true;}// 2. 创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 先存入磁盘if (queue.getDurable()) {diskDataCenter.insertMSGQueue(queue);}// 4. 后存入到内存memoryDataCenter.insertMSGQueue(queue);log.info("队列创建成功, queueName: {}", queueName);return true;} catch (Exception e) {log.warn("创建队列失败, queueName: {}", queueName);e.printStackTrace();return false;}}}

删除队列

/*** @description: 删除队列**/public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;synchronized (lockerQueue) {try {// 1. 判断队列是否存在MSGQueue existsQueue = memoryDataCenter.getMSGQueue(queueName);if (existsQueue == null) {throw new MqException("要删除的队列不存在, 无法删除! queueName=" + queueName);}// 2. 删除磁盘中的队列if (existsQueue.getDurable()) {diskDataCenter.deleteMSGQueueByName(queueName);}// 3. 删除内存中的队列memoryDataCenter.deleteMSGQueue(queueName);log.info("删除队列成功, queueName: {}", queueName);return true;} catch (Exception e) {log.warn("删除队列失败, queueName: {}", queueName);e.printStackTrace();return false;}}}

创建绑定

  • bindingKey 是进⾏ topic 转发时的⼀个关键概念. 使⽤ router 类来检测是否是合法的 bindingKey.
  • 后续再介绍 router.checkBindingKeyValid 的实现. 此处先留空
/*** @description: 添加绑定**/public boolean queueBind(String queueName, String exchangeName, String bindingKey) {// 加上 virtualHostName 前缀queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;synchronized (lockerQueue) {synchronized (lockerExchange) {try {// 1. 判断绑定是否存在Binding existedBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existedBinding != null) {log.info("绑定存在, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);return true;}// 2. 判断 bindingKey 是否合法if (!router.checkBindingKeyValid(bindingKey)) {throw new MqException("bindingKey 不合法! bindingKey: " + bindingKey);}// 3. 创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 4. 获取绑定对应的队列和交换机, 判断这两个是否存在, 都存在才能创建MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("队列不存在, queueName: " + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("交换机不存在, exchangeName: " + exchangeName);}// 5. 写入磁盘if (msgQueue.getDurable() && exchange.getDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insetBinding(binding);log.info("添加绑定成功, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);return true;} catch (Exception e) {log.warn("添加绑定失败, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);e.printStackTrace();return false;}}}}

删除绑定

/*** @description: 删除绑定**/public boolean queueUnBind(String queueName, String exchangeName) {// 加上 virtualHostName 前缀queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;synchronized (lockerExchange) {synchronized (lockerQueue) {try {// 1. 判断绑定是否存在Binding existedBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existedBinding == null) {throw new MqException("要删除的绑定不存在, 无法删除, exchangeName: " + exchangeName + " , queueName: " + queueName);}// 2. 无论绑定是否持久化了, 都试着删除一下磁盘中的数据, 影响不大diskDataCenter.deleteBinding(exchangeName, queueName);// 3. 删除内存memoryDataCenter.deleteBinding(existedBinding);log.info("删除绑定成功, queueName: {}, exchangeName: {}", queueName, exchangeName);return true;} catch (Exception e) {log.warn("删除绑定失败, queueName: {}, exchangeName: {}", queueName, exchangeName);e.printStackTrace();return false;}}}}

发布消息

  • 发布消息其实是吧消息发送给指定的Exchange, 在根据 Exchange 和 Queue 的binding关系, 转发到对应队列中
  • 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType是相关的
    • Direct: routingKey 就是对应的队列名字, 此时不需要binding关系, 也不需要bindingKey,就可以直接转发消息
    • Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定到该交换机上的所有队列中
    • Topic: routingKey 是一个特定的字符串, 会和bindingKey进行匹配, 如果匹配成功, 则发到对应的队列中, 具体规则后续介绍
  • BasicProperties 是消息的元消息, body是消息本体
/*** @description: 发布消息* 发布消息其实就是把消息发送给指定的exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中* 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType 相关的* Direct: routingKey 就是对应的队列名字, 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息* Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定该交换机上的所有队列中* Topic: routingKey 是一个特定的字符串, 会和 bindingKey 按照一定规则进行匹配, 如果匹配成功, 则发送到对应的队列中, 具体规则在 Router 类中介绍* @param: [exchangeName, routingKey, basicProperties 消息的元消息, body 消息本体]* @return:**/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字, 如果为 null, 就使用默认的交换机名字if (exchangeName == null) {exchangeName = "";}exchangeName = virtualHostName + exchangeName;// 2. 检验 routingKey 的合法性if (!router.checkRoutingKeyValid(routingKey)) {throw new MqException("routingKey 非法! routingKey: " + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("交换机不存在! exchangeName: " + exchangeName);}// 4. 依据交换机的类型来进行消息转发if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 此时 routingKey 作为队列的名字, 直接把消息写入指定的队里中, 此时可以无视绑定关系String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 找找队列名字对应的对象MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("队列不存在, queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息 -- 执行一次方法就消费一次消息sendMessage(msgQueue, message);} else {// 按照 fanout 和 topic 的方式来转发// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// ① 获取到绑定对象, 判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue msgQueue = memoryDataCenter.getMSGQueue(binding.getQueueName());if (msgQueue == null) {// 此处就不抛异常, 可能此处有多个这样的队列// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输log.warn("basicPublish 发送消息是, 发现队列不存在! queueName: {}", binding.getQueueName());continue;}// ② 构造消息对象, 发送给每一个队列的对象都是一个新的复制体Message message = Message.createMessageWithId(routingKey, basicProperties, body);// ③ 判定这个消息是否能转发给改队列//    如果是 fanout, 所有绑定的队列都是要转发的//    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配if (!router.route(exchange.getType(), binding, message)) {continue;}// ④ 真正的转发消息给队列sendMessage(msgQueue, message);}}log.info("发送信息成功, exchangeName: {}, routingKey: {}", exchangeName, routingKey);return true;} catch (Exception e) {log.warn("发送信息失败, exchangeName: {}, routingKey: {}", exchangeName, routingKey);e.printStackTrace();return false;}}/*** @description: 发送一次消息**/private void sendMessage(MSGQueue msgQueue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上int deliverMode = message.getDeliverMode();// deliverMode 为 1, 不持久化, 为 2 表示持久化if (deliverMode == 2) {diskDataCenter.sendMessage(msgQueue, message);}// 写入内存memoryDataCenter.sendMessage(msgQueue, message);// 通知消费者可以消费消息, 就是让消费者从对应的内存中取出消息consumerManager.notifyConsume(msgQueue.getName());}

路由规则

实现 mqserver.core.Router

  1. 实现 route ⽅法
    /*** @description: 路由选择* @param: [type 交换机类型, binding 绑定对象 -- 里面提取 routingKey, message 消息对象]* @return: 返回该交换机是否可以将该消息转发给绑定的队列中, true 表示可以, false 表示不可以**/public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {// 根据不同的转发类型来进行不同的转发逻辑if (type == ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 该交换机上所有绑定的队列都需要进行转发return true;} else if (type == ExchangeType.TOPIC) {// 如果是 TOPIC 类型, 规则复杂return routeTopic(binding,message);} else {// 其他情况是不应该存在的throw new MqException("[Router] 交换机类型违法! type: " + type);}}
  1. 实现 checkRoutingKeyValid
/*** @description: 判断 routingKey 是否合法* routingKey 组成规则如下:*      1. 组成: 数字, 字母, 下划线*      2. 使用符号 . 将 routingKey 划分成多个部分* 形如:*      aaa.bbb.ccc*      a.1.b*      a**/public boolean checkRoutingKeyValid(String routingKey) {if (!StringUtils.hasLength(routingKey)) {// null or 空字符串, 合法的情况, 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判断该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判断字符是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判断该字符阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判断字符是否 '_' 或 '.'if (ch == '.' || ch == '_') {continue;}// 走到这里, 都不是上述任何一种合法的情况, 就直接返回 falsereturn false;}return true;}
  1. 实现 checkBindingKeyValid

在这里插入图片描述

/*** @description: 判断 bindingKey 是否合法* bindingKey 组成规则如下:*      1. 组成: 数字, 字母, 下划线*      2. 使用符号 . 将 routingKey 划分成多个部分*      3. 支持两种特殊符号作为通配符 (* 和 # 必须是作为被 . 分割出来的独立的部分)*          1) *:  * 可以匹配任何一个独立的部分*          2) #:  # 监听匹配任何 0 个或者多个独立的部分* 形如:*      aaa.bbb.ccc*      a.1.b*      a*      #*      a.*.b**/public boolean checkBindingKeyValid(String bindingKey) {if (!StringUtils.hasLength(bindingKey)) {// null or 空字符串, 合法的情况, 比如在使用 direct / fanout 交换机的时候, bindingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);// 判断该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判断字符是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判断该字符阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判断字符是否 '_' 或 '.''if (ch == '.' || ch == '_') {continue;}// 判断字符是否为通配符 '*' 或 '#'if (ch == '*' || ch == '#') {continue;}// 走到这里, 都不是上述任何一种合法的情况, 就直接返回 falsereturn false;}// 检查 * 或者 # 是否是独立的部分// aaa.*.bbb 合法情况, aaa.a*.bbb 非法情况String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 #, 就是非法的格式了if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}return true;}
  1. 实现 routeTopic
  • 使用动态规划的方式来进行规则的匹配
private boolean routeTopic(@NotNull Binding binding, @NotNull Message message) {String bindingKey = binding.getBindingKey();String routingKey = message.getRoutingKey();String[] bindingStr = bindingKey.split("\\.");String[] routingStr = routingKey.split("\\.");return mate(bindingStr,routingStr);}private boolean mate(String @NotNull [] bindingStr, String @NotNull [] routingStr) {int m = bindingStr.length;int n = routingStr.length;boolean[][] dp = new boolean[m + 1][n + 1];dp[0][0] = true;for (int i = 0; i < m; i++) {if ("#".equals(bindingStr[i])) {dp[i+1][0] = true;} else {break;}}for (int i = 1; i <= m; i++) {String wordBinding = bindingStr[i - 1];for (int j = 1; j <= n; j++) {String wordRouting = routingStr[j - 1];if (!"#".equals(wordBinding) && !"*".equals(wordBinding)) {if (wordBinding.equals(wordRouting)) {dp[i][j] = dp[i - 1][j - 1];} else {dp[i][j] = false;}} else if ("*".equals(wordBinding)) {dp[i][j] = dp[i - 1][j - 1];} else {dp[i][j] = dp[i - 1][j] || dp[i][j - 1];}}}return dp[m][n];}
  1. 测试 Router 的匹配规则
    测试代码如下:
package en.edu.zxj.mq.common;import en.edu.zxj.mq.mqserver.core.Binding;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-03-02* @time: 19:32:53*/
@SpringBootTest
class RouterTest {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}@Testvoid checkBindingKeyValid1() {boolean ok = router.checkBindingKeyValid("aaa.bbb.ccc");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid2() {boolean ok = router.checkBindingKeyValid("1.a.c");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid3() {boolean ok = router.checkBindingKeyValid("a");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid4() {boolean ok = router.checkBindingKeyValid("");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid5() {boolean ok = router.checkBindingKeyValid("a.*.ccc");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid6() {boolean ok = router.checkBindingKeyValid("#");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid7() {boolean ok = router.checkBindingKeyValid("aaa.bb*b.ccc");Assertions.assertFalse(ok);}@Testvoid checkBindingKeyValid8() {boolean ok = router.checkBindingKeyValid("123.bbb.ccc");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid1() {boolean ok = router.checkRoutingKeyValid("a.b.c");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid2() {boolean ok = router.checkRoutingKeyValid("a.b._c");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid3() {boolean ok = router.checkRoutingKeyValid("a.b.c@");Assertions.assertFalse(ok);}@Testvoid checkRoutingKeyValid4() {boolean ok = router.checkRoutingKeyValid("a");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid5() {boolean ok = router.checkRoutingKeyValid("a.1.b");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid6() {boolean ok = router.checkRoutingKeyValid("12222222223123123");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid7() {boolean ok = router.checkRoutingKeyValid("aaaa");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid8() {boolean ok = router.checkRoutingKeyValid("_____________._______________");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid9() {boolean ok = router.checkRoutingKeyValid("!!!!!!!!!!!!!!");Assertions.assertFalse(ok);}@Testvoid checkRoutingKeyValid10() {boolean ok = router.checkRoutingKeyValid("a.2._.!");Assertions.assertFalse(ok);}@Testvoid checkRoutingKeyValid11() {boolean ok = router.checkRoutingKeyValid("_a_.1_a.b");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid12() {boolean ok = router.checkRoutingKeyValid("a.b.c.12.7.234.4234.adf.___");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid13() {boolean ok = router.checkRoutingKeyValid("123.468a.sdfa.w");Assertions.assertTrue(ok);}@Testvoid route1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.FANOUT, binding, message);Assertions.assertTrue(ok);}@Testvoid route2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route3() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route5() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route6() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route7() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route8() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route9() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route10() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route12() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route15() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route17() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route18() throws MqException {binding.setBindingKey("aaa.#.#.#");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route19() throws MqException {binding.setBindingKey("aaa.#.#.#.*");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route20() throws MqException {binding.setBindingKey("aaa.#.#.#.ccc");message.setRoutingKey("aaa.aaa.aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}}

订阅消息

  1. 添加一个订阅者
/*** @description: 订阅消息* 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者* @param: [consumerTag: 消费者的身份标识,* queueName: 订阅的队列名字,* autoAck: 消息被消费后的应当方式, true 为自动应当, false 为手动应答* consumer: 是一个回调函数, 此处类型设定成函数是接口, 这样后续调用 basicConsume 并且传递实参的时候, 就可以写成 lambda 样子了]* @return: true 表示订阅成功, false 表示订阅失败**/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info("basicConsume 成功! consumerTag: {}, queueName: {}", consumerTag, queueName);return true;} catch (Exception e) {log.warn("basicConsume 失败! consumerTag: {}, queueName: {}", consumerTag, queueName);e.printStackTrace();return false;}}

Consumer 相当于⼀个回调函数. 放到 common.Consumer 中.
在这里插入图片描述

  1. 创建订阅者管理管理类

创建 mqserver.core.ConsumerManager

@Slf4j
public class ConsumerManager {// parent 用来记录虚拟主机private final VirtualHost parent;// 存放令牌的队列, 通过令牌来触发消费线程的消费操作// 使用一个阻塞队列来触发消息消费, 称为令牌队列, 每次有消息过来了, 都为队列中放一个令牌(也就是队列名), 让后消费者再去消费对应的队列信息// 作用: 令牌队列的设定, 避免搞出来太多线程, 否则就需要给每个队列都安排一个单独的线程了, 如果队列很多则开销就比较的了private final BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();// 使用一个线程池用来执行消息的回调private final ExecutorService workerPool = Executors.newFixedThreadPool(4);// 扫描线程private Thread scannerThread = null;
}
  • parent 用来记录虚拟主机
  • 使用一个阻塞队列来触发信息消费,称为令牌队列, 每次有消息过来了, 都往队列中放一个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
  • 使用一个线程池用来执行消息回调

这样令牌队列的设定避免搞出来太多线程,否则需要给每个队列到安排一个单独的线程了,如果队列很多开销就比较大了.

  1. 添加令牌接⼝
    /*** @description: 通知消费者去消费消息**/public void notifyConsume(String name) throws InterruptedException {tokenQueue.put(name);}
  1. 实现添加订阅者
/*** @description: 添加订阅者* 新来的消费者需要消费掉之前的消息**/public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getMSGQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在, queueName: " + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息, 需要立即消费掉int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一次消息consumeMessage(queue);}}}

创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境.

@Data
@Slf4j
public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}

MsgQueue 添加⼀个订阅者列表.

在这里插入图片描述

此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮流拿到消息.

  1. 实现扫描线程

在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作.

    public ConsumerManager(VirtualHost virtualHost) {this.parent = virtualHost;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌找到对应的队列MSGQueue msgQueue = parent.getMemoryDataCenter().getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("获取令牌后, 发现队列为空! queueName: " + queueName);}// 3. 从队列中消费一次消息consumeMessage(msgQueue);} catch (Exception e) {e.printStackTrace();}}});scannerThread.start();}
  1. 实现消费消息
    所谓的消费消息, 其实就是调⽤消息的回调. 并把消息删除掉.
/*** @description: 消费一次消息**/private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 取出消费者ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 暂时还没有消费者, 就暂时不消费return;}// 2. 从指定队列中取出一个元素Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中, 这个操作势必在执行回调之前parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());// 3. 如果当前是 "自动应答", 就可以直接报消息给删除了//    如果当前是 "手动应答", 则先不处理, 交给后续消费调用 basicAck 方法来处理if (luckyDog.isAutoAck()) {// ① 先删除磁盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// ② 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// ③ 删除内存中消息parent.getMemoryDataCenter().deleteMessage(message.getMessageId());log.info("消息被消费成功, queueName: {}", queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.

⼩结

一、订阅者已经存在了, 才发送消息

  1. 这种直接获取队列的订阅者,从中按照轮询的方式挑一个消费者来调用回调即可
  2. 消息先发送到队列了,订阅者还没到,此时当订阅者到达,就快速把指定队列中的消息全部消费掉。

⼆. 关于消息不丢失的论证

每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.

  • 如果 autoAck 为 true
    消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼
  • 如果 autoAck 为 false
    在回调内部, 进⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
  1. 执⾏消息回调的时候抛出异常

此时消息仍然处在待确认队列中
此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.

  1. 执⾏消息回调的时候服务器宕机
    内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.

消息确认

下列⽅法只是⼿动应答的时候才会使⽤.

应答成功, 则把消息删除掉.

/*** @description: 消息确认**/public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("要确认的消息不存在, messageId: " + messageId);}MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("要确认的队列不存在, queueName: " + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(msgQueue,message);}// 3. 删除消息中心中心的数据memoryDataCenter.deleteMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info("basicAck 成功, 消息确认成功! queueName: {}, messageId: {}",queueName,messageId);return true;} catch (Exception e) {log.warn("basicAck 失败, 消息确认失败! queueName: {}, messageId: {}",queueName,messageId);e.printStackTrace();return false;}}

测试 VirtualHost

package en.edu.zxj.mq.mqserver;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-03-02* @time: 21:45:01*/
@SpringBootTest
class VirtualHostTest {private VirtualHost virtualHost = null;@BeforeEachpublic void setUp() {MqApplication.context = SpringApplication.run(MqApplication.class);virtualHost = new VirtualHost("default");}@AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost = null;// 把硬盘的目录删除掉File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testvoid exchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);}@Testvoid exchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchangeName");Assertions.assertFalse(ok);}@Testvoid queueDeclare() {Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);}@Testvoid queueDelete() {Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueDelete("testQueueName");Assertions.assertTrue(ok);}@Testvoid queueBind() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");Assertions.assertTrue(ok);}@Testvoid queueUnBind() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.queueUnBind("testQueueName","testExchangeName");Assertions.assertTrue(ok);}@Testvoid basicPublish() {boolean ok = virtualHost.basicPublish("testExchangeName", "testRoutingKey", new BasicProperties(), "Hello word".getBytes());Assertions.assertFalse(ok);ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchangeName", "testQueueName", new BasicProperties(), "Hello word".getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息@Testvoid basicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchangeName",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueueName", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法System.out.println("messageId: " + basicProperties.getMessageId());System.out.println("body = " + new String(body,0,body.length));Assertions.assertEquals("testQueueName",basicProperties.getRoutingKey());Assertions.assertEquals(1,basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(),body);} catch (Error e) {//断言如果失败, 抛出的是 Error, 而不是 Exceptione.printStackTrace();}}});Assertions.assertTrue(ok);Thread.sleep(500);// 在发送消息ok  = virtualHost.basicPublish("testExchangeName","testQueueName",null,"hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);}// 先发送消息, 后订阅队列@Testvoid basicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchangeName",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先发送消息ok  = virtualHost.basicPublish("testExchangeName","testQueueName",null,"hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 后订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueueName", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法System.out.println("messageId: " + basicProperties.getMessageId());System.out.println("body = " + new String(body,0,body.length));Assertions.assertEquals("testQueueName",basicProperties.getRoutingKey());Assertions.assertEquals(1,basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(),body);} catch (Error e) {//断言如果失败, 抛出的是 Error, 而不是 Exceptione.printStackTrace();}}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testvoid basicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 往交换机中发布一个消息ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}

⼗⼀、 ⽹络通信协议设计

明确需求

接下来需要考虑客⼾端和服务器之间的通信. 回顾交互模型.
在这里插入图片描述

⽣产者和消费者都是客⼾端, 都需要通过⽹络和 Broker Server 进⾏通信.

此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服务器这边功能的远程调⽤.

要调⽤的功能有:

  • 创建 channel
  • 关闭 channel
  • 创建 exchange
  • 删除 exchange
  • 创建 queue
  • 删除 queue
  • 创建 binding
  • 删除 binding
  • 发送 message
  • 订阅 message
  • 发送 ack
  • 返回 message (服务器 -> 客⼾端)

设计应⽤层协议

使⽤⼆进制的⽅式设定协议.

因为 Message 的消息体本⾝就是⼆进制的. 因此不太⽅便使⽤ json 等⽂本格式的协议.

请求:

在这里插入图片描述

响应:
在这里插入图片描述
其中 type 表⽰请求响应不同的功能. 取值如下:

  • 0x1 创建 channel
  • 0x2 关闭 channel
  • 0x3 创建 exchange
  • 0x4 销毁 exchange
  • 0x5 创建 queue
  • 0x6 销毁 queue
  • 0x7 创建 binding
  • 0x8 销毁 binding
  • 0x9 发送 message
  • 0xa 订阅 message
  • 0xb 返回 ack
  • 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的

其中 payload 部分, 会根据不同的 type, 存在不同的格式.

对于请求来说, payload 表⽰这次⽅法调⽤的各种参数信息.

对于响应来说, payload 表⽰这次⽅法调⽤的返回值.

定义 Request / Response

创建 common.Request

/*** Created with IntelliJ IDEA.* Description:定义了请求的格式* 一个完整的请求, 分成了三个部分* 1. type: 表示请求不同的功能, 调用不同的函数 -- 4 个字节* 2. length: 表示 payload 的长度  -- 4 个字节* 3. payload: 要传输的二进制数据  -- length 个字节** @author: zxj* @date: 2024-03-05* @time: 21:16:58*/
@Data
public class Request implements Serializable {private Integer type;private Integer length;private byte[] payload;@Overridepublic String toString() {return "Request{" +"type=" + type +", length=" + length +'}';}
}

创建 common.Response

/*** Created with IntelliJ IDEA.* Description: 定义一个完整的响应格式* 一个完整的响应, 分成了三个部分* 1. type: 表示响应不同的功能, 调用不同的函数 -- 4 个字节* 2. length: 表示 payload 的长度  -- 4 个字节* 3. payload: 要传输的二进制数据  -- length 个字节** @author: zxj* @date: 2024-03-05* @time: 21:16:46*/
@Data
public class Response implements Serializable {private Integer type;private Integer length;private byte[] payload;@Overridepublic String toString() {return "Response{" +"type=" + type +", length=" + length +'}';}
}

定义参数⽗类

构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.

不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再通过继承的⽅式体现.

common.BasicArguments

/*** Created with IntelliJ IDEA.* Description:定义请求父类** @author: zxj* @date: 2024-03-05* @time: 21:31:01*/
@Data
public class BasicArguments implements Serializable {// 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上// 此处的 rid 和 channelId 都是基于 UUID 来生成的, rid 用来标识一个请求-响应, 这一点在请求响应非常多的时候游泳protected String rid;protected String channelId;
}
  • 此处的 rid 和 channelId 都是基于 UUID 来⽣成的. rid ⽤来标识⼀个请求-响应. 这⼀点在请求响应⽐较多的时候⾮常重要.

定义返回值⽗类

和参数同理, 也需要构造⼀个类表⽰返回值, 作为 Response 的 payload

common.BasicReturns

/*** Created with IntelliJ IDEA.* Description:定义返回的父类** @author: zxj* @date: 2024-03-05* @time: 21:43:23*/
@Data
public class BasicReturns implements Serializable {// 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上protected String rid;// 用来标识一个 channelprotected String channelId;protected Boolean ok;
}

定义其他参数类

针对每个 VirtualHost 提供的⽅法, 都需要有⼀个类表⽰对应的参数.

  1. ExchangeDeclareArguments
/*** Created with IntelliJ IDEA.* Description:ExchangeDeclare 方法请求参数类** @author: zxj* @date: 2024-03-05* @time: 21:46:53*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private Boolean durable;private Boolean autoDelete;private Map<String,Object> arguments;
}

⼀个创建交换机的请求, 形如:

  • 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构.
  • 按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象.

在这里插入图片描述
2) ExchangeDeleteArguments

@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}
  1. QueueDeclareArguments
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private Map<String, Object> arguments;
}
  1. QueueDeleteArguments
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;
}
  1. QueueBindArguments
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}
  1. QueueUnbindArguments
@Data
public class QueueUnBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;
}
  1. BasicPublishArguments
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}
  1. BasicConsumeArguments
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;
}
  1. SubScribeReturns
  • 这个不是参数, 是返回值. 是服务器给消费者推送的订阅消息.
  • consumerTag 其实是 channelId.
  • basicProperties 和 body 共同构成了 Message.
/*** Created with IntelliJ IDEA.* Description:返回值, 是服务器给消费者推送的订阅消息.** @author: zxj* @date: 2024-03-05* @time: 22:54:08*/
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}

⼗⼆、 实现 BrokerServer

创建 BrokerServer 类

@Data
@Slf4j
public class BrokerServer {// 当前程序只考虑一个虚拟主机的情况private VirtualHost virtualHost = new VirtualHost("default-virtualHost");// 使用这个 哈希表, 表示当前的所有会话(也就是说有哪些客户端正在和服务器进行通信)// key 为 channelId, value 为 channel 对应的 socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();private ServerSocket serverSocket;// 引入一个线程池, 来处理多个客户端的需求private ExecutorService executorService;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;
}
  • virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.
  • sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket
  • serverSocket 是服务器⾃⾝的 socket
  • executorService 这个线程池⽤来处理响应
  • runnable 这个标志位⽤来控制服务器的运⾏停⽌.

启动/停⽌服务器

  • 这⾥就是⼀个单纯的 TCP 服务器, 没啥特别的
  • 实现停⽌操作, 主要是为了⽅便后续开展单元测试.
public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}// begin: 单纯的 TCP 服务器模板public void start() {log.info("[BrokerServer] 服务器开始启动");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把这个处理连接的逻辑对给线程池executorService.submit(() -> {// 处理连接的统一方法processConnection(clientSocket);});}} catch (SocketException e) {log.info("[BrokerServer] 服务器停止运行!");} catch (IOException e) {log.error("[BrokerServer] 服务器出现异常!");e.printStackTrace();}}/*** @description: 一般来说, 停止服务器, 都是 kill 对应的进程就可以了* 此处还是搞一个单独的停止方法, 主要是用于后续的单元测试**/public void stop() throws IOException {runnable = false;// 把线程池中的人物都放弃了, 让线程都销毁executorService.shutdownNow();serverSocket.close();}// end: 单纯的 TCP 服务器模板

实现处理连接

  • 对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接.
    • 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException
    • 如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException
    /*** @description: 服务方法* 通过这个方法, 来处里一个客户端的连接* 在这个连接中, 可能会涉及到多个请求和响应**/private void processConnection(@NotNull Socket clientSocket) {// 获取服务对象的 输入输出 流try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {// 这里需要按照特定的格式来读取并解析, 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 循环进行服务, 保持连接, 以便处理多个请求while (true) {// 1. 读取请求并解析Request request = readRequest(dataInputStream);log.info("接收到[client: {} : {}] 请求: {}",clientSocket.getInetAddress(),clientSocket.getPort(),request);// 2. 根据请求计算响应Response response = process(request, clientSocket);log.info("响应给[client: {} : {}] 数据: {}",clientSocket.getInetAddress(),clientSocket.getPort(),response);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {log.info("connection 关闭! 客户端地址: {} : {}", clientSocket.getInetAddress(), clientSocket.getPort());} catch (IOException | MqException | ClassNotFoundException e) {log.error("connection 出现异常 e: {}", e.toString());e.printStackTrace();} finally {try {// 当连接处理完了, 一定要关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 也顺便清理掉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}

实现 readRequest

/*** @description: 反序列化请求消息**/private @NotNull Request readRequest(@NotNull DataInputStream dataInputStream) throws IOException, MqException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new MqException("读取请求格式出错");}request.setPayload(payload);return request;}

实现 writeResponse

  • 注意这⾥的 flush 操作很关键, 否则响应不⼀定能及时返回给客⼾端
    /*** @description: 将 Response 对象中的内容先后写入 dataOutputStream 中**/private void writeResponse(@NotNull DataOutputStream dataOutputStream, Response response) throws IOException {log.info("{writeResponse}: 即将发送响应为: {}",response);dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 刷新缓冲区十分重要dataOutputStream.flush();}

实现处理请求

  • 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
  • 再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).
  • 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.
  • 最后构造成统⼀的响应.
/*** @description: 依据 request 中的信息, 执行相关方法, 并构造 Response 对象返回**/private @NotNull Response process(@NotNull Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析, 让父类来接受BasicArguments basicArguments = (BasicArguments) BinaryUtils.fromBytes(request.getPayload());log.info("request 中 payload 解析结果:  rid = {}, channelId = {}", basicArguments.getRid(),basicArguments.getChannelId());// 2. 根据 type 的值, 来近一步来区分这一次请求时要干啥的boolean ok = true; // 各个方法的返回结果基本都是 booleanif (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info("创建 channel 完成! channelId: {}", basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());log.info("销毁 channel 完成! channelId: {}", basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机, 此时的 payload 就是 ExchangeDeclareArguments 对象了ExchangeDeclareArguments exchangeDeclareArguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(exchangeDeclareArguments.getExchangeName(), exchangeDeclareArguments.getExchangeType(), exchangeDeclareArguments.getDurable(), exchangeDeclareArguments.getAutoDelete(), exchangeDeclareArguments.getArguments());} else if (request.getType() == 0x4) {// 销毁交换机ExchangeDeleteArguments exchangeDeleteArguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(exchangeDeleteArguments.getExchangeName());} else if (request.getType() == 0x5) {// 创建队列QueueDeclareArguments queueDeclareArguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(queueDeclareArguments.getQueueName(), queueDeclareArguments.isDurable(), queueDeclareArguments.isExclusive(), queueDeclareArguments.isAutoDelete(), queueDeclareArguments.getArguments());} else if (request.getType() == 0x6) {// 销毁队列QueueDeleteArguments queueDeleteArguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete(queueDeleteArguments.getQueueName());} else if (request.getType() == 0x7) {// 创建绑定QueueBindArguments queueBindArguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(queueBindArguments.getQueueName(), queueBindArguments.getExchangeName(), queueBindArguments.getBindingKey());} else if (request.getType() == 0x8) {// 删除绑定QueueUnBindArguments queueUnBindArguments = (QueueUnBindArguments) basicArguments;ok = virtualHost.queueUnBind(queueUnBindArguments.getQueueName(), queueUnBindArguments.getExchangeName());} else if (request.getType() == 0x9) {// 发布消息BasicPublishArguments basicPublishArguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(basicPublishArguments.getExchangeName(), basicPublishArguments.getRoutingKey(), basicPublishArguments.getBasicProperties(), basicPublishArguments.getBody());} else if (request.getType() == 0xa) {// 订阅消息BasicConsumeArguments basicConsumeArguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(basicConsumeArguments.getConsumerTag(), basicConsumeArguments.getQueueName(), basicConsumeArguments.isAutoAck(), new Consumer() {/*** 这个回调函数要做的工作, 就是把服务收到消息直接推送会给对应的消费者客户端即可, 在客户端进行对消息的消费**/@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端// 此处 consumerTag, 其实就是 channelId (这里是规定的, 客户端填写该字段的时候, 就是以 channelId 来填写的),// 根据 channelId 去 sessions 中查询, 就可以得到对应的 socket 对象, 就可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应, rid 暂时不需要subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryUtils.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送消息数据response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端//      注意! 此处的 dataOutputStream 不能close!//      如果把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 OutputStream 也关闭了//      此时就无法继续往 socket 中写入后续数据了DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 确认应答, 消费者确认收到消息BasicAckArguments basicAckArguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(basicAckArguments.getQueueName(), basicAckArguments.getMessageId());} else {// 当前的 type 是非法的throw new MqException("未知的 type: " + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryUtils.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("构造响应完成: {}", response);return response;}

实现 clearClosedSession

  • 如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进⾏清理.
  • 注意迭代器失效问题.
/*** @description: 用户关闭连接后, 清理对应的 channel 资源* 需要注意的是 迭代器失效的问题**/private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该关闭的 socket 对应的键值对, 统统删除List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!// 这属于使用集合类的一个大忌 -- 一边遍历, 一边删除!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}log.info("清理 session 完成! 被清理的 channelId: {}", toDeleteChannelId);}

⼗三、 实现客⼾端

创建包 mqclient

创建 ConnectionFactory

⽤来创建连接的⼯⼚类

/*** Created with IntelliJ IDEA.* Description:工厂类 -- 以工厂模式来创建 Connection 类** @author: zxj* @date: 2024-03-06* @time: 21:55:32*/
@Data
public class ConnectionFactory {// BrokerServer 的 IP 和 portprivate String host;private Integer port;// more ...// 建立一个 TCP 连接public Connection newConnection() throws IOException {Connection connection = new Connection(host,port);return connection;}
}

Connection 和 Channel 的定义

⼀个客⼾端可以创建多个 Connection.

⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.

⼀个 Connection 可以包含多个 Channel

  1. Connection 的定义
@Data
@Slf4j
public class Connection {private Socket socket;private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;// 记录当前 Connection 包含的 Channelprivate ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();// 执行消息回调的线程池private ExecutorService callbackPool = null;
}    
  • Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.
  • channelMap ⽤来管理该连接中所有的 Channel.
  • callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.
  1. Channel 的定义
@Data
public class Channel {// channelId 为 channel 的身份标识, 使用 UUID 标识private String channelId;// connection 为 channel 对应的连接private Connection connection;// key 为 rid, 即 requestId / responseId// basicReturnsMap 用来保存响应的返回值, 放到这个哈希表中方便和请求匹配private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 订阅消息的回调 -- 为消费者的回调(用户注册的), 对应消息响应, 应该调用这个回调处理消息private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}
}    
  • channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.
  • Connection 为 channel 对应的连接.
  • baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.
  • consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.

封装请求响应读写操作

在 Connection 中, 实现下列⽅法

/*** @description: 读取响应**/public Response readResponse() throws IOException, MqException {log.info("客户端: 开始等待读取消息");Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new MqException("读取的响应数据不完整");}response.setPayload(payload);log.info("收到响应: type: {}, length: {}", response.getType(), response.getLength());return response;}/*** @description: 写请求**/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info("发送请求:type: {}, length: {}", request.getType(), request.getLength());}

创建 channel

在 Connection 中, 定义下列⽅法来创建⼀个 channel

public Channel createChannel() throws IOException {// 使用 UUID 生产 channelId, 以 C- 开头String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 这里需要先把 channel 键值放到 Map 中 进行管理channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了, 整个这次创建 channel 操作不顺利// 把刚才已经加入 hash 表的键值对, 再删了channelMap.remove(channelId);return null;}return channel;}

发送请求

通过 Channel 提供请求的发送操作.

  1. 创建 channel
/*** @description: 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了**/public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments = new BasicAckArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryUtils.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.getOk();}

generateRid 的实现

    private @NotNull String generateRid() {return "R-" + UUID.randomUUID().toString();}

waitResult 的实现

  • 由于服务器的响应是异步的. 此处通过 waitResult 实现同步等待的效果
/** @description: 期望使用这个方法来阻塞等待服务器的响应**/private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没有回来// 此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}return basicReturns;}

关闭 channel

/*** @description: 关闭 channel, 给服务器发送一个 0x2 类型的请求**/public boolean close() throws IOException {BasicArguments basicArguments = new BasicAckArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.getOk();}

创建交换机

/*** @description: 创建交换机**/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryUtils.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.getOk();}

删除交换机

/*** @description: 删除交换机**/public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();exchangeDeleteArguments.setExchangeName(exchangeName);exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setRid(generateRid());byte[] payload = BinaryUtils.toBytes(exchangeDeleteArguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());return basicReturns.getOk();}

创建队列

/*** @description: 创建队列**/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setRid(generateRid());byte[] payload = BinaryUtils.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.getOk();}

删除队列

/*** @description: 删除队列**/public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload = BinaryUtils.toBytes(queueDeleteArguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());return basicReturns.getOk();}

创建绑定

/*** @description: 创建绑定**/public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments queueBindArguments = new QueueBindArguments();queueBindArguments.setBindingKey(bindingKey);queueBindArguments.setQueueName(queueName);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(queueBindArguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueBindArguments.getRid());return basicReturns.getOk();}

删除绑定

/*** @description: 删除绑定**/public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();queueUnBindArguments.setExchangeName(exchangeName);queueUnBindArguments.setQueueName(queueName);queueUnBindArguments.setRid(generateRid());queueUnBindArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(queueUnBindArguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());return basicReturns.getOk();}

发送消息

/*** @description: 发送消息**/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {BasicPublishArguments basicPublishArguments = new BasicPublishArguments();basicPublishArguments.setBasicProperties(basicProperties);basicPublishArguments.setBody(body);basicPublishArguments.setExchangeName(exchangeName);basicPublishArguments.setRoutingKey(routingKey);basicPublishArguments.setRid(generateRid());basicPublishArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(basicPublishArguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());return basicReturns.getOk();}

订阅消息

/*** @description: 订阅消息**/public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {// 先设置回调, 一个channel 只能设置一个回调方法if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来标识basicConsumeArguments.setAutoAck(autoAck);basicConsumeArguments.setQueueName(queueName);byte[] payload = BinaryUtils.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.getOk();}

确认消息

/*** @description: 确认消息**/public boolean basicAck(String queueName,String messageId) throws IOException {BasicAckArguments basicAckArguments = new BasicAckArguments();basicAckArguments.setMessageId(messageId);basicAckArguments.setQueueName(queueName);basicAckArguments.setRid(generateRid());basicAckArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(basicAckArguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicAckArguments.getRid());return basicReturns.getOk();}

⼩结

上述发送请求的操作, 逻辑基本⼀致. 构造参数 + 构造请求 + 发送 + 等待结果

处理响应

  1. 创建扫描线程

创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据

注意: ⼀个 Connection 中可能包含多个 channel, 需要把响应分别放到对应的 channel 中.

public Connection(String host, Integer port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据, 把这个响应数据再交给对应的 Channel 负责处理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {// System.out.println("[Connection] 连接异常断开!");log.error("连接异常断开! e: {}", e);e.printStackTrace();}});t.start();}
  1. 实现响应的分发

给 Connection 创建 dispatchResponse ⽅法

  • 针对服务器返回的控制响应和消息响应, 分别处理.
    • 如果是订阅数据, 则调⽤ channel 中的回调.
    • 如果是控制消息, 直接放到结果集合中.
/*** @description: 使用这个方法来分别处理, 当前的响应是针对控制请求的响应, 还是服务器推送的消息**/private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryUtils.fromBytes(response.getPayload());// 根据 ChannelId 找到对应的 Channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("该消息对应的 Channel 在客户端中不存在, channelId: " + subScribeReturns.getChannelId());}// 执行该 channel 对象内部的回调callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前相应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryUtils.fromBytes(response.getPayload());// 根据 ChannelId 找到对应的 Channel 对象Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("该消息对应的 Channel 在客户端中不存在, channelId: " + basicReturns.getChannelId());}channel.putReturns(basicReturns);}}
  1. 实现 channel.putReturns
    /*** @description: 存入 basicReturns**/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应// 把所有的等待的线程都唤醒notifyAll();}}

关闭 Connection

    public void close() {// 关闭 Connection, 释放相关资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (Exception e) {log.error("关闭资源出现异常");e.printStackTrace();}}

测试代码

package en.edu.zxj.mq.mqclient;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.BrokerServer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-03-07* @time: 10:55:32*/
@SpringBootTest
class MqClientTest {private ConnectionFactory factory = null;private Thread t = null;private BrokerServer brokerServer = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器MqApplication.context = SpringApplication.run(MqApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {brokerServer.start();});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException, InterruptedException {// 停止服务器brokerServer.stop();MqApplication.context.close();t.join();// 删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}@Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue","testExchangeName","testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue","testExchangeName");Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}@Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchangeName","testQueue",null,requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody,body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);ok = channel.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}}

项目结果

演示
首先启动 BrokerServer 类

@SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context = null;public static void main(String[] args) throws IOException {context = SpringApplication.run(MqApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}

在这里插入图片描述

接着分别启动消费者和生产者客户端, 不分先后启动顺序

在这里插入图片描述
在这里插入图片描述
此时消费者就会收到消息并进行处理

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/740608.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【位运算】【脑筋急转弯】2749. 得到整数零需要执行的最少操作数

作者推荐 视频算法专题 本文涉及知识点 2749. 得到整数零需要执行的最少操作数 给你两个整数&#xff1a;num1 和 num2 。 在一步操作中&#xff0c;你需要从范围 [0, 60] 中选出一个整数 i &#xff0c;并从 num1 减去 2i num2 。 请你计算&#xff0c;要想使 num1 等于 …

对GIS与游戏引擎(UE4 或 U3D)结合的看法

GIS与游戏引擎结合&#xff0c;这在6年前就已经很多公司在进行探索了&#xff0c;经过这几年的发展&#xff0c;结合当前的政策&#xff0c;从以下几方面说一下我的看法&#xff1a; 1.GIS客户都是特殊单位及领域。2018年后&#xff0c;国内已经对国产化有明确要求了&#xff0…

Docker进阶:深入了解容器数据卷

Docker进阶&#xff1a;深入了解容器数据卷 一、前言二、容器数据卷的作用三、容器数据卷的使用方法四、实战--使用docker部署前端项目&#xff08;数据卷挂载&#xff09;4.1 重要&#xff1a;准备工作&#xff0c;先在本地创建挂载目录4.2 启动一个临时的nginx容器&#xff0…

pytorch(九)卷积神经网络

文章目录 卷积神经网络全连接神经网络与卷积神经网络的区别概念性知识mnist数据集(卷积神经网络) GoogLeNetInception 残差网络ResNet残差块结构 稠密连接网络网络结构 卷积神经网络 全连接神经网络与卷积神经网络的区别 全连接神经网络是一种最为基础的前馈神经网络&#xf…

ChatGPT Prompt 的原理总结

ChatGPT Prompt 的原理总结 ChatGPT Prompt 是 OpenAI 开发的大型语言模型 ChatGPT 的一种使用方式。通过 Prompt&#xff0c;用户可以引导 ChatGPT 生成特定内容&#xff0c;例如回答问题、写故事、写代码等等。 Prompt 的原理 Prompt 本质上是一段文本&#xff0c;它告诉 C…

处理json异常问题,由于发送kafka消息是一个字符串,等到消费时json字符串会有多个““引号,故需要先处理json再转对象

发送一个正确的json对象 发送kafka消息也是一个json传&#xff0c;也没问题 等到消费kafka时&#xff0c;也能接收到一个json字符串但是会多一个 " 引号&#xff0c; 就会导致json转对象失败所以需要先去除 开通和结尾的 " 引号 去除后的json 就是一个正常的json&…

河北专升本(C语言编程题)

一&#xff1a;基础算法原理 1. 冒泡排序 原理&#xff1a;从左到右&#xff0c;相邻元素进行比较。每次比较一轮&#xff0c;就会找到序列中最大的一个或最小的一个。这个数就会从序列的最右边冒出来。 以从小到大排序为例&#xff0c;第一轮比较后&#xff0c;所有数中最大的…

Linux进程概念(2)

一、进程状态 Linux的进程状态实际上就是 struct task_struct 结构体中的一个变量 1.1状态汇总 其中&#xff0c;Linux 状态是用数组储存的&#xff0c;如下&#xff1a; static const char * const task_state_array[] { "R (running)", // 0 …

【Web世界探险家】打开Web世界的大门

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 |《MySQL探索之旅》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &…

唯众物联网+地理科学交付云南师范大学地理学部教学实验室项目

近日&#xff0c;云南师范大学地理学部教学实验室建设项目顺利交付。该项目的成功落地&#xff0c;标志着物联网技术与地理科学教育的深度融合&#xff0c;为云南师范大学的地理教学提供了全新的教学平台与资源。该项目以物联网技术为核心&#xff0c;结合地理科学的特点&#…

蓝桥杯算法错题记录-基础篇

文章目录 本文还在跟新&#xff0c;最新跟新时间3/11&#xff01;&#xff01;&#xff01; 格式一定要符合要求&#xff0c;&#xff08;输入&#xff0c;输出格式&#xff09;1. nextInt () next() nextLine() 的注意事项2 .数的幂 a^2等3.得到最大长度&#xff08;最大...&a…

卡尔曼滤波器笔记——最详细

笔记来源— 卡尔曼滤波算法原理及代码实现&#xff01;https://www.bilibili.com/video/BV1WZ4y1F7VN/?spm_id_from333.337.search-card.all.click&vd_source8d55784dc9c7530bc9e3fa220380be56 简单介绍一下 现在我们就是不知道是距离多少&#xff0c;就需要用到这个卡尔…

如何将应用一键部署至多个环境?丨Walrus教程

在 Walrus 平台上&#xff0c;运维团队在资源定义&#xff08;Resource Definition&#xff09;中声明提供的资源类型&#xff0c;通过设置匹配规则&#xff0c;将不同的资源部署模板应用到不同类型的环境、项目等。与此同时&#xff0c;研发人员无需关注底层具体实现方式&…

CSS元素显示模式

CSS元素显示模式 定义&#xff1a;元素显示模式是指元素&#xff08;即标签&#xff09;以什么方式进行显示。 HTML元素分为块元素和行内元素 块元素 常见块元素 &#xff08;下列仅举出部分&#xff09; <h1>~<h6>、<p>、<div>、<ul>、<…

【数据分享】2008-2022年全国范围逐月NO2栅格数据(免费获取)

空气质量数据是在我们日常研究中经常使用的数据&#xff01;之前我们给大家分享了2000-2022年全国范围逐月的PM2.5栅格数据、2013-2022年全国范围逐月SO2栅格数据、2013-2022年全国范围逐月CO栅格数据和2000-2022年全国范围逐月PM10栅格数据&#xff08;可查看之前的文章获悉详…

properties文件和yml文件的区别以及文件优先级

properties文件和yml文件的区别 yml是按照缩进关系&#xff0c;而properties用"."来表示关系springboot默认生成的是properties文件当properties文件和yml文件都存在时&#xff0c;properties文件的优先级更高。 properties文件的样式 yml文件的样式 文件优先级 r…

Shell常用脚本:hadoop集群启动、停止、重启脚本

脚本内容以我搭建的hadoop集群为例&#xff0c;你们自用的时候自行根据你们的情况进行修改即可 hadoop-cluster-manager.sh #!/bin/bash # 1. 调用此脚本前&#xff0c;请使用ssh-keygen -t rsa、ssh-copy-id -f 目标机器这两个命令使得目标机器是免密登录的 # 2. ssh远程执行…

第五篇【传奇开心果系列】Python的自动化办公库技术点案例示例:深度解读Pandas在教育数据和研究数据处理领域的应用

传奇开心果博文系列 系列博文目录Python的自动化办公库技术点案例示例系列 博文目录前言一、Pandas 在教育和学术研究中的常见应用介绍二、数据清洗和预处理示例代码三、数据分析和统计示例代码四、数据可视化示例代码五、时间序列分析示例代码六、数据导入和导出示例代码七、数…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的障碍物检测系统(深度学习代码+UI界面+训练数据集)

摘要&#xff1a;开发障碍物检测系统对于道路安全性具有关键作用。本篇博客详细介绍了如何运用深度学习构建一个障碍物检测系统&#xff0c;并提供了完整的实现代码。该系统基于强大的YOLOv8算法&#xff0c;并对比了YOLOv7、YOLOv6、YOLOv5&#xff0c;展示了不同模型间的性能…

JavaParser的快速介绍

开发的工作主要是写代码&#xff0c; 有考虑过使用代码写代码&#xff0c; 使用代码分析和改进代码吗&#xff1f; JavaParser 就可以帮你用来处理Java 代码的这些功能。 Java Parser 的介绍 Java Parser是一个用于解析和分析Java源代码的开源工具。它提供了一个API接口&…