Java项目之MQ
- 七. 消息存储设计
- 设计思路
- 为什么要用文件存储
- 文件存储结构
- queue_data.txt ⽂件格式:
- queue_stat.txt ⽂件格式:
- 创建 MessageFileManager 类
- 定义一个内部类, 来表示该队列的统计信息 Stat
- 实现统计⽂件Stat读写(文本文件的读写)
- InputStream——Scanner
- OutputStream——PrintWriter
- 实现创建队列⽬录
- 实现删除队列⽬录
- 检查队列⽂件是否存在(发送消息时用到)
- 实现消息对象序列化/反序列化
- byte数组转成对象 对象转成byte数组
- ByteArrayOutputStream——ObjectOutputStream
- 实现写⼊消息⽂件【信息写入数据文件】
- 1. 检查消息文件是否存在
- 2. 把Message对象转成二进制数据
- 3. 队列上锁
- 4. 获取数据文件,根据数据文件长度设置beg,end
- 5. 写入文件
- 6. 更新stat
- 实现删除消息
- 根据beg 找到消息
- 实现消息加载
- 实现垃圾回收(GC)
- 测试 MessageFileManager
七. 消息存储设计
设计思路
为什么要用文件存储
消息需要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储.
原因如下:
- 对于消息的操作并不需要复杂的 增删改查 .
- 对于⽂件的操作效率⽐数据库会⾼很多
主流 MQ 的实现(包括 RabbitMQ), 都是把消息存储在⽂件中, ⽽不是数据库中.
文件存储结构
我们给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue
该⽬录中包含两个固定名字的⽂件
• queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
• queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
queue_data.txt ⽂件格式:
使⽤⼆进制⽅式存储.
每个消息分成两个部分:
• 前四个字节, 表⽰ Message 对象的⻓度(字节数)
• 后⾯若⼲字节, 表⽰ Message 内容.
• 消息和消息之间⾸尾相连.
每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化.
Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置
queue_stat.txt ⽂件格式:
使⽤⽂本⽅式存储.
⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割.
第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬.
形如:
2000\t1500
创建 MessageFileManager 类
创建 mqserver.database.MessageFileManager
public class MessageFileManager {// 定义一个内部类, 来表示该队列的统计信息// 有限考虑使用 static, 静态内部类.static public class Stat {// 此处直接定义成 public, 就不再搞 get set 方法了.// 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.public int totalCount; // 总消息数量public int validCount; // 有效消息数量}public void init() {// 暂时不需要做啥额外的初始化工作, 以备后续扩展}// 预定消息文件所在的目录和文件名// 这个方法, 用来获取到指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return "./data/" + queueName;}// 这个方法用来获取该队列的消息数据文件路径// 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改.// .bin / .datprivate String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
}
• 内部包含⼀个 Stat 类, ⽤来表⽰消息统计⽂件的内容.
• getQueueDir, getQueueDataPath, getQueueStatPath ⽤来表⽰这⼏个⽂件所在位置
定义一个内部类, 来表示该队列的统计信息 Stat
// 定义一个内部类, 来表示该队列的统计信息// 有限考虑使用 static, 静态内部类.static public class Stat {// 此处直接定义成 public, 就不再搞 get set 方法了.// 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.public int totalCount; // 总消息数量public int validCount; // 有效消息数量}
实现统计⽂件Stat读写(文本文件的读写)
InputStream——Scanner
OutputStream——PrintWriter
private Stat readStat(String queueName) {// 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}
实现创建队列⽬录
每个队列都有⾃⼰的⽬录和配套的⽂件. 通过下列⽅法把⽬录和⽂件先准备好
// 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {// 1. 先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在, 就创建这个目录boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4. 给消息统计文件, 设定初始值. 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);
}
把上述约定的⽂件都创建出来, 并对消息统计⽂件进⾏初始化.
初始化 0\t0 这样的初始值.
实现删除队列⽬录
如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除.
// 删除队列的目录和文件.// 队列也是可以被删除的. 当队列删除之后, 对应的消息文件啥的, 自然也要随之删除.public void destroyQueueFiles(String queueName) throws IOException {// 先删除里面的文件, 再删除目录.File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败, 都算整体删除失败.throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());}}
如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除
检查队列⽂件是否存在(发送消息时用到)
判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作
// 检查队列的目录和文件是否存在.// 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)public boolean checkFilesExits(String queueName) {// 判定队列的数据文件和统计文件是否都存在!!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}
实现消息对象序列化/反序列化
byte数组转成对象 对象转成byte数组
ByteArrayOutputStream——ObjectOutputStream
Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此处针对这⾥的逻辑进⾏封装.
创建 common.BinaryTool
// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的.
// 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.
public class BinaryTool {// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组.// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到// ObjectOutputStream 中.// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了objectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一个字节数组, 反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.object = objectInputStream.readObject();}}return object;}
}
• 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这
两个流对象是纯内存的, 不需要进⾏ close).
• 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的
readObject / writeObject 即可完成对应操作.
• 此处涉及到的序列化对象, 需要实现 Serializable 接⼝. 这⼀点咱们的 Message 对象已经实现过了.
对于 serialVersionUID , 此处咱们暂时不需要. ⼤家可以⾃⾏了解 serialVersionUID 的⽤途
实现写⼊消息⽂件【信息写入数据文件】
1. 检查消息文件是否存在
2. 把Message对象转成二进制数据
3. 队列上锁
4. 获取数据文件,根据数据文件长度设置beg,end
5. 写入文件
6. 更新stat
// 这个方法用来把一个新的消息, 放到队列对应的文件中.// queue 表示要把消息写入的队列. message 则是要写的消息.public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查一下当前要写入的队列对应的文件是否存在.if (!checkFilesExits(queue.getName())) {throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());}// 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4// offsetEnd 就是当前文件长度 + 4 + message 自身长度.File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下来要先写当前消息的长度, 占据 4 个字节的~~dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
- 虑线程安全, 按照队列维度进⾏加锁.
- 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
- 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息. offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息⼤⼩的空间. (参考上⾯的图).
- 写完消息, 要同时更新统计信息.
创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
实践中创建多个异常类, 分别表⽰不同异常种类是更好的做法. 此处我们只是偷懒了
public class MqException extends Exception {public MqException(String reason) {super(reason);}
}
实现删除消息
根据beg 找到消息
此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.
这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决
// 这个是删除消息的方法.// 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0// 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;// 2. 把 isValid 改成 0;// 3. 把上述数据重新写回到文件.// 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1. 先从文件中读取对应的 Message 数据.byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 把当前读出来的二进制数据, 转换回成 Message 对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效.diskMessage.setIsValid((byte) 0x0);// 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象// 而这个对象马上也要被从内存中销毁了.// 4. 重新写入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到// 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);// 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~}// 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}
• 使⽤ RandomAccessFile 来随机访问到⽂件的内容.
• 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
• 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定⽂件指针的位置. ⽂件指针会随着上述的读操作产⽣改变.
• 最后, 要记得更新统计⽂件, 把合法消息 - 1.
实现消息加载
把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键
// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件光标.long currentOffset = 0;// 一个文件中包含了很多消息, 此处势必要循环读取.while (true) {// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错乱了!!throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if (message.getIsValid() != 0x1) {// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.currentOffset += (4 + messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置// 因此就需要手动计算下文件光标.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}
• 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消
息内容.
• 读取完毕之后, 转换成 Message 对象.
• 同时计算出该对象的 offsetBeg 和 offsetEnd.
• 最终把结果整理成链表, 返回出去.
• 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定
值. 因此需要注意上述循环的结束条件.
实现垃圾回收(GC)
上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越
多. 因此需要定期的进⾏批量清除.
此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC.
GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动的时候, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件光标.long currentOffset = 0;// 一个文件中包含了很多消息, 此处势必要循环读取.while (true) {// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.int messageSize = dataInputStream.readInt();// 2. 按照这个长度, 读取消息内容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错乱了!!throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);}// 3. 把这个读到的二进制数据, 反序列化回 Message 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象, 是不是无效对象.if (message.getIsValid() != 0x1) {// 无效数据, 直接跳过.// 虽然消息是无效数据, 但是 offset 不要忘记更新.currentOffset += (4 + messageSize);continue;}// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置// 因此就需要手动计算下文件光标.message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}// 检查当前是否要针对该队列的消息数据文件进行 GCpublic boolean checkGC(String queueName) {// 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值都是在 消息统计文件 中的.Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {return true;}return false;}private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.// 使用复制算法来完成.// 创建一个新的文件, 名字就是 queue_data_new.txt// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.// 删除旧的文件, 再把新的文件改名回 queue_data.txt// 同时要记得更新消息统计文件.public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.synchronized (queue) {// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBeg = System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 写入到新的文件中.try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先写四个字节消息的长度dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并且把新的文件进行重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBeg) + "ms");}}
如果⽂件很⼤, 消息⾮常多, 可能⽐较低效, 这种就需要把⽂件做拆分和合并了.
Rabbitmq 本体是这样实现的. 但是咱们此处为了实现简单, 就不做这个了.
测试 MessageFileManager
创建 MessageFileManagerTests 编写测试⽤例代码.
• 创建两个队列, ⽤来辅助测试.
• 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法.
package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;
import sun.awt.image.ImageWatched;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;@SpringBootTest
public class MessageFileManagerTests {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";// 这个方法是每个用例执行之前的准备工作@BeforeEachpublic void setUp() throws IOException {// 准备阶段, 创建出两个队列, 以备后用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 这个方法就是每个用例执行完毕之后的收尾工作@AfterEachpublic void tearDown() throws IOException {// 收尾阶段, 就把刚才的队列给干掉.messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}@Testpublic void testCreateFiles() {// 创建队列文件已经在上面 setUp 阶段执行过了. 此处主要是验证看看文件是否存在.File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}@Testpublic void testReadWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 此处就需要使用反射的方式, 来调用 writeStat 和 readStat 了.// Java 原生的反射 API 其实非常难用~~// 此处使用 Spring 帮我们封装好的 反射 的工具类.ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);// 写入完毕之后, 再调用一下读取, 验证读取的结果和写入的数据是一致的.MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);System.out.println("测试 readStat 和 writeStat 完成!");}private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {// 构造出消息, 并且构造出队列.Message message = createTestMessage("testMessage");// 此处创建的 queue 对象的 name, 不能随便写, 只能用 queueName1 和 queueName2. 需要保证这个队列对象// 对应的目录和文件啥的都存在才行.MSGQueue queue = createTestQueue(queueName1);// 调用发送消息方法messageFileManager.sendMessage(queue, message);// 检查 stat 文件.MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(1, stat.totalCount);Assertions.assertEquals(1, stat.validCount);// 检查 data 文件LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1, messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());System.out.println("message: " + curMessage);}@Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往队列中插入 100 条消息, 然后验证看看这 100 条消息从文件中读取之后, 是否和最初是一致的.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 读取所有消息LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的三个消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 对比这里的内容是否正确.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {// 先往队列中写 100 个消息. 获取到文件大小.// 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)// 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 获取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 删除偶数下标的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手动调用 gcmessageFileManager.gc(queue);// 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.// actual 中的 0 对应 expected 的 1// actual 中的 1 对应 expected 的 3// actual 中的 2 对应 expected 的 5// actual 中的 i 对应 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 获取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);}
}