仿 RabbitMQ 的消息队列3(实战项目)

七. 消息存储设计

上一篇博客已经将消息统计文件的读写代码实现了,下一步我们将实现创建队列文件和目录。

实现创建队列文件和目录

初始化 0\t0 这样的初始值.

//创建队列对应的文件和目录:public void createQueueFile(String queueName) throws IOException, MqException {//先创建对应的目录:File file = new File(getQueueDir(queueName));if(!file.exists()){boolean ok = file.mkdirs();if(!ok) throw new IOException("创建队列目录失败。baseDir:"+file.getAbsolutePath());}else{throw new MqException("[createQueueFile] 队列对应的目录已经被创建过了,创建失败");}//下面开始创建 数据文件:File dataFile = new File(getQueueDataDir(queueName));if(!dataFile.exists()){boolean ok = dataFile.createNewFile();if(!ok) throw new IOException("创建数据文件失败。queuedataDir:"+dataFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 队列对应的数据文件已经被创建过了,创建失败");}//创建 统计文件:File statFile = new File(getQueueStatDir(queueName));if(!statFile.exists()){boolean ok = statFile.createNewFile();if(!ok) throw new IOException("创建统计文件失败。queuestatDir:"+statFile.getAbsolutePath());}else{throw new MqException("[createQueueFile] 队列对应的统计文件已经被创建过了,创建失败");}//给消息统计文件设定初始值 0\t0, (消息数量:0,有效消息数量:0)// 目的:不用在今后使用的时候对空文件做一些特殊的判定Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;//再写入:writeStat(queueName,stat);}

实现删除文件或目录

注意:File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉,如果还存在多余文件,就会删除失败。

//删除队列的文件或目录://队列也是可以删除的,当队列删除后,对应的消息文件啥的,也要随之删除。public void deleteQueueFile(String queueName) throws IOException{//先删除 数据文件:File queueDataFile = new File(getQueueDataDir(queueName));boolean ok1 = queueDataFile.delete();//再删除 统计文件:File queueStatFile = new File(getQueueStatDir(queueName));boolean ok2 = queueStatFile.delete();//再删除目录;File file = new File(getQueueDir(queueName));boolean ok3 = file.delete();if(!(ok1 && ok2 && ok3)){//任意一个删除失败,就失败,抛出异常:throw new IOException("删除队列文件或目录失败");}}

检查队列⽂件是否存在

判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作.

//检查队列的 文件或目录 是否存在: 目的:判断是否队列之前被 别人用过。//用处1:如果后续有生产者给 broker server 生产消息了,这个消息可能需要记录到文件上,此时需要判断文件是否存在(持久化的应用)。public boolean checkFilesExists(String queueName){//如果队列的 数据文件和统计文件都存在,才存在:File queueDataFiles = new File(getQueueDataDir(queueName));if(!queueDataFiles.exists()) return false;File queueStatFiles = new File(getQueueStatDir(queueName));if(!queueStatFiles.exists()) return false;//都存在,则返回true;return true;}

实现消息对象序列化/反序列化

先创建工具类BinaryTool用与序列化/反序列化。
在这里插入图片描述

  • 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这两个流对象是纯内存的, 不需要进⾏ close).
  • 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的readObject / writeObject 即可完成对应操作.
/*** 这个类用来序列化 与反序列化* 此处我们采用的是java标注库里的 ObjectOutputStream 和ObjectInputStream 两个流对象,但是序列化的对象必须要实现Serializable接口、** 由于将序列化,反序列化当做一个工具,很多数据都可能用到,所以我们将它的方法搞成静态的**/
public class BinaryTool {//序列化:public static byte[] toBytes(Object object) throws IOException {//由于在try里面写流对象能自动关闭省去我们不少事,所以,直接写在try()里//这里 使用ByteArrayOutputStream是因为 未知的byte数组的长度,这个类能自动记录。try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){//将byteArrayOutputStream 传入ObjectOutputStream 就相当于将他们相互关联了,当objectOutputStream调用writeObject方法//时会将这个对象写入关联的byteArrayOutputStream里,然后直接调用byteArrayOutputStream里的方法,将序列化的数据转换成直接数组就行了//其实这个 ObjectOutputStream 不仅可以关联数组,还可以是文件,网络。关联了文件就将对象序列化到文件里,关联了网络,就是网络数据的传输socket。try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){objectOutputStream.writeObject(object);}//这个操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]return byteArrayOutputStream.toByteArray();}}//反序列化:public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){return objectInputStream.readObject();}}}}

实现写入消息文件

