模拟实现消息队列项目(系列3) -- 服务器模块(硬盘管理)

目录

前言

1. 创建项目

2. 创建核心类

2.1 Exchange

2.2 MSQueue

2.3 Binding

2.4 Message

3. 数据库设计

3.1 SQLite 配置

3.2 Mapper层代码实现

3.2.1 创建表操作

3.2.2 交换机 队列 绑定的增加和删除

3.3 实现DataBaseManager

3.4 DataBaseManager单元测试

4. 消息存储设计

4.1 创建MessageFileManager类

4.2 MessageFileManager单元测试

5. 整合数据库和文件操作(DiskDataCenter)

结语


前言

        我们上一节,对我们的项目的需求和模块的划分进行了总结,接下来我们进入代码环节,这里还是在强调一遍,一个项目的最好的开始就是对项目的需求分析以及模块的划分规划好,有了整天的架构,我们再写代码对功能进行一一实现.这个环节是必须要有的.接下来,本章节是对于服务器模块中硬盘管理进行总结,主要是数据库管理和文件管理.最后会将整个项目的Gitee链接放在文章末尾,欢迎访问.


1. 创建项目

        本项目是基于Spring boot框架的,这里如何创建Spring Boot 项目这里就不进行过多的赘述了,可以从之前的博客进行学习.

Spring Boot的创建与使用icon-default.png?t=N6B9https://blog.csdn.net/weixin_46114074/article/details/131652160

2. 创建核心类

2.1 Exchange

在mqServer中创建core文件夹新建Exchange.class 和 ExchangeType.class(使用枚举表示交换机的类型)

package com.example.demo.mqserver.core;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;import java.util.HashMap;
import java.util.Map;@Data
public class Exchange {// 1.交换机的名字,为身份标识,也就是唯一的private String name;// 2.交换机的类型// direct fanout topicprivate ExchangeType type = ExchangeType.DIRECT;// 3.交换机是否持久化储存private boolean durable = false;// 4.如果当前交换机次没人使用了(生产者),就会自动被删除(没有实现此功能)private boolean autoDelete = false;// 5.创建交换机的时候指定一些额外的选项(没有实现此功能)// 为了进行存储数据库,我们将map进行转换成Json格式的字符串在数据库中进行储存private Map<String, Object> arguments = new HashMap<>();/*** 这一组getter 和 setter 方法 用来和数据库进行交互的时候的使用* @return*/public String getArguments() {// 将arguments进行转换成Json格式ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";}public void setArguments(String argumentsJson) {ObjectMapper objectMapper = new ObjectMapper();// 将从数据库获取的argumentsJson转换成maptry {
//                                  转换的数据       转换成的类型,如果是简单对象就直接使用类对象即可,
//                                                 要是复杂的数据类型,就使用TypeReference匿名内部类,传入目标转换的类型this.arguments = objectMapper.readValue(argumentsJson,new TypeReference<HashMap<String,Object>>(){});} catch (JsonProcessingException e) {e.printStackTrace();}}// ---------------------------------------------------------------------------------------/*** 这一组getter 和 setter 方法供内部进行使用,更加简单的获取和设置键值对* @param key* @return*/public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key,value);}// ---------------------------------------------------------------------------------------public void setArguments(Map<String,Object> arguments){this.arguments = arguments;}
}

2.2 MSQueue

 在mqServer中创建core文件夹新建MSQueue.class

package com.example.demo.mqserver.core;import com.example.demo.common.ConsumerEnv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;/*** Created with IntelliJ IDEA.* Description:队列(存储消息的队列)* User: YAO* Date: 2023-07-26* Time: 16:43*/
@Data
public class MSQueue {// 1.队列的身份标识private String name;// 2.队列是否持久化private boolean durable = false;// 3.为true表示只可以被一个消费者使用,false表示大家都可以进行使用(没有实现此功能)private boolean exclusive = false;// 4.如果当前交换机次没人(消费者)使用了,就会自动被删除(没有实现此功能)private boolean autoDelete = false;// 5.创建交换机的时候指定一些额外的选项(没有实现此功能)private Map<String, Object> arguments = new HashMap<>();
}

2.3 Binding

 在mqServer中创建core文件夹新建Binding.class

package com.example.demo.mqserver.core;import lombok.Data;/*** Created with IntelliJ IDEA.* Description:绑定(交换机和队列之间的关联关系)* User: YAO* Date: 2023-07-26* Time: 16:43*/
@Data
public class Binding {// 1. 交换机的名字private String exchangeName;// 2. 队列的名字private String queueName;// 3. bindingKey(与routingKey进行匹配)private String bindingKey;// 4.binding这个定西依附于Exchange 和 queue 单独设计持久化就没有什么意义了
}

 2.4 Message

  在mqServer中创建core文件夹新建Message.class 和 BasicProperties.class(消息的属性)

