Java项目之消息队列(手写java模拟实现mq)【三、MQ的核心类-消息类的存储(用文件存储消息)】✔ ★

Java项目之MQ

  • 七. 消息存储设计
    • 设计思路
      • 为什么要用文件存储
      • 文件存储结构
        • queue_data.txt ⽂件格式:
        • queue_stat.txt ⽂件格式:
    • 创建 MessageFileManager 类
      • 定义一个内部类, 来表示该队列的统计信息 Stat
      • 实现统计⽂件Stat读写(文本文件的读写)
        • InputStream——Scanner
        • OutputStream——PrintWriter
      • 实现创建队列⽬录
      • 实现删除队列⽬录
      • 检查队列⽂件是否存在(发送消息时用到)
    • 实现消息对象序列化/反序列化
        • byte数组转成对象 对象转成byte数组
        • ByteArrayOutputStream——ObjectOutputStream
      • 实现写⼊消息⽂件【信息写入数据文件】
        • 1. 检查消息文件是否存在
        • 2. 把Message对象转成二进制数据
        • 3. 队列上锁
        • 4. 获取数据文件,根据数据文件长度设置beg,end
        • 5. 写入文件
        • 6. 更新stat
      • 实现删除消息
        • 根据beg 找到消息
      • 实现消息加载
      • 实现垃圾回收(GC)
    • 测试 MessageFileManager

七. 消息存储设计

设计思路

为什么要用文件存储

消息需要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储.
原因如下:

  1. 对于消息的操作并不需要复杂的 增删改查 .
  2. 对于⽂件的操作效率⽐数据库会⾼很多

主流 MQ 的实现(包括 RabbitMQ), 都是把消息存储在⽂件中, ⽽不是数据库中.

文件存储结构

我们给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue
该⽬录中包含两个固定名字的⽂件
• queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
• queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.

在这里插入图片描述

queue_data.txt ⽂件格式:

使⽤⼆进制⽅式存储.
每个消息分成两个部分:
• 前四个字节, 表⽰ Message 对象的⻓度(字节数)
• 后⾯若⼲字节, 表⽰ Message 内容.
• 消息和消息之间⾸尾相连.
每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化.
在这里插入图片描述
Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置

queue_stat.txt ⽂件格式:

使⽤⽂本⽅式存储.
⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割.
第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬.
形如:

2000\t1500

创建 MessageFileManager 类

创建 mqserver.database.MessageFileManager

public class MessageFileManager {// 定义一个内部类, 来表示该队列的统计信息// 有限考虑使用 static, 静态内部类.static public class Stat {// 此处直接定义成 public, 就不再搞 get set 方法了.// 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.public int totalCount;  // 总消息数量public int validCount;  // 有效消息数量}public void init() {// 暂时不需要做啥额外的初始化工作, 以备后续扩展}// 预定消息文件所在的目录和文件名// 这个方法, 用来获取到指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return "./data/" + queueName;}// 这个方法用来获取该队列的消息数据文件路径// 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改.// .bin / .datprivate String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
}

• 内部包含⼀个 Stat 类, ⽤来表⽰消息统计⽂件的内容.
• getQueueDir, getQueueDataPath, getQueueStatPath ⽤来表⽰这⼏个⽂件所在位置

定义一个内部类, 来表示该队列的统计信息 Stat

    // 定义一个内部类, 来表示该队列的统计信息// 有限考虑使用 static, 静态内部类.static public class Stat {// 此处直接定义成 public, 就不再搞 get set 方法了.// 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.public int totalCount;  // 总消息数量public int validCount;  // 有效消息数量}

实现统计⽂件Stat读写(文本文件的读写)

InputStream——Scanner
OutputStream——PrintWriter
    private Stat readStat(String queueName) {// 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}

实现创建队列⽬录

每个队列都有⾃⼰的⽬录和配套的⽂件. 通过下列⽅法把⽬录和⽂件先准备好

// 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {// 1. 先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在, 就创建这个目录boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4. 给消息统计文件, 设定初始值. 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);
}

