【消息队列开发】 实现消息持久化

文章目录

  • 🍃前言
  • 🍀消息存储格式设计
    • 🚩queue_data文件设计
    • 🚩queue_stat文件设计
    • 🚩拓展
  • 🎄实现统计文件(queue_stat)的读写
  • ⭕总结

🍃前言

本次开发目标,实现消息持久化
在这里插入图片描述

🍀消息存储格式设计

在前面最开始博主的设计就为将消息存储在硬盘上,那么我们应该以怎样的格式存储在硬盘上面呢?

我们的消息是需要依附于队列的,因此我们在存储的时候,可以将消息按照队列的维度展开。

在前面数据创建的时候,我们有了一个data目录(meta.db就在这个目录中)

我们可以在data中创建一些子目录,子目录的名字就是队列名。
在这里插入图片描述
然后我们再每个队列的子目录下,再分配两个文件,来存储消息

  1. 第一个文件:queue_data这里保存消息的内容
  2. 第二个文件:queue_stat 这里用来保存消息的统计信息(具体统计什么信息,后买你会说到)

🚩queue_data文件设计

关于queue_data.txt文件的设计

我是这样设计的,由于queue_data文件是一个二进制格式的文件。

所以我做出一下约定,这个文件包含若干个消息,每个消息都已二进制的方式进行存储。那么每个消息约定有以下几部分组成

在这里插入图片描述
而相应的二进制数据模块又由以下几部分组成

在这里插入图片描述

起初在设计消息类的时候,还涉及两个元素offsetBeg,offsetEnd,没有让它们序列化
在这里插入图片描述
此时呢,这两个属性就不会跟着Message进入硬盘中。而是在内存中进行存储,方便随时找到内存中的Message对象,就能找到对应的Message对象了

如此以来我们的queue_data文件设计就完成了

🚩queue_stat文件设计

那么queue_stat文件是用来干嘛的呢?

再上面的queue_data文件中有一个属性叫isValid,代表的是 是否持久化,也就是需要进行删除

对于Broker Server来说消息是需要新增,也需要删除的.

生产者生产-一个消息过来,就得新增这个消息

消费者把这个消息消费掉,这个消息就得删除.

新增和删除,对于内存中来说,好办 (直接使用一一些集合类)

但是在文件。上就麻烦了。新增消息可以直接把新的消息追加到文件末尾

删除消息不好搞文件可以视为是一个"顺序表"这样的结构如果直接

删除中间元素,就需要涉及到类似于"顺序表搬运",这样的操作,效率是非常低的

因此,使用这种搬运的方式删除,是不合适的

因此我们使用逻辑删除,是比较合适的

  • isValid为1,有效
  • isValid为0,无效

但是呢,随着时间的推移,这些文件可能越来越大,并且可能大部分都是无效消息,针对这种情况,就需要考虑对当前文件进行垃圾回收

关于垃圾回收,博主这里使用的复制算法

我们只需要直接遍历原有的消息数据文件,把所有的有效的数据拷贝到一个新的文件中,再把之前的整个旧文件都删除就好了

那我们什么时候执行垃圾回收呢?

此处我们这里做出这样的约定,当总数目超过2000(这个可以随意定义),并且有效消息的数目低于总消息数目的50%(随意定义),就触发垃圾回收

如此以来,我们queue_stat文件的作用,就体现出来,

  • 用来记录总消息数目与有效消息数目

queue_stat文件定义为文本格式,只存一行数据,一行有两列:

  • 第一列 是总的消息数目
  • 第二列 是有效消息数目
  • 两者之间使用 \t 分割
  • 形如2000\t1888

如此以来,queue_stat文件也就此完成了

🚩拓展

如果整个队列中,消息特别特别多,而且都是有效消息

此时就会导致整个消息的数据库文件特别大,后续针对这个文件的操作,成本也会上移

假如这个文件非常大(10G),触发一次GC,整体耗时就会非常高

虽然博主这里没有解决该问题,但是可以提供以下思路

  1. 需要专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息数目是多少,无效消息是多少.
  2. 设计策略,什么时候触发文件的拆分.什么时候触发文件的合并

🎄实现统计文件(queue_stat)的读写

统计文件读写来说相对较为简单,所以这里就不进行讲解了,相关注解已包含再代码中,实现代码如下:

