RabbitMQ 模拟实现【三】:存储设计

文章目录

  • 数据库设计
    • SQLite
    • 配置数据库
    • 实现 数据库
    • 关于哈希表等复杂类的存储
    • 启动数据库
  • 文件设计
    • 消息持久化
    • 消息属性格式
    • 核心方法
    • 消息序列化
    • 消息文件回收
  • 统一硬盘存储管理
  • 内存存储管理
    • 线程安全
    • 数据结构实现

数据库设计

数据库主要存储交换机、队列、绑定

SQLite

此处考虑的是更轻量的数据库SQLite, 因为⼀个完整的 SQLite 数据库,只有⼀个单独的不到1M的可执⾏⽂件,在Java中使用SQLite,不需要额外安装,只需要引入依赖即可,同时采用 mybatis 来管理数据库,完成我们数据存储方面的需求
SQLite,只是一个本地的数据库,这个数据库相当于直接操作本地硬盘文件,
因此需要在配置文件中配置好数据库文件的路径

配置数据库

  1. 直接在pom.xml⽂件中引⼊
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.42.0.0</version>
</dependency>
  1. 然后在 application.yml 配置⽂件中
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC

实现 数据库

1.此处我们根据之前的需求分析,对 application.yml 添加如下配置:

mybatis:
mapper-locations: classpath:mapper/**Mapper.xml

2.创建一个对应的 interface 实现包括但不限于建表的方法来操作数据库,同时注入Spring 容器中
在这里插入图片描述3.创建 mapper⽬录和⽂件 MetaMapper.xml 并在 MetaMapper.xml 中利用 MyBits 实现 后续会用到的数据库 CRUD 功能
在这里插入图片描述

关于哈希表等复杂类的存储

  • 说明:转成 json 格式的字符串来表示,在数据库中直接利用 varchar 类型即可
  • 转换思想:
  • 比如 MyBatis 往数据库中写数据, 就会调用对象的 getter 方法,拿到属性的值,再往数据库中写.如果这个过程中,让 getArguments 得到的结果是 String 类型的,此时,就可以直接把这个数据写到数据库了
  • 比如 MyBatis 从数据库读数据的时候,就会调用对象的 setter 方法,把数据库中读到的结果设置到对象的属性中,如果这个过程中,让 setArguments,参数是一个 String,并且在setArquments 内部针对字符串解析,解析成一个 Map 对象
  • 具体实现
public String getArguments(){// Map 类型转换为 String(json)ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";
}public void setArguments(String arguments){ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}
}

启动数据库

  • 在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:
  1. 如果数据库存在,表也都有了,不做任何操作
  2. 如果数据库不存在,则创建库,创建表,构造默认数据
  • 依据下列框图构造⼀个类 DataBaseManager 来管理数据库
    在这里插入图片描述
    我们需要用 依赖查找 得到这个类.故而需要给项目启动类 增添 一个 静态属性:容器上下文
    在这里插入图片描述
    在这里插入图片描述

后续,我们就可以通过
MqApplication.context = SpringApplication.run(MqApplication.class);
来直接直接拿到 Spring 对象

文件设计

文件这一块主要存储的是消息

消息持久化

消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
在 data 中创建⼀些⼦⽬录,每个队列对应⼀个⼦⽬录,⼦⽬录名就是队列名
在这里插入图片描述

消息属性格式

使用两个文件:

  • queue_data.txt 保存消息的内容
  • queue.stat.txt 保存消息的统计内容(总消息 \t 有效消息
    在这里插入图片描述

核心方法

  • 垃圾回收
  • 统计文件读写
  • 创建消息目录和文件
  • 删除消息目录和文件
  • 消息序列化
  • 把消息写入文件中
  • 从文件中删除消息(逻辑删除)
  • 从硬盘中恢复数据到内存

消息序列化

我们知道在存储时,我们需要保存到文件,而文件只能存储字符串/二进制数据,无法直接存储消息对象,同时通过socket套接字在网络中传输时,也需要转为二进制,因此消息的序列化与反序列化尤为重要

tip:此处不使⽤ json 进⾏序列化,由于 Message,⾥⾯存储是⼆进制数据。⽽jason序列化得到的结果是⽂本数据,JSON格式中有很多特殊符号,:"{}这些符号会影响 json
格式的解析如果存文本,你的键值对中不会包含上述特殊符号,如果存二进制,那就不好说.万一某个二进制的字节正好就和 上述特殊符号 的ascii样了,此时就可能会引起 json 解析的格式错误~~

实现如下:

// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组// 可以把 object 序列化的数据逐渐写入该流对象,再转为 byte[]try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){// 把该对象序列化, 写入objectOutputStream中,因为其关联byteArrayOutputStream// 所以相当于写入了 byteArrayOutputStream 中objectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}// 把一个字节数组,反序列化成一个对象public static Object fromByte(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){object = objectInputStream.readObject();}}return object;}

消息文件回收

由于当前会不停的往消息⽂件中写⼊消息,并且删除消息只是逻辑删除,这就可能导致消息⽂件越来越⼤,并且包含⼤量⽆⽤的消息。我们需要实现垃圾文件的回收

  • 此处使⽤的是复制算法。如下:
    在这里插入图片描述
  • 此处就要⽤到我们每个队列⽬录中,所对应的另⼀个⽂件 queue_stat.txt了,使⽤这个⽂件来保存消息的统计信息
  • 只存⼀⾏数据,⽤ \t 分割, 左边是 queue_data.txt 中消息的总数⽬,右边是 queue_data.txt中有
    效的消息数⽬。 形如 2000\t1500, 代表该队列总共有2000条消息,其中有效消息为1500条
    所以此处我们就约定,当消息总数超过2000条,并且有效消息数⽬低于总消息数的50%,就处理⼀次垃圾回收GC
    具体实现代码:
// 检查是否需要进行GCpublic boolean checkGC(String queueName){// 判断Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){return true;}return false;}// GC操作使用复制算法,会创建一个新的文件出来,这里约定新文件的位置public String getQueueDataNewPath(String queueName){return getQueueDir(queueName) + "/queue_data_new.txt";}// 垃圾回收机制public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {synchronized (queue){// 统计花费的时间long gcBeg = System.currentTimeMillis();// 1.创建新文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()){throw new MqException("[MessageFileManager] gc 时发现该队列queue_data_new 已经存在");}boolean ok = queueDataNewFile.createNewFile();if (!ok){throw new MqException("[MessageFileManager] 创建文件失败 queueName=" + queueDataNewFile.getAbsolutePath());}// 2.读取有效消息LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3.将有效消息写入文件try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for(Message message : messages){byte[] buffer = BinaryTool.toBytes(message);dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4.删除旧的文件File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok){throw new MqException("[MessageFileManager] 删除旧的文件内容失败 queueDataOldFile="+ queueDataOldFile.getAbsolutePath());}// 5.重命名ok =  queueDataNewFile.renameTo(queueDataOldFile);if (!ok){throw new MqException("[MessageFileManager] 文件重命名失败 queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}//6.更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName()+ "time=" + (gcEnd - gcBeg) + "ms");}}

统一硬盘存储管理

上述我们存储在硬盘中的数据,分为了两个,⼀个是存放数据库中,⼀个是存放在⽂件中。
我们需要统⼀封装⼀个类对上⾯硬盘数据进⾏管理

package com.example.demo.mqsever.datacenter;import com.example.demo.common.MqException;
import com.example.demo.mqsever.core.Binding;
import com.example.demo.mqsever.core.Exchange;
import com.example.demo.mqsever.core.MSGQueue;
import com.example.demo.mqsever.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;// 整合 数据库:交换机、绑定、队列 + 数据文件:消息
public class DiskDataCenter {// 管理数据库的实例private DataBaseManager  dataBaseManager = new DataBaseManager();// 管理数据文件中的实例private MessageFileManager messageFileManager = new MessageFileManager();public void init(){dataBaseManager.init();messageFileManager.init();}// 交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}// 队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);// 创建目录的同时,也要创建文件和目录messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);// 删除目录的同时,也要删除文件和目录messageFileManager.destroyQueueFiles(queueName);}public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueue();}// 绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}// 消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);// 判断是否需要进行 GC 操作if (messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}}

内存存储管理

借助内存中的⼀些列数据结构 ,保存 交换机、队列、绑定、消息,⼴泛使⽤了 哈希表、链表、嵌套的数据结构等使
⽤内存管理上述的数据,对于MQ来说,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)

线程安全

此处为了保证线程安全,统一使用 线程安全的 ConcurrentHashMap.同时再编写相关代码的时候,要考虑:要不要加锁?锁加到哪⾥?

数据结构实现

    // key 是 exchangeName, value 是 Exchange 对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();// key 是 queueName, value 是 MSGQueue 对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();// 第一个 key 是 exchangeName, 第二个 key 是 queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();// key 是 messageId, value 是 Message 对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();// key 是 queueName, value 是一个 Message 的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();// 第一个 key 是 queueName, 第二个 key 是 messageIdprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/743867.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

完整的通过git命令框和windows窗口将本地文件上传到gitee远程仓库流程步骤

1.下载git 这个网站搜索git官方&#xff0c;去下载就行了 2.打开git安装后的Git Bash命令框 3.在Git Bash命令框设置一下要远程链接的gitee账号 git config --global user.name “名字”Git config --global user.email “邮箱” 4.查看一下账号设置 git config --global -…

Chitosan-PEG-DSPE 壳聚糖修聚乙二醇磷脂 DSPE-PEG-Chitosan

产品简称&#xff1a;DSPE-PEG-Chitosan、Chitosan-PEG-DSPE、DSPE-PEG-CS、CS-PEG-DSPE 产品中文名称&#xff1a;壳聚糖-聚乙二醇-磷脂、磷脂-聚乙二醇-壳聚糖 分子量&#xff1a;可以根据要求定制 保存条件&#xff1a; -20干燥保存 有效期&#xff1a; 一年 纯度&…

创建SpringCloudGateWay

创建SpringCloudGateWay 本案例基于尚硅谷《谷粒商城》项目&#xff0c;视频27 创建测试API网关 1、创建module 2、引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:x…

C++进阶--mep和set的模拟实现

红黑树链接入口 底层容器 模拟实现set和map时常用的底层容器是红黑树。 红黑树是一种自平衡的搜索二叉树&#xff0c;通过对节点进行颜色标记来保持平衡。 在模拟实现set和map时&#xff0c;可以使用红黑树来按照元素的大小自动排序&#xff0c;并且保持插入和删除操作的高效…

Set cancelled by MemoryScratchSinkOperator

Bug信息 Caused by: com.starrocks.connector.spark.exception.StarrocksInternalException: StarRocks server StarRocks BE{host=10.9.14.39, port=9060} internal failed, status code [CANCELLED] error message is [Set cancelled by MemoryScratchSinkOperator]Bug产生的…

微信名【无感】的同学,你还好吗?

今天遇到个选择了微信一对一服务的同学&#xff0c;问Python问题&#xff0c;问题比较简单。 回答完问题&#xff0c;我就说了一句&#xff1a;问题比较简单&#xff0c;随意打赏一个红包就行了。 然后我就被拉黑了&#xff0c;然后我的解答问题&#xff0c;收到了一堆投诉&…

深入解析Java中锁机制以及底层原理

一、概述 1.1 背景 概念&#xff1a;锁是多线程编程中的机制&#xff0c;用于控制对共享资源的访问。可以防止多个线程同时修改或读取共享资源&#xff0c;从而保证线程安全。 作用&#xff1a;锁用于实现线程间的互斥和协调&#xff0c;确保在多线程环境下对共享资源的访问顺…

Flutter开发入门——Widget和常用组件

1.什么是Widget&#xff1f; 在Flutter中几乎所有的对象都是一个 widget 。与原生开发中“控件”不同的是&#xff0c;Flutter 中的 widget 的概念更广泛&#xff0c;它不仅可以表示UI元素&#xff0c;也可以表示一些功能性的组件如&#xff1a;用于手势检测的 GestureDetecto…

spring中事务失效的场景有哪些?

异常捕获处理 在方法中已经将异常捕获处理掉并没有抛出。 事务只有捕捉到了抛出的异常才可以进行处理&#xff0c;如果有异常业务中直接捕获处理掉没有抛出&#xff0c;事务是无法感知到的。 解决&#xff1a;在catch块throw抛出异常。 抛出检查异常 spring默认只会回滚非检…

ChatGPT浪潮来袭!谁先掌握,谁将领先!

任正非在接受采访时说 今后职场上只有两种人&#xff0c; 一种是熟练使用AI的人&#xff0c; 另一种是创造AI工具的人。 虽然这个现实听起来有些夸张的残酷&#xff0c; 但这就是我们必须面对的事实 &#x1f4c6; 对于我们普通人来说&#xff0c;我们需要努力成为能够掌握…

基于STM32的智慧农业管理系统设计与实现

文章目录 一、前言1.1 项目介绍【1】项目功能【2】设计实现的功能【3】项目硬件模块组成 1.2 设计思路1.3 传感器功能介绍1.4 开发工具的选择 二、EMQX开源MQTT服务器框架三、购买ECS云服务器3.1 登录官网3.2 购买ECS服务器3.3 配置安全组3.4 安装FinalShell3.5 远程登录到云服…

xsslabs靶场通关(持续更新)

文章目录 前言一、level1思路实现 二、levle2思路 三、level3思路实现 四、level4思路实现 五、level5思路实现 六、level6思路实现 七、level7思路实现 八、level8思路实现 九、level9思路实现 前言 本篇文章将介绍在xsslabs这个靶场&#xff08;在不知道源码的前提下&#x…

Linux从0到1——Linux环境基础开发工具的使用(上)

Linux从0到1——Linux环境基础开发工具的使用&#xff08;上&#xff09; 1. Linux软件包管理器yum1.1 yum介绍1.2 用yum来下载软件1.3 更新yum源 2. Linux编辑器&#xff1a;vi/vim2.1 vim的基本概念2.2 vim的基本操作2.3 vim正常模式命令集2.4 vim底行模式命令集2.5 视图模式…

Java初阶数据结构队列的实现

1.队列的概念 1.队列就是相当于排队打饭 2.在排队的时候就有一个队头一个队尾。 3.从队尾进对头出 4.所以他的特点就是先进先出 所以我们可以用链表来实现 单链表实现要队尾进队头出{要有last 尾插头删} 双向链表实现效率高&#xff1a;不管从哪个地方当作队列都是可以的&…

OpenMP 编程模型

OpenMP 内存模型 共享内存模型&#xff1a; OpenMP 专为多处理器/核心、共享内存机器设计&#xff0c;底层架构可以是共享内存UMA或NUM OpenMP 执行模型 基于线程的并行&#xff1a; OpenMP 程序基于多线程来实现并行&#xff0c; 线程是操作系统可以调度的最小执行单元。 …

react 综合题-旧版

一、组件基础 1. React 事件机制 javascript 复制代码<div onClick{this.handleClick.bind(this)}>点我</div> React并不是将click事件绑定到了div的真实DOM上&#xff0c;而是在document处监听了所有的事件&#xff0c;当事件发生并且冒泡到document处的时候&a…

Facebook:连接世界的社交巨人

在当今数字化时代&#xff0c;Facebook作为全球最大的社交媒体平台之一&#xff0c;扮演着连接世界的重要角色。它不仅仅是一个社交网站&#xff0c;更是一个数字化的社交生态系统&#xff0c;影响着全球数十亿用户的生活和交流方式。本文将深入探讨Facebook的起源、用户规模和…

uniapp——第1篇:基于vue语法的、比原生开发屌的小程序开发

前提&#xff0c;建议先学会前端几大基础&#xff1a;HTML、CSS、JS、Ajax&#xff0c;还有一定要会Vue!&#xff08;Vue2\Vue3&#xff09;都要会&#xff01;&#xff01;&#xff01;不然不好懂 博主作为大二前端小白&#xff0c;刚刚接触前端微信小程序开发时选择的是基于“…

electron + vtkjs加载模型异常,界面显示类似于图片加载失败的图标

electron vtkjs加载模型显示异常&#xff0c;类似于图片加载失败的效果&#xff0c;如上图。 electron开发版本&#xff1a;13。 解决方法&#xff1a;升级electron版本号。 注意&#xff1a;win7最高兼容electron 22版本。

多维时序 | Matlab实现VMD-CNN-GRU变分模态分解结合卷积神经网络门控循环单元多变量时间序列预测

多维时序 | Matlab实现VMD-CNN-GRU变分模态分解结合卷积神经网络门控循环单元多变量时间序列预测 目录 多维时序 | Matlab实现VMD-CNN-GRU变分模态分解结合卷积神经网络门控循环单元多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现VMD-CN…