把上述约定的⽂件都创建出来, 并对消息统计⽂件进⾏初始化.

初始化 0\t0 这样的初始值.

实现删除队列⽬录

如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除.

    // 删除队列的目录和文件.// 队列也是可以被删除的. 当队列删除之后, 对应的消息文件啥的, 自然也要随之删除.public void destroyQueueFiles(String queueName) throws IOException {// 先删除里面的文件, 再删除目录.File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败, 都算整体删除失败.throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());}}

如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除

检查队列⽂件是否存在(发送消息时用到)

判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作

    // 检查队列的目录和文件是否存在.// 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)public boolean checkFilesExits(String queueName) {// 判定队列的数据文件和统计文件是否都存在!!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}

实现消息对象序列化/反序列化

byte数组转成对象 对象转成byte数组
ByteArrayOutputStream——ObjectOutputStream

Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此处针对这⾥的逻辑进⾏封装.
创建 common.BinaryTool

// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的.
// 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.
public class BinaryTool {// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组.// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到// ObjectOutputStream 中.// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了objectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一个字节数组, 反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.object = objectInputStream.readObject();}}return object;}
}

• 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这
两个流对象是纯内存的, 不需要进⾏ close).
• 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的
readObject / writeObject 即可完成对应操作.
• 此处涉及到的序列化对象, 需要实现 Serializable 接⼝. 这⼀点咱们的 Message 对象已经实现过了.
对于 serialVersionUID , 此处咱们暂时不需要. ⼤家可以⾃⾏了解 serialVersionUID 的⽤途

实现写⼊消息⽂件【信息写入数据文件】

1. 检查消息文件是否存在
2. 把Message对象转成二进制数据
3. 队列上锁
4. 获取数据文件,根据数据文件长度设置beg,end
5. 写入文件
6. 更新stat
    // 这个方法用来把一个新的消息, 放到队列对应的文件中.// queue 表示要把消息写入的队列. message 则是要写的消息.public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查一下当前要写入的队列对应的文件是否存在.if (!checkFilesExits(queue.getName())) {throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());}// 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4// offsetEnd 就是当前文件长度 + 4 + message 自身长度.File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下来要先写当前消息的长度, 占据 4 个字节的~~dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
  • 虑线程安全, 按照队列维度进⾏加锁.
  • 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
  • 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息. offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息⼤⼩的空间. (参考上⾯的图).
  • 写完消息, 要同时更新统计信息.

创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常

实践中创建多个异常类, 分别表⽰不同异常种类是更好的做法. 此处我们只是偷懒了

public class MqException extends Exception {public MqException(String reason) {super(reason);}
}

实现删除消息

根据beg 找到消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决

    // 这个是删除消息的方法.// 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0// 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;// 2. 把 isValid 改成 0;// 3. 把上述数据重新写回到文件.// 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1. 先从文件中读取对应的 Message 数据.byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 把当前读出来的二进制数据, 转换回成 Message 对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效.diskMessage.setIsValid((byte) 0x0);// 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象// 而这个对象马上也要被从内存中销毁了.// 4. 重新写入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到// 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);// 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~}// 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}

• 使⽤ RandomAccessFile 来随机访问到⽂件的内容.
• 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
• 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定⽂件指针的位置. ⽂件指针会随着上述的读操作产⽣改变.
• 最后, 要记得更新统计⽂件, 把合法消息 - 1.

实现消息加载

把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键

    // 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件光标.long currentOffset = 0;// 一个文件中包含了很多消息, 此处势必要循环读取.while (true) {// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)//    readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错乱了!!throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if (message.getIsValid() != 0x1) {// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.currentOffset += (4 + messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd//    进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置//    因此就需要手动计算下文件光标.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}

