从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 核心类持久化存储

文章目录

  • 一、持久化存储的方式与路径
  • 二、公共模块
    • 序列化 / 反序列化
    • 异常规定
  • 三、持久化存储
    • 数据库数据管理
    • 文件数据管理
      • 读写规定
      • 新增 /删除规定
      • 内存中 Message 的规定
      • 存储规定
      • 代码编写
    • 硬盘数据管理


一、持久化存储的方式与路径

交换机,队列,绑定关系,这些我们使用数据库来管理,

Message 消息 并不会涉及到复杂的增删改查操作.且 消息 的数量可能会非常多,数据库的访问效率并不高

因此在Message持久化的存储,我们不存储在数据库中,我们直接存储到文件中.

  • 我们来规定一下数据存储的文件及路径

在这里插入图片描述
在这里插入图片描述


二、公共模块

序列化 / 反序列化

上面提到了,消息存储到文件中,那么存储到文件中,就要将 Message 对象 序列化成二进制数据,再存储到文件中,而读取消息时,就要将二进制数据反序列化成 Message 对象.
因此咱们先去写公共模块的 序列化方法与反序列化方法 .
创建一个 common 包,再创建一个 BinaryTool类 来写序列化 / 反序列化方法
在这里插入图片描述

public class BinaryTool {// 要用 Java 标准库提供的流对象完成 序列化/反序列化 操作,一定要继承 Serializable 接口,不然运行时,会抛出异常// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {// ByteArrayOutputStream 这个流对象是相当于一个中间过渡,解决无法确定数组长度的问题try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {// ObjectOutputStream 这个流对象是 Java 标准库提供的序列化的流对象,将过渡流对象绑定到序列化流对象中,最后直接转换成数组返回try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 将对象进行序列化后,存储到绑定的流对象中 即 过渡流对象objectOutputStream.writeObject(object);// 序列化完成后,存储到了绑定的流对象中,然后再通过绑定的流对象转换成数组返回}return byteArrayOutputStream.toByteArray();}}// 把一个字节数组反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;// ByteArrayInputStream 这个流对象直接绑定 data 数组,相当于直接从 data 数组中取数据try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {// ObjectInputStream 这个流对象是 Java 标准库提供的反序列化的流对象,绑定中间流对象,从中间流对象取数据try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// readObject方法,从中间流对象取数据,中间流对象从 data 数组取数据,=》 将 data 数组 反序列化成对象object = objectInputStream.readObject();}}return object;}
}

异常规定

对文件进行读写操作,可能会出现意料之外的情况,因此我们自定义一个异常,来针对咱们这个程序中出现意料之外的情况时.
在这里插入图片描述

public class MqException extends Exception{public MqException(String message) {super(message);}
}

三、持久化存储

数据库数据管理

MySQL数据库本身就比较重量,
此处为了使用方便,我们使用更轻量的数据库 SQLite,
在Java中使用SQLite,不需要额外安装,只需要引入依赖即可.(记得去maven找到与你jdk相匹配的版本)

在这里插入图片描述


SQLite,只是一个本地的数据库,这个数据库相当于直接操作本地硬盘文件,
因此需要在配置文件中配置好数据库文件的路径
使用 yaml格式的配置文件
在这里插入图片描述


创建一个 mapper包,通过里面的接口来操作数据库
在这里插入图片描述

/*** 使用这个类来操作数据库*/
@Mapper // 注入到容器中
public interface MetaMapper {// 提供三个核心建表方法,create 可以直接使用 update代替@Update("create table if not exists exchange (name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024))")void createExchangeTable();@Update("create table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024))")void createQueueTable();@Update("create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256))")void createBindingTable();// 针对上述三个表,进行 增加删除查找 操作@Insert("insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments})")void insertExchange(Exchange exchange);@Delete("delete from exchange where name = #{exchangeName}")void deleteExchange(String exchangeName);@Select("select * from exchange")List<Exchange> selectAllExchanges();@Insert("insert into queue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments})")void insertQueue(MSGQueue queue);@Delete("delete from queue where name = #{queueName}")void deleteQueue(String queueName);@Select("select * from queue")List<MSGQueue> selectAllQueues();@Insert("insert into binding values (#{exchangeName},#{queueName},#{bindingKey})")void insertBinding(Binding binding);@Delete("delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName}")void deleteBinding(Binding binding);@Select("select * from binding")List<Binding> selectAllBindings();
}