package com.example.demo.mqserver.core;import lombok.Data;import java.io.Serializable;
import java.util.UUID;/*** Created with IntelliJ IDEA.* Description:消息(表示为要传递的消息)* 一, 主要包含两个部分: 属性 和 正文*      1. 属性部分 (BasicProperties)*      2. 正文属性 body** 二, 此处的message的对象要满足在网络中进行传输,并且进行写入文件中,所以我们要进行序列化和反序列化*      此处使用标准库自带的方法进行序列化,不使用Json,因为Json存储的是文本格式的数据,而我们的消息的body是二进制数据* User: YAO* Date: 2023-07-26* Time: 16:44*/
@Data
public class Message implements Serializable {// 版本号: 当程序员进行修改当前类的时候需要将当前版号进行变更private static final long serialVersionUID = 1L;// 1. 消息的属性(核心部分)private BasicProperties basicProperties = new BasicProperties();// 2. 消息的正文 (支持二进制数据)(核心属性)private byte[] body;// 辅助属性// 3. 一个文件中会存储很多消息,如何找到某个消息,在文件中的具体内容呢?// 使用两个偏移量进行表示// [offset,offend) 左闭右开// 不需要被序列化保存文件中(防止进行序列化)private transient long offsetBegin = 0;   // 消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd = 0;   // 消息数据的结尾距离文件开头的位置偏移(字节)// 4. 表示这条消息是否是有效信息(针对文件的删除使用逻辑删除)// 0x1 表示有效 0x0 表示无效private byte isValid = 0x1;// 5. 创建工厂方法,帮助我们进行封装创建message的过程public static Message createMessageWithId(String routingKey ,BasicProperties basicProperties,byte[] body){// 此方法会自动生成一个带有唯一的messageId的message对象Message message = new Message();if (basicProperties != null){message.setBasicProperties(basicProperties);}message.setMessageID("M-" + UUID.randomUUID());message.basicProperties.setRoutingKey(routingKey);message.setBody(body);// 此处将消息的主要的两个属性进行设置了,剩下的辅助属性在持久化之前进行设置return message;}public String getMessageID(){return basicProperties.getMessageId();}public void setMessageID(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int mode){basicProperties.setDeliverMode(mode);}}

3. 数据库设计

        我们之前划分的模块是将交换机 队列 绑定这三个的信息存储在硬盘中,由于对于操作这三者不是很频繁,所以我们将其存储在数据库中,但是对于消息的存储,我们不能存储在数据库中,是因为我们频繁的进行操作消息,所以我们将其存储在硬盘的文件中.

3.1 SQLite 配置

此处我们使用的数据库是SQLite,是⼀个更轻量的数据库.我们可以在Maven的中央仓库进行导入该依赖,直接进行使用.

1. 引入环境依赖

<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version>
</dependency>

2. 配置Spring Boot配置文件application.yaml文件

spring:datasource:
#   当前项目的路径下url: jdbc:sqlite:./data/meta.db
#   sqlite不需要设置进行设置用户名以及密码,这点不同于MySQL
#   sqlite不是客户端节后的程序,就只有自己一个人进行访问,所以不需要进行设置用户名以及密码username:password:driver-class-name: org.sqlite.JDBC
# 设置 Mybatis 的 xml 保存路径
mybatis:mapper-locations: classpath:mapper/**Mapper.xml
#  configuration: # 配置打印 MyBatis 执行的 SQL
#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 配置打印 MyBatis 执行的 SQL
#logging:
#  level:
#    com:
#      example:
#        demo: debug

3.2 Mapper层代码实现

3.2.1 创建表操作

@Mapper
public interface MetaMapper {// 1. 提供三个核心的建表方法void createExchangeTable();void createQueueTable();void createBindingTable();
}

对应MyBatis的Sql创建

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.mqserver.mapper.MetaMapper"><!--    1. 消息队列服务器的表创建--><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024))</update><update id="createQueueTable">create table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024))</update><update id="createBindingTable">create table if not exists binding (exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256))</update>
</mapper>

3.2.2 交换机 队列 绑定的增加和删除

@Mapper
public interface MetaMapper {List<Exchange> selectAllExchange();List<MSQueue> selectAllMSQueue();List<Binding> selectAllBinding();void insertExchange(Exchange exchange);void deleteExchange(String exchangeName);void insertQueue(MSQueue queue);void deleteQueue(String queueName);void insertBinding(Binding binding);void deleteBinding(Binding binding);
}

对应MyBatis的Sql创建 

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.mqserver.mapper.MetaMapper"><!--    查找数据操作--><select id="selectAllExchange" resultType="com.example.demo.mqserver.core.Exchange">select * from exchange</select><select id="selectAllMSQueue" resultType="com.example.demo.mqserver.core.MSQueue">select * from queue</select><select id="selectAllBinding" resultType="com.example.demo.mqserver.core.Binding">select * from binding</select><!--   插入数据操作--><insert id="insertExchange" >insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});</insert><insert id="insertQueue">insert into queue values(#{name},#{durable},#{exclusive},#{autoDelete},#{arguments})</insert><insert id="insertBinding">insert into binding values(#{exchangeName},#{queueName},#{bindingKey})</insert><!--    删除数据操作--><delete id="deleteExchange">delete from exchange where name=#{exchangeName}</delete><delete id="deleteQueue">delete from queue where name=#{queueName}</delete><delete id="deleteBinding">delete from binding where exchangeName=#{exchangeName} and queueName=#{queueName}</delete>
</mapper>

3.3 实现DataBaseManager

路径 : mqserver.datacenter.DataBaseManager  对数据库的操作进行封装.

1. 初始化数据库的文件

2. 进行数据库建表操作

3. 插入默认的一条交换机数据

4. 提供删除数据库文件的操作,主要用于单元测试.

5. 封装其他数据库的操作(获取,插入,删除)

package com.example.demo.mqserver.datacenter;import com.example.demo.DemoApplication;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.ExchangeType;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.mapper.MetaMapper;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.File;
import java.util.List;/*** Created with IntelliJ IDEA.* Description:操作数据库的类* User: YAO* Date: 2023-07-27* Time: 11:36*/
public class DataBaseManager {// 从Spring中获取Bean对象(手动获取)private MetaMapper metaMapper;/*** 数据库初始化*/public void init(){// 手动获取MetaMapper对象metaMapper = DemoApplication.context.getBean(MetaMapper.class);// 1.建库建表if (!checkBDExists()){// 创建data目录File dataDir = new File("./data");dataDir.mkdirs();createTable();// 2.插入默认的数据createDefaultData();System.out.println("[DataBaseManager]: 数据库初始化完成");}else {System.out.println("[DataBaseManager]: 数据库已经存在");}}/*** 删除数据库*/public void deleteDB(){File file = new File("./data/meta.db");boolean ret = file.delete();File dataDir = new File("./data");ret = dataDir.delete();if (ret){System.out.println("[DataBaseManager]: 数据库已经删除");}else {System.out.println("[DataBaseManager]: 数据库删除失败");}}/*** 判断数据库是否存在* @return*/private boolean checkBDExists() {File file = new File("./data/meta.db");if (file.exists()){return true;}return false;}/*** 建表,建库操作不需要自己手动执行* 首次执行到这了的数据操作的时候,就会创建出meta.db文件*/private void createTable(){metaMapper.createBindingTable();metaMapper.createQueueTable();metaMapper.createExchangeTable();System.out.println("[DataBaseManager]: 创建完成");}/*** 给数据库表中添加默认的数据* 添加一个默认的交换机* RabbitMq中有这样一个设定,带有一个匿名的交换机,类型为direct*/private void createDefaultData() {// 构造一个默认的交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println("[DataBaseManager]: 创建初始数据完成");}// 封装其他数据库操作public void insertExchange(Exchange exchange){metaMapper.insertExchange(exchange);};public void deleteExchange(String exchangeName){metaMapper.deleteExchange(exchangeName);};public void insertQueue(MSQueue queue){metaMapper.insertQueue(queue);};public void deleteQueue(String queueName){metaMapper.deleteQueue(queueName);};public void insertBinding(Binding binding){metaMapper.insertBinding(binding);};public void deleteBinding(Binding binding){metaMapper.deleteBinding(binding);};List<Exchange> selectAllExchange(){return metaMapper.selectAllExchange();};List<MSQueue> selectAllMSQueue(){return metaMapper.selectAllMSQueue();};List<Binding> selectAllBinding(){return metaMapper.selectAllBinding();};
}

3.4 DataBaseManager单元测试