• 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消
息内容.
• 读取完毕之后, 转换成 Message 对象.
• 同时计算出该对象的 offsetBeg 和 offsetEnd.
• 最终把结果整理成链表, 返回出去.
• 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定
值. 因此需要注意上述循环的结束条件.

实现垃圾回收(GC)

上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越
多. 因此需要定期的进⾏批量清除.
此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC.
GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.

    // 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件光标.long currentOffset = 0;// 一个文件中包含了很多消息, 此处势必要循环读取.while (true) {// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)//    readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错乱了!!throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if (message.getIsValid() != 0x1) {// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.currentOffset += (4 + messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd//    进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置//    因此就需要手动计算下文件光标.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}// 检查当前是否要针对该队列的消息数据文件进行 GCpublic boolean checkGC(String queueName) {// 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值都是在 消息统计文件 中的.Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {return true;}return false;}private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.// 使用复制算法来完成.// 创建一个新的文件, 名字就是 queue_data_new.txt// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.// 删除旧的文件, 再把新的文件改名回 queue_data.txt// 同时要记得更新消息统计文件.public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.synchronized (queue) {// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBeg = System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 写入到新的文件中.try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先写四个字节消息的长度dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并且把新的文件进行重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBeg) + "ms");}}

如果⽂件很⼤, 消息⾮常多, 可能⽐较低效, 这种就需要把⽂件做拆分和合并了.
Rabbitmq 本体是这样实现的. 但是咱们此处为了实现简单, 就不做这个了.

测试 MessageFileManager

创建 MessageFileManagerTests 编写测试⽤例代码.
• 创建两个队列, ⽤来辅助测试.
• 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法.