我们再创建一个 datacenter包,通过里面的类来 整合对硬盘与内存上数据的管理 ,
首先来整合数据库操作,
在这里插入图片描述

注意了!此时我并不想将 DataBaseManager 这个类的控制权交给容器,
因此我就不给这个类加 类注解,所以在这里也没办法用 @Autowired依赖注入得到这个类,
因此我们需要用 依赖查找 得到这个类.故而需要给项目启动类 增添 一个 静态属性:容器上下文 ,
在这里插入图片描述
我直接贴出代码,代码中有详细注释!

/*** 通过这个类,来整合数据库操作*/
public class DataBaseManager {private MetaMapper metaMapper;// 数据库的初始化方法public void init() {// 通过启动类中的 上下文对象,进行依赖查找给 metaMapper 赋值metaMapper = MessageQueueApplication.context.getBean(MetaMapper.class);if (!checkDBExists()) {// 如果数据库不存在,则建库建表,构造默认数据// 创建 data 目录File dataDir = new File("./data");dataDir.mkdirs();createTable();createDefaultData();System.out.println("[DataBaseManger]数据库初始化完成");} else {// 如果数据库存在(有表有数据),不做任何操作System.out.println("[DataBaseManger]数据库已存在");}}// 删除数据库文件public void deleteDB() {File file = new File("./data/meta.db");if (file.delete()) {System.out.println("[DataBaseManager] 删除数据库文件成功");} else {System.out.println("[DataBaseManager] 删除数据库文件失败");}// 删除数据库目录(一定要先删除文件,目录是空的才能删除成功)File dataDir = new File("./data");if (dataDir.delete()) {System.out.println("[DataBaseManager] 删除数据库目录成功");} else {System.out.println("[DataBaseManager] 删除数据库目录失败");}}// 判断数据库是否存在private boolean checkDBExists() {File file = new File("./data/meta/.db");return file.exists();}// 建表方法// 建库操作并不需要手动执行,(不需要手动创建 meta.db 文件)// 首次执行这里的数据库操作时,就会自动创建 meta.db 文件(MyBatis 完成的)private void createTable() {metaMapper.createBindingTable();metaMapper.createExchangeTable();metaMapper.createQueueTable();System.out.println("[DataBaseManger]创建表完成");}// 给数据库表中,添加默认数据// 此处主要是添加一个默认的交换机// RabbiMQ 里有一个这样的设定,带有一个 匿名 的交换机,类型是 DIRECTprivate void createDefaultData() {Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println("[DataBaseManger]插入初始数据完成");}// 将其他数据库操作,也在这个类中封装成方法public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public List<MSGQueue> selectAllQueues() {return metaMapper.selectAllQueues();}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}public List<Binding> selectAllBindings() {return metaMapper.selectAllBindings();}
}

文件数据管理

读写规定

在上面已经规定好了文件的存储路径,每个队列的消息都放在 ./data/队列名 下.

大家想,文件里所有的消息都是二进制数据,因此文件中都是二进制数据,没办法找到消息之间有效的界限.

故而我们规定,写入文件的 Message对象 格式如下
在这里插入图片描述

在每个 Message 对象 的二进制数据前,加一个 大小为 四个字节 的整数 来描述 Message二进制数据的长度.
先读取 四个字节 获得 Message 的 长度,再读取该长度个 字节.
这样就可以使每个 Message 的二进制数据有一个可读界限.

在写入的时候只需写入下面的这两个 属性,只有这三个属性,才是 Message 对象的 核心属性,其他的属性都是为管理内存中的 Message而设置的,不需要写入到文件中.
在这里插入图片描述

新增 /删除规定

对于 消息来说,是需要频繁的新增和删除的,
当生产者 生产新的消息,就需要新增,
当消费者 消费了一个消息,就需要删除这个消息.

对于新增来说,比较简单,可以直接写到文件末尾,

但是对于删除来说,就比较复杂了,因为要删除的消息 不一定就是开头或末尾的消息,
文件类似于一个顺序表的结构,如果删除中间的消息,就需要将后面的消息向前搬运,