        我们使用Spring自带的单元测试,具体怎么生成单元测试,之前的文章也进行讲解过,就不再进行单独的讲解,其实很简单,只不过别忘了运行环境的注解@SpringBootTest.

package com.example.demo.mqserver.datacenter;import com.example.demo.DemoApplication;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.ExchangeType;
import com.example.demo.mqserver.core.MSQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;import java.io.File;
import java.util.List;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:测试用例(操作数据库)* User: YAO* Date: 2023-07-27* Time: 14:17*/
class DataBaseManagerTest {private DataBaseManager dataBaseManager = new DataBaseManager();/*** 执行测试前的准备工作*/@BeforeEachvoid setUp() {// 由于我们在init中,需要通过context中拿到MetaMapper实例DemoApplication.context = SpringApplication.run(DemoApplication.class);dataBaseManager.init(); // 初始化了MetaMapper}/*** 执行测试完成的后续工作*/@AfterEachvoid tearDown() {// 数据库给清空// 不能直接就进行删除操作 ===>> 先释放Context对象// 先将Context对象进行关闭,因为Context对象持有了MetaMapper的实例,MetaMapper实例又打开了meta.db数据库文件// meta.db被别人打开了,此时进行删除是执行失败的,在Windows是这样的在Linux中可以直接删除// 另一个方面,获取Context操作,会占用8080端口,如果不进行释放,那么下一个单元测试再重新获取Context对象时是获取不到的DemoApplication.context.close();dataBaseManager.deleteDB();}@Testvoid init() {// 在setUp中调用过,我们只需要检查数据库状态即可// 查交换机表 --> 1条数据// 查队列表 --> 0条数据// 查绑定表 --> 0条数据List<Exchange> exchangeList = dataBaseManager.selectAllExchange();List<MSQueue> msQueueList = dataBaseManager.selectAllMSQueue();List<Binding> bindingList = dataBaseManager.selectAllBinding();// 我们使用断言进行查看结果Assertions.assertEquals(1,exchangeList.size());Assertions.assertEquals("",exchangeList.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT,exchangeList.get(0).getType());Assertions.assertEquals(0,msQueueList.size());Assertions.assertEquals(0,msQueueList.size());}@Testvoid deleteDB() {}private Exchange createTestExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setAutoDelete(false);exchange.setDurable(true);exchange.setArguments("aa",1);exchange.setArguments("bb",2);return exchange;}@Testvoid insertExchange() {// 构造一个Exchange对象Exchange exchange = createTestExchange("testExchange");// 插入数据库dataBaseManager.insertExchange(exchange);// 查询结果List<Exchange> exchangeList = dataBaseManager.selectAllExchange();// 对查询结果进行验证Assertions.assertEquals(2,exchangeList.size());Assertions.assertEquals("testExchange",exchangeList.get(1).getName());Assertions.assertEquals(ExchangeType.FANOUT,exchangeList.get(1).getType());assertFalse(exchangeList.get(1).isAutoDelete());assertTrue(exchangeList.get(1).isDurable());Assertions.assertEquals(1,exchangeList.get(1).getArguments("aa"));Assertions.assertEquals(2,exchangeList.get(1).getArguments("bb"));}@Testvoid deleteExchange() {// 先构造叫交换机Exchange exchange = createTestExchange("TestExchange");dataBaseManager.insertExchange(exchange);List<Exchange> exchangeList = dataBaseManager.selectAllExchange();// 对查询结果进行验证Assertions.assertEquals(2,exchangeList.size());Assertions.assertEquals("TestExchange",exchangeList.get(1).getName());dataBaseManager.deleteExchange("TestExchange");List<Exchange> exchangeList2= dataBaseManager.selectAllExchange();Assertions.assertEquals(1,exchangeList2.size());Assertions.assertEquals("",exchangeList2.get(0).getName());}private MSQueue createTestMSQueue(String queueName){MSQueue queue = new MSQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);queue.setArguments("aaa", 1);queue.setArguments("bbb", 2);return queue;}@Testvoid insertQueue() {MSQueue queue = createTestMSQueue("testQueue");dataBaseManager.insertQueue(queue);List<MSQueue> queueList = dataBaseManager.selectAllMSQueue();Assertions.assertEquals(1, queueList.size());MSQueue newQueue = queueList.get(0);Assertions.assertEquals("testQueue", newQueue.getName());assertTrue(newQueue.isDurable());assertFalse(newQueue.isAutoDelete());assertFalse(newQueue.isExclusive());Assertions.assertEquals(1, newQueue.getArguments("aaa"));Assertions.assertEquals(2, newQueue.getArguments("bbb"));}@Testvoid deleteQueue() {MSQueue queue = createTestMSQueue("testQueue");dataBaseManager.insertQueue(queue);List<MSQueue> queueList = dataBaseManager.selectAllMSQueue();Assertions.assertEquals(1, queueList.size());// 进行删除dataBaseManager.deleteQueue("testQueue");queueList = dataBaseManager.selectAllMSQueue();Assertions.assertEquals(0, queueList.size());}private Binding createTestBinding(String exchangeName, String queueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey("testBindingKey");return binding;}@Testpublic void testInsertBinding() {Binding binding = createTestBinding("testExchange", "testQueue");dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBinding();Assertions.assertEquals(1, bindingList.size());Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());}@Testpublic void testDeleteBinding() {Binding binding = createTestBinding("testExchange", "testQueue");dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBinding();Assertions.assertEquals(1, bindingList.size());// 删除Binding toDeleteBinding = createTestBinding("testExchange", "testQueue");dataBaseManager.deleteBinding(toDeleteBinding);bindingList = dataBaseManager.selectAllBinding();Assertions.assertEquals(0, bindingList.size());}
}

以上就完成了数据库相关的操作

4. 消息存储设计