package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;
import sun.awt.image.ImageWatched;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;@SpringBootTest
public class MessageFileManagerTests {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";// 这个方法是每个用例执行之前的准备工作@BeforeEachpublic void setUp() throws IOException {// 准备阶段, 创建出两个队列, 以备后用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 这个方法就是每个用例执行完毕之后的收尾工作@AfterEachpublic void tearDown() throws IOException {// 收尾阶段, 就把刚才的队列给干掉.messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}@Testpublic void testCreateFiles() {// 创建队列文件已经在上面 setUp 阶段执行过了. 此处主要是验证看看文件是否存在.File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testpublic void testReadWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 此处就需要使用反射的方式, 来调用 writeStat 和 readStat 了.// Java 原生的反射 API 其实非常难用~~// 此处使用 Spring 帮我们封装好的 反射 的工具类.ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);// 写入完毕之后, 再调用一下读取, 验证读取的结果和写入的数据是一致的.MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);System.out.println("测试 readStat 和 writeStat 完成!");}private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {// 构造出消息, 并且构造出队列.Message message = createTestMessage("testMessage");// 此处创建的 queue 对象的 name, 不能随便写, 只能用 queueName1 和 queueName2. 需要保证这个队列对象// 对应的目录和文件啥的都存在才行.MSGQueue queue = createTestQueue(queueName1);// 调用发送消息方法messageFileManager.sendMessage(queue, message);// 检查 stat 文件.MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 检查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: " + curMessage);}@Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往队列中插入 100 条消息, 然后验证看看这 100 条消息从文件中读取之后, 是否和最初是一致的.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 读取所有消息LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的三个消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 对比这里的内容是否正确.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {// 先往队列中写 100 个消息. 获取到文件大小.// 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)// 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 获取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 删除偶数下标的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手动调用 gcmessageFileManager.gc(queue);// 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.// actual 中的 0 对应 expected 的 1// actual 中的 1 对应 expected 的 3// actual 中的 2 对应 expected 的 5// actual 中的 i 对应 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 获取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/22713.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python爬虫入门教程(一)

上一篇文章讲了爬虫的工作原理&#xff0c;这篇文章以后就要重点开始讲编程序了。 简单爬虫的的两个步骤&#xff1a; 使用HTTPRequest工具模拟HTTP请求&#xff0c;接收到返回的文本。用于请求的包有: requests、urllib等。 对接收的文本进行筛选,获取想要的内容。用户筛选文…

JavaScript-内存分配,关键字const

内存空间 内存分为栈和堆 栈&#xff1a;由操作系统自动释放存放的变量值和函数值等。简单数据类型存放在栈中 栈会由低到高先入后出 堆&#xff1a;存储引用类型 &#xff08;数组&#xff0c;对象&#xff09; 对象会先将数据存放在堆里面&#xff0c;堆的地址放在栈里面 关键…

VMD-PSO-LSTM单维时序预测模型(单输入单输出)-附代码

VMD-PSO-LSTM单维时序预测模型&#xff08;单输入单输出&#xff09; 1&#xff09;首先对原始单维数据进行VMD分解&#xff0c;分解为K个模态分量和1个残差分量 2&#xff09;将各个模态分量输入模型&#xff0c;建立模型进行预测 3&#xff09;将各个预测结果相加得到最终…

clickhouse(十五、存储优化实践)

文章目录 背景问题定位优化方式排序键设计写入顺序压缩算法 DoubleDeltaLowCardinality避免使用Nullable 总结 背景 clickhouse集群容量告警&#xff0c;项目中某些表占据大量的存储空间&#xff0c;借此机会对ck的存储优化进行实践学习&#xff0c;并通过多种方式测试验证优化…

React(五)useEffect、useRef、useImperativeHandle、useLayoutEffect

(一)useEffect useEffect – React 中文文档 useEffect hook用于模拟以前的class组件的生命周期&#xff0c;但比原本的生命周期有着更强大的功能 1.类组件的生命周期 在类组件编程时&#xff0c;网络请求&#xff0c;订阅等操作都是在生命周期中完成 import React, { Com…

【前端】响应式布局笔记——flex

二、Flex Flex(FlexiableBox:弹性盒子&#xff0c;用于弹性布局&#xff0c;配合rem处理尺寸的适配问题)。 1、flex-direction:子元素在父元素盒子中的排列方式。 父级元素添加&#xff1a;flex-direction: row; 父级元素添加&#xff1a;flex-direction: row-reverse; 父…

家政预约小程序13我的订单

目录 1 我的订单页面布局2 全部订单页面3 完善订单状态4 查询订单信息总结 现在我们已经完成了家政预约小程序主体功能的开发&#xff0c;包含服务的查看&#xff0c;在线预约已经登录等功能。预约之后就需要家政公司的客服进行派单&#xff0c;由服务人员进行上门服务。在小程…

Hotcoin精彩亮相Consensus 2024 Austin,探索行业风向标

5 月 31 日&#xff0c;由CoinDesk主办的“Consensus 2024”大会在德克萨斯州的奥斯汀市正式落下帷幕。作为全球规模最大、最具影响力的加密货币、区块链、Web3盛会&#xff0c;本次Consensus 2024 Austin吸引来自 100 多个国家/地区的 15,000 多名与会者、6,800 家公司、850 多…

【C++】手动模拟String底层与深浅拷贝

在string类&#xff1a;版本、组件、构造、操作及应用和 C中string的一些超常用函数 (附习题)这两篇文章中我们已经了解到了string&#xff0c;现在让我们再来手动实现模拟一下吧~ 模拟实现string是为了更好的理解string函数的使用和深浅拷贝方面的知识~ 总体整理了两张思维导…

【Python编程】【Jupyter Notebook】启动时报错:no available port could be found

一、报错描述 在Jupyter Notebook中编写程序&#xff0c;无法运行&#xff0c;提示由于没有可供监听的端口&#xff0c;无法启动Jupyter服务器&#xff0c;如下图所示&#xff1a; 二、原因分析 通过报错信息&#xff0c;猜测大概是由于网络环境的原因。首先&#xff0c;关闭…

C语言(结构体)

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#xff0c;在这里撰写成文一…

AIGC实战!7个超热门的 Midjourney 关键词教程

一、剪纸风格 核心词&#xff1a; paper art&#xff08;剪纸艺术&#xff09; 关键技巧&#xff1a; 主体物&#xff1a;可以换成任意主角&#xff0c;Chinese illustration &#xff08;中国风插画&#xff09;&#xff1b;艺术风格&#xff1a;paper art &#xff08;剪纸…

ruoyi vue 集成积木报表真实记录

按官方文档集成即可 积木报表官方集成文档 集成问题 1.注意 idea 配置的 maven 需要设置成 本地配置&#xff0c;不可以使用 idea 自带的 maven,自带 maven 会导致私有源调用不到 后端代码 新建 base 模块 maven配置 <project xmlns"http://maven.apache.org/POM/…

微软云计算[3]之Windows Azure AppFabric

Windows Azure AppFabric AppFabric概述AppFabric关键技术服务总线访问控制高速缓存 AppFabric概述 AppFabric为本地应用和云中应用提供了分布式的基础架构服务 用户本地应用与云应用之间进行安全联接和信息传递 云应用和现有应用或服务之间的连接及跨语言、跨平台、跨不同标…

C++数据结构之:哈希表Hash

摘要&#xff1a; it人员无论是使用哪种高级语言开发东东&#xff0c;想要更高效有层次的开发程序的话都躲不开三件套&#xff1a;数据结构&#xff0c;算法和设计模式。数据结构是相互之间存在一种或多种特定关系的数据元素的集合&#xff0c;即带“结构”的数据元素的集合&am…

鸿蒙开发 之 ArkUI路由

1.页面路由 页面路由是指在应用程序中实现不同页面之间的跳转和数据传递 比如说你打开一个app&#xff0c;首先进入的是登陆页&#xff0c;首页&#xff0c;列表搜索页&#xff0c;详情页&#xff0c;你打开几个页面都会存储在页面栈里&#xff0c;页面栈的最大容量上限为32个&…

在Oracle VM virtual box 中复制 CentOS 7虚拟机更改IP地址的操作

最近玩Redis主从复制的时候&#xff0c;我装了一个虚拟机&#xff0c;但主从复制需要准备3个虚拟机&#xff0c;这个时候&#xff0c;我又不想一个一个去装&#xff0c;我看到Oracle VM virtual box提供了一个虚拟机复制操作&#xff0c;于是就用了一下这个功能&#xff0c;发现…

云原生时代:从 Jenkins 到 Argo Workflows,构建高效 CI Pipeline

作者&#xff1a;蔡靖 Argo Workflows Argo Workflows [ 1] 是用于在 Kubernetes 上编排 Job 的开源的云原生工作流引擎。可以轻松自动化和管理 Kubernetes 上的复杂工作流程。适用于各种场景&#xff0c;包括定时任务、机器学习、ETL 和数据分析、模型训练、数据流 pipline、…

【成品设计】基于STC15F104W的互补PWM输出器

《基于STC15F104W的互补PWM输出器》 1.所需器件&#xff1a; (1)单片机&#xff1a;STC15F104W。 ①最小系统板链接&#xff1a;【淘宝】https://m.tb.cn/h.5WnLl9X?tkqSGrdCWm0PW「STC15F104W STC15W204S单片机模块 系统板 核心板 学习板 开发板」点击链接直接打开 或者 淘宝…

HCIP-Datacom-ARST自选题库__多种协议简答【11道题】

1.BGP/MPLSIP VPN的典型组网场景如图所示&#xff0c;PE1和PE2通过LoopbackO建立MP-IBGP&#xff0c;PE1和PE2之间只传递VPN路由&#xff0c;其中PE1BGP进程的部分配置已在图中标出&#xff0c;则编号为0的命令不是必须的。(填写阿拉伯数字) 3 2.在如图所示的Hub&amp;Spok…