// 定义一个内部类, 来表示该队列的统计信息
// 有限考虑使用 static, 静态内部类.
static public class Stat {// 此处直接定义成 public, 就不再搞 get set 方法了.// 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.public int totalCount;  // 总消息数量public int validCount;  // 有效消息数量
}// 预定消息文件所在的目录和文件名
// 这个方法, 用来获取到指定队列对应的消息文件所在路
private String getQueueDir(String queuename) {return ".data" + queuename;
}
// 这个方法用来获取该队列的消息数据文件路径
// 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改.
// .bin / .dat
private String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";
}// 这个方法用来获取该队列的消息统计文件路径
private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";
}
private Stat readStat(String queuename) {// 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容Stat stat = new Stat();try(InputStream inputStream = new FileInputStream(getQueueStatPath(queuename))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;
}
private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try(OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.print(stat.totalCount + "/t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}
}
// 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {// 1. 先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在, 就创建这个目录boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4. 给消息统计文件, 设定初始值. 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);
}

除此之外呢,博主还提供了两个方法,作用分别为

  • 删除队列的文件和目录
  • 检查队列的目录和文件是否存在.

实现如下:

public void destroyQueueFiles(String queueName) throws IOException {// 先删除里面的文件, 再删除目录.File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败, 都算整体删除失败.throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());}
}// 检查队列的目录和文件是否存在.
// 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
public boolean checkFilesExits(String queueName) {// 判定队列的数据文件和统计文件是否都存在!!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;
}

⭕总结

关于《【消息队列开发】 实现消息持久化》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/744179.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024阿里云域名优惠口令大全(3月更新)

2024年阿里云域名优惠口令,com域名续费优惠口令“com批量注册更享优惠”,cn域名续费优惠口令“cn注册多个价格更优”,cn域名注册优惠口令“互联网上的中国标识”,阿里云优惠口令是域名专属的优惠码,可用于域名注册、续…

C# MES通信从入门到精通(1)——串口传输文件

前言: 在上位机软件开发领域,有一些工厂的mes系统需要我们通过串口发送文件的方式把一些图片或者检测数据csv文件等发送给服务器,这种方式是一些比较旧的工厂采用的方式,但是这种方式也是存在的,本文就是讲解如何使用串口发送文件详情见下文。 1、串口发送文件思路 将需…

【刷题节】美团2024年春招第一场笔试【技术】

