消息队列的设计
- 一、消息队列的背景知识
- 二、需求分析
- 核心概念
- ⼀个⽣产者, ⼀个消费者
- N 个⽣产者, N 个消费者
- Broker Server 中的相关概念
- 核⼼ API
- 交换机类型 (Exchange Type)
- 持久化
- ⽹络通信
- 消息应答
- 三、 模块划分
- 四、 项⽬创建
- 五、创建核心类
- 创建 Exchange
- 创建 MSGQUeue
- 创建 Binding
- 创建Message
- 六、 数据库设计
- 配置 SQLite
- 实现创建表
- 实现数据库基本操作
- 实现 DataBaseManager
- 测试 DataBaseManager
- 七、消息存储设计
- 设计思路
- 创建 MessageFileManager 类
- 实现统计⽂件读写
- 实现创建队列⽬录
- 实现删除队列⽬录
- 检查队列⽂件是否存在
- 实现消息对象序列化/反序列化
- 实现写⼊消息⽂件
- 实现删除消息
- 实现消息加载
- 实现垃圾回收(GC)
- 测试 MessageFileManager
- ⼋、 整合数据库和⽂件
- 创建 DiskDataCenter
- 封装 Exchange ⽅法
- 封装 Queue ⽅法
- 封装 Binding 方法
- 封装 Message ⽅法
- 九、 内存数据结构设计
- 创建 MemoryDataCenter
- 封装 Exchange ⽅法
- 封装 Queue ⽅法
- 封装 Binding ⽅法
- 封装 Message ⽅法
- 针对未确认的消息的处理
- 实现重启后恢复内存
- 测试 MemoryDataCenter
- ⼗、 虚拟主机设计
- 创建 VirtualHost
- 实现构造⽅法和 getter
- 创建交换机
- 删除交换机
- 创建队列
- 删除队列
- 创建绑定
- 删除绑定
- 发布消息
- 路由规则
- 订阅消息
- 消息确认
- 测试 VirtualHost
- ⼗⼀、 ⽹络通信协议设计
- 明确需求
- 设计应⽤层协议
- 定义 Request / Response
- 定义参数⽗类
- 定义返回值⽗类
- 定义其他参数类
- ⼗⼆、 实现 BrokerServer
- 创建 BrokerServer 类
- 启动/停⽌服务器
- 实现处理连接
- 实现 readRequest
- 实现 writeResponse
- 实现处理请求
- 实现 clearClosedSession
- ⼗三、 实现客⼾端
- 创建 ConnectionFactory
- Connection 和 Channel 的定义
- 封装请求响应读写操作
- 创建 channel
- 发送请求
- 关闭 channel
- 创建交换机
- 删除交换机
- 创建队列
- 删除队列
- 创建绑定
- 删除绑定
- 发送消息
- 订阅消息
- 确认消息
- 处理响应
- 关闭 Connection
- 测试代码
- 项目结果
一、消息队列的背景知识
- Java标准库中有提供阻塞队列的数据结构, 阻塞队列最重要的用途是实现生产者消费者模型;
- 生产者消费者模型存在诸多好处, 是后端开发的常用编程方式
- 解耦合
- 削峰填谷
- 在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求, 因此我们通常会把阻塞队列, 封装成一个独立的服务器程序, 并且赋予其更丰富的功能, 这样的程序就被称为 消息队列 (Message Queue, MQ)
- 市场上有许多消息队列, 如
- RabbitMQ
- Kafka
- …
- 这里仿照 RabbitMQ 来模拟实现一下消息队列
二、需求分析
核心概念
- ⽣产者 (Producer)
- 消费者 (Consumer)
- 中间⼈ (Broker)
- 发布 (Publish) : 生产者向中间人这里投递消息的过程
- 订阅 (Subscribe) : 哪些消费者要从这个中间人这里取数据, 这个注册的过程, 称为 “订阅”
- 消费 (Consume): 消费者从中间人这里取走消息后处理数据的动作
通过取快递来理解上述概念
- 商家就是生产者
- "我"就是消费者
- 菜鸟驿站就是中间人
- 首先可以是商家向菜鸟驿站发快递 (发布)
- 接着 ''我" 关注哪个商家发的快递 (订阅)
- 最后"我"从菜鸟驿站中取走快递后, 并使用快递里的商品 (消费)
⼀个⽣产者, ⼀个消费者
N 个⽣产者, N 个消费者
其中, Broker 是最核⼼的部分. 负责消息的存储和转发.
Broker Server 中的相关概念
- 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”, 是一个逻辑上的集合, 一个 BrokerServer 上可以存在多个 VirtualHost
- 交换机(Exchange): 生产者把消息先发到Broker的Exchange上, 在根据不同的规则, 把消息转发给不同的Queue
- 队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个Queue上读取消息
- 绑定(Binding): Exchange 和 Queue 之间的关联关系, Exchange 和 Queue可以理解成 “多对多” 关系, 使用一个关联表就可以把这两个概念联系起来
- 消息 (Message): 传递的内容
所谓的Exchange 和 Queue 可以理解成 “多对多” 关系, 和数据库中的 “多对多” 一样, 意思是:
一个Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
一个 Queue 也可被多个Exchange 绑定(一个 Queue 中的消息可以来自于多个 Exchange)
关系结构图
这些概念, 既需要在内存中存储, 也需要在硬盘上存储.
- 内存存储: 方便使用
- 磁盘存储: 重启服务器后数据不丢失
核⼼ API
对于 Broker 来说, 要实现以下核心 API, 通过这些 API 来实现消息队列的基本功能
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.
交换机类型 (Exchange Type)
对于 RabbitMQ 来说, 主要支持四种交换机类型
- Direct
- Fanout
- Topic
- Header
其中 Header 这种⽅式⽐较复杂, ⽐较少⻅. 常⽤的是前三种交换机类型. 此处也主要实现这三种.
- Direct: 生产者发送消息时, 直接指定被该交换机绑定的队列名
- Fanout: 生产者发送的消息会被复制到交换机的所有队列中
- Topic: 绑定队列到交换机上时, 指定一个字符串为 bindingKey, 发送消息指定一个字符串为 routingKey, 当 routingKey 和 bindingKey 满足一定匹配条件的时候, 则把消息投递到指定队列中
这三个操作就像有发奖品一样
- Direct是发一个专属的奖品给特定的人, 只有指定的人才能领取
- Fanout 就是给每一个人都发一个安慰奖
- Topic是有奖竞猜, 出了一道题, 只有作答并正确的人才能领取到奖品
持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.
⽹络通信
⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.
在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.
Connection 对应⼀个 TCP 连接
Channel 则是 Connection 中的逻辑通道
⼀个 Connection 中可以包含多个 Channel.
Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.
这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.
Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆.
消息应答
被消费的消息, 需要进行应答
应答模式分成两种
- 自动应答: 消费者只要消费了消息, 就算应答完毕了, Broker 直接删除这个消息
- 手动应答: 消费者手动调用应答接口, Broker 收到应答请求之后, 才真正删除这个消息
手动应答的目的, 是为了保证消息确实被消费者处理成功了, 在一些对于数据可靠性要求高的场景, 比较常见
三、 模块划分
可以看到, 像 交换机, 队列, 绑定, 消息, 这⼏个核⼼概念在内存和硬盘中都是存储了的.
其中内存为主, 是⽤来实现消息转发的关键; 硬盘为辅, 主要是保证服务器重启之后, 之前的信息都可以正常保持.
四、 项⽬创建
创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 8.
依赖引⼊ Spring Web 、 MyBatis 和 lombok.
五、创建核心类
创建包 mqserver.mq
创建 Exchange
/*** Created with IntelliJ IDEA.* Description:这个类表示一个交换机** @author: zxj* @date: 2024-02-25* @time: 20:05:48*/
@Data
public class Exchange {// 此处使用 name 来作为交换机的身份标识 (唯一的)private String name;// 交换机类型: Direct, Fanout, Topicprivate ExchangeType type = ExchangeType.DIRECT;// 该交换机是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.private Boolean durable = false;// RabbitMQ 有的字段, 相关功能待开发// 该属性表示 如果当前交换机没有人用了, 就会自动删除private Boolean autoDelete = false;// arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发private Map<String,Object> arguments = new HashMap<>();
}
package en.edu.zxj.mq.mqserver.core;/*** Created with IntelliJ IDEA.* Description:交换机类型* • Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.* • Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.* • Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey.* 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.** @author: zxj* @date: 2024-02-25* @time: 20:10:02*/
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private Integer type;ExchangeType(Integer type) {this.type = type;}public Integer getType() {return type;}
}
name
: 交换机的名字, 相当于交换机的身份标识type
: 交换机的类型, 三种取值, DIRECT, FANOUT, TOPICdurable
: 交换机是否要持久化存储, true 为持久化, false 不持久化autoDelete
: 使用完毕后是否自动删除- arguments: 交换机的其他参数属性
创建 MSGQUeue
/*** Created with IntelliJ IDEA.* Description:消息队列,* 类名叫做 MSGQueue, ⽽不是 Queue, 是为了防⽌和标准库中的 Queue 混淆** @author: zxj* @date: 2024-02-25* @time: 20:19:52*/
@Data
public class MSGQueue {// 表示队列的身份标识.private String name;// 该消息队列是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.private Boolean durable = false;// 以下为保留字段// exclusive 为 true, 表示这个队列只能被一个消费者使用(别人用不了), 如果为 false 则是大家都能使用private Boolean exclusive = false;// 该属性表示 如果当前交换机没有人用了, 就会自动删除private Boolean autoDelete = false;// arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发private Map<String, Object> arguments = new HashMap<>();
}
类名叫做MSGQUeue, 而不是 Queue, 是为了防止和标准库中的Queue混淆
name
: 队列名字, 相当于队列的身份标识durable
: 交换机是否要持久化存储, true 为持久化, false 不持久化exclusive
: 独占(排他), 队列只能被一个消费者使用autoDelete
: 使用完毕后是否自动删除- arguments: 消息队列的其他参数属性
创建 Binding
/*** Created with IntelliJ IDEA.* Description:表示队列和交换机之间的关联关系** @author: zxj* @date: 2024-02-25* @time: 20:24:23*/
@Data
public class Binding {// exchangeName 交换机名字private String exchangeName;// queueName 队列名字private String queueName;// bindingKey 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配.private String bindingKey;
}
exchangeName
: 交换机名字queueName
: 队列名字bindingKey
: 只在交换机类型为 TOPIC 时才有效, 用于和消息中的 routingKey 进行匹配
创建Message
/*** Created with IntelliJ IDEA.* Description:表示一个要传递的消息* 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.* 此时就需要针对 Message 进行序列化和反序列化.* 此处使用 标准库 自带的 序列化/反序列化 操作.** @author: zxj* @date: 2024-02-25* @time: 20:55:17*/
@Data
@Component
public class Message implements Serializable {// Message 核心属性private BasicProperties basicProperties = new BasicProperties();// 存储需要传输的消息, 使用字节的方式存储private byte[] body;// 辅助属性/*** 一个文件中会存储很多信息, 如何找到某个消息, 在文件中的具体位置呢?* 使用下列的两个偏移量进行表示, [offset, offsetEnd)* 这两个属性并不需要被反序列化保存到文件中, 此时信息一旦被写入文件之后, 所在的位置就固定了, 并不需要单独存储* 这两个属性存在的目的, 主要是为了让内存中的 Message 对象, 能够快速找到对应硬盘上的 Message 位置.**/private long offsetBeg = 0; // 消息数据开头距离文件开头的偏移位置 (单位是字节)private long offsetEnd = 0; // 消息数据结尾距离文件开头的偏移位置 (单位是字节)/*** 使用这个属性表示该消息在文件中是否是有效信息. (针对文件中的信息, 如果删除, 使用逻辑删除的方式)* 0x1 表示有效, 0x0 表示无效**/private byte isValid = 0x1;/*** @description: 工厂模式创建 Message 实例* 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程* 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId* 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主**/public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message = new Message();if (basicProperties != null) {message.setBasicProperties(basicProperties);}// 此处生成的 MessageId 以 M- 作为前缀.message.setMessageId("M-" + UUID.randomUUID());message.setRoutingKey(routingKey);message.setBody(body);/* 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.此处只是在内存中创建一个 Message 对象.*/return message;}
}
/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-02-25* @time: 21:18:11*/
@Component
@Data
public class BasicProperties implements Serializable {// 消息的唯一身份标识, 此处为了保证 id 的唯一性, 使用 UUID 来作为 messageIdprivate String messageId;/*** 是一个消息上带有的内容, 和 bindingKey 做匹配* 如果当前交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名* 如果当前交换机类型是 FANOUT, 此时 routingKey 没有意义* 如果当前交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey做匹配, 符合要求的才能转发给对应的队列**/private String routingKey;// 这个属性表示消息是否要持久化: 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样的)private Integer deliverMode = 1;
}
Message
需要实现Serializable
接口, 后续需要把Message写入文件以及进行网络传输basicProperties
: 是消息的属性信息, body 是消息体offsetBeg
和offsetEnd
表示消息在消息文件中所在的起始位置和结束位置, 这一块具体的设计后续再说; 使用transient
关键字避免属性被序列化isValid
用来标识消息在文件中是否有效, 这一块具体设计后续再说- createMessageWithId 相当于一个工厂方法, 用来创建一个 Message 实例, messageId 通过 UUID 的方式来生成
六、 数据库设计
对于 Exchange,MSGQUeue,Binding,我们使用数据库进行持久化保存
此处我们使用的数据库是 SQLite,是一个更轻量的数据库
SQLite 只是一个动态库,我们在 Java 中直接注入 SQLite 依赖即可直接使用,不必安装其他的软件
配置 SQLite
引入 pom.xml 依赖
<!--导入 sqlite 数据库--><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.45.1.0</version></dependency>
配置数据源 application.yml
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:configuration:map-underscore-to-camel-case: true #配置驼峰自动转换log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句mapper-locations: classpath:mapper/**Mapper.xml
username 和 password 空着即可
此处我们约定, 把数据库文件放到 ./data/meta.db 中
SQLite 只是把数据单存的存储到一个文件中, 非常简单方便
实现创建表
@Mapper
public interface MetaMapper {// 建表操作void createExchangeTable();void createMSGQueueTable();void createBindingTable();
}
本身 MyBatis 针对 MySQL 和 Oracle 是可以执行多个 sql 语句的, 但是 SQLite 不行
MetaMapper.xml 中 具体实现 sql 语句
<update id="createExchangeTable">create table if not exists exchange_table(name varchar(64) primary key,type int comment '0 表示 Direct, 1 表示 Fanout, 2 表示 Topic',durable boolean,auto_delete boolean,arguments varchar(1024),`delete_flag` tinyint(4) DEFAULT '0',`create_time` datetime DEFAULT CURRENT_TIMESTAMP,`update_time` datetime DEFAULT CURRENT_TIMESTAMP);</update><update id="createMSGQueueTable">create table if not exists msg_queue_table(name varchar(64) primary key,durable boolean,exclusive boolean,auto_delete boolean,arguments varchar(1024),`delete_flag` tinyint(4) DEFAULT '0',`create_time` datetime DEFAULT CURRENT_TIMESTAMP,`update_time` datetime DEFAULT CURRENT_TIMESTAMP);</update><update id="createBindingTable">create table if not exists binding_table(exchange_name varchar(64),queue_name varchar(64),binding_key varchar(64),`delete_flag` tinyint(4) DEFAULT '0',`create_time` datetime DEFAULT CURRENT_TIMESTAMP,`update_time` datetime DEFAULT CURRENT_TIMESTAMP);</update>
实现数据库基本操作
给 mapper.MetaMapper 中添加
// 相关的增删改查操作Integer insertExchange(Exchange exchange);Integer deleteExchangeByName(String name);List<Exchange> selectAllExchanges();Integer insertMSGQueue(MSGQueue msgQueue);Integer deleteMSGQueueByName(String name);List<MSGQueue> selectAllMSGQueues();Integer insertBinding(Binding binding);Integer deleteBinding(String exchangeName, String queueName);List<Binding> selectAllBindings();
相关sql语句实现
<insert id="insertExchange">insert into exchange_table (name, type, durable, auto_delete, arguments)values (#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});</insert><insert id="insertMSGQueue">insert into msg_queue_table (name, durable, exclusive, auto_delete, arguments)values (#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});</insert><insert id="insertBinding">insert into binding_table (exchange_name, queue_name, binding_key)values (#{exchangeName}, #{queueName}, #{bindingKey});</insert><update id="deleteExchangeByName">update exchange_tableset delete_flag = 1where name = #{nanme};</update><update id="deleteMSGQueueByName">update msg_queue_tableset delete_flag = 1where name = #{nanme};</update><update id="deleteBinding">update binding_tableset delete_flag = 1where exchange_name = #{exchangeName}and queue_name = #{queueName};</update><select id="selectAllExchanges" resultType="en.edu.zxj.mq.mqserver.core.Exchange">select name,type,durable,auto_delete,arguments,delete_flag,create_time,update_timefrom exchange_tablewhere delete_flag = 0;</select><select id="selectAllMSGQueues" resultType="en.edu.zxj.mq.mqserver.core.MSGQueue">select name,durable,exclusive,auto_delete,arguments,delete_flag,create_time,update_timefrom msg_queue_tablewhere delete_flag = 0;</select><select id="selectAllBindings" resultType="en.edu.zxj.mq.mqserver.core.Binding">select exchange_name,queue_name,binding_key,delete_flag,create_time,update_timefrom binding_tablewhere delete_flag = 0;</select>
实现 DataBaseManager
mqserver.datacenter.DatabaseManager
- 创建 DatabaseManager 类 – 通过这个类来封装对数据库的操作
/*** Created with IntelliJ IDEA.* Description:通过这个类来封装针对数据库的操作.** @author: zxj* @date: 2024-02-26* @time: 21:21:21*/
@Slf4j
public class DatabaseManager {// 由于 DataBaseManager 不是⼀个 Bean// 需要⼿动来获取实例private MetaMapper metaMapper;public void init() {metaMapper = MqApplication.context.getBean(MetaMapper.class);if (!checkDBExits()) {// 数据库不存在// 1. 先创建目录File file = new File("./data/");if (!file.exists()) {file.mkdirs();}// 2. 建表createTables();// 3. 插入默认的数据createDefaultData();log.info("创建数据库成功~");} else {log.info("数据库已经存在!");}}}
如果数据库已经存在了, 就不必建库建表了
针对 MqApplication, 需要新增⼀个 context 属性. 并初始化
@SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context = null;public static void main(String[] args) {context = SpringApplication.run(MqApplication.class, args);}
}
- 实现 checkDBExists
private boolean checkDBExits() {File file = new File("./data/meta.db");return file.exists();}
- 实现 createTable
/*** 这个方法用来建表.* 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)* 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)**/private void createTables() {metaMapper.createMSGQueueTable();metaMapper.createBindingTable();metaMapper.createExchangeTable();log.info("建表成功");}
- 实现 createDefaultData
/*** @description: 给数据库表中, 添加默认的数据* 此处主要是添加一个默认的交换机* RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT**/
private void createDefaultData() {// 构造一个默认的交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(false);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info("创建默认的数据成功~");
}
默认数据主要是创建⼀个默认的交换机. 这个默认交换机没有名字, 并且是直接交换机.
- 封装其他的数据库操作
// 封装其他的数据库操作Integer insertExchange(Exchange exchange) {return metaMapper.insertExchange(exchange);}Integer deleteExchangeByName(String name) {return metaMapper.deleteExchangeByName(name);}List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}Integer insertMSGQueue(MSGQueue msgQueue) {return metaMapper.insertMSGQueue(msgQueue);}Integer deleteMSGQueueByName(String name) {return metaMapper.deleteMSGQueueByName(name);}List<MSGQueue> selectAllMSGQueues() {return metaMapper.selectAllMSGQueues();}Integer insertBinding(Binding binding) {return metaMapper.insertBinding(binding);}Integer deleteBinding(String exchangeName, String queueName) {return metaMapper.deleteBinding(exchangeName, queueName);}List<Binding> selectAllBindings() {return metaMapper.selectAllBindings();}// 清理资源public void deleteDB() {File file = new File("./data/meta.db");if (file.delete()) {log.info("删除数据库文件成功~");} else {log.info("删除数据库文件失败~");}File dataDir = new File("./data/");// 使用 delete 删除目录的时候, 需要保证目录是空的.if (dataDir.delete()) {log.info("删除数据库目录成功~");} else {log.info("删除数据库目录失败~");}}
测试 DataBaseManager
使⽤ Spring ⾃带的单元测试, 针对上述代码进⾏测试验证.
在 test ⽬录中, 创建 DataBaseManagerTests
- 准备⼯作
@SpringBootTest
class DatabaseManagerTest {private final DatabaseManager databaseManager = new DatabaseManager();// 接下来需要编写多个方法, 每个方法都是一个/一组单元测试用例// 还需要做一个准备工作, 需要写两个方法, 分别用于今 "准备工作" 和 "收尾工作"// 使用这个方法, 来执行准备工作, 每个用例执行前, 都要调用这个方法@BeforeEachpublic void setUp() {// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的// 所以就需要先把 context 对象给搞出来, 给搞出来MqApplication.context = SpringApplication.run(MqApplication.class);databaseManager.init();}// 使用这个方法, 来执行收尾工作, 每个用例执行后, 都要调用这个方法@AfterEachpublic void tearDown() {/*这里需要进行操作, 就是把数据库给清空~ (把数据库文件, meta.db 直接删了就行了)注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件如果 meta.db 被别人打开了, 次数的删除文件操作是不会成功的 (Windows 系统的限制, Linux 没有这个问题)另一方面, 获取 context 操作, 会占用 8080 端口, 此处的 close 也是释放 8080*/MqApplication.context.close();databaseManager.deleteDB();}
}
@SpringBootTest
注解表示该类是一个测试类@BeforeEach
每个测试用例之前执行, 一般用来做准备工作, 此处进行数据库初始化, 以及针对 Spring 服务的初始化@AfterEach
每个测试用例之后执行, 一般用来做收尾工作, 此处需要先关闭 Spring 项目, 再删除数据库
- 编写测试用例
@Test
注解表示一个测试用例Assertions
是断言, 用来判定结果的- 每个用例执行之前, 都会先调用 setUp, 每次用例执行后, 都会调用 tearDown
- 确保每个用例执行的都是 “clean” 的, 也就是每个测试用例不会被上一个测试用例干扰
具体代码
@Testvoid init() {// 由于 init 方法, 已经在上面 setUp 方法中调用了, 直接在测试用例代码中, 检查当前的数据库状态即可// 直接从数据库中查询, 看数据是否符合预期.// 查交换机表, 里面应该有一个数据 (匿名的 exchange); 查消息队列表, 没有数据; 查绑定表, 没有数据List<Exchange> exchanges = databaseManager.selectAllExchanges();List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();List<Binding> bindings = databaseManager.selectAllBindings();/*直接打印结果, 通过肉眼来检查结果, 可以但是不优雅, 不方便更好的方法是使用断言System.out.println(exchanges.size());assertEquals 判定结果是不是相等注意 assertEquals 两个参数的顺序, 虽然比较相等, 谁在前, 谁在后, 无所谓但是 assertEquals 的形参, 第一个形参叫做 expected (预期), 第二个形参叫做 actual (实际的)*/Assertions.assertEquals(1, exchanges.size());Assertions.assertEquals("", exchanges.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT, exchanges.get(0).getType());Assertions.assertEquals(0, msgQueues.size());Assertions.assertEquals(0, bindings.size());}@org.jetbrains.annotations.NotNullprivate Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setDurable(true);exchange.setAutoDelete(false);exchange.setArguments("11", "aa");exchange.setArguments("22", "bb");return exchange;}@Testvoid insertExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.String exchangeName = "exchangeTest";Exchange exchange = createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果List<Exchange> exchanges = databaseManager.selectAllExchanges();Exchange newExchange = exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(true, newExchange.getDurable());Assertions.assertEquals(false, newExchange.getAutoDelete());Assertions.assertEquals("aa", newExchange.getArguments("11"));Assertions.assertEquals("bb", newExchange.getArguments("22"));}@Testvoid deleteExchangeByName() {// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!String exchangeName = "exchangeTest";Exchange exchange = createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果List<Exchange> exchanges = databaseManager.selectAllExchanges();Exchange newExchange = exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());// 删除交换机databaseManager.deleteExchangeByName(exchangeName);exchanges = databaseManager.selectAllExchanges();Assertions.assertEquals(1, exchanges.size());Assertions.assertEquals("", exchanges.get(0).getName());}@Testvoid selectAllExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.String exchangeName = "exchangeTest";Exchange exchange = createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果List<Exchange> exchanges = databaseManager.selectAllExchanges();Exchange newExchange = exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(true, newExchange.getDurable());Assertions.assertEquals(false, newExchange.getAutoDelete());Assertions.assertEquals("aa", newExchange.getArguments("11"));Assertions.assertEquals("bb", newExchange.getArguments("22"));}private MSGQueue createTestMSGQueue(String msgQueueName) {MSGQueue msgQueue = new MSGQueue();msgQueue.setName(msgQueueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);msgQueue.setArguments("a", 1);msgQueue.setArguments("b", 2);return msgQueue;}@Testvoid insertMSGQueue() {// 插入数据String msgQueueName = "testMSGQueueName";MSGQueue msgQueue = createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew = msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());Assertions.assertEquals(true, msgQueueNew.getDurable());Assertions.assertEquals(false, msgQueueNew.getAutoDelete());Assertions.assertEquals(false, msgQueueNew.getExclusive());Assertions.assertEquals(1, msgQueueNew.getArguments("a"));Assertions.assertEquals(2, msgQueueNew.getArguments("b"));}@Testvoid deleteMSGQueueByName() {// 插入数据String msgQueueName = "testMSGQueueName";MSGQueue msgQueue = createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew = msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());// 依据名字删除databaseManager.deleteMSGQueueByName(msgQueueName);msgQueues = databaseManager.selectAllMSGQueues();Assertions.assertEquals(0, msgQueues.size());}@Testvoid selectAllMSGQueue() {// 插入数据String msgQueueName = "testMSGQueueName";MSGQueue msgQueue = createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew = msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());}private @NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(msgQueueName);binding.setBindingKey("Hello word");return binding;}@Testvoid insertBinding() {// 插入 binding 数据String exchangeName = "testBindingExchangeName";String msgQueueName = "testBindingMSGQueueName";Binding binding = createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询List<Binding> bindings = databaseManager.selectAllBindings();Binding bindingNew = bindings.get(0);Assertions.assertEquals(1, bindings.size());Assertions.assertEquals(exchangeName, bindingNew.getExchangeName());Assertions.assertEquals(msgQueueName, bindingNew.getQueueName());Assertions.assertEquals("Hello word", bindingNew.getBindingKey());}@Testvoid deleteBinding() {// 插入 binding 数据String exchangeName = "testBindingExchangeName";String msgQueueName = "testBindingMSGQueueName";Binding binding = createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询List<Binding> bindings = databaseManager.selectAllBindings();Binding bindingNew = bindings.get(0);Assertions.assertEquals(1, bindings.size());// 删除databaseManager.deleteBinding(exchangeName, msgQueueName);bindings = databaseManager.selectAllBindings();Assertions.assertEquals(0, bindings.size());}@Testvoid selectAllBinding() {// 插入 binding 数据String exchangeName = "testBindingExchangeName";String msgQueueName = "testBindingMSGQueueName";Binding binding = createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询List<Binding> bindings = databaseManager.selectAllBindings();Binding bindingNew = bindings.get(0);Assertions.assertEquals(1, bindings.size());}
七、消息存储设计
设计思路
消息需要再硬盘上存储, 但是并不是直接放到数据库中,而是直接使用文件存储。
原因如下:
- 对于消息的操作不需要复杂的 增删改查
- 对于文件的操作效率比数据库会高很多
主流 mq 的实现 (包括 RabbitMQ), 都是把消息存储在文件中, 而不是数据库中
我们给每个队列分配一个目录, 目录的名字为 data + 队列名, 形如 ./data/testQueue, 该目录中包含两个固定名字的文件
queue_data.txt
消息数据文件, 用来保存消息内容queue_stat.txt
消息统计文件, 用来保存消息统计信息
queue_data.txt
文件格式:
使用二进制方式存储.
每个消息分成两个部分:
- 前四个字节, 表示 Message 对象的长度(字节为单位)
- 后面若干个字节, 表示 Message 内容
- 消息和消息之间收尾相连
每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化
Message 对象中的 offsetBeg 和 offsetEnd 正是用来描述每个消息体所在的位置
queue_static.txt
文件格式:
使用文本方式存储
文件中只包含一行, 里面包含两列(都是整数), 使用 \t 分割.
第一列表示当前总的消息数目. 第二列表示有效消息数目.
形如:
2000\t1500
创建 MessageFileManager 类
创建 mqserver.datacenter.MessageFileManager
/*** Created with IntelliJ IDEA.* Description:消息持久化存储* 存储单位是以队列名字为单位存储的** @author: zxj* @date: 2024-02-28* @time: 13:43:23*/
@Slf4j
public class MessageFileManger {private final static String BASIC_DIR = "./data/";/*** @description: 内部类, 用于管理 queue_stat.txt 中的数据* 存储格式: totalCount \t validCount* 作用: 为了后面垃圾回收功能做准备的* 约定: 当 有效信息的占比低于 50% 时, 并且 总的消息数目大于 2000 时, 触发 GC 功能**/static public class Stat {// 总信息的存储数目public Integer totalCount;// 有效的信息数目public Integer validCount;// 最少消息数目private static final Integer atLeastCount = 2000;// 最低有效信息占比private static final Double minProportion = 0.5;}public void init() {// 暂时不需要任何初始化操作, 方便后续扩展}// 设定信息存储的目录和文件/*** @description: 用来获取指定队列信息存储的目录**/@Contract(pure = true)private @NotNull String getQueueDir(String queueName) {return BASIC_DIR + queueName;}/*** @description: 用来获取指定队列信息数据存储路径**/@Contract(pure = true)private @NotNull String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}/*** @description: 用来获取指定队列信息记录存储路径**/@Contract(pure = true)private @NotNull String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}/*** @description: 用来获取指定队列新数据存储路径**/@Contract(pure = true)private @NotNull String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}}
- 内部包含一个 Stat 类, 用来标识消息统计文件的内容
- getQueueDir, getQueueDataPath, getQueueStatPath, getQueueDataNewPath 用来表示这几个文件的位置
实现统计⽂件读写
这是后续操作的一些准备工作
/*** @description: 读取 queue_stat.txt 文件里面的内容**/private @NotNull Stat readStat(String queueName) throws IOException {Stat stat = new Stat();try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueStatPath(queueName)))) {// 因为 queue_stat.txt 里面存储的内容是文本的, 所以可以使用 Scanner 来读取Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();}return stat;}/*** @description: 向 queue_stat.txt 文件里面写入内容**/private void writeStat(String queueName, @NotNull Stat stat) throws IOException {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream = Files.newOutputStream(Paths.get(getQueueStatPath(queueName)))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();}}
直接使用 Scanner 和 Printer 写即可
实现创建队列⽬录
每个队列都是自己的目录和配置的文件, 通过下列方法把目录和文件先准备好
/*** @description: 创建相关目录信息, 并进行相关的初始化操作**/public void createQueueFiles(String queueName) throws IOException {// 1. 创建对应的目录File fileDir = new File(getQueueDir(queueName));if (!fileDir.exists()) {// 不存在对应目录if (!fileDir.mkdirs()) {throw new IOException("创建目录失败, fileDir: " + fileDir.getAbsoluteFile());}}// 2. 创建 queue_data.txt 文件File fileData = new File(getQueueDataPath(queueName));if (!fileData.exists()) {// 不存在对应文件if (!fileData.createNewFile()) {throw new IOException("创建目录失败, fileData: " + fileData.getAbsoluteFile());}}// 3. 创建 queue_stat.txt 文件File fileStat = new File(getQueueDataPath(queueName));if (!fileStat.exists()) {// 不存在对应文件if (!fileStat.createNewFile()) {throw new IOException("创建目录失败, fileStat: " + fileStat.getAbsoluteFile());}}// 4. 初始化 queue_stat.txt 文件Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);}
把上述约定的文件都创建出来, 并对消息统计文件进行初始化
初始化 0\t0 这样的初始值
实现删除队列⽬录
如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除
/*** @description: 销毁消息的目录文件**/public void destroyQueueFiles(String queueName) throws IOException {// 先删除文件, 在删除目录File fileData = new File(getQueueDataPath(queueName));boolean ok1 = fileData.delete();File fileStat = new File(getQueueStatPath(queueName));boolean ok2 = fileStat.delete();File fileDir = new File(getQueueDir(queueName));boolean ok3 = fileDir.delete();if (!ok1 || !ok2 || !ok3) {// 但凡有一个失败, 就算整体是失败的throw new IOException("删除指定文件和目录失败, dir: " + fileDir.getAbsoluteFile());}}
注意: File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉
检查队列⽂件是否存在
/*** @description: 判断 queueName 对应的文件是否存在* 比如后续生产者给 broker server 生产消息了, 这消息可能需要被记录到文件上(取决于该信息是否需要持久化)* @return:**/public boolean checkFilesExists(String queueName) {// 判断队里的数据文件和状态文件是否存在即可File fileData = new File(getQueueDataPath(queueName));File fileStat = new File(getQueueStatPath(queueName));return fileStat.exists() && fileData.exists();}
实现消息对象序列化/反序列化
Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此处针对这⾥的逻辑进⾏封装.
创建 common.BinaryUtils
/*** Created with IntelliJ IDEA.* Description:操作二级制数据相关的工具类 -- 提供将 java 对象 反序列化和序列化** @author: zxj* @date: 2024-02-28* @time: 14:33:24*/
public class BinaryUtils {/*** @description: 反序列化, 将字节数组转化为 java 对象**/public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 这里的 readObject 就是将字节数组 反序列为 java 对象object = objectInputStream.readObject();}}return object;}public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数据// 可以把 Object 序列化的数据逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到 ObjectOutputStream中// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream 中, 最终结果就写入到 ByteArrayOutputStream 中了objectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据提取出现, 转化为 byte[]return byteArrayOutputStream.toByteArray();}}}
- 使用 ByteArrayInputStream / ByteArrayOutputStream 针对 byte[ ]进行封装, 方便后续操作 (这两个流对象是纯内存的, 不需要进行 close)
- 使用 ObjectInputStream / ObjectOutputStream 进行序列化和反序列化操作, 通过内部的 readObject / writeObject 即可完成对应操作
- 此处涉及到的序列化对象, 需要实现 Serializable 接口
实现写⼊消息⽂件
/*** @description: 增* 将 message 放到 msgQueue 对应的队列文件中* @param: [msgQueue 消息队列, message 需要存储的信息 - 内存中也会管理该对象]**/public void sendMessage(@NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {// 1. 检查当前队列要写入的文件是否存在if (!checkFilesExists(msgQueue.getName())) {throw new MqException("[MessageFileManager] 队列所对应的文件不存在! queueName=" + msgQueue.getName());}// 2. 把 message 转化为 字节数组byte[] messageBinary = BinaryUtils.toBytes(message);// 将 messageBinary 写入到 msgQueue 所对应的队列文件中// 文件属于一个公共资源, 此时进行写操作, 存在线程安全的问题// 需要对对应的队列进行加锁, 确保同时向同一个队列中写入信息是线程安全的synchronized (msgQueue) {// 3. 设置 Message 对象中 offsetBeg 和 offsetEnd 字段// 3.1 获取此时对应文件的总长度, fileQueueData.length() 就可以获取File fileQueueData = new File(getQueueDataPath(msgQueue.getName()));// 3.2 计算// 把新的 message 写入到文件中, offsetBeg = 旧的总长度 + 4, offsetEnd = 旧的总长度 + messageBinary.length + 4message.setOffsetBeg(fileQueueData.length() + 4);message.setOffsetEnd(fileQueueData.length() + messageBinary.length + 4);// 4. 将 messageBinary 写入到文件的默认, 注意: 这里是追加写try (OutputStream outputStream = new FileOutputStream(getQueueDataPath(msgQueue.getName()), true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 4.1 先写入新 messageBinary 的长度 -- 固定占四个字节// 知识点: outputStream.write() 参数看似是 int 类型, 但是实际上只是写入一个字节的数据, dataOutputStream.writeInt() 就是写四个字节的数据dataOutputStream.writeInt(messageBinary.length);// 4.2 写入主体信息dataOutputStream.write(messageBinary);}}// 5. 更新信息统计文件的信息Stat stat = readStat(msgQueue.getName());stat.validCount += 1;stat.totalCount += 1;writeStat(msgQueue.getName(), stat);}}
- 考虑线程安全, 按照队列维度进行加锁
- 使用 DataOutputStream 进行二进制写操作, 比原生 OutputStream 要方便
- 需要记录 Message 对象在文件中的偏移量, 后续的删除操作依赖这个偏移量定位到信息, offsetBeg是原文件大小的基础上 + 4, 4个字节是存放消息大小的空间
- 写完消息, 要同时更新统计消息
创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常.
/*** Created with IntelliJ IDEA.* Description:自定义异常信息** @author: zxj* @date: 2024-02-28* @time: 14:30:32*/
public class MqException extends Exception {public MqException() {}public MqException(String message) {super(message);}
}
实现删除消息
此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.
这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决.
/*** @description: 删除 message* 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置为 0x0* 1. 先把这个文件中的这一段数据, 读出现来, 还原回 Message 对象* 2. 把 isValid 该成 0;* 3. 把上述数据重新写回文件* 此处这个参数中的 message 对象, 必须要包含有效的 offsetBeg 和 offsetEnd**/public void deleteMessage(@NotNull MSGQueue msgQueue, @NotNull Message message) throws IOException, ClassNotFoundException {// 修改文件, 存在线程安全问题synchronized (msgQueue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(msgQueue.getName()), "rw")) {// 1. 先从文件中读取对应的 message 数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 将当前读出来的二进制数据, 转换成 Message 对象Message diskMessage = (Message) BinaryUtils.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效diskMessage.setIsValid((byte) 0x0);// 此处不需要宰割参数这个 Message 的 isValid 设为 0, 因为这个参数代表的内容中管理的 Message 对象, 而这个对象也马上要被从内存中销毁了// 4. 重新写入文件byte[] buffDest = BinaryUtils.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到下一个信息的位置了,// 因此想要接下来的写入, 能能够刚好写回到之前的位置, 就需要重新调整文件光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(buffDest);// 通过上述这些操作, 对于文件来说, 只是有一个字节发生了改变了而已~}// 更新统计文件, 把一个消息设置为无效了, 此时有效信息个数就需要 -1Stat stat = readStat(msgQueue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(msgQueue.getName(), stat);}}
- 使用 RandomAccessFile 来随机访问到文件的内容
- 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在文件中的位置, 通过 randomAccessFile.seek 操作文件指针偏移过去, 在读取
- 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回文件, 注意写的时候要重新设定文件指针的位置, 文件指针会随着上述的读操作产生改变
- 最好, 要记得更新统计文件, 把合法消息 -1
实现消息加载
把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键.
/*** @description: 查* 使用这个方法, 从文件中, 读取所有的消息内容, 加载到内存中 (具体来说是放到一个链表里面)* 这个方法,使用一个 LinkedList, 主要目的是为了后续进行头删操作* 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象, 因为这个方法不需要加锁,只使用 queueName 就够了* 由于该方法是在程序启动调用, 此时服务还不能处理请求, 不涉及多线程操作文件**/public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueDataPath(queueName)))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件的光标long currentOffset = 0;while (true) {// 1. 先读取当前消息长度字段int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取后续的数据长度byte[] buffSrc = new byte[messageSize];int actualSize = dataInputStream.read(buffSrc);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错了throw new MqException("[MessageFileManager] 文件格式错误, queueName=" + queueName);}// 3. 把读到的二进制数据, 反序列化为 Message 对象Message message = (Message) BinaryUtils.fromBytes(buffSrc);// 4. 判定一下, 看看这个消息对象, 是不是无效对象if (message.getIsValid() != 0x1) {// 无效数据, 直接跳过// 虽然消息是无效数据, 但是 offset 仍要更新currentOffset += (4 + messageSize);continue;}// 5. 有效数据, 则需要将这个 Message 对象加入到链表中, 加入之前还需要添加 OffsetBeg 和 OffsetEnd// 进行计算 Offset 的时候, 需要当前文件光标的位置, 由于当下使用的 DataInputStream 并不方便直接获取文件光标位置// 因此需要手动计算下文件光标message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个异常是表示读取到文件的末尾了, 这是 DataInputStream 中规定的// 不需要做什么特殊处理log.info("恢复磁盘文件数据完成");}}return messages;}
- 使用 DataInputStream 读取数据, 先读 4 个字节为消息懂得长度, 然后在按照这个长度来读取实际消息内容
- 读取完毕之后, 转化成 Message 对象
- 同时计算出该对象的 offsetBeg 和 offsetEnd
- 最终把结果整理成链表, 返回出去
- 注意, 对于DataInputStream 来说, 如果读到 EOF, 就会抛出一个 EOFException, 而不是返回特定值, 因此需要注意上述循环的结束条件
实现垃圾回收(GC)
上述删除操作, 只是把消息在文件上标记成了无效, 并没有腾出磁盘空间, 因此需要定期的进行批量删除.
此处使用类似于复制算法, 当总消息数超过 2000, 并且有效消息数目少于 50%的时候, 就触发 GC.
GC 的时候会把所有有效消息加载出来, 写入一个新的消息文件中, 使用新文件, 替代旧文件即可
/*** @description: 检查当前是否需要针对队列的消息数据文件进行 GC**/public boolean checkGC(String queueName) throws IOException {// 判断是否要 GC, 是根据总消息数和有效消息数, 这两个值都是在 消息统计文件中的Stat stat = readStat(queueName);return stat.totalCount > Stat.atLeastCount && (double) stat.validCount / (double) stat.totalCount < Stat.minProportion;}/*** @description: 垃圾回收, 防止存储过多垃圾信息* 通过这个方法, 真正执行消息数据文件的垃圾回收操作* 使用复制算法来完成* 创建一个新的文件, 名字就是 queue_data_new.txt* 把之前消息数据文件中的有效消息都读出来, 写到新的文件中* 删除旧的文件,在把新的文件改名回 queue_data.txt* 同时要更新消息统计文件**/public void gc(MSGQueue msgQueue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的过程, 是针对消息数据文件进行大洗牌, 这个过程中, 其他线程不能针对该队列的消息文件做任何修改synchronized (msgQueue) {// 由于 gc 操作可能比较耗时, 此处统计一下消耗时间long gcBeg = System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(msgQueue.getName()));if (queueDataNewFile.exists()) {// 正常情况下, 这个文件不应该存在, 如果存在, 就是意外, 说明上次 gc 了一半, 程序意味崩溃了throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在, queueName=" + msgQueue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建文件失败 ,queueDataNewFile=" + queueDataNewFile.getAbsoluteFile());}// 2. 从旧文件中, 读取出所有的有效消息对象LinkedList<Message> messages = loadAllMessageFromQueue(msgQueue.getName());// 3. 把有效信息写入到新的文件中try (OutputStream outputStream = Files.newOutputStream(Paths.get(getQueueDataNewPath(msgQueue.getName())))) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryUtils.toBytes(message);// 先写长度dataOutputStream.writeInt(buffer.length);// 在写整体数据dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并把新的文件进行重命名File queueDataOldFile = new File(getQueueDataPath(msgQueue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败, queueDataOldFile: " + queueDataOldFile.getAbsoluteFile());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败, queueDataOldFile: " + queueDataOldFile.getAbsoluteFile() + ", queueDataNewFile: " + queueDataNewFile.getAbsoluteFile());}// 5. 更新统计文件Stat stat = readStat(msgQueue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();long gcEnd = System.currentTimeMillis();log.info("gc 执行消耗时间: {} ms", (gcEnd - gcBeg));}}
如果文件很大, 消息非常多, 可能比较低效, 这种就需要把文件做拆分和合并了
Rabbitmq 是这样实现了, 此处实现简答, 就不做了
测试 MessageFileManager
- 创建两个队列, 用来辅助测试
- 使用
ReflectionTestUtils.invokeMethod
来调用私有方法
package en.edu.zxj.mq.mqserver.datacenter;import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.MSGQueue;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-02-28* @time: 18:30:12*/
@SpringBootTest
class MessageFileMangerTest {private MessageFileManger messageFileManger = new MessageFileManger();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";/*** @description: 每个方法执行前的 准备工作**/@BeforeEachvoid setUp() throws IOException {// 创建两个队列messageFileManger.createQueueFiles(queueName1);messageFileManger.createQueueFiles(queueName2);}/*** @description: 每个方法执行前的 收尾工作工作**/@AfterEachvoid tearDown() throws IOException {// 删除两个队列文件messageFileManger.destroyQueueFiles(queueName1);messageFileManger.destroyQueueFiles(queueName2);}// @Test// void init() {// // 功能待开发// }@Testvoid createQueueFiles() {// 创建文件已经在上面 setUp 阶段执行过了, 此处主要是验证看看文件是否存在File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testvoid destroyQueueFiles() throws IOException {// 删除文件, 看看是否存在, 不存在才对File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());// messageFileManger.destroyQueueFiles(queueName1);// Assertions.assertEquals(false,queueDataFile1.isFile());// Assertions.assertEquals(false,queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testvoid checkFilesExists() {// 在 setUp 阶段, 创建了两个队列, 此处只要判断接口返回的是否是 true 即可Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName1));Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName2));}private Message createTestMessage(String content) {return Message.createMessageWithId("testRoutingKey", null, content.getBytes());}private MSGQueue createTestMSGQueue(String queueName) {MSGQueue msgQueue = new MSGQueue();msgQueue.setName(queueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);return msgQueue;}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {// 构造出消息, 并构造出队列Message message = createTestMessage("testSendMessage");// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 调用发送信息方法messageFileManger.sendMessage(queue, message);// 检查 stat 文件MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger, "readStat", queueName1);Assertions.assertEquals(1, stat.validCount);Assertions.assertEquals(1, stat.totalCount);// 检查 data 文件LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);Message newMessage = messages.get(0);Assertions.assertEquals(true, newMessage.equals(message));System.out.println("message: " + newMessage);}@Testvoid deleteMessage() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 构造 10 条数据, 在进行删除List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 删除数据messageFileManger.deleteMessage(queue,exceptedMessages.get(9));messageFileManger.deleteMessage(queue,exceptedMessages.get(8));// 判断数据是否正确LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(8,actualMessages.size());for (int i = 0; i < 8; i++) {Message exceptedMessage = exceptedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println(i + ": " + actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}}@Testvoid loadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 构造 100 条数据List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 读取所有数据, 看释放和原来的数据相同// 判断数据是否正确LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(100,actualMessages.size());for (int i = 0; i < 100; i++) {Message exceptedMessage = exceptedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println(i + ": " + actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}}@Testvoid gc() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue = createTestMSGQueue(queueName1);// 构造 100 条数据List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 保存 gc 前文件的大小File file = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGC = file.length();// 删除偶数下标的数据for (int i = 0; i < 100; i+=2) {messageFileManger.deleteMessage(queue,exceptedMessages.get(i));}// 手动调用 gcmessageFileManger.gc(queue);// 读取所有数据LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50,actualMessages.size());for (int i = 0; i < 50; i++) {Message exceptedMessage = exceptedMessages.get(i * 2 + 1);Message actualMessage = actualMessages.get(i);System.out.println(i + ": " + actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}File file1 = new File("./data/" + queueName1 + "/queue_data.txt");long afterGC = file1.length();Assertions.assertEquals(true,afterGC < beforeGC);}
}
⼋、 整合数据库和⽂件
上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.
接下来我们把两个部分整合起来, 统⼀进⾏磁盘管理.
创建 DiskDataCenter
使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容.
DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象.
/*** Created with IntelliJ IDEA.* Description:封装访问磁盘数据操作: 数据库 + 文件* 1. 数据库: 交换机, 消息队列, 绑定* 2. 文件: 消息* 上层逻辑如果需要访问磁盘, 就直接调用这个类, 不需要知道下层访问的是数据库还是文件** @author: zxj* @date: 2024-02-28* @time: 21:28:00*/
public class DiskDataCenter {private DatabaseManager databaseManager = new DatabaseManager();private MessageFileManger messageFileManger = new MessageFileManger();public void init() {databaseManager.init();messageFileManger.init();}
}
封装 Exchange ⽅法
/** 封装交换机操作*/public Integer insertExchange(Exchange exchange) {return databaseManager.insertExchange(exchange);}public Integer deleteExchangeByName(String name) {return databaseManager.deleteExchangeByName(name);}public List<Exchange> selectAllExchanges() {return databaseManager.selectAllExchanges();}
封装 Queue ⽅法
/** 封装消息队列操作*/public Integer insertMSGQueue(MSGQueue msgQueue) throws IOException {Integer ret = databaseManager.insertMSGQueue(msgQueue);messageFileManger.createQueueFiles(msgQueue.getName());return ret;}public Integer deleteMSGQueueByName(String name) throws IOException {Integer ret = databaseManager.deleteMSGQueueByName(name);messageFileManger.destroyQueueFiles(name);return ret;}public List<MSGQueue> selectAllMSGQueues() {return databaseManager.selectAllMSGQueues();}
封装 Binding 方法
/** 封装绑定机操作*/public Integer insertBinding(Binding binding) {return databaseManager.insertBinding(binding);}public Integer deleteBinding(String exchangeName, String queueName) {return databaseManager.deleteBinding(exchangeName, queueName);}public List<Binding> selectAllBindings() {return databaseManager.selectAllBindings();}
封装 Message ⽅法
/** 封装消息操作*/public void createQueueFiles(String queueName) throws IOException {messageFileManger.createQueueFiles(queueName);}public void destroyQueueFiles(String queueName) throws IOException {messageFileManger.destroyQueueFiles(queueName);}public boolean checkFilesExists(String queueName) {return messageFileManger.checkFilesExists(queueName);}public void sendMessage(@NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {messageFileManger.sendMessage(msgQueue, message);}public void deleteMessage(@NotNull MSGQueue msgQueue, @NotNull Message message) throws IOException, ClassNotFoundException, MqException {messageFileManger.deleteMessage(msgQueue, message);// 删除一条信息之后, 判断是否需要 gcif (messageFileManger.checkGC(msgQueue.getName())) {messageFileManger.gc(msgQueue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManger.loadAllMessageFromQueue(queueName);}
- 在 deleteMessage 的时候判定是否进⾏ GC.
⼩结
通过上述封装, 把数据库和硬盘⽂件两部分合并成⼀个整体. 上层代码在调⽤的时候则不再关⼼该数据是存储在哪个部分的.
九、 内存数据结构设计
硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构.
对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.
创建 MemoryDataCenter
创建 mqserver.datacenter.MemoryDataCenter
/*** Created with IntelliJ IDEA.* Description:内存数据管理类 -- 实际消息转发/存储的类* 该类后续提供的一些方法, 可能会在多线程环境下使用, 因此需要注意线程安全的问题** @author: zxj* @date: 2024-02-29* @time: 20:58:38*/
@Slf4j
public class MemoryDataCenter {// key: 为 exchangeName, value: Exchange 对象private final ConcurrentHashMap<String, Exchange> exchangesMap = new ConcurrentHashMap<>();// key: 为 messageId, value: Message 对象private final ConcurrentHashMap<String, Message> messagesMap = new ConcurrentHashMap<>();// key: 为 exchangeName, value: Exchange 对象private final ConcurrentHashMap<String, MSGQueue> msgQueuesMap = new ConcurrentHashMap<>();// key1: exchangeName, key2: msgQueueName, value: Binding 对象private final ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();}
- 使用四个哈希表, 管理 Exchange, Queue, Binding, Message
- 使用一个哈希表 + 链表管理队列 -> 消息之间的关系
- 使用一个哈希表 + 哈希表管理所有的未被确认的消息
为了保证消息被正确消费了, 会使用两种方式进行确认, 自动 Ack, 和 手动 Ack
其中自动 Ack 是指当消息被消费之后, 就会立即被销毁释放
其中手动 Ack 是指当消息被消费之后, 由消费者主动调用一个 basicAck方法, 进行主动确认, 服务器收到这个确认之后, 才能真正被销毁消息
此处的 “未确认消息” 就是指在手动Ack模式下, 该消息还没有被调用 basicAck, 此时消息不能被删除, 但是要和其他未消费的消息区分开, 于是另搞个结构
当后续basicAck到了, 就可以删除消息了
封装 Exchange ⽅法
/*** 封装 Exchange 操作**/public void insertExchange(Exchange exchange) {exchangesMap.put(exchange.getName(), exchange);log.info("新交换机添加成功! exchangeName: {}", exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangesMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangesMap.remove(exchangeName);log.info("删除交换机成功! exchangeName: {}", exchangeName);}
封装 Queue ⽅法
/*** 封装 MSGQueue 操作**/public void insertMSGQueue(MSGQueue msgQueue) {msgQueuesMap.put(msgQueue.getName(), msgQueue);log.info("新交换机添加成功! msgQueueName: {}", msgQueue.getName());}public MSGQueue getMSGQueue(String msgQueueName) {return msgQueuesMap.get(msgQueueName);}public void deleteMSGQueue(String msgQueueName) {msgQueuesMap.remove(msgQueueName);log.info("删除交换机成功! msgQueueName: {}", msgQueueName);}
封装 Binding ⽅法
/*** 封装 Binding 操作**/public void insetBinding(Binding binding) throws MqException {// 传统的创建 Map 的步骤, 因为不是原子性操作, 存在线程安全的问题, 为了保证线程安全, 可以加上 synchronized 加锁// ConcurrentHashMap<String, Binding> stringBindingConcurrentHashMap = bindingsMap.get(binding.getExchangeName()) ;// if (stringBindingConcurrentHashMap == null) {// stringBindingConcurrentHashMap = new ConcurrentHashMap<>();// bindingsMap.put(binding.getExchangeName(),stringBindingConcurrentHashMap);// }// ConcurrentHashMap 中有提供了 computeIfAbsent 方法, 简化了上述步骤, 并且是线程安全的 --// 先使用 exchangeName 查询一下, 如果存在就直接返回, 如果不存在就创建ConcurrentHashMap<String, Binding> stringBindingConcurrentHashMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) -> {return new ConcurrentHashMap<>();});synchronized (stringBindingConcurrentHashMap) {// 这里先查询在插入, 具有强的顺序关系, 数据存在二次覆盖, 存在线程安全的问题// 在根据 msgQueueName 查找, 如果存在, 就直接抛异常, 不存在才能插入if (stringBindingConcurrentHashMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在, exchangeName: " + binding.getExchangeName() +"; msgQueueName: " + binding.getQueueName());}stringBindingConcurrentHashMap.put(binding.getQueueName(), binding);}log.info("绑定添加成功成功, binding.exchangeName: {}, binding.queueName: {},", binding.getExchangeName(), binding.getQueueName());}// 获取绑定// 1. 依据 exchangeName 和 queueName 获取唯一的 bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}// 2. 依据 exchangeName 获取所有的 bindingpublic ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if (bindingMap == null) {// 该交换机没有绑定任何队列, 报错throw new MqException("[MemoryDataCenter] 绑定不存在! binding: " + binding);}bindingsMap.remove(binding.getExchangeName());log.info("删除绑定成功! binding: {}", binding);}
封装 Message ⽅法
/*** 封装信息操作**/// 添加信息public void addMessage(Message message) {messagesMap.put(message.getMessageId(), message);log.info("添加信息成功! messageId: {}", message.getMessageId());}// 依据 Id 查询信息public Message getMessage(String messageId) {return messagesMap.get(messageId);}// 依据 Id 删除信息public void deleteMessage(String messageId) {messagesMap.remove(messageId);log.info("消息被删除! messageId: {}", messageId);}// 发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列数据结构中// 先根据队列的名字, 找到该队列对应的消息链表LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), (k) -> {return new LinkedList<>();});// 把数据假如到 messages 里面synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心中插入一下, 假设如果 message 已经在消息中心存在, 重复插入也没有关系// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器不会对 Message 内容修改 basicProperties 和 body)addMessage(message);log.info("消息被投递到队列中! messageId = " + message.getMessageId());}// 从队列中取消息public Message pollMessage(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);// 为空if (messages == null) {return null;}synchronized (messages) {// 队列中没有任何消息if (messages.isEmpty()) {return null;}// 链表中有元素, 就进行头删Message currentMessage = messages.remove(0);log.info("从消息从队列中取出! messageId: {}", currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中消息的个数public int getMessageCount(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);if (messages == null) {return 0;}synchronized (messages) {return messages.size();}}
针对未确认的消息的处理
// 添加未确认的消息public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAck.computeIfAbsent(queueName, (k) -> {return new ConcurrentHashMap<>();});messageHashMap.put(message.getMessageId(), message);log.info("消息进入待确认队列! messageId: {}", message.getMessageId());}// 删除未确认的消息(消息已经确认了)public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAck.computeIfAbsent(queueName, (k) -> {return new ConcurrentHashMap<>();});messageHashMap.remove(messageId);log.info("消息从待确认队列中删除! messageId: {}", messageId);}// 获取指定的未确认的信息public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageConcurrentHashMap = queueMessageWaitAck.get(queueName);if (messageConcurrentHashMap == null) {return null;}return messageConcurrentHashMap.get(messageId);}
实现重启后恢复内存
// 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中 -- 交换机, 消息队列, 绑定, 消息public void recovery(DiskDataCenter diskDataCenter) throws MqException, IOException, ClassNotFoundException {// 0. 清空之前的所有数据exchangesMap.clear();msgQueuesMap.clear();bindingsMap.clear();messagesMap.clear();queueMessageMap.clear();// 1. 恢复所有的交换机数据List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges) {exchangesMap.put(exchange.getName(), exchange);}log.info("恢复所有的 交换机 数据成功!");// 2. 恢复所有的队列数据List<MSGQueue> msgQueues = diskDataCenter.selectAllMSGQueues();for (MSGQueue msgQueue : msgQueues) {msgQueuesMap.put(msgQueue.getName(), msgQueue);}log.info("恢复所有的 队列 数据成功!");// 3. 恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for (Binding binding : bindings) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) -> {return new ConcurrentHashMap<>();});bindingMap.put(binding.getQueueName(), binding);}log.info("恢复所有的 绑定 数据成功!");// 4. 恢复所有的消息队列// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息for (MSGQueue msgQueue : msgQueues) {LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(msgQueue.getName());queueMessageMap.put(msgQueue.getName(), messages);for (Message message : messages) {messagesMap.put(message.getMessageId(), message);}}log.info("恢复所有的 消息队列 成功!");log.info("从磁盘中恢复所有数据到内存成功");// 规定:// 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复, 之前考虑硬盘存储的时候, 也没有设定这一块// 这个消息在硬盘上存储的时候, 就是当做 "为被取走"}
测试 MemoryDataCenter
package en.edu.zxj.mq.mqserver.datacenter;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-02-29* @time: 23:07:51*/
@SpringBootTest
class MemoryDataCenterTest {private MemoryDataCenter memoryDataCenter;@BeforeEachpublic void setUp() {memoryDataCenter = new MemoryDataCenter();}@AfterEachpublic void tearDown() {memoryDataCenter = null;}/*** 创建测试 消息 对象**/private @NotNull Message createTestMessage(@NotNull String content) {return Message.createMessageWithId("testRoutingKey", null, content.getBytes());}/*** 创建 消息队列 对象**/private @NotNull MSGQueue createTestMSGQueue(String queueName) {MSGQueue msgQueue = new MSGQueue();msgQueue.setName(queueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);return msgQueue;}/*** 创建 绑定 对象**/private @NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(msgQueueName);binding.setBindingKey("Hello word");return binding;}/*** 创建交换机对象**/@org.jetbrains.annotations.NotNullprivate Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setDurable(true);exchange.setAutoDelete(false);exchange.setArguments("11", "aa");exchange.setArguments("22", "bb");return exchange;}@Testvoid testExchange() {// 对 交换机 相关操作统一测试Exchange exceptedExchange = createTestExchange("exchangeTest");// 1. 插入操作memoryDataCenter.insertExchange(exceptedExchange);// 2. 查找操作Exchange actualExchange = memoryDataCenter.getExchange(exceptedExchange.getName());// 此时 exceptedExchange 和 actualExchange 应该指向同一个空间才对Assertions.assertEquals(exceptedExchange, actualExchange);// 3. 删除操作memoryDataCenter.deleteExchange(exceptedExchange.getName());actualExchange = memoryDataCenter.getExchange(exceptedExchange.getName());// 判断是否为空Assertions.assertNull(actualExchange);}@Testvoid testMSGQueue() {// 对 消息队列 相关操作统一测试MSGQueue exceptedMSGQueue = createTestMSGQueue("testMSGQueue");// 1. 插入操作memoryDataCenter.insertMSGQueue(exceptedMSGQueue);// 2. 查找操作MSGQueue actualMSGQueue = memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());Assertions.assertEquals(exceptedMSGQueue, actualMSGQueue);// 3. 删除操作memoryDataCenter.deleteMSGQueue(exceptedMSGQueue.getName());actualMSGQueue = memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());// 判断是否为空Assertions.assertNull(actualMSGQueue);}@Testvoid testBinding() throws MqException {// 对 绑定 相关操作的统一测试Binding exceptedBinding = createTestBinding("testExchangeName", "testMSGQueueName");// 1. 插入操作memoryDataCenter.insetBinding(exceptedBinding);// 2. 查找操作Binding actrualBinding = memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());Assertions.assertEquals(exceptedBinding, actrualBinding);ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exceptedBinding.getExchangeName());Assertions.assertEquals(1, bindings.size());Assertions.assertEquals(exceptedBinding, bindings.get(exceptedBinding.getQueueName()));// 3. 删除操作memoryDataCenter.deleteBinding(exceptedBinding);actrualBinding = memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());bindings = memoryDataCenter.getBindings(exceptedBinding.getExchangeName());// 判断是否为空Assertions.assertNull(actrualBinding);Assertions.assertNull(bindings);}@Testvoid testMessage() {// 测试 消息相关的增删查 操作Message exceptedMessage = createTestMessage("testMessage");// 1. 插入操作memoryDataCenter.addMessage(exceptedMessage);// 2. 查找操作Message actualMessage = memoryDataCenter.getMessage(exceptedMessage.getMessageId());Assertions.assertEquals(exceptedMessage, actualMessage);// 3. 删除操作memoryDataCenter.deleteMessage(exceptedMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(exceptedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testvoid sendMessage() {// 1. 创建一个队列, 创建十条消息, 把这些消息都插入到队列中MSGQueue queue = createTestMSGQueue("testQueue");List<Message> exceptedMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);memoryDataCenter.sendMessage(queue, message);exceptedMessages.add(message);}// 2. 从这个队列中取出这些消息List<Message> actualMessages = new ArrayList<>();while (true) {Message message = memoryDataCenter.pollMessage(queue.getName());if (message == null) {break;}actualMessages.add(message);}// 3. 比较取出的消息和之前的消息是否是一致的Assertions.assertEquals(exceptedMessages.size(), actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Assertions.assertEquals(exceptedMessages.get(i), actualMessages.get(i));}}@Testvoid testMessageWaitAck() {// 测试 消息相关的增删查 操作Message exceptedMessage = createTestMessage("testMessage");// 1. 插入操作memoryDataCenter.addMessageWaitAck("testQueueName", exceptedMessage);// 2. 查找操作Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueueName", exceptedMessage.getMessageId());Assertions.assertEquals(exceptedMessage, actualMessage);// 3. 删除操作memoryDataCenter.removeMessageWaitAck("testQueueName", exceptedMessage.getMessageId());actualMessage = memoryDataCenter.getMessageWaitAck("testQueueName", exceptedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testvoid recovery() throws IOException, MqException, ClassNotFoundException {MqApplication.context = SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造好数据DiskDataCenter diskDataCenter = new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange exceptedExchange = createTestExchange("testExchangeName");diskDataCenter.insertExchange(exceptedExchange);// 构造消息队列MSGQueue exceptedMSGQueue = createTestMSGQueue("testQueueName");diskDataCenter.insertMSGQueue(exceptedMSGQueue);// 构造绑定Binding exceptedBinding = createTestBinding("testExchangeName", "testMSGQueueName");diskDataCenter.insertBinding(exceptedBinding);// 构造消息Message exceptedMessage = createTestMessage("testContent");diskDataCenter.sendMessage(exceptedMSGQueue, exceptedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果Exchange actualExchangeName = memoryDataCenter.getExchange("testExchangeName");Assertions.assertEquals(exceptedExchange.getName(), actualExchangeName.getName());Assertions.assertEquals(exceptedExchange.getType(), actualExchangeName.getType());Assertions.assertEquals(exceptedExchange.getDurable(), actualExchangeName.getDurable());Assertions.assertEquals(exceptedExchange.getAutoDelete(), actualExchangeName.getAutoDelete());MSGQueue actualMSGQueue = memoryDataCenter.getMSGQueue("testQueueName");Assertions.assertEquals(exceptedMSGQueue.getName(), actualMSGQueue.getName());Assertions.assertEquals(exceptedMSGQueue.getExclusive(), actualMSGQueue.getExclusive());Assertions.assertEquals(exceptedMSGQueue.getAutoDelete(), actualMSGQueue.getAutoDelete());Assertions.assertEquals(exceptedMSGQueue.getDurable(), actualMSGQueue.getDurable());Binding actualBinding = memoryDataCenter.getBinding("testExchangeName", "testMSGQueueName");Assertions.assertEquals(exceptedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(exceptedBinding.getBindingKey(), actualBinding.getBindingKey());Assertions.assertEquals(exceptedBinding.getQueueName(), actualBinding.getQueueName());Message actualMessage = memoryDataCenter.pollMessage("testQueueName");Assertions.assertEquals(exceptedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(exceptedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getIsValid(), actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(exceptedMessage.getBody(), actualMessage.getBody());// 4. 清理磁盘MqApplication.context.close();File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}
}
⼗、 虚拟主机设计
⾄此, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起来
并且实现⼀些 MQ 的关键 API.
创建 VirtualHost
创建 mqserver.VirtualHost
/*** Created with IntelliJ IDEA.* Description:通过这个类, 来标识虚拟主机* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息, 数据* 同时提供 API 供上层调用* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了** @author: zxj* @date: 2024-03-01* @time: 19:23:34*/
@Getter
@Slf4j
public class VirtualHost {private final String virtualHostName;private final DiskDataCenter diskDataCenter = new DiskDataCenter();private final MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// Router 定义转发规则private final Router router = new Router();// ConsumerManager 实现消费逻辑private final ConsumerManager consumerManager = new ConsumerManager(this);
}
- 其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 后续介绍
实现构造⽅法和 getter
构造⽅法中会针对 DiskDataCenter 和 MemoryDataCenter 进⾏初始化.
同时会把硬盘的数据恢复到内存中.
public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// 先初始化硬盘操作diskDataCenter.init();// 后初始化内存操作memoryDataCenter.init();// 从磁盘中恢复数据到内存中try {memoryDataCenter.recovery(diskDataCenter);} catch (Exception e) {log.error("从磁盘中恢复数据到内存失败!");e.printStackTrace();return;}log.info("初始化 VirtualHost 成功, VirtualHostName: {}", virtualHostName);}
创建交换机
- 约定, 交换机 / 队列的名字, 都加上 VirtualHostName 作为前置, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了
- exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回, 因此不叫做"exchangeCreate"
- 先写硬盘, 后写内存, 因为硬盘失败概率更大, 如果硬盘写失败了,也就不必写内存了
/*** @description: 创建交换机, declare 表示存在就不创建, 因此不叫做 "exchangeCreate"* 此处的 autoDelete, arguments 其实并没有使用, 只是先预留出来. (RabbitMQ 是支持的)* 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了* 先写磁盘, 后写内存, 因为写磁盘失败概率更大, 如果磁盘写失败了, 也就不必要写内存了* @param: [exchangeName, exchangeType, durable, autoDelete, arguments]* @return: 抛异常就返回 false, 正常执行逻辑就返回 true**/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> arguments) {// 真实的 exchangeName 要加上 virtualHostName 前缀exchangeName = virtualHostName + exchangeName;synchronized (lockerExchange) {try {// 1. 判断交换机是否存在Exchange exsitsExchange = memoryDataCenter.getExchange(exchangeName);if (exsitsExchange != null) {log.info("交换机已经存在, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);return true;}// 2. 构造 Exchange 对象Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3. 先写入磁盘if (exchange.getDurable()) {diskDataCenter.insertExchange(exchange);}// 4. 后写入内存memoryDataCenter.insertExchange(exchange);log.info("交换机创建成功, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);return true;} catch (Exception e) {log.warn("创建交换机失败, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);e.printStackTrace();return false;}}}
删除交换机
/*** @description: 删除交换机**/public boolean exchangeDelete(String exchangeName) {// 真正存储的 exchangeNameexchangeName = virtualHostName + exchangeName;synchronized (lockerExchange) {try {// 1. 判断交换机是否存在Exchange exsitsExchange = memoryDataCenter.getExchange(exchangeName);if (exsitsExchange == null) {throw new MqException("交换机不存在, 无法删除! exchangeName; " + exchangeName);}// 2. 删除磁盘中的交换机diskDataCenter.deleteExchangeByName(exchangeName);// 3. 删除内存中的交换机memoryDataCenter.deleteExchange(exchangeName);log.info("删除交换机成功, exchangeName: {},", exchangeName);return true;} catch (Exception e) {log.warn("删除交换机失败, exchangeName: {},", exchangeName);e.printStackTrace();return false;}}}
创建队列
/*** @description: 创建队列**/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {// 真实的 queueNamequeueName = virtualHostName + queueName;synchronized (lockerQueue) {try {// 1. 判断队列是否存在MSGQueue existsQueue = memoryDataCenter.getMSGQueue(queueName);if (existsQueue != null) {log.info("队列已经存在, queueName: {}", queueName);return true;}// 2. 创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 先存入磁盘if (queue.getDurable()) {diskDataCenter.insertMSGQueue(queue);}// 4. 后存入到内存memoryDataCenter.insertMSGQueue(queue);log.info("队列创建成功, queueName: {}", queueName);return true;} catch (Exception e) {log.warn("创建队列失败, queueName: {}", queueName);e.printStackTrace();return false;}}}
删除队列
/*** @description: 删除队列**/public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;synchronized (lockerQueue) {try {// 1. 判断队列是否存在MSGQueue existsQueue = memoryDataCenter.getMSGQueue(queueName);if (existsQueue == null) {throw new MqException("要删除的队列不存在, 无法删除! queueName=" + queueName);}// 2. 删除磁盘中的队列if (existsQueue.getDurable()) {diskDataCenter.deleteMSGQueueByName(queueName);}// 3. 删除内存中的队列memoryDataCenter.deleteMSGQueue(queueName);log.info("删除队列成功, queueName: {}", queueName);return true;} catch (Exception e) {log.warn("删除队列失败, queueName: {}", queueName);e.printStackTrace();return false;}}}
创建绑定
- bindingKey 是进⾏ topic 转发时的⼀个关键概念. 使⽤ router 类来检测是否是合法的 bindingKey.
- 后续再介绍
router.checkBindingKeyValid
的实现. 此处先留空
/*** @description: 添加绑定**/public boolean queueBind(String queueName, String exchangeName, String bindingKey) {// 加上 virtualHostName 前缀queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;synchronized (lockerQueue) {synchronized (lockerExchange) {try {// 1. 判断绑定是否存在Binding existedBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existedBinding != null) {log.info("绑定存在, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);return true;}// 2. 判断 bindingKey 是否合法if (!router.checkBindingKeyValid(bindingKey)) {throw new MqException("bindingKey 不合法! bindingKey: " + bindingKey);}// 3. 创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 4. 获取绑定对应的队列和交换机, 判断这两个是否存在, 都存在才能创建MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("队列不存在, queueName: " + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("交换机不存在, exchangeName: " + exchangeName);}// 5. 写入磁盘if (msgQueue.getDurable() && exchange.getDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insetBinding(binding);log.info("添加绑定成功, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);return true;} catch (Exception e) {log.warn("添加绑定失败, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);e.printStackTrace();return false;}}}}
删除绑定
/*** @description: 删除绑定**/public boolean queueUnBind(String queueName, String exchangeName) {// 加上 virtualHostName 前缀queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;synchronized (lockerExchange) {synchronized (lockerQueue) {try {// 1. 判断绑定是否存在Binding existedBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existedBinding == null) {throw new MqException("要删除的绑定不存在, 无法删除, exchangeName: " + exchangeName + " , queueName: " + queueName);}// 2. 无论绑定是否持久化了, 都试着删除一下磁盘中的数据, 影响不大diskDataCenter.deleteBinding(exchangeName, queueName);// 3. 删除内存memoryDataCenter.deleteBinding(existedBinding);log.info("删除绑定成功, queueName: {}, exchangeName: {}", queueName, exchangeName);return true;} catch (Exception e) {log.warn("删除绑定失败, queueName: {}, exchangeName: {}", queueName, exchangeName);e.printStackTrace();return false;}}}}
发布消息
- 发布消息其实是吧消息发送给指定的Exchange, 在根据 Exchange 和 Queue 的binding关系, 转发到对应队列中
- 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType是相关的
- Direct: routingKey 就是对应的队列名字, 此时不需要binding关系, 也不需要bindingKey,就可以直接转发消息
- Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定到该交换机上的所有队列中
- Topic: routingKey 是一个特定的字符串, 会和bindingKey进行匹配, 如果匹配成功, 则发到对应的队列中, 具体规则后续介绍
- BasicProperties 是消息的元消息, body是消息本体
/*** @description: 发布消息* 发布消息其实就是把消息发送给指定的exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中* 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType 相关的* Direct: routingKey 就是对应的队列名字, 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息* Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定该交换机上的所有队列中* Topic: routingKey 是一个特定的字符串, 会和 bindingKey 按照一定规则进行匹配, 如果匹配成功, 则发送到对应的队列中, 具体规则在 Router 类中介绍* @param: [exchangeName, routingKey, basicProperties 消息的元消息, body 消息本体]* @return:**/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字, 如果为 null, 就使用默认的交换机名字if (exchangeName == null) {exchangeName = "";}exchangeName = virtualHostName + exchangeName;// 2. 检验 routingKey 的合法性if (!router.checkRoutingKeyValid(routingKey)) {throw new MqException("routingKey 非法! routingKey: " + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("交换机不存在! exchangeName: " + exchangeName);}// 4. 依据交换机的类型来进行消息转发if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 此时 routingKey 作为队列的名字, 直接把消息写入指定的队里中, 此时可以无视绑定关系String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 找找队列名字对应的对象MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("队列不存在, queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息 -- 执行一次方法就消费一次消息sendMessage(msgQueue, message);} else {// 按照 fanout 和 topic 的方式来转发// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// ① 获取到绑定对象, 判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue msgQueue = memoryDataCenter.getMSGQueue(binding.getQueueName());if (msgQueue == null) {// 此处就不抛异常, 可能此处有多个这样的队列// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输log.warn("basicPublish 发送消息是, 发现队列不存在! queueName: {}", binding.getQueueName());continue;}// ② 构造消息对象, 发送给每一个队列的对象都是一个新的复制体Message message = Message.createMessageWithId(routingKey, basicProperties, body);// ③ 判定这个消息是否能转发给改队列// 如果是 fanout, 所有绑定的队列都是要转发的// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配if (!router.route(exchange.getType(), binding, message)) {continue;}// ④ 真正的转发消息给队列sendMessage(msgQueue, message);}}log.info("发送信息成功, exchangeName: {}, routingKey: {}", exchangeName, routingKey);return true;} catch (Exception e) {log.warn("发送信息失败, exchangeName: {}, routingKey: {}", exchangeName, routingKey);e.printStackTrace();return false;}}/*** @description: 发送一次消息**/private void sendMessage(MSGQueue msgQueue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上int deliverMode = message.getDeliverMode();// deliverMode 为 1, 不持久化, 为 2 表示持久化if (deliverMode == 2) {diskDataCenter.sendMessage(msgQueue, message);}// 写入内存memoryDataCenter.sendMessage(msgQueue, message);// 通知消费者可以消费消息, 就是让消费者从对应的内存中取出消息consumerManager.notifyConsume(msgQueue.getName());}
路由规则
实现 mqserver.core.Router
- 实现 route ⽅法
/*** @description: 路由选择* @param: [type 交换机类型, binding 绑定对象 -- 里面提取 routingKey, message 消息对象]* @return: 返回该交换机是否可以将该消息转发给绑定的队列中, true 表示可以, false 表示不可以**/public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {// 根据不同的转发类型来进行不同的转发逻辑if (type == ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 该交换机上所有绑定的队列都需要进行转发return true;} else if (type == ExchangeType.TOPIC) {// 如果是 TOPIC 类型, 规则复杂return routeTopic(binding,message);} else {// 其他情况是不应该存在的throw new MqException("[Router] 交换机类型违法! type: " + type);}}
- 实现 checkRoutingKeyValid
/*** @description: 判断 routingKey 是否合法* routingKey 组成规则如下:* 1. 组成: 数字, 字母, 下划线* 2. 使用符号 . 将 routingKey 划分成多个部分* 形如:* aaa.bbb.ccc* a.1.b* a**/public boolean checkRoutingKeyValid(String routingKey) {if (!StringUtils.hasLength(routingKey)) {// null or 空字符串, 合法的情况, 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判断该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判断字符是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判断该字符阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判断字符是否 '_' 或 '.'if (ch == '.' || ch == '_') {continue;}// 走到这里, 都不是上述任何一种合法的情况, 就直接返回 falsereturn false;}return true;}
- 实现 checkBindingKeyValid
/*** @description: 判断 bindingKey 是否合法* bindingKey 组成规则如下:* 1. 组成: 数字, 字母, 下划线* 2. 使用符号 . 将 routingKey 划分成多个部分* 3. 支持两种特殊符号作为通配符 (* 和 # 必须是作为被 . 分割出来的独立的部分)* 1) *: * 可以匹配任何一个独立的部分* 2) #: # 监听匹配任何 0 个或者多个独立的部分* 形如:* aaa.bbb.ccc* a.1.b* a* #* a.*.b**/public boolean checkBindingKeyValid(String bindingKey) {if (!StringUtils.hasLength(bindingKey)) {// null or 空字符串, 合法的情况, 比如在使用 direct / fanout 交换机的时候, bindingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);// 判断该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判断字符是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判断该字符阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判断字符是否 '_' 或 '.''if (ch == '.' || ch == '_') {continue;}// 判断字符是否为通配符 '*' 或 '#'if (ch == '*' || ch == '#') {continue;}// 走到这里, 都不是上述任何一种合法的情况, 就直接返回 falsereturn false;}// 检查 * 或者 # 是否是独立的部分// aaa.*.bbb 合法情况, aaa.a*.bbb 非法情况String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 #, 就是非法的格式了if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}return true;}
- 实现 routeTopic
- 使用动态规划的方式来进行规则的匹配
private boolean routeTopic(@NotNull Binding binding, @NotNull Message message) {String bindingKey = binding.getBindingKey();String routingKey = message.getRoutingKey();String[] bindingStr = bindingKey.split("\\.");String[] routingStr = routingKey.split("\\.");return mate(bindingStr,routingStr);}private boolean mate(String @NotNull [] bindingStr, String @NotNull [] routingStr) {int m = bindingStr.length;int n = routingStr.length;boolean[][] dp = new boolean[m + 1][n + 1];dp[0][0] = true;for (int i = 0; i < m; i++) {if ("#".equals(bindingStr[i])) {dp[i+1][0] = true;} else {break;}}for (int i = 1; i <= m; i++) {String wordBinding = bindingStr[i - 1];for (int j = 1; j <= n; j++) {String wordRouting = routingStr[j - 1];if (!"#".equals(wordBinding) && !"*".equals(wordBinding)) {if (wordBinding.equals(wordRouting)) {dp[i][j] = dp[i - 1][j - 1];} else {dp[i][j] = false;}} else if ("*".equals(wordBinding)) {dp[i][j] = dp[i - 1][j - 1];} else {dp[i][j] = dp[i - 1][j] || dp[i][j - 1];}}}return dp[m][n];}
- 测试 Router 的匹配规则
测试代码如下:
package en.edu.zxj.mq.common;import en.edu.zxj.mq.mqserver.core.Binding;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-03-02* @time: 19:32:53*/
@SpringBootTest
class RouterTest {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}@Testvoid checkBindingKeyValid1() {boolean ok = router.checkBindingKeyValid("aaa.bbb.ccc");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid2() {boolean ok = router.checkBindingKeyValid("1.a.c");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid3() {boolean ok = router.checkBindingKeyValid("a");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid4() {boolean ok = router.checkBindingKeyValid("");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid5() {boolean ok = router.checkBindingKeyValid("a.*.ccc");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid6() {boolean ok = router.checkBindingKeyValid("#");Assertions.assertTrue(ok);}@Testvoid checkBindingKeyValid7() {boolean ok = router.checkBindingKeyValid("aaa.bb*b.ccc");Assertions.assertFalse(ok);}@Testvoid checkBindingKeyValid8() {boolean ok = router.checkBindingKeyValid("123.bbb.ccc");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid1() {boolean ok = router.checkRoutingKeyValid("a.b.c");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid2() {boolean ok = router.checkRoutingKeyValid("a.b._c");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid3() {boolean ok = router.checkRoutingKeyValid("a.b.c@");Assertions.assertFalse(ok);}@Testvoid checkRoutingKeyValid4() {boolean ok = router.checkRoutingKeyValid("a");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid5() {boolean ok = router.checkRoutingKeyValid("a.1.b");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid6() {boolean ok = router.checkRoutingKeyValid("12222222223123123");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid7() {boolean ok = router.checkRoutingKeyValid("aaaa");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid8() {boolean ok = router.checkRoutingKeyValid("_____________._______________");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid9() {boolean ok = router.checkRoutingKeyValid("!!!!!!!!!!!!!!");Assertions.assertFalse(ok);}@Testvoid checkRoutingKeyValid10() {boolean ok = router.checkRoutingKeyValid("a.2._.!");Assertions.assertFalse(ok);}@Testvoid checkRoutingKeyValid11() {boolean ok = router.checkRoutingKeyValid("_a_.1_a.b");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid12() {boolean ok = router.checkRoutingKeyValid("a.b.c.12.7.234.4234.adf.___");Assertions.assertTrue(ok);}@Testvoid checkRoutingKeyValid13() {boolean ok = router.checkRoutingKeyValid("123.468a.sdfa.w");Assertions.assertTrue(ok);}@Testvoid route1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.FANOUT, binding, message);Assertions.assertTrue(ok);}@Testvoid route2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route3() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route5() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route6() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route7() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route8() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route9() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route10() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route12() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route15() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route17() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route18() throws MqException {binding.setBindingKey("aaa.#.#.#");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}@Testvoid route19() throws MqException {binding.setBindingKey("aaa.#.#.#.*");message.setRoutingKey("aaa");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}@Testvoid route20() throws MqException {binding.setBindingKey("aaa.#.#.#.ccc");message.setRoutingKey("aaa.aaa.aaa.bbb.ccc");boolean ok = router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}}
订阅消息
- 添加一个订阅者
/*** @description: 订阅消息* 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者* @param: [consumerTag: 消费者的身份标识,* queueName: 订阅的队列名字,* autoAck: 消息被消费后的应当方式, true 为自动应当, false 为手动应答* consumer: 是一个回调函数, 此处类型设定成函数是接口, 这样后续调用 basicConsume 并且传递实参的时候, 就可以写成 lambda 样子了]* @return: true 表示订阅成功, false 表示订阅失败**/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info("basicConsume 成功! consumerTag: {}, queueName: {}", consumerTag, queueName);return true;} catch (Exception e) {log.warn("basicConsume 失败! consumerTag: {}, queueName: {}", consumerTag, queueName);e.printStackTrace();return false;}}
Consumer 相当于⼀个回调函数. 放到 common.Consumer
中.
- 创建订阅者管理管理类
创建 mqserver.core.ConsumerManager
@Slf4j
public class ConsumerManager {// parent 用来记录虚拟主机private final VirtualHost parent;// 存放令牌的队列, 通过令牌来触发消费线程的消费操作// 使用一个阻塞队列来触发消息消费, 称为令牌队列, 每次有消息过来了, 都为队列中放一个令牌(也就是队列名), 让后消费者再去消费对应的队列信息// 作用: 令牌队列的设定, 避免搞出来太多线程, 否则就需要给每个队列都安排一个单独的线程了, 如果队列很多则开销就比较的了private final BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();// 使用一个线程池用来执行消息的回调private final ExecutorService workerPool = Executors.newFixedThreadPool(4);// 扫描线程private Thread scannerThread = null;
}
- parent 用来记录虚拟主机
- 使用一个阻塞队列来触发信息消费,称为令牌队列, 每次有消息过来了, 都往队列中放一个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
- 使用一个线程池用来执行消息回调
这样令牌队列的设定避免搞出来太多线程,否则需要给每个队列到安排一个单独的线程了,如果队列很多开销就比较大了.
- 添加令牌接⼝
/*** @description: 通知消费者去消费消息**/public void notifyConsume(String name) throws InterruptedException {tokenQueue.put(name);}
- 实现添加订阅者
/*** @description: 添加订阅者* 新来的消费者需要消费掉之前的消息**/public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getMSGQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在, queueName: " + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息, 需要立即消费掉int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一次消息consumeMessage(queue);}}}
创建 ConsumerEnv
, 这个类表⽰⼀个订阅者的执⾏环境.
@Data
@Slf4j
public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}
给 MsgQueue
添加⼀个订阅者列表.
此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮流拿到消息.
- 实现扫描线程
在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作.
public ConsumerManager(VirtualHost virtualHost) {this.parent = virtualHost;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌找到对应的队列MSGQueue msgQueue = parent.getMemoryDataCenter().getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("获取令牌后, 发现队列为空! queueName: " + queueName);}// 3. 从队列中消费一次消息consumeMessage(msgQueue);} catch (Exception e) {e.printStackTrace();}}});scannerThread.start();}
- 实现消费消息
所谓的消费消息, 其实就是调⽤消息的回调. 并把消息删除掉.
/*** @description: 消费一次消息**/private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 取出消费者ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 暂时还没有消费者, 就暂时不消费return;}// 2. 从指定队列中取出一个元素Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中, 这个操作势必在执行回调之前parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());// 3. 如果当前是 "自动应答", 就可以直接报消息给删除了// 如果当前是 "手动应答", 则先不处理, 交给后续消费调用 basicAck 方法来处理if (luckyDog.isAutoAck()) {// ① 先删除磁盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// ② 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// ③ 删除内存中消息parent.getMemoryDataCenter().deleteMessage(message.getMessageId());log.info("消息被消费成功, queueName: {}", queue.getName());}} catch (Exception e) {e.printStackTrace();}});}
注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.
⼩结
一、订阅者已经存在了, 才发送消息
- 这种直接获取队列的订阅者,从中按照轮询的方式挑一个消费者来调用回调即可
- 消息先发送到队列了,订阅者还没到,此时当订阅者到达,就快速把指定队列中的消息全部消费掉。
⼆. 关于消息不丢失的论证
每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.
- 如果 autoAck 为 true
消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作.
分别清除硬盘数据, 待确认队列, 消息中⼼ - 如果 autoAck 为 false
在回调内部, 进⾏清除⼯作.
分别清除硬盘数据, 待确认队列, 消息中⼼.
- 执⾏消息回调的时候抛出异常
此时消息仍然处在待确认队列中
此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.
- 执⾏消息回调的时候服务器宕机
内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.
消息确认
下列⽅法只是⼿动应答的时候才会使⽤.
应答成功, 则把消息删除掉.
/*** @description: 消息确认**/public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("要确认的消息不存在, messageId: " + messageId);}MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);if (msgQueue == null) {throw new MqException("要确认的队列不存在, queueName: " + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(msgQueue,message);}// 3. 删除消息中心中心的数据memoryDataCenter.deleteMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info("basicAck 成功, 消息确认成功! queueName: {}, messageId: {}",queueName,messageId);return true;} catch (Exception e) {log.warn("basicAck 失败, 消息确认失败! queueName: {}, messageId: {}",queueName,messageId);e.printStackTrace();return false;}}
测试 VirtualHost
package en.edu.zxj.mq.mqserver;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-03-02* @time: 21:45:01*/
@SpringBootTest
class VirtualHostTest {private VirtualHost virtualHost = null;@BeforeEachpublic void setUp() {MqApplication.context = SpringApplication.run(MqApplication.class);virtualHost = new VirtualHost("default");}@AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost = null;// 把硬盘的目录删除掉File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testvoid exchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);}@Testvoid exchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchangeName");Assertions.assertFalse(ok);}@Testvoid queueDeclare() {Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);}@Testvoid queueDelete() {Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueDelete("testQueueName");Assertions.assertTrue(ok);}@Testvoid queueBind() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");Assertions.assertTrue(ok);}@Testvoid queueUnBind() {boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.queueUnBind("testQueueName","testExchangeName");Assertions.assertTrue(ok);}@Testvoid basicPublish() {boolean ok = virtualHost.basicPublish("testExchangeName", "testRoutingKey", new BasicProperties(), "Hello word".getBytes());Assertions.assertFalse(ok);ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);Map<String, Object> arguments = new HashMap<>();arguments.put("a", 1);arguments.put("b", 2);ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchangeName", "testQueueName", new BasicProperties(), "Hello word".getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息@Testvoid basicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchangeName",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueueName", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法System.out.println("messageId: " + basicProperties.getMessageId());System.out.println("body = " + new String(body,0,body.length));Assertions.assertEquals("testQueueName",basicProperties.getRoutingKey());Assertions.assertEquals(1,basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(),body);} catch (Error e) {//断言如果失败, 抛出的是 Error, 而不是 Exceptione.printStackTrace();}}});Assertions.assertTrue(ok);Thread.sleep(500);// 在发送消息ok = virtualHost.basicPublish("testExchangeName","testQueueName",null,"hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);}// 先发送消息, 后订阅队列@Testvoid basicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchangeName",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchangeName","testQueueName",null,"hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 后订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueueName", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法System.out.println("messageId: " + basicProperties.getMessageId());System.out.println("body = " + new String(body,0,body.length));Assertions.assertEquals("testQueueName",basicProperties.getRoutingKey());Assertions.assertEquals(1,basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(),body);} catch (Error e) {//断言如果失败, 抛出的是 Error, 而不是 Exceptione.printStackTrace();}}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testvoid basicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 往交换机中发布一个消息ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}
⼗⼀、 ⽹络通信协议设计
明确需求
接下来需要考虑客⼾端和服务器之间的通信. 回顾交互模型.
⽣产者和消费者都是客⼾端, 都需要通过⽹络和 Broker Server 进⾏通信.
此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服务器这边功能的远程调⽤.
要调⽤的功能有:
- 创建 channel
- 关闭 channel
- 创建 exchange
- 删除 exchange
- 创建 queue
- 删除 queue
- 创建 binding
- 删除 binding
- 发送 message
- 订阅 message
- 发送 ack
- 返回 message (服务器 -> 客⼾端)
设计应⽤层协议
使⽤⼆进制的⽅式设定协议.
因为 Message 的消息体本⾝就是⼆进制的. 因此不太⽅便使⽤ json 等⽂本格式的协议.
请求:
响应:
其中 type 表⽰请求响应不同的功能. 取值如下:
- 0x1 创建 channel
- 0x2 关闭 channel
- 0x3 创建 exchange
- 0x4 销毁 exchange
- 0x5 创建 queue
- 0x6 销毁 queue
- 0x7 创建 binding
- 0x8 销毁 binding
- 0x9 发送 message
- 0xa 订阅 message
- 0xb 返回 ack
- 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
其中 payload 部分, 会根据不同的 type, 存在不同的格式.
对于请求来说, payload 表⽰这次⽅法调⽤的各种参数信息.
对于响应来说, payload 表⽰这次⽅法调⽤的返回值.
定义 Request / Response
创建 common.Request
/*** Created with IntelliJ IDEA.* Description:定义了请求的格式* 一个完整的请求, 分成了三个部分* 1. type: 表示请求不同的功能, 调用不同的函数 -- 4 个字节* 2. length: 表示 payload 的长度 -- 4 个字节* 3. payload: 要传输的二进制数据 -- length 个字节** @author: zxj* @date: 2024-03-05* @time: 21:16:58*/
@Data
public class Request implements Serializable {private Integer type;private Integer length;private byte[] payload;@Overridepublic String toString() {return "Request{" +"type=" + type +", length=" + length +'}';}
}
创建 common.Response
/*** Created with IntelliJ IDEA.* Description: 定义一个完整的响应格式* 一个完整的响应, 分成了三个部分* 1. type: 表示响应不同的功能, 调用不同的函数 -- 4 个字节* 2. length: 表示 payload 的长度 -- 4 个字节* 3. payload: 要传输的二进制数据 -- length 个字节** @author: zxj* @date: 2024-03-05* @time: 21:16:46*/
@Data
public class Response implements Serializable {private Integer type;private Integer length;private byte[] payload;@Overridepublic String toString() {return "Response{" +"type=" + type +", length=" + length +'}';}
}
定义参数⽗类
构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再通过继承的⽅式体现.
common.BasicArguments
/*** Created with IntelliJ IDEA.* Description:定义请求父类** @author: zxj* @date: 2024-03-05* @time: 21:31:01*/
@Data
public class BasicArguments implements Serializable {// 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上// 此处的 rid 和 channelId 都是基于 UUID 来生成的, rid 用来标识一个请求-响应, 这一点在请求响应非常多的时候游泳protected String rid;protected String channelId;
}
- 此处的 rid 和 channelId 都是基于 UUID 来⽣成的. rid ⽤来标识⼀个请求-响应. 这⼀点在请求响应⽐较多的时候⾮常重要.
定义返回值⽗类
和参数同理, 也需要构造⼀个类表⽰返回值, 作为 Response 的 payload
common.BasicReturns
/*** Created with IntelliJ IDEA.* Description:定义返回的父类** @author: zxj* @date: 2024-03-05* @time: 21:43:23*/
@Data
public class BasicReturns implements Serializable {// 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上protected String rid;// 用来标识一个 channelprotected String channelId;protected Boolean ok;
}
定义其他参数类
针对每个 VirtualHost 提供的⽅法, 都需要有⼀个类表⽰对应的参数.
- ExchangeDeclareArguments
/*** Created with IntelliJ IDEA.* Description:ExchangeDeclare 方法请求参数类** @author: zxj* @date: 2024-03-05* @time: 21:46:53*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private Boolean durable;private Boolean autoDelete;private Map<String,Object> arguments;
}
⼀个创建交换机的请求, 形如:
- 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构.
- 按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象.
2) ExchangeDeleteArguments
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}
- QueueDeclareArguments
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private Map<String, Object> arguments;
}
- QueueDeleteArguments
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;
}
- QueueBindArguments
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}
- QueueUnbindArguments
@Data
public class QueueUnBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;
}
- BasicPublishArguments
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}
- BasicConsumeArguments
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;
}
- SubScribeReturns
- 这个不是参数, 是返回值. 是服务器给消费者推送的订阅消息.
- consumerTag 其实是 channelId.
- basicProperties 和 body 共同构成了 Message.
/*** Created with IntelliJ IDEA.* Description:返回值, 是服务器给消费者推送的订阅消息.** @author: zxj* @date: 2024-03-05* @time: 22:54:08*/
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}
⼗⼆、 实现 BrokerServer
创建 BrokerServer 类
@Data
@Slf4j
public class BrokerServer {// 当前程序只考虑一个虚拟主机的情况private VirtualHost virtualHost = new VirtualHost("default-virtualHost");// 使用这个 哈希表, 表示当前的所有会话(也就是说有哪些客户端正在和服务器进行通信)// key 为 channelId, value 为 channel 对应的 socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();private ServerSocket serverSocket;// 引入一个线程池, 来处理多个客户端的需求private ExecutorService executorService;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;
}
- virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.
- sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket
- serverSocket 是服务器⾃⾝的 socket
- executorService 这个线程池⽤来处理响应
- runnable 这个标志位⽤来控制服务器的运⾏停⽌.
启动/停⽌服务器
- 这⾥就是⼀个单纯的 TCP 服务器, 没啥特别的
- 实现停⽌操作, 主要是为了⽅便后续开展单元测试.
public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}// begin: 单纯的 TCP 服务器模板public void start() {log.info("[BrokerServer] 服务器开始启动");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把这个处理连接的逻辑对给线程池executorService.submit(() -> {// 处理连接的统一方法processConnection(clientSocket);});}} catch (SocketException e) {log.info("[BrokerServer] 服务器停止运行!");} catch (IOException e) {log.error("[BrokerServer] 服务器出现异常!");e.printStackTrace();}}/*** @description: 一般来说, 停止服务器, 都是 kill 对应的进程就可以了* 此处还是搞一个单独的停止方法, 主要是用于后续的单元测试**/public void stop() throws IOException {runnable = false;// 把线程池中的人物都放弃了, 让线程都销毁executorService.shutdownNow();serverSocket.close();}// end: 单纯的 TCP 服务器模板
实现处理连接
- 对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接.
- 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException
- 如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException
/*** @description: 服务方法* 通过这个方法, 来处里一个客户端的连接* 在这个连接中, 可能会涉及到多个请求和响应**/private void processConnection(@NotNull Socket clientSocket) {// 获取服务对象的 输入输出 流try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {// 这里需要按照特定的格式来读取并解析, 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 循环进行服务, 保持连接, 以便处理多个请求while (true) {// 1. 读取请求并解析Request request = readRequest(dataInputStream);log.info("接收到[client: {} : {}] 请求: {}",clientSocket.getInetAddress(),clientSocket.getPort(),request);// 2. 根据请求计算响应Response response = process(request, clientSocket);log.info("响应给[client: {} : {}] 数据: {}",clientSocket.getInetAddress(),clientSocket.getPort(),response);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {log.info("connection 关闭! 客户端地址: {} : {}", clientSocket.getInetAddress(), clientSocket.getPort());} catch (IOException | MqException | ClassNotFoundException e) {log.error("connection 出现异常 e: {}", e.toString());e.printStackTrace();} finally {try {// 当连接处理完了, 一定要关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 也顺便清理掉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}
实现 readRequest
/*** @description: 反序列化请求消息**/private @NotNull Request readRequest(@NotNull DataInputStream dataInputStream) throws IOException, MqException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new MqException("读取请求格式出错");}request.setPayload(payload);return request;}
实现 writeResponse
- 注意这⾥的 flush 操作很关键, 否则响应不⼀定能及时返回给客⼾端
/*** @description: 将 Response 对象中的内容先后写入 dataOutputStream 中**/private void writeResponse(@NotNull DataOutputStream dataOutputStream, Response response) throws IOException {log.info("{writeResponse}: 即将发送响应为: {}",response);dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 刷新缓冲区十分重要dataOutputStream.flush();}
实现处理请求
- 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
- 再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).
- 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.
- 最后构造成统⼀的响应.
/*** @description: 依据 request 中的信息, 执行相关方法, 并构造 Response 对象返回**/private @NotNull Response process(@NotNull Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析, 让父类来接受BasicArguments basicArguments = (BasicArguments) BinaryUtils.fromBytes(request.getPayload());log.info("request 中 payload 解析结果: rid = {}, channelId = {}", basicArguments.getRid(),basicArguments.getChannelId());// 2. 根据 type 的值, 来近一步来区分这一次请求时要干啥的boolean ok = true; // 各个方法的返回结果基本都是 booleanif (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info("创建 channel 完成! channelId: {}", basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());log.info("销毁 channel 完成! channelId: {}", basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机, 此时的 payload 就是 ExchangeDeclareArguments 对象了ExchangeDeclareArguments exchangeDeclareArguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(exchangeDeclareArguments.getExchangeName(), exchangeDeclareArguments.getExchangeType(), exchangeDeclareArguments.getDurable(), exchangeDeclareArguments.getAutoDelete(), exchangeDeclareArguments.getArguments());} else if (request.getType() == 0x4) {// 销毁交换机ExchangeDeleteArguments exchangeDeleteArguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(exchangeDeleteArguments.getExchangeName());} else if (request.getType() == 0x5) {// 创建队列QueueDeclareArguments queueDeclareArguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(queueDeclareArguments.getQueueName(), queueDeclareArguments.isDurable(), queueDeclareArguments.isExclusive(), queueDeclareArguments.isAutoDelete(), queueDeclareArguments.getArguments());} else if (request.getType() == 0x6) {// 销毁队列QueueDeleteArguments queueDeleteArguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete(queueDeleteArguments.getQueueName());} else if (request.getType() == 0x7) {// 创建绑定QueueBindArguments queueBindArguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(queueBindArguments.getQueueName(), queueBindArguments.getExchangeName(), queueBindArguments.getBindingKey());} else if (request.getType() == 0x8) {// 删除绑定QueueUnBindArguments queueUnBindArguments = (QueueUnBindArguments) basicArguments;ok = virtualHost.queueUnBind(queueUnBindArguments.getQueueName(), queueUnBindArguments.getExchangeName());} else if (request.getType() == 0x9) {// 发布消息BasicPublishArguments basicPublishArguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(basicPublishArguments.getExchangeName(), basicPublishArguments.getRoutingKey(), basicPublishArguments.getBasicProperties(), basicPublishArguments.getBody());} else if (request.getType() == 0xa) {// 订阅消息BasicConsumeArguments basicConsumeArguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(basicConsumeArguments.getConsumerTag(), basicConsumeArguments.getQueueName(), basicConsumeArguments.isAutoAck(), new Consumer() {/*** 这个回调函数要做的工作, 就是把服务收到消息直接推送会给对应的消费者客户端即可, 在客户端进行对消息的消费**/@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端// 此处 consumerTag, 其实就是 channelId (这里是规定的, 客户端填写该字段的时候, 就是以 channelId 来填写的),// 根据 channelId 去 sessions 中查询, 就可以得到对应的 socket 对象, 就可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应, rid 暂时不需要subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryUtils.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送消息数据response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端// 注意! 此处的 dataOutputStream 不能close!// 如果把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 OutputStream 也关闭了// 此时就无法继续往 socket 中写入后续数据了DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 确认应答, 消费者确认收到消息BasicAckArguments basicAckArguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(basicAckArguments.getQueueName(), basicAckArguments.getMessageId());} else {// 当前的 type 是非法的throw new MqException("未知的 type: " + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryUtils.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("构造响应完成: {}", response);return response;}
实现 clearClosedSession
- 如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进⾏清理.
- 注意迭代器失效问题.
/*** @description: 用户关闭连接后, 清理对应的 channel 资源* 需要注意的是 迭代器失效的问题**/private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该关闭的 socket 对应的键值对, 统统删除List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!// 这属于使用集合类的一个大忌 -- 一边遍历, 一边删除!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}log.info("清理 session 完成! 被清理的 channelId: {}", toDeleteChannelId);}
⼗三、 实现客⼾端
创建包 mqclient
创建 ConnectionFactory
⽤来创建连接的⼯⼚类
/*** Created with IntelliJ IDEA.* Description:工厂类 -- 以工厂模式来创建 Connection 类** @author: zxj* @date: 2024-03-06* @time: 21:55:32*/
@Data
public class ConnectionFactory {// BrokerServer 的 IP 和 portprivate String host;private Integer port;// more ...// 建立一个 TCP 连接public Connection newConnection() throws IOException {Connection connection = new Connection(host,port);return connection;}
}
Connection 和 Channel 的定义
⼀个客⼾端可以创建多个 Connection.
⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.
⼀个 Connection 可以包含多个 Channel
- Connection 的定义
@Data
@Slf4j
public class Connection {private Socket socket;private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;// 记录当前 Connection 包含的 Channelprivate ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();// 执行消息回调的线程池private ExecutorService callbackPool = null;
}
- Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.
- channelMap ⽤来管理该连接中所有的 Channel.
- callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.
- Channel 的定义
@Data
public class Channel {// channelId 为 channel 的身份标识, 使用 UUID 标识private String channelId;// connection 为 channel 对应的连接private Connection connection;// key 为 rid, 即 requestId / responseId// basicReturnsMap 用来保存响应的返回值, 放到这个哈希表中方便和请求匹配private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 订阅消息的回调 -- 为消费者的回调(用户注册的), 对应消息响应, 应该调用这个回调处理消息private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}
}
- channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.
- Connection 为 channel 对应的连接.
- baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.
- consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.
封装请求响应读写操作
在 Connection 中, 实现下列⽅法
/*** @description: 读取响应**/public Response readResponse() throws IOException, MqException {log.info("客户端: 开始等待读取消息");Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new MqException("读取的响应数据不完整");}response.setPayload(payload);log.info("收到响应: type: {}, length: {}", response.getType(), response.getLength());return response;}/*** @description: 写请求**/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info("发送请求:type: {}, length: {}", request.getType(), request.getLength());}
创建 channel
在 Connection 中, 定义下列⽅法来创建⼀个 channel
public Channel createChannel() throws IOException {// 使用 UUID 生产 channelId, 以 C- 开头String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 这里需要先把 channel 键值放到 Map 中 进行管理channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了, 整个这次创建 channel 操作不顺利// 把刚才已经加入 hash 表的键值对, 再删了channelMap.remove(channelId);return null;}return channel;}
发送请求
通过 Channel 提供请求的发送操作.
- 创建 channel
/*** @description: 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了**/public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments = new BasicAckArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryUtils.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.getOk();}
generateRid 的实现
private @NotNull String generateRid() {return "R-" + UUID.randomUUID().toString();}
waitResult 的实现
- 由于服务器的响应是异步的. 此处通过 waitResult 实现同步等待的效果
/** @description: 期望使用这个方法来阻塞等待服务器的响应**/private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没有回来// 此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}return basicReturns;}
关闭 channel
/*** @description: 关闭 channel, 给服务器发送一个 0x2 类型的请求**/public boolean close() throws IOException {BasicArguments basicArguments = new BasicAckArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.getOk();}
创建交换机
/*** @description: 创建交换机**/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryUtils.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.getOk();}
删除交换机
/*** @description: 删除交换机**/public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();exchangeDeleteArguments.setExchangeName(exchangeName);exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setRid(generateRid());byte[] payload = BinaryUtils.toBytes(exchangeDeleteArguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());return basicReturns.getOk();}
创建队列
/*** @description: 创建队列**/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setRid(generateRid());byte[] payload = BinaryUtils.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.getOk();}
删除队列
/*** @description: 删除队列**/public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload = BinaryUtils.toBytes(queueDeleteArguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());return basicReturns.getOk();}
创建绑定
/*** @description: 创建绑定**/public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments queueBindArguments = new QueueBindArguments();queueBindArguments.setBindingKey(bindingKey);queueBindArguments.setQueueName(queueName);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(queueBindArguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueBindArguments.getRid());return basicReturns.getOk();}
删除绑定
/*** @description: 删除绑定**/public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();queueUnBindArguments.setExchangeName(exchangeName);queueUnBindArguments.setQueueName(queueName);queueUnBindArguments.setRid(generateRid());queueUnBindArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(queueUnBindArguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());return basicReturns.getOk();}
发送消息
/*** @description: 发送消息**/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {BasicPublishArguments basicPublishArguments = new BasicPublishArguments();basicPublishArguments.setBasicProperties(basicProperties);basicPublishArguments.setBody(body);basicPublishArguments.setExchangeName(exchangeName);basicPublishArguments.setRoutingKey(routingKey);basicPublishArguments.setRid(generateRid());basicPublishArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(basicPublishArguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());return basicReturns.getOk();}
订阅消息
/*** @description: 订阅消息**/public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {// 先设置回调, 一个channel 只能设置一个回调方法if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来标识basicConsumeArguments.setAutoAck(autoAck);basicConsumeArguments.setQueueName(queueName);byte[] payload = BinaryUtils.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.getOk();}
确认消息
/*** @description: 确认消息**/public boolean basicAck(String queueName,String messageId) throws IOException {BasicAckArguments basicAckArguments = new BasicAckArguments();basicAckArguments.setMessageId(messageId);basicAckArguments.setQueueName(queueName);basicAckArguments.setRid(generateRid());basicAckArguments.setChannelId(channelId);byte[] payload = BinaryUtils.toBytes(basicAckArguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicAckArguments.getRid());return basicReturns.getOk();}
⼩结
上述发送请求的操作, 逻辑基本⼀致. 构造参数 + 构造请求 + 发送 + 等待结果
处理响应
- 创建扫描线程
创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据
注意: ⼀个 Connection 中可能包含多个 channel, 需要把响应分别放到对应的 channel 中.
public Connection(String host, Integer port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据, 把这个响应数据再交给对应的 Channel 负责处理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {// System.out.println("[Connection] 连接异常断开!");log.error("连接异常断开! e: {}", e);e.printStackTrace();}});t.start();}
- 实现响应的分发
给 Connection 创建 dispatchResponse ⽅法
- 针对服务器返回的控制响应和消息响应, 分别处理.
- 如果是订阅数据, 则调⽤ channel 中的回调.
- 如果是控制消息, 直接放到结果集合中.
/*** @description: 使用这个方法来分别处理, 当前的响应是针对控制请求的响应, 还是服务器推送的消息**/private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryUtils.fromBytes(response.getPayload());// 根据 ChannelId 找到对应的 Channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("该消息对应的 Channel 在客户端中不存在, channelId: " + subScribeReturns.getChannelId());}// 执行该 channel 对象内部的回调callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前相应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryUtils.fromBytes(response.getPayload());// 根据 ChannelId 找到对应的 Channel 对象Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("该消息对应的 Channel 在客户端中不存在, channelId: " + basicReturns.getChannelId());}channel.putReturns(basicReturns);}}
- 实现 channel.putReturns
/*** @description: 存入 basicReturns**/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应// 把所有的等待的线程都唤醒notifyAll();}}
关闭 Connection
public void close() {// 关闭 Connection, 释放相关资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (Exception e) {log.error("关闭资源出现异常");e.printStackTrace();}}
测试代码
package en.edu.zxj.mq.mqclient;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.BrokerServer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;/*** Created with IntelliJ IDEA.* Description:** @author: zxj* @date: 2024-03-07* @time: 10:55:32*/
@SpringBootTest
class MqClientTest {private ConnectionFactory factory = null;private Thread t = null;private BrokerServer brokerServer = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器MqApplication.context = SpringApplication.run(MqApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {brokerServer.start();});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException, InterruptedException {// 停止服务器brokerServer.stop();MqApplication.context.close();t.join();// 删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}@Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue","testExchangeName","testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue","testExchangeName");Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}@Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchangeName","testQueue",null,requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody,body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);ok = channel.exchangeDelete("testExchangeName");Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}}
项目结果
演示
首先启动 BrokerServer 类
@SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context = null;public static void main(String[] args) throws IOException {context = SpringApplication.run(MqApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}
接着分别启动消费者和生产者客户端, 不分先后启动顺序
此时消费者就会收到消息并进行处理