  • 考虑线程安全, 按照队列维度进⾏加锁.
  • 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
  • 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息,这也是message里的偏移量初始化的时候,就是发送消息的时候。offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息大小的空间.
  • 写完消息, 要同时更新统计信息.
 //该方法将传来的一个新的消息放到对应的文件当中:新增消息public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {//先检查文件是否存在:如果不存在怎抛出异常,这个异常可以自定义。if(!checkFilesExists(queue.getName())){throw new MqException("[MessageFileManager对应的文件不存在]!queueName"+queue.getName());}//先进行序列化:byte[] binaryMessage = BinaryTool.toBytes(message);//为了解决线程安全问题,我们引入锁,如果此时的锁对象 是同一个队列,那就阻塞等待。synchronized (queue){//先将数据文件new出来,看看此时文件里已经写入的数据长度,方便我们后续计算offsetbegin和offsetendFile file = new File(getQueueDataDir(queue.getName()));//在写入消息的时候才对message里的offsetbegin和offsetend 进行赋值:message.setOffsetBeg(file.length()+4);message.setOffsetEnd(file.length()+4+binaryMessage.length);//由于我们的写入是追加写入,所以不要忘记 truetry (OutputStream outputStream = new FileOutputStream(file,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//先写入4个字节的消息长度:dataOutputStream.writeInt(binaryMessage.length);//再写入offsetbegin 和 offsetenddataOutputStream.write(binaryMessage);}}//此时已经将消息数据文件写完了,不要忘记消息统计文件:Stat stat = readStat(queue.getName());stat.validCount++;stat.totalCount++;writeStat(queue.getName(),stat);}}

创建异常类MqException

作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
在这里插入图片描述

public class MqException extends Exception{public MqException(String reason){super(reason);}
}

实现删除消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

  • 使⽤ RandomAccessFile 来随机访问到⽂件的内容.(随机访问其实没什么玄乎的,就像数组一样,能通过下标快速访问某个元素,这就是随机访问的原理。内存是支持随机访问的)。
  • 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
  • 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定文件指针的位置. ⽂件指针会随着上述的读操作产⽣改变,所以要重新seek,将光标移动到开始。
  • 最后, 要记得更新统计⽂件, 把合法消息 - 1.
 //删除消息:主要的操作歩奏:// 1,将消息读出来//2,将消息里的isVail 改成0x0//3,将消息放回文件中public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {//由于删除消息的时候也可能收到线程安全问题,所以我们要加锁:synchronized (queue){//先将消息读出来://由于我们正常使用的FileInputStream,只能从头开始读。而此时的场景更倾向于 随机读取,所以我们使用到了RandomAccessFile进行随机读取//注意这个RandomAccessFile类的第二个参数:rw可读可写。try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){//先准备一个byte数组用来放 读出来的二进制数据:byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];//先将光标刷新到 offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//将message二进制数据读出来randomAccessFile.read(bufferSrc);//转换成message对象:Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);//将message里的isVail 改成0x0message1.setIsVail((byte) 0x0);//将新的message1 转成二进制:byte[] bufferDest = BinaryTool.toBytes(message1);//由于上一次读文件光标已经发生了变化,所以此时还要调整光标到offsetbeginrandomAccessFile.seek(message.getOffsetBeg());//将数据写入文件:randomAccessFile.write(bufferDest);//此时已经将数据文件里的vail改成无效,那我们需不需要将这个内存中的 message对象里的vail也改成无效呢?//可以是可以,但是没有必要:想象一下我们将一个文件标记成无效的场景是不是我们此时要删掉这个文件的时候,//此时我们都要删掉这个文件了,当然要连同文件里的数据和内存中的数据都删了呀,文件里的数据可能需要一些歩奏,//但是在内存中删一个对象实在太容易了,今后会有内存中的删除消息操作。这就相当于让一个将死之人多活几秒,但他终究逃不过死亡//这就是message里的vail 其实不需要改动的原因。}//不要忘了统计文件也要更新, 由于我们此时已经将数据文件里的一个消息改成无效的,所以此时统计文件里的有效消息就要--了Stat stat = readStat(queue.getName());if(stat.validCount >0){stat.validCount --;}//再将更新后的统计信息 写入文件writeStat(queue.getName(),stat);}}

实现消息加载

这个功能在服务器重启, 和垃圾回收的时候都很关键