 我们给每个队列分配一个目录,目录的名字为data+队列名 ./data/testQueue

 

 4.1 创建MessageFileManager类

路径 :  mqserver.database.MessageFileManager 

实现方法:

  • 1. 获取指定队列对应的消息文件所在的文件夹
  • 2. 获取指定队列的对应消息的数据文件路径
  • 3. 获取指定队列的对应消息的统计文件路径
  • 4. 创建队列对应的文件和目录
  • 5. 删除指定队列对应的消息的目录和文件
  • 6. 对于队列对应消息的文件进行判断是否存在
  • 7. 往队列对应文件中,添加消息
  • 8. 删除消息 逻辑删除,对消息的的isValid进行设置成0
  • 9. 加载指定名称的队列对应的所有消息存放在一个链表中
  • 10.实现文件消息的垃圾回收
package com.example.demo.mqserver.datacenter;import com.example.demo.common.BinaryTool;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.core.Message;import java.io.*;
import java.util.LinkedList;
import java.util.Scanner;/*** Created with IntelliJ IDEA.* Description:对硬盘上的消息进行管理* User: YAO* Date: 2023-07-27* Time: 17:43*/
public class MessageFileManager {// 1. 约定消息文件所在的目录和文件名public void init(){// 暂时不需要进行初始化}/*** 1. 获取指定队列对应的消息文件所在的文件夹* @param queueName* @return*/private String getQueueDir(String queueName){return "./data/" + queueName;}/*** 2.获取指定队列的对应消息的数据文件路径* @param queueName* @return*/private String getQueueDataPath(String queueName){return getQueueDir(queueName) + "/queue_data.txt";}/*** 3.获取指定队列的对应消息的统计文件路径* @param queueName* @return*/private String getQueueStatPath(String queueName){return getQueueDir(queueName) + "/queue_stat.txt";}/*** 4. 定义一个静态内部类进行描述,消息的统计信息文件的属性** 对于简单的类,就直接设置为public的成员变量*/static public class Stat{// 1. 定义总消息的数量public int totalCount;// 2. 定义有效消息的数量public int validCount;}/*** 4.1  读取统计文件的信息* @param queueName* @return*/private Stat readStat(String queueName){// 由于当前的统计信息的文件类型是文本文件,我们可以直接使用Scanner进行读取Stat stat = new Stat();try(InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}/*** 4.2 往统计文件写入信息* @param queueName* @param stat* @return*/private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}/*** 5. 创建队列对应的文件和目录*/public void createQueueFiles(String queueName) throws IOException {// 1. 先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()){// 不存在就进行创建文件路径boolean ok = baseDir.mkdirs();if(!ok){throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());}}// 2. 创建消息数据文件File queueDataDir = new File(getQueueDataPath(queueName));if (!queueDataDir.exists()){// 不存在就进行创建消息数据文件boolean ok = queueDataDir.createNewFile();if(!ok){throw new IOException("创建消息数据文件失败! baseDir=" + queueDataDir.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatDir = new File(getQueueStatPath(queueName));if (!queueStatDir.exists()){// 不存在就进行创建消息统计文件boolean ok = queueStatDir.createNewFile();if(!ok){throw new IOException("创建消息数据文件失败! baseDir=" + queueStatDir.getAbsolutePath());}}// 4. 给消息统计文件进行设置初始值  0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}/*** 6. 删除指定队列对应的消息的目录和文件* 当队列被删除的时候,对应的消息文件也要删除* @param queueName*/public void destroyQueueFiles(String queueName) throws IOException {// 1. 删除文件里面的内容File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();// 2. 删除目录File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3){throw new IOException("删除消息目录和文件失败, baseDir:"+baseDir.getAbsolutePath());}}/*** 7. 对于队列对应消息的文件进行判断是否存在**  后续生产者进行生产消息了,如果此时的消息设置的是持久化类型,就需要判断之前的存放消息的文件夹是否存在* @param queueName* @return*/public boolean checkFilesExits(String queueName){File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()){return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()){return false;}return true;}/*** 8. 往队列对应文件中,添加消息* @param queue   写入的队列* @param message 当前消息内容*/public void sendMessage(MSQueue queue, Message message) throws MqException, IOException {// 1. 先检查要写入队列对应的文件和目录是否存在if (!checkFilesExits(queue.getName())){// 自己定义异常进行抛出throw new MqException("[MessageFileManager] 队列对应文件不存在! queueName=" + queue.getName());}// 2. 将Message对象进行序列化,然后进行写入文件byte[] messageBinary = BinaryTool.toBytes(message);// 3. 获取当前二进制数据的长度,计算 Message的offsetBeg 和 offsetEnd// offsetBeg = 当前文件长度 + 4   (4个字节用来表示文件的长度)// offsetEnd = 当前文件长度 + 4 + 文件长度synchronized (queue){File queueDataFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBegin(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 3. 将二进制消息数据进行写入文件try(OutputStream outputStream = new FileOutputStream(queueDataFile,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){// 使用DataOutputStream写可以将一个int写入的时候是四个字节// 写入消息长度(四个字节)dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat = readStat(queue.getName());stat.validCount += 1;stat.totalCount += 1;writeStat(queue.getName(),stat);}}/*** 9. 删除消息  逻辑删除,对消息的的isValid进行设置成0*   1. 将文件的二级制数据进行读取,还原成Message对象*   2. 将该Message对象的isValid属性进行设置为0*   3. 将修改过的Message对象进行转换成二进制数据重新写入到文件中*   此处这个参数的Message必须包含有效的offsetBeg 和 offsetEnd* @param queue* @param message*/public void deleteMessage(MSQueue queue, Message message) throws IOException, ClassNotFoundException {// 1. 将文件的二级制数据进行读取,还原成Message对象// 读取的是随机读取,指定位置进行读取  RandomAccessFile// 对其进行加锁synchronized (queue){try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){// 1.1先从文件中读取对应的Message数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd()-message.getOffsetBegin())];// 1.2将光标指定到消息数据的开始位置randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.read(bufferSrc);Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 2. 将该Message对象的isValid属性进行设置为0x0diskMessage.setIsValid((byte) 0x0);// 3. 将修改过的Message对象进行转换成二进制数据重新写入到文件中byte[] bufferSet = BinaryTool.toBytes(diskMessage);// 记得移动光标到消息的开始位置,光标会随着读写操作会发生移动randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.write(bufferSet);// 以上操作对于文件来说就是一个字节进行了调整}// 4.最后进行更新统计文件Stat stat = readStat(queue.getName());if (stat.validCount > 0){stat.validCount -= 1;}writeStat(queue.getName(),stat);}}/*** 10. 加载指定名称的队列对应的所有消息存放在一个链表中*     调用时机: 当程序进行启动的时候进行调用()*     使用LinkedList: 后续为了进行头删除操作* @param queueName 队列名字* @return 指定队列的所有的消息*/public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {// 1. 创建消息链表LinkedList<Message> messages = new LinkedList<>();// 2. 读取消息文件(顺序读取)try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){try(DataInputStream dataInputStream = new DataInputStream(inputStream)){long currentOffset = 0;while (true){// 1. 读取文件长度int messageSize = dataInputStream.readInt();// 读到文件末尾,就会抛出异常// 2. 按照消息长度进行读消息byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize){// 如果不匹配,说明文件的问题,格式错乱了throw new MqException("[MessageFileManager] 文件格式有误! queueName=" + queueName);}// 3. 把读取二级制消息数据进行反序列化,添加到链表中Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判断消息对象是否是无效消息if (message.getIsValid() != 0x1){// 无效数据直接跳过,但是光标要进行更新currentOffset += (4 + messageSize);continue;}// 5. 有效数据,则需要把这个Message对象添加到链表,加入之前还需要填写offsetBeg 和 offsetEnd//  进行计算offset的时候,需要记录当前文件光标的位置message.setOffsetBegin(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);// 更新光标的位置currentOffset += (4 + messageSize);messages.add(message);}}catch (EOFException e){// 这个catch 并不是处理异常,而是正常的业务逻辑,表示文章读到了末尾,会被ReadeInt抛出异常System.out.println("[MessageFileManager] 恢复Message数据完成");}}return messages;}/*** 11. 实现文件消息的垃圾回收*  实现思路: 使用垃圾回收的复制算法*  具体: 判断当文件中消息总数为2000的时候,并且有效消息不足50%的时候,触发垃圾回收*       此时就把当前文件的有效数据进行提取,单独的写入到新的文件中,删除旧文件,使用新文件进行代替*//*** 11.1 检测当前队列的消息文件是否要进行触发垃圾回收* @param queueName* @return*/public boolean checkGC(String queueName) throws MqException {if (!checkFilesExits(queueName)){throw new MqException("[MessageFileManager] 队列对应文件不存在! queueName=" + queueName);}// 判断总消息数量和有效数量Stat stat = readStat(queueName);if (stat.totalCount > 200 && (double)stat.validCount / (double)stat.totalCount < 0.5){return true;}return false;}/*** 11.2 创建存放有效数据的新的文件夹* @param queueName* @return*/private String getQueueDataNewPath(String queueName){return getQueueDir(queueName) + "/queue_data_new.txt";}/*** 11.3    进行垃圾回收  (复制算法)*   1. 创建新的文件*   2. 把之前有效的消息进行读取,写入到新的文件中*   3. 删除原来的文件*   4. 将新的文件进行重命名操作(修改成原来的文件)*   5. 更新消息的统计文件的数据**   注意: 进行垃圾回收的时候是对数据的大改动,所以不允许别的线程进行对消息文件进行改动,所以要进行加锁的操作** @param queue*/public void gc(MSQueue queue) throws IOException, MqException, ClassNotFoundException {synchronized (queue){// gc操作可能比较耗时,我们记录一下消耗的时间long gcBeg = System.currentTimeMillis();// 1. 创建新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()){// 正常情况下是不存在的,如果存在就代表上次gc没有完成throw  new MqException("[MessageFileManager] gc时发现该队列的queue_data_new 已经存在 queueName:" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok){throw  new MqException("[MessageFileManager] 队列对应的消息的queue_data_new文件创建失败 queueName:" + queue.getName());}// 2. 把之前有效的消息进行读取,写入到新的文件中LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for (Message message : messages){byte[] buffer = BinaryTool.toBytes(message);// 先写四个字节的消息的长度dataOutputStream.writeInt(buffer.length);// 写入消息dataOutputStream.write(buffer);}}}// 3. 删除原来的文件File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete(); if (!ok){throw  new MqException("[MessageFileManager] 队列对应的消息的queue_data_old文件删除失败 queueName:" + queue.getName());}// 4. 将新的文件进行重命名操作(修改成原来的文件)ok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok){throw  new MqException("[MessageFileManager] 队列对应的消息的queue_data_new文件重命名失败 queueName:" + queue.getName());}// 5. 更新消息的统计文件的数据Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc执行完毕, queueName=" + queue.getName() + ",time" + (gcEnd-gcBeg) + "ms");}}
}

4.2 MessageFileManager单元测试

package com.example.demo.mqserver.datacenter;import com.example.demo.DemoApplication;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:测试硬盘中消息管理的方法* User: YAO* Date: 2023-07-28* Time: 14:06*/
@SpringBootTest
class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();public static final String queueName1 = "testQueue1";public static final String queueName2 = "testQueue2";@BeforeEachvoid setUp() throws IOException {messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}@AfterEachvoid tearDown() throws IOException {messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}@Testvoid createQueueFiles() {// 1.创建队列消息文件已经执行// 2.直接验证文件是否勋在File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");assertTrue(queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");assertTrue(queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");assertTrue(queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");assertTrue(queueStatFile2.isFile());}@Testvoid readAndWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 此处使用反射的机制进行调用writeStat 和 readStat// 此处使用Spring封装好的反射类ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(100,newStat.totalCount);Assertions.assertEquals(50,newStat.validCount);}private MSQueue createTestQueue(String queueName) {MSQueue queue = new MSQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {//  构造消息 , 构造队列Message message = createTestMessage("testMessage");// 对应的目录和文件啥的都存在才行.MSQueue queue = createTestQueue(queueName1);// 调用发送消息方法messageFileManager.sendMessage(queue, message);// 检查 stat 文件.MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 检查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);System.out.println(messages);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageID(), curMessage.getMessageID());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: " + curMessage);}@Testvoid loadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往队列中插入100条数据,验证100条消息,验证加载之后是否跟之前是一致的List<Message> expectMessages = new LinkedList<>();MSQueue queue = createTestQueue(queueName1);for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue,message);expectMessages.add(message);}// 从硬盘中读取LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectMessages.size(),actualMessages.size());for (int i = 0; i < expectMessages.size(); i++) {Message expectMessage = expectMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i +"]" + "actualMessages:" + actualMessage);Assertions.assertEquals(expectMessage.getMessageID(), actualMessage.getMessageID());Assertions.assertEquals(expectMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testvoid deleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.MSQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的三个消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 对比这里的内容是否正确.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testvoid gc() throws IOException, MqException, ClassNotFoundException {// 先往队列中写 100 个消息. 获取到文件大小.// 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)// 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.MSQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 获取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 删除偶数下标的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手动调用 gcmessageFileManager.gc(queue);// 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.// actual 中的 0 对应 expected 的 1// actual 中的 1 对应 expected 的 3// actual 中的 2 对应 expected 的 5// actual 中的 i 对应 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 获取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);// 验证gc之后的是stat文件的内容// 反射进行获取MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(50,newStat.validCount);Assertions.assertEquals(50,newStat.totalCount);}
}

单元测试全部通过: 

5. 整合数据库和文件操作(DiskDataCenter)