因此我们使用 逻辑删除 ,即使用 isValid 这个属性来表示该消息 是否有效.
在这里插入图片描述


内存中 Message 的规定

上面说到了使用 逻辑删除 来让一个消息失效,那么在要如何在文件中找到这个要删除的消息呢?

所以我们设置了 下面这两个属性
在这里插入图片描述
根据这两个属性,去读取对应的字节.
在这里插入图片描述


存储规定

针对逻辑删除,
我们可以队列文件中,再创建两个文件,
queue_data.txt(消息信息文件) 用来存储所有消息的本体,
queue_stat.txt(消息统计文件) 用来存储一行数据,这一行数据只有两个数字,用 "/t"分隔
所有消息的数量,
有效消息的数量.
在这里插入图片描述


代码编写

咱们再创建一个类,来集中管理消息.
在这里插入图片描述

/*** 通过这个类,来针对硬盘上的消息管理*/
public class MessageFileManager {public void init() {// 暂时不需要做啥额外的初始化工作, 以备后续扩展}// 定义一个内部类,来表示该队列的统计信息// 优先考虑使用 static,静态内部类static public class Stat {// 此处直接定义成 public,就不再搞 get  set 方法了// 对于这样简单的类,就直接使用成员,类似于 C 的结构体了public int totalCount; // 总消息数量public int validCount; // 有效消息数量}// 约定消息文件所在的目录和文件名// 这个方法用来获取指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return "./data/" + queueName;}// 这个方法用来获取该队列的消息数据文件路径// 注意,二进制文件,使用 txt 作为后缀,不太合适,txt 一般表示文本(.bin或.dat 较为合适)private String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}// 读取该队列的消息统计文件private Stat readStat(String queueName) {Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}// 向该队列的消息统计文件中写入数据private void writeStat(String queueName,Stat stat) {// 使用 PrintWrite 来写文件// OutputStream 打开文件,默认情况下,会直接把原文件清空,此时相当于新的数据覆盖了旧的try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}// 创建队列对应的文件和目录public void createQueueFiles(String queueName) throws IOException {// 1.先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在则创建boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("创建队列对应的消息目录失败!baseDir=" + baseDir.getAbsolutePath());}}// 2.创建队列对应的 data 文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {// 不存在则创建boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("创建该队列的消息数据文件失败!queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3.创建队列对应的 stat 文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("创建该队列的消息统计文件失败!queueStatFile="+queueStatFile.getAbsolutePath());}}// 4.给消息统计文件设置初始值Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}// 删除队列的目录和文件// 队列可以被删除,当队列删除后,对应的消息文件等,自然也要随之删除public void destroyQueueFiles(String queueName) throws IOException {// 先删除文件再删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败,都算整体失败,throw new IOException("删除队列目录和文件失败!baseDir="+getQueueDir(queueName));}}// 检查文件的目录和文件是否存在// 后续有生产者给 broker server 生产消息了,这个消息就可能需要记录到文件上(取决是否要持久化)// 在记录之前就要先检查文件是否存在public boolean checkFilesExits(String queueName) {// 判断队列和消息文件是否都存在!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}// 将新消息写入到对应的队列的数据文件并更新统计文件的方法// queue 表示要对应的队列,message 表示要写入的消息public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1.检查一下当前要写入的队列对应的文件是否存在if (!checkFilesExits(queue.getName())) {throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName=" + queue.getName());}// 2.把要写入的 message 序列化byte[] messageBinary = BinaryTool.toBytes(message);// 写入消息时,可能会与其他线程对该队列的操作产生线程安全问题,因此针对这个 队列加锁// (如其他线程此时也向该队列写入消息,那么此时offsetBeg与offsetEnd就可能是不准确的,那么后续操作就也可能会出现BUG)synchronized (queue) {// 3.先获取当前队列的文件长度,计算 message 的 offsetBeg 和 offsetEnd// 把新的Message数据写入到队列数据文件的末尾,// 此时 offsetBeg = 文件长度 + 4,offsetEnd = 文件长度 + 4 + message长度File queueDataFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4.写入消息到数据文件,注意,是写入到文件末尾                       第二个属性,一定要写true,不然会直接覆盖掉之前的数据try (OutputStream outputStream = new FileOutputStream(queueDataFile,true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){// 先写入当前消息的长度,占据 4 个字节,writeInt()方法,写入的整形占四个字节dataOutputStream.writeInt(messageBinary.length);// 将数据本体写入到队列数据文件中outputStream.write(messageBinary);}}// 5.更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(),stat);}}// 删除消息的方法public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {// 此处的删除指的是逻辑上的删除,即将 isValid 由 0x1 改为0x0// 1.先把文件中对应的 [offsetBeg,offsetEnd) 的 消息读出来// 2.然后修改 isValid 为 0x0// 3.再将消息写回对应的 [offsetBeg,offsetEnd) 位置// 删除消息时,可能会与其他线程对该队列的操作产生线程安全问题,因此针对这个 队列加锁// (如在删除消息时,进行了gc操作,那么就很有可能导致,这个被逻辑删除的消息,覆盖其他消息,且也有可能会覆盖正确的统计文件)synchronized (queue) {// RandomAccessFile 这个流,可以自由移动光标(读取位置),                                 第二个参数 "rw"代表读写try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1.先从文件中读取 messagebyte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];// 移动光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2.把读取出来的数据 反序列化 成 MessageMessage diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3.把 isValid 设置成无效diskMessage.setIsValid((byte) 0x0);// 4.将修改完成后的 message 序列化成二进制数据byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 5.重新移动光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}// 更新 统计文件 stat,逻辑删除了一个消息,有效消息数量 validCount 需要 -1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动/重启时, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.public LinkedList<Message> loadAllMessageFromQueue (String queueName) throws IOException, ClassNotFoundException, MqException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){// DataInputStream 提供了 readInt()方法,会读取四个字节的然后转化为整数,普通的 read()方法只会读取一个字节就转化为整数了try (DataInputStream dataInputStream = new DataInputStream(inputStream)){// 记录光标位置// 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置因此就需要手动记录下文件光标.long currentOffset = 0;while (true) {// 1.读取每个消息前用来记录消息长度的 4 个字节大小的 int// readInt()方法,会读取 4 个字节的大小 的intint messageSize = dataInputStream.readInt();currentOffset += 4;// 2.按照 messageSize 读取对应的字节数byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错乱了!!throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);}// 3.将读取的二进制数据 反序列化成 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4.判断该数据是否有效if (message.getIsValid() == 0x1) {// 设置 message 的 offsetBeg 与 offsetEndmessage.setOffsetBeg(currentOffset);currentOffset += messageSize;message.setOffsetEnd(currentOffset);// 5.填加到链表中messages.add(message);} else {// 数据无效也要更新光标位置currentOffset += messageSize;}}} catch (EOFException e) {// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}// 检查当前是否需要针对该队列的消息数据文件进行 GC(无效消息回收)public boolean checkGC(String queueName) {Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)(stat.validCount / stat.totalCount) < 0.5) {return true;}return false;}// 获取 GC 复制算法的 新文件路径private String getQueueDataNewPath (String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.// 使用复制算法来完成.// 创建一个新的文件, 名字就是 queue_data_new.txt// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.// 删除旧的文件, 再把新的文件改名回 queue_data.txt// 同时要记得更新消息统计文件.public void gc(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {// 进行 gc 的时候,是针对消息数据文件进行大洗牌,在这个过程中,其他线程不能针对该队列的消息文件做任何修改synchronized (queue) {// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBeg = System.currentTimeMillis();// 1.创建新数据文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));boolean ok = queueDataNewFile.exists();if (ok) {throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName="+ queue.getName());}ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] gc 时创建新文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2.获取原数据文件中的有效消息,并写入到新数据文件中LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先写入记录 消息长度 的占 4 个字节的 intdataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 3.删除原数据文件File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 4.将新数据文件重命名ok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5.更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBeg) + "ms");}}
}

硬盘数据管理

咱们再创建一个类, 集中管理 数据库数据与文件数据 .
在这里插入图片描述

/*** 使用这个类来管理所有硬盘上的数据* 1. 数据库: 交换机,绑定,队列* 2. 数据文件: 消息* 上层逻辑如果需要操作硬盘,统一都通过这个类来使用,(上层代码不关心当前数据是存储在数据库还是文件中的)*/
public class DiskDataCenter {// 这个实例用来管理数据库private DataBaseManager dataBaseManager = new DataBaseManager();// 这个实践用来管理数据文件中的数据private MessageFileManager messageFileManager = new MessageFileManager();public void init() {// 针对上述两个实例进行初始化dataBaseManager.init();// 当前 messageFileManager.init() 方法,是空的,只是先放在这里,方便后续扩展messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges() {return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);// 创建队列的同时, 不仅仅是把队列对象写到数据库中, 还需要创建出对应的目录和文件messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);// 删除队列的同时, 不仅仅是把队列从数据库中删除, 还需要删除对应的目录和文件messageFileManager.destroyQueueFiles(queueName);}public List<MSGQueue> selectAllQueues() {return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings() {return dataBaseManager.selectAllBindings();}// 封装消息方法public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if (messageFileManager.checkGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/93540.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

四、浏览器渲染过程,DOM,CSSDOM,渲染,布局,绘制详细介绍

知识点&#xff1a; 1、为什么不能先执行 js文件&#xff1f;&#xff1f; 我们不能先执行JS文件&#xff0c;必须等到CSSOM构建完成了才能执行JS文件&#xff0c;因为前面已经说过渲染树是需要DOM和CSSOM构建完成了以后才能构建&#xff0c;而且JS是可以操控CSS样式的&#…

springboot的配置文件(properties和yml/yaml)

springboot的配置文件有两种格式分别是properties和yml/yaml 创建配置文件 在创建springboot项目时候&#xff0c;会默认生成application.properties这种格式 书写风格 端口 application.propertis server.port8080 application.yml server:port: 8080 连接数据库 applica…

<Xcode> Xcode IOS无开发者账号打包和分发

关于flutter我们前边聊到的初入门、数据解析、适配、安卓打包、ios端的开发和黑苹果环境部署&#xff0c;但是对于苹果的打包和分发&#xff0c;我只是给大家了一个链接&#xff0c;作为一个顶级好男人&#xff0c;我认为这样是对大家的不负责任&#xff0c;那么这篇就主要是针…

【计算机网络黑皮书】应用层

【事先声明】 这是对于中科大的计算机网络的网课的学习笔记&#xff0c;感谢郑烇老师的无偿分享 书籍是《计算机网络&#xff08;自顶向下方法 第6版&#xff09;》 需要的可以私信我&#xff0c;无偿分享&#xff0c;课程简介下也有 课程连接 目录 应用层网络应用的原理应用架…

作业 day4

完成父子进程通信

Socket通信

优质博文IT-BLOG-CN 一、简介 Socket套接字&#xff1a;描述了计算机的IP地址和端口&#xff0c;运行在计算机中的程序之间采用socket进行数据通信。通信的两端都有socket&#xff0c;它是一个通道&#xff0c;数据在两个socket之间进行传输。socket把复杂的TCP/IP协议族隐藏在…

Linux基本指令(二)

&#x1f493;博主个人主页:不是笨小孩&#x1f440; ⏩专栏分类:数据结构与算法&#x1f440; C&#x1f440; 刷题专栏&#x1f440; C语言&#x1f440; &#x1f69a;代码仓库:笨小孩的代码库&#x1f440; ⏩社区&#xff1a;不是笨小孩&#x1f440; &#x1f339;欢迎大…

[spring] spring core - 配置注入及其他内容补充

[spring] spring core - 配置注入及其他内容补充 上篇 [sping] spring core - 依赖注入 这里主要补一些 core 相关内容补充&#xff0c;同时添加了 java config bean 的方法 java config bean 是除了 XML、java 注解之外另一给实现 DI 的方法 java config bean 这个方法不…

Tomcat报404问题的原因分析

1.未配置环境变量 按照需求重新配置即可。 2.IIs访问权限问题 注意:这个问题有的博主也写了,但是这个问题可有可无,意思是正常情况下,有没有都是可以访问滴放心 3.端口占用问题 端口占用可能会出现这个问题,因为tomcat的默认端口号是8080,如果在是运行tomcat时计算机的…

司空见惯 - 奈尔宝的NTTP

联合国对21世纪人才定义的标准&#xff0c;包括六种核心技能&#xff0c;即批判性思维&#xff08;critical thinking)、人际交往&#xff08;communication)、与人合作&#xff08;collaboration)、创造性&#xff08;creativity)、信息素养&#xff08;information literacy)…

【匠心打造】从0打造uniapp 可视化拖拽设计 c_o 第十篇

一、click one for uniapp置顶&#xff1a; 全部免费开源 (你商业用途也没关系&#xff0c;不过可以告诉我公司名或者项目名&#xff0c;放在官网上好看点。哈哈-_-) 二、写在之前 距离上一篇更新已经大约4个月了&#xff0c;公司的事情&#xff0c;自己的一些琐事一直没时间…

Excel·VBA分列、字符串拆分

看到一篇博客《VBA&#xff0c;用VBA进行分列&#xff08;拆分列&#xff09;的2种方法》&#xff0c;使用VBA对字符串进行拆分 目录 Excel分列功能将字符串拆分为二维数组&#xff0c;Split函数举例 将字符串拆分为一维数组&#xff0c;正则表达式举例 Excel分列功能 Sub 测…

在线OJ项目核心思路

文章目录 在线OJ项目核心思路1. 项目介绍2.预备知识理解多进程编程为啥采用多进程而不使用多线程?标准输入&标准输出&标准错误 3.项目实现题目API实现相关实体类定义新增/修改题目获取题目列表 编译运行编译运行流程 4.统一功能处理 在线OJ项目核心思路 1. 项目介绍 …

决策树C4.5算法的技术深度剖析、实战解读

目录 一、简介决策树&#xff08;Decision Tree&#xff09;例子&#xff1a; 信息熵&#xff08;Information Entropy&#xff09;与信息增益&#xff08;Information Gain&#xff09;例子&#xff1a; 信息增益比&#xff08;Gain Ratio&#xff09;例子&#xff1a; 二、算…

跟着顶级科研报告IPCC学绘图:温度折线/柱图/条带/双y轴

复现IPCC气候变化过程图 引言 升温条带Warming stripes&#xff08;有时称为气候条带&#xff0c;目前尚无合适且统一的中文释义&#xff09;是数据可视化图形&#xff0c;使用一系列按时间顺序排列的彩色条纹来视觉化描绘长期温度趋势。 在IPCC报告中经常使用这一方案 IPCC是…

【PostgreSQL】【存储管理】表和元组的组织方式

外存管理负责处理数据库与外存介质(PostgreSQL8.4.1版本中只支持磁盘的管理操作)的交互过程。在PostgreSQL中&#xff0c;外存管理由SMGR(主要代码在smgr.c中)提供了对外存的统一接口。SMGR负责统管各种介质管理器&#xff0c;会根据上层的请求选择一个具体的介质管理器进行操作…

凉鞋的 Godot 笔记 105. 第一个通识:编辑-测试 循环

105. 第一个通识&#xff1a;编辑-测试 循环 在这一篇&#xff0c;我们简单聊聊此教程中所涉及的一个非常重要的概念&#xff1a;循环。 我们在做任何事情都离不开某种循环&#xff0c;比如每天的 24 小时循环&#xff0c;一日三餐循环&#xff0c;清醒-睡觉循环。 在学习一…

首发Orin N芯片,腾势追赶「智驾第一梯队」

张祥威 编辑 | 德新 英伟达最新一代芯片—— Orin N&#xff0c;腾势拿下 首发。 9月26日&#xff0c;腾势N7推出「高快智驾包」。官方描述中&#xff0c;这一选装将“基于新一代NIVIDIA DRIVE ORIN的 高性能平台”&#xff0c;可以实现高速NOA。 此前&#xff0c;腾势的…

从零手搓一个【消息队列】实现虚拟主机的核心功能

文章目录 一、虚拟主机设计二、实现虚拟主机1, 创建 VirtualHost 类2, VirtualHost() 构造方法3, exchangeDeclare() 创建交换机4, exchageDelete() 删除交换机5, queueDeclare() 创建队列6, queueDelete() 删除队列7, queueBind() 创建绑定8, queueUnBind() 删除绑定9, basicP…

vscode 注释插件koroFileHeader

https://blog.51cto.com/u_15785499/5664323 https://blog.csdn.net/weixin_67697081/article/details/129004675