1.小美的平衡矩阵 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner scanner new Scanner(System.in);int n scanner.nextInt();int[][] nums new int[n][n], sum new int[n][n];char[] chars;for (int i 0; i < n; i) {…

介绍Oracle的SQL调化健康检查脚本(SQLHC)

概述 Oracle提供了一个SQL调优健康检查脚本&#xff08;SQLHC&#xff09;&#xff0c;用于检查需要优化的SQL的运行环境&#xff0c;生成报告以便帮助DBA找到SQL性能不佳的原因。SQLHC是SQLT的一个子集&#xff08;我后续的文章会介绍SQLT&#xff09;&#xff0c;但SQLHC与S…

迁移学习怎么用

如果想实现一个计算机视觉应用&#xff0c;而不想从零开始训练权重&#xff0c;比方从随机初始化开始训练&#xff0c;更快的方式是下载已经训练好权重的网络结构&#xff0c;把这个作为预训练&#xff0c;迁移到你感兴趣的新任务上。ImageNet、PASCAL等等数据库已经公开在线。…

C#,数值计算,希尔伯特矩阵(Hilbert Matrix)的算法与源代码

Hilbert, David (1862-1943) 1 希尔伯特(Hilbert) 德国数学家,在《几何学基础》中提出了第一套严格的几何公理(1899年)。他还证明了自己的系统是自洽的。他发明了一条简单的空间填充曲线,即埃里克魏斯汀的数学世界,即希尔伯特曲线,埃里克魏斯汀的数学世界,并证明了不…

C/C++程序设计实验报告2 | 循环结构实验

本文整理自博主学校大一&#xff08;2021级&#xff09;C/C专业课的课程实验报告&#xff0c;适合学弟妹或C语言初学者入门C语言学习、练习。 编译器&#xff1a;gcc 10.3.0 ---- 注&#xff1a; 1.虽然课程名为C程序设计&#xff0c;但实际上当时校内该课的内容大部分其实都是…

ElasticSearch学习篇10_Lucene数据存储之BKD动态磁盘树

前言 基础的数据结构如二叉树衍生的的平衡二叉搜索树通过左旋右旋调整树的平衡维护数据&#xff0c;靠着二分算法能满足一维度数据的logN时间复杂度的近似搜索。对于大规模多维度数据近似搜索&#xff0c;Lucene采用一种BKD结构&#xff0c;该结构能很好的空间利用率和性能。 …

查找jdk的安装

方式1&#xff1a;which或者where java which java -- linux where java --windows 方式2: echo 使用echo 打印配置的java home环境变量 echo $JAVA_HOME$ --linux echo %JAVA_HOME% --windows 方式3&#xff1a;使用ls -lrt -a &#xff1a;显示所有文件即目录…

沃通SSL证书证券行业应用案例

金融证券行业作为现代经济体系中的重要组成部分&#xff0c;其安全性直接关系到国家经济的稳定和广大投资者的利益。沃通SSL证书基于密码技术保护传输数据的机密性、完整性&#xff0c;通过权威身份认证确保服务器身份真实性&#xff0c;已持续为众多知名证券行业客户提供服务&…

微信小程序之vue按钮切换内容变化

效果图如下&#xff1b; 上代码 <template><view class"content"><view class"searchDiv"><view class"paytab"><view class"buttab" v-for"(t,index) in tabList" :key"index" clic…

Python小设计

1. 五个PPT上的界面打印【print、input函数】 &#xff08;1&#xff09;英雄商城登陆界面 print(英雄联盟商城登录界面 ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~1. 用户登录2. 新用户注册3. 退出系统 ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~ * ~…

【Numpy】基础学习:一文了解np.expand_dims的作用、用法

【Numpy】基础学习&#xff1a;一文了解np.expand_dims的作用、用法 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望…

MySQL用法---MySQL Workbench创建数据库和表

1. 连接数据库 打开软件&#xff0c;点击左下角卡片&#xff0c;输入设置的数据库密码&#xff0c;勾选单选框 2. 了解主页面的组成部分 3. 创建数据库 先点击工具栏的创建按钮 再输入数据库名称 点击 Apply 创建 4. 创建数据表 展开数据库&#xff0c;在Tables上右键&…

docker学习(十四)docker搭建私服

docker私服搭建&#xff0c;配置域名访问&#xff0c;设置访问密码 启动registry docker run -d \-p 5000:5000 \-v /opt/data/registry:/var/lib/registry \registrydocker pull hello-world docker tag hello-world 127.0.0.1:5000/hello-world docker push 127.0.0.1:5000…

【2024-完整版】python爬虫 批量查询自己所有CSDN文章的质量分:附整个实现流程

【2024】批量查询CSDN文章质量分 写在最前面一、分析获取步骤二、获取文章列表1. 前期准备2. 获取文章的接口3. 接口测试&#xff08;更新重点&#xff09; 三、查询质量分1. 前期准备2. 获取文章的接口3. 接口测试 四、python代码实现1. 分步实现2. 批量获取文章信息3. 从exce…

WPF —— Calendar日历控件详解

1&#xff1a; Calendar的简介 日历控件用于创建可视日历&#xff0c;让用户选择日期并在选择日期时触发事件。 DisplayMode 用来调整日历显示模式&#xff0c;分为Month、Year 和Decade 三种。如下是None 2&#xff1a;Calendar控件常用的属性 SelectionMode 选中日历的类…

深入理解指针——C语言

目录 1. 内存和地址 2. 指针变量和地址 3. 指针变量类型的意义 4. const修饰指针 5. 指针运算 6. 野指针 7. assert断言 8. 指针的使用和传址调用 9. 数组名的理解 10. 使用指针访问数组 11. 一维数组传参的本质 12. 冒泡排序 13. 二级指针 14. 指针数组 15. 指…

如何在Windows系统安装Node.js环境并制作html页面发布公网远程访问?

文章目录 前言1.安装Node.js环境2.创建node.js服务3. 访问node.js 服务4.内网穿透4.1 安装配置cpolar内网穿透4.2 创建隧道映射本地端口 5.固定公网地址 前言 Node.js 是能够在服务器端运行 JavaScript 的开放源代码、跨平台运行环境。Node.js 由 OpenJS Foundation&#xff0…

创建一个Django项目

安装python、pip、Django 找不到python命令&#xff0c;请将C:\Python33、C:\Python33\Lib、C:\Python33\libs三个地址加入环境变量。 get-pip.py下载&#xff1a;https://bootstrap.pypa.io/ django版本需要与python版本对应&#xff0c;并将C:\Python33\Scripts加入环境变…