 管理所有硬盘上的数据(对数据库与和文件对的操作进行封装) 

1. 数据库: 交换机 绑定 队列 

2. 数据文件: 消息

上层逻辑要操作银盘,就进行调用通过这个类,(上层代码关心的是数据存储在数据库还是文件中)

package com.example.demo.mqserver.datacenter;import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.MSQueue;
import com.example.demo.mqserver.core.Message;
import lombok.Data;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;/*** Created with IntelliJ IDEA.* Description:管理所有硬盘上的数据(对数据库与和文件对的操作进行封装)*      1. 数据库: 交换机 绑定 队列*      2. 数据文件: 消息*  上层逻辑要操作银盘,就进行调用通过这个类,(上层代码关心的是数据存储在数据库还是文件中)* User: YAO* Date: 2023-07-28* Time: 16:25*/
@Data
public class DiskDataCenter {// 管理数据库中数据private DataBaseManager dataBaseManager = new DataBaseManager();// 管理文件中的数据private MessageFileManager messageFileManager = new MessageFileManager();public void init(){// 初始化dataBaseManager所有的条件dataBaseManager.init();// 当前messageFileManager没有任何初始化的内容messageFileManager.init();}// 封装交换机, 队列, 绑定的操作-----------------------------------------------------------public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public void insertQueue(MSQueue queue) throws IOException {// 创建队列的同时,不仅仅是把队列对象写到数据库,而且还需要创建出对应的目录和文件messageFileManager.createQueueFiles(queue.getName());dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName) throws IOException {// 删除队列的同时,不仅仅是把队列对象在数据库中进行删除,而且还要删除队列对应的目录和文件messageFileManager.destroyQueueFiles(queueName);dataBaseManager.deleteQueue(queueName);}public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Exchange> selectAllExchange(){return dataBaseManager.selectAllExchange();}public List<MSQueue> selectAllMSQueue(){return dataBaseManager.selectAllMSQueue();}public List<Binding> selectAllBinding(){return dataBaseManager.selectAllBinding();}// 封装消息的操作-----------------------------------------------------------public void sendMessage(MSQueue queue, Message message) throws MqException, IOException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);// 进行判断是否进行gcif (messageFileManager.checkGC(queue.getName())){// 进行gcmessageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}

以上我们就完成了相关硬盘数据的封装,之后上层代码在调用的时候就不必考虑数据是存储在哪个地方.


结语

本文主要是对硬盘存储的数据进行封装以及相关API的实现,为以后虚拟主机操作交换机 队列 绑定 消息提供服务.

完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/31513.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线性代数(三) 线性方程组向量空间

前言 如何利用行列式&#xff0c;矩阵求解线性方程组。 线性方程组的相关概念 用矩阵方程表示 齐次线性方程组&#xff1a;Ax0&#xff1b;非齐次线性方程组&#xff1a;Axb. 可以理解 齐次线性方程组 是特殊的 非齐次线性方程组 如何判断线性方程组的解 其中R(A)表示矩阵A的…

git的简单介绍和使用

git学习 1. 概念git和svn的区别和优势1.1 区别1.2 git优势 2. git的三个状态和三个阶段2.1 三个状态&#xff1a;2.2 三个阶段&#xff1a; 3. 常用的git命令3.1 下面是最常用的命令3.2 git命令操作流程图如下&#xff1a; 4. 分支内容学习4.1 项目远程仓库4.2 项目本地仓库4.3…

恒盛策略:沪指冲高回落跌0.26%,酿酒、汽车等板块走弱,燃气股拉升

10日早盘&#xff0c;两市股指盘中冲高回落&#xff0c;半日成交约4200亿元&#xff0c;北向资金净卖出超20亿元。 到午间收盘&#xff0c;沪指跌0.26%报3235.9点&#xff0c;深成指跌0.54%&#xff0c;创业板指跌0.28%&#xff1b;两市算计成交4202亿元&#xff0c;北向资金净…

【Terraform学习】保护敏感变量(Terraform配置语言学习)

实验步骤 创建 EC2 IAM 角色 导航到IAM 在左侧菜单中&#xff0c;单击角色 。单击创建角色该按钮以创建新的 IAM 角色。 在创建角色部分&#xff0c;为角色选择可信实体类型&#xff1a; AWS 服务 使用案例:EC2 单击下一步 添加权限&#xff1a;现在&#xff0c;您可以看到…

小白到运维工程师自学之路 第七十一集 (kubernetes网络设置)

一、概述 Master 节点NotReady 的原因就是因为没有使用任何的网络插件&#xff0c;此时Node 和Master的连接还不正常。目前最流行的Kubernetes 网络插件有Flannel、Calico、Canal、Weave 这里选择使用flannel。 二、安装flannel 1、master下载kube-flannel.yml&#xff0c;所…

Chrome有些网站打不开,但是火狐可以打开

Chrome有些网站打不开&#xff0c;但是火狐可以打开 问题描述火狐成功界面谷歌报错界面局域网设置使用代理服务器访问成功 解决方案参考 问题描述 开了一个tizi&#xff0c;Chrome不能使用&#xff0c;火狐可以。之前装过插件Ghelper白嫖科学上网&#xff0c;那次之后好像浏览…

领航优配:沪指震荡涨0.47%,保险、券商板块强势,互联金融概念活跃

4日早盘&#xff0c;两市股指高开高走&#xff0c;沪指一度涨逾1%打破3300点&#xff0c;随后涨幅有所收窄&#xff1b;两市半日成交超6000亿元&#xff0c;北向资金小幅净流入。 截至午间收盘&#xff0c;沪指涨0.47%报3295.91点&#xff0c;深成指涨0.67%&#xff0c;创业板指…

【JAVA基础】- 同步非阻塞模式NIO详解

【JAVA基础】- 同步非阻塞模式NIO详解 文章目录 【JAVA基础】- 同步非阻塞模式NIO详解一、概述二、常用概念三、NIO的实现原理四、NIO代码实现客户端实现服务端实现 五、同步非阻塞NIO总结 一、概述 NIO&#xff08;Non-Blocking IO&#xff09;是同步非阻塞方式来处理IO数据。…

数据API服务管理功能 - 提升数据效率的关键工具

数据API服务管理功能 - 提升数据效率的关键工具 什么是数据API服务管理功能&#xff1f; 数据API服务管理功能是一种用于有效管理和控制数据API的工具。它为用户提供了方便的界面和功能&#xff0c;以简化数据访问、解析和处理的过程。通过使用数据API服务管理功能&#xff0…

优思学院|质量第一的目的是什么?

国外有一句很著名的话&#xff1a;Quality comes first, profit is its logical sequence&#xff0c;意思是&#xff1a;质量第一&#xff0c;利润是其合理的结果&#xff0c;这句话也是很多公司或者商店使用的标语。 简而言之&#xff0c;只要你把质量放在第一位&#xff0c…

yolo-nas对自定义数据集进行训练,测试详解 香烟数据集

yolov5格式的香烟数据集 https://download.csdn.net/download/qq_42864343/88110620?spm1001.2014.3001.5503 创建yolo-nas的运行环境 进入Pycharm的terminal&#xff0c;输入如下命令 conda create -n yolonas python3.8pip install super-gradients使用自定义数据训练Yo…

苍穹外卖系统07

哈喽&#xff01;大家好&#xff0c;我是旷世奇才李先生 文章持续更新&#xff0c;可以微信搜索【小奇JAVA面试】第一时间阅读&#xff0c;回复【资料】更有我为大家准备的福利哟&#xff0c;回复【项目】获取我为大家准备的项目 最近打算把我手里之前做的项目分享给大家&#…

AWS——04篇(AWS之Amazon S3(云中可扩展存储)-02——EC2访问S3存储桶)

AWS——04篇&#xff08;AWS之Amazon S3&#xff08;云中可扩展存储&#xff09;-02——EC2访问S3存储桶&#xff09; 1. 前言2. 创建EC2实例 S3存储桶3. 创建IAM角色4. 修改EC2的IAM 角色5. 连接EC2查看效果5.1 连接EC25.2 简单测试5.2.1 查看桶内存储情况5.2.2 复制本地文件…

如何将苹果彻底删除视频找回?试试这3种方法

如今是短视频时代&#xff0c;大家通常会使用苹果手机来拍摄视频&#xff0c;以此记录生活中的美好日常。但是大家都知道视频是十分占空间的&#xff0c;这也经常会出现iPhone内存不足&#xff0c;磁盘崩溃的问题。 当遇到iPhone内存不足的情况时&#xff0c;大家往往会选择清…

uni-app之app上传pdf类型文件

通过阅读官方文档发现&#xff0c;uni.chooseFile在app端不支持非媒体文件上传&#xff1b; 可以使用这个插件&#xff0c;验证过可以上传pdf&#xff1b;具体使用可以去看文档 插件地址 就是还是会出现相机&#xff0c;这个可能需要自己解决下 实现功能&#xff1a;上传只能上…

刷新缓冲区(标准IO)

标准IO是带缓冲的&#xff0c;输入和输出函数属于行缓冲&#xff0c;stdin、stdin、printf、scanf 1.换行符刷新 2.缓冲区满刷新 3.fflush函数强制刷新 4.程序正常结束

在线Word怎么转换成PDF?Word无法转换成PDF文档原因分析

不同的文件格式使用方法是不一样的&#xff0c;而且也需要使用不同的工具才可以打开编辑内容&#xff0c;针对不同的场合用户们难免会用到各种各样的文件格式&#xff0c;要想在不修改内容的前提下提高工作效率&#xff0c;那就需要用到文件格式转换&#xff0c;那么在线Word怎…

交换机的堆叠技术

目录 一、堆叠的优势 1、提高可靠性 2、简化组网 3、简化管理 4、强大的网络拓展 二、堆叠的方式 1、堆叠卡堆叠 2、业务口堆叠 3、堆叠卡和业务卡堆叠的优缺点 三、堆叠的原理 1、角色 2、单机堆叠 3、堆叠ID 4、堆叠的优先级 5、堆叠的建立过程 1&#xff09…

Windows下安装Sqoop

Windows下安装Sqoop 一、Sqoop简介二、Sqoop安装2.1、Sqoop官网下载2.2、Sqoop网盘下载2.3、Sqoop安装&#xff08;以version&#xff1a;1.4.7为例&#xff09;2.3.1、解压安装包到 D:\bigdata\sqoop\1.4.7 目录2.3.2、新增环境变量 SQOOP_HOME2.3.3、环境变量 Path 添加 %SQO…

Nginx负载均衡(重点)

正向代理 部署正向代理 server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; proxy_pass http://20.0.0.60:80…