  • 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消息内容.
  • 读取完毕之后, 转换成 Message 对象.
  • 同时计算出该对象的 offsetBeg 和 offsetEnd.
  • 最终把结果整理成链表, 返回出去.
  • 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定值. 因此需要注意上述循环的结束条件.
//从消息数据文件当中读出所有消息://由于是服务器刚启动的时候才会调用这个方法,此时的队列还不能处理各种请求,所以不需要考虑线程安全问题。public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {//先new出来一个linkedList来放所有消息:使用链表是因为要进行头删和尾删等操作:LinkedList<Message> messages = new LinkedList<>();//创建流对象:try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){//与上面的DataOutputStream对应,此时用的是DataInputStreamtry(DataInputStream dataInputStream = new DataInputStream(inputStream)){//由于要读的消息可能不止一条,所以用一个while循环://但是如果我们直接这样写会一直重复的读一个消息,而DataInputStream不能控制光标的移动,所以要定义一个量来//记录我们读到哪里了,另外,这个量也为后续message对象的offsetbegin和offsetend的初始化提供便利long currentOffset = 0;//写完大概得逻辑以后不知道不会不会有疑问,这个while的条件可是true啊,这不死循环了嘛,//其实这也是无奈之举,主要原因是dataInputStream.readInt()读到文件的末尾并不会返回-1,EOF啥的,而是//直接抛出 EOFException异常,直接结束循环,因此我们只用在外层catch住这个异常就行了,这是一个很特别的预料之内的循环结束方式while(true){//先读4个字节,求出数据的长度:int messageLen = dataInputStream.readInt();//创建一个刚好能装messageLen长度的字节数组:byte[] messageBinary = new byte[messageLen];//读出消息数据:并且用变量接收,判断读出的数据是否和预期的数据长度一致,若不一致,说明格式不正确,错乱了则抛出异常int realMessageLen = dataInputStream.read(messageBinary);if(realMessageLen != messageLen){throw new MqException("[MessageFileManager] 文件格式错误!!!queueName:"+queueName);}//将数组反序列化成message对象Message message = (Message) BinaryTool.fromBytes(messageBinary);//如果读到的消息是无效的,则跳过这个无效消息,更新currentoffset:if(message.getIsVail() == 0x0){currentOffset+=(4+messageLen);continue;}//再将message里的offsetbegin和offsetend给初始化:message.setOffsetBeg(currentOffset+4);message.setOffsetEnd(currentOffset+4+messageLen);//正常读完后,别忘了,将currentoffset更新currentOffset+=(4+messageLen);//再将消息加入到链表当中messages.add(message);}}catch (EOFException e){System.out.println("[MessageFileManager] 恢复Message 数据完成!!!");}}return messages;}

实现垃圾回收(GC)

  • 上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越多. 因此需要定期的进⾏批量清除.
  • 此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC。GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
 public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {//根据以前的写代码经验,次GC过程可能有线程安全问题,所以我们直接加锁://其实这也是为什么形参传入的是一个队列,而不是队列的名字的其中一个原因。synchronized (queue){//由于GC的执行时间可能很慢,我们手动的将时间计算出来,如果将来服务器运行半天无响应了,如果是GC的问题//我们也能知道long gcBegin = System.currentTimeMillis();//先创建新的文件:File newQueueFile = new File(getQueueDataNewPath(queue.getName()));//如果文件已经存在了,可能上一次gc有残留,这是不正常的,所以抛出异常if(newQueueFile.exists()){throw new MqException("[MessageFilemanager] 队列的queue_new_data.txt已经存在!!queueName:"+queue.getName());}//如果执行到这,说明文件不存在,则创建新文件:boolean ok = newQueueFile.createNewFile();//如果创建文件失败,则抛出异常:if(!ok){throw new MqException("[MessageFileManager] 创建文件失败!!newQueueDataFile:"+newQueueFile.getAbsolutePath());}//先创建一个链表用来存储从原来的文件中取出来的message对象:此处可以用到之前的方法://取出原来文件里的所有有效文件:LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());//new出相应的流对象用来写入新文件://这里我写错了,将queue.getName()传入了,但是明明是一个不存在的路径,他竟然还能正常写?底层也不抛出异常,//我真是又惊讶,又惊吓。//之后我又去查了查资料:原来是FileOutputStream的问题啊,FileOutputStream太nb了,//如果传入的字符串对应的路径不存在,FileOutputStream会自动给你创建一个文件用于写入,这这这也太贴心了吧,//不过我还是希望他能直接抛异常,毕竟我找bug也找了这么久了,况且天知道他会把我的数据写到哪里://其实也知道:如果是绝对路径,他会自动创建路径下的文件;如果是相对路径,他会在当前工作空间创建一个文件。//找了一圈以后发下,在我的mq路径下,就存在一个queuetest1的文件,里面正好是之前我写入的数据,呜呜呜,要哭了。
//            try(OutputStream outputStream = new FileOutputStream(queue.getName())){try(OutputStream outputStream = new FileOutputStream(newQueueFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){//循环读取messages,将对象重新写入新文件:for(Message m : messages){//先将消息序列化:一个字节数组:byte[] buffer = BinaryTool.toBytes(m);//将二进制数组写入新的文件:注意遵循之前的约定:dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//删除旧文件,这里以前传入的旧文件的路径写错了,直接传成了名字,所以写代码一定要细心啊。
//            File oldQueueFile = new File(queue.getName());File oldQueueFile = new File(getQueueDataDir(queue.getName()));boolean ok2 = oldQueueFile.delete();System.out.println("[ok2]oldQueueFile 文件删除:"+ok2);//如果删除失败,可能是没有权限之类的,抛出异常:if(!ok2){throw new MqException("MessageFileManager 删除旧文件失败!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());}//重命名新文件:boolean ok3 = newQueueFile.renameTo(oldQueueFile);//如果重命名失败,抛出异常:if(!ok3) {throw new MqException("[MessageFileManager] 新文件重命名失败!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()+" , newDataQueueFile="+newQueueFile.getAbsolutePath());}//不要忘记更新统计文件:Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] GC执行完毕!!! 执行的时间:"+(gcEnd - gcBegin)+"ms");}}

测试MessageFileManager

创建MessageFileManagerTest类用于测试:
在这里插入图片描述

测试前的准备:

  • 创建两个队列, ⽤来辅助测试.
  • 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法(这就是传说中的反射,注意它的参数,用法)。
  •     ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
    

这个反射的参数:
第一个参数:类的实例。
第二个参数:你想调用的方法
后面的参数就是不定参数了(数量不确定),能确定的是:后面的参数的就是你想调用的方法的参数。


@SpringBootTest
public class MessageFileManagerTest {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "queuetest1";private static final String queueName2 = "queuetest2";@BeforeEachpublic void setUp() throws IOException, MqException {//由于我们要测试的是队列,所以准备工作就是先创建队列文件:messageFileManager.createQueueFile(queueName1);messageFileManager.createQueueFile(queueName2);}//这个@AfterEach注解我试过了,即使测试方法执行过程中抛出了异常,这个方法还是在每次执行完测试单元以后该执行他还是执行他,//无关乎异常,真nb@AfterEachpublic void tearDown() throws IOException {//首尾工作,将刚才创建的队列文件删掉:messageFileManager.deleteQueueFile(queueName1);messageFileManager.deleteQueueFile(queueName2);}}

测试代码:

@Testpublic void testCreateFile(){//其实就测试创建的队列文件是否存在://由于我们在MessageFileManager里的get路径方法是 private修饰的,所以不能直接调用get路径方法,只能手动写上//检验 队列数据文件是否存在:File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");//此处用的方法是isFile 而不是exists,因为要判定这是个文件,并不是只是存在就行,存在了也可能是个目录。Assertions.assertEquals(true,queueDataFile1.isFile());//检验 队列统计文件是否存在:File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile1.isFile());//检验 队列数据文件是否存在:File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");Assertions.assertEquals(true,queueDataFile2.isFile());//检验 队列统计文件是否存在:File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");Assertions.assertEquals(true,queueStatFile2.isFile());System.out.println("[CreateFileText] 测试创建队列文件成功!!!");}@Testpublic void testReadAndWriteStat(){//先创建出stat类,由于他是内部类,所以要类名. 调用出来:MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 200;stat.validCount = 100;//此时写入stat 到统计文件当中:但是如果直接用messageFileManager. 由于writeStat是private修饰,所以肯定调用不出来,//此时就要用spring带的 反射方法了:
//        messageFileManager.//用反射将 stat写入统计文件:ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);//用反射将 写入的统计文件读出来:MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);//判断读出来的stat和我们设定的stat是否一样Assertions.assertEquals(200,statNew.totalCount);Assertions.assertEquals(100,statNew.validCount);System.out.println("[testReadAndWriteStat] 测试成功!!!");}//要想测试发送消息,首先要有队列和消息吧,所以,我们先写创建队列和消息的方法:private MESGQueue createQueue(){MESGQueue queue = new MESGQueue();//这里的队列名字不能随便取,因为随便取的队列名字也没有对应的文件啊,要用就要用已经创建了文件的队列名,//考虑到这个队列要与文件交互,而我们只创建了queuename1和queuename2两个名字对应的文件,所以只能用这两个名字的一个。queue.setName(queueName1);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);HashMap<String, Object> hashMap = new HashMap<>();hashMap.put("aaa", "111");hashMap.put("bbb", "222");queue.setArguments(hashMap);return queue;}private Message createMessage(String context){//此时能用到我们之前在message里写的创建 message的工厂类了:Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());return message;}@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {//先创建队列与消息:MESGQueue queue = createQueue();Message message = createMessage("abcdefghijklmnopqrstuvwxyz");//发送消息:messageFileManager.sendMessage(queue,message);//验证stat文件:MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);//验证data文件:使用loadAllMessagesFromQueueDataFile读取文件内容:LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//验证:Assertions.assertEquals(1,messages.size());Message message1 = messages.get(0);//判断这个message1和我们之前的消息message是否一样:Assertions.assertEquals(message.getMessageId(),message1.getMessageId());Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());Assertions.assertArrayEquals(message.getBody(),message1.getBody());System.out.println("[testSendMessage] 测试成功!!!");}//虽然上一个testSendMessage 已经间接测试过这个方法,但是为了求稳,再测试一遍@Testpublic void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {//我们需要准备200条数据用来加载://先创建一个队列用来存放消息:注意这个方法使用的是queueName1创建的队列MESGQueue queue = createQueue();//先创建一个链表用来保存消息,和后面新加载的消息作对比:LinkedList<Message> expectedMessages = new LinkedList<>();//使用for循环创建消息:for(int i =0;i<200;i++){Message message = createMessage("testMessage"+i);//将消息写入文件:messageFileManager.sendMessage(queue,message);//记录消息:expectedMessages.add(message);}//调用loadAllMessagesFromQueueDataFile取出所有消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先验证队列的数目是否一致:Assertions.assertEquals(200,realMessages.size());//验证基本属性:for(int i= 0;i<realMessages.size();i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testLoadAllMessagesFromQueueDataFile] 测试成功!!!");}//测试删除消息:@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {//先创建一个队列:MESGQueue queue = createQueue();//创建一个链表用来保存预期消息:LinkedList<Message> expectedMessages = new LinkedList<>();//再将20条消息都写入队列:for(int i =0;i<20;i++){Message message = createMessage("testMessage"+i);//将消息写入队列文件:messageFileManager.sendMessage(queue,message);//记录消息:expectedMessages.add(message);}//这里 就以删除前三条消息为例:messageFileManager.deleteMessage(queue,expectedMessages.get(0));messageFileManager.deleteMessage(queue,expectedMessages.get(1));messageFileManager.deleteMessage(queue,expectedMessages.get(2));//读出消息,对比:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先判断个数:Assertions.assertEquals(17,realMessages.size());for(int i =3;i<20;i++){Message realMessage = realMessages.get(i-3);Message expectMessage = expectedMessages.get(i);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}System.out.println("[testDeleteMessage] 测试成功!!!");}//测试GC,这里的GC其实只是测试,不用管消息总数是否大于2000或有效消息占比不到50%,因为那是业务上的判定,会有专门的类来进一步封装//而此处我们只进行测试GC这个方法//计划将100条消息都存入队列,然后将奇数下标的消息都删除,然后执行GC,验证现在的文件是否比原来的文件小:@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {//创建一个队列:MESGQueue queue = createQueue();//创建一个链表用来记录消息:LinkedList<Message> expectedMessages = new LinkedList<>();//先发送100条消息:for(int i =0;i<100;i++){Message message = createMessage("testMessage"+i);//发送到队列:messageFileManager.sendMessage(queue,message);//记录expectedMessages.add(message);}//删除奇数下标的消息:for(int i =1;i<100;i+=2){messageFileManager.deleteMessage(queue,expectedMessages.get(i));}//先记录执行GG之前的文件大小:File oldFile = new File("./data/"+queueName1+"/queue_data.txt");long oldFileLength = oldFile.length();//执行GCmessageFileManager.gc(queue);//记录执行完GC之后的文件大小:File newFile = new File("./data/"+queueName1+"/queue_data.txt");long newFileLength = newFile.length();//取出真实的消息:LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);//先验证消息数量是否对的上:Assertions.assertEquals(50,realMessages.size());//挨个验证消息:for(int i = 0;i<50;i++){Message realMessage = realMessages.get(i);Message expectMessage = expectedMessages.get(i*2);System.out.println("["+i+"]"+realMessage.toString());Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());}//验证文件大小://这个验证的原理其实是://删除一个文件并不是直接删除,而是逻辑删除,通过标记统计文件里的vail来标识的,此时的数据文件即使有很多无效的文件,但是他的大小依旧是total//而非vail有效文件的大小。但是如果进行了文件的GC迁移,此时的新文件的大小就是旧文件的vail有效文件的大小了。所以,新文件会小于旧文件的大小。System.out.println("[oldFileLength]:"+oldFileLength);System.out.println("[newFileLength]:"+newFileLength);Assertions.assertTrue(newFileLength<oldFileLength);System.out.println("[testGC] 测试成功!!!");}

测试结果:没问题

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66898.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTP 配置与应用(局域网)

想做一个自己学习的有关的csdn账号&#xff0c;努力奋斗......会更新我计算机网络实验课程的所有内容&#xff0c;还有其他的学习知识^_^&#xff0c;为自己巩固一下所学知识&#xff0c;下次更新HTTP 配置与应用&#xff08;不同网段&#xff09;。 我是一个萌新小白&#xf…

大模型应用与部署 技术方案

大模型应用与部署 技术方案 一、引言 人工智能蓬勃发展,Qwen 大模型在自然语言处理领域地位关键,其架构优势尽显,能处理文本创作等多类复杂任务,提供优质交互。Milvus 向量数据库则是向量数据存储检索利器,有高效索引算法(如 IVF_FLAT、HNSWLIB 等)助力大规模数据集相似…

【Prometheus】Prometheus如何监控Haproxy

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

Go语言中的值类型和引用类型特点

一、值类型 值类型的数据直接包含值&#xff0c;当它们被赋值给一个新的变量或者作为参数传递给函数时&#xff0c;实际上是创建了原值的一个副本。这意味着对新变量的修改不会影响原始变量的值。 Go中的值类型包括&#xff1a; 基础类型&#xff1a;int&#xff0c;float64…

GPT 结束语设计 以nanogpt为例

GPT 结束语设计 以nanogpt为例 目录 GPT 结束语设计 以nanogpt为例 1、简述 2、分词设计 3、结束语断点 1、简述 在手搓gpt的时候&#xff0c;可能会遇到一些性能问题&#xff0c;即关于是否需要全部输出或者怎么节约资源。 在输出语句被max_new_tokens 限制&#xff0c…

深入MapReduce——计算模型设计

引入 通过引入篇&#xff0c;我们可以总结&#xff0c;MapReduce针对海量数据计算核心痛点的解法如下&#xff1a; 统一编程模型&#xff0c;降低用户使用门槛分而治之&#xff0c;利用了并行处理提高计算效率移动计算&#xff0c;减少硬件瓶颈的限制 优秀的设计&#xff0c…

macOS安装Gradle环境

文章目录 说明安装JDK安装Gradle 说明 gradle8.5最高支持jdk21&#xff0c;如果使用jdk22建议使用gradle8.8以上版本 安装JDK mac系统安装最新&#xff08;截止2024.9.13&#xff09;Oracle JDK操作记录 安装Gradle 下载Gradle&#xff0c;解压将其存放到资源java/env目录…

五国十五校联合巨献!仿人机器人运动与操控:控制、规划与学习的最新突破与挑战

作者&#xff1a; Zhaoyuan Gu, Junheng Li, Wenlan Shen, Wenhao Yu, Zhaoming Xie, Stephen McCrory, Xianyi Cheng, Abdulaziz Shamsah, Robert Griffin, C. Karen Liu, Abderrahmane Kheddar, Xue Bin Peng, Yuke Zhu, Guanya Shi, Quan Nguyen, Gordon Cheng, Huijun Gao,…

【BQ3568HM开发板】如何在OpenHarmony上通过校园网的上网认证

引言 前面已经对BQ3568HM开发板进行了初步测试&#xff0c;后面我要实现MQTT的工作&#xff0c;但是遇到一个问题&#xff0c;就是开发板无法通过校园网的认证操作。未认证的话会&#xff0c;学校使用的深澜软件系统会屏蔽所有除了认证用的流量。好在我们学校使用的认证系统和…

(Java版本)基于JAVA的网络通讯系统设计与实现-毕业设计

源码 论文 下载地址&#xff1a; ​​​​c​​​​​​c基于JAVA的网络通讯系统设计与实现(源码系统论文&#xff09;https://download.csdn.net/download/weixin_39682092/90299782https://download.csdn.net/download/weixin_39682092/90299782 第1章 绪论 1.1 课题选择的…

SpringCloud微服务Gateway网关简单集成Sentinel

Sentinel是阿里巴巴开源的一款面向分布式服务架构的轻量级流量控制、熔断降级组件。Sentinel以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度来帮助保护服务的稳定性。 官方文档&#xff1a;https://sentinelguard.io/zh-cn/docs/introduction.html …

vscode环境中用仓颉语言开发时调出覆盖率的方法

在vscode中仓颉语言想得到在idea中利用junit和jacoco的覆盖率&#xff0c;需要如下几个步骤&#xff1a; 1.在vscode中搭建仓颉语言开发环境&#xff1b; 2.在源代码中右键运行[cangjie]coverage. 思路1&#xff1a;编写了测试代码的情况&#xff08;包管理工具&#xff09; …

pikachu靶场-敏感信息泄露概述

敏感信息泄露概述 由于后台人员的疏忽或者不当的设计&#xff0c;导致不应该被前端用户看到的数据被轻易的访问到。 比如&#xff1a; ---通过访问url下的目录&#xff0c;可以直接列出目录下的文件列表; ---输入错误的url参数后报错信息里面包含操作系统、中间件、开发语言的版…

安卓动态设置Unity图形API

命令行方式 Unity图像api设置为自动,安卓动态设置Vulkan、OpenGLES Unity设置 安卓设置 创建自定义活动并将其设置为应用程序入口点。 在自定义活动中,覆盖字符串UnityPlayerActivity。updateunitycommandlineararguments (String cmdLine)方法。 在该方法中,将cmdLine…

低代码可视化-转盘小游戏可视化-代码生成器

转盘小程序是一种互动工具&#xff0c;它通过模拟真实的转盘抽奖或决策体验&#xff0c;为用户提供了一种有趣且公平的选择方式。以下是对转盘小程序的详细介绍&#xff1a; 转盘小程序的应用场景 日常决策&#xff1a;转盘小程序可以帮助用户解决日常生活中的选择困难问题&a…

MongoDB文档查询

一、实验目的 1. 理解MongoDB文档数据库的基本概念和特性。 2. 掌握在MongoDB中创建集合和插入文档数据的方法。 3. 学习使用MongoDB进行文档查询操作&#xff0c;包括查询、过滤和排序等。 二、实验环境准备 1. JAVA环境准备&#xff1a;确保Java Development Kit (J…

速通Docker === 使用最佳实践总结

目录 主要使用步骤 1. 命令 2. 网络 3. 存储 卷存储&#xff08;Volumes&#xff09; 目录挂载&#xff08;Bind Mounts&#xff09; 比较 4. 环境变量 5. 端口 示例&#xff1a;启动 MySQL 容器 解释&#xff1a; 总结 Docker 是一个开源的应用容器引擎&#xff0…

Postgresql源码(140)理解PG的编译流程(make、Makefile、Makefile.global.in)

PG16 PG中使用的makefile看起来代码比较多&#xff0c;但是实际逻辑比较简单&#xff0c;这里做一些抽象总结。 总结 Makefile.global.in的$(recurse)宏自动生成了target&#xff0c;可以方便的进入内存目录进行编译。 all: all-common-recurse all-common-recurse: submak…

c语言中的数组(上)

数组的概念 数组是⼀组相同类型元素的集合&#xff1b; 数组中存放的是1个或者多个数据&#xff0c;但是数组元素个数不能为0。 数组中存放的多个数据&#xff0c;类型是相同的。 数组分为⼀维数组和多维数组&#xff0c;多维数组⼀般⽐较多⻅的是⼆维数组。 数组创建 在C语言…

戴尔电脑设置u盘启动_戴尔电脑设置u盘启动多种方法

最近有很多网友问&#xff0c;戴尔台式机怎么设置u盘启动&#xff0c;特别是近两年的戴尔台式机比较复杂&#xff0c;有些网友不知道怎么设置&#xff0c;其实设置u盘启动有两种方法&#xff0c;下面小编教大家戴尔电脑设置u盘启动方法。 戴尔电脑设置u盘启动方法一、戴尔进入b…