文章目录
- 数据库设计
- SQLite
- 配置数据库
- 实现 数据库
- 关于哈希表等复杂类的存储
- 启动数据库
- 文件设计
- 消息持久化
- 消息属性格式
- 核心方法
- 消息序列化
- 消息文件回收
- 统一硬盘存储管理
- 内存存储管理
- 线程安全
- 数据结构实现
数据库设计
数据库主要存储交换机、队列、绑定
SQLite
此处考虑的是更轻量的数据库SQLite, 因为⼀个完整的 SQLite 数据库,只有⼀个单独的不到1M的可执⾏⽂件,在Java中使用SQLite,不需要额外安装,只需要引入依赖即可,同时采用 mybatis 来管理数据库,完成我们数据存储方面的需求
SQLite,只是一个本地的数据库,这个数据库相当于直接操作本地硬盘文件,
因此需要在配置文件中配置好数据库文件的路径
配置数据库
- 直接在pom.xml⽂件中引⼊
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.42.0.0</version>
</dependency>
- 然后在 application.yml 配置⽂件中
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC
实现 数据库
1.此处我们根据之前的需求分析,对 application.yml 添加如下配置:
mybatis:
mapper-locations: classpath:mapper/**Mapper.xml
2.创建一个对应的 interface 实现包括但不限于建表的方法来操作数据库,同时注入Spring 容器中
3.创建 mapper⽬录和⽂件 MetaMapper.xml 并在 MetaMapper.xml 中利用 MyBits 实现 后续会用到的数据库 CRUD 功能
关于哈希表等复杂类的存储
- 说明:转成 json 格式的字符串来表示,在数据库中直接利用 varchar 类型即可
- 转换思想:
- 比如 MyBatis 往数据库中写数据, 就会调用对象的 getter 方法,拿到属性的值,再往数据库中写.如果这个过程中,让 getArguments 得到的结果是 String 类型的,此时,就可以直接把这个数据写到数据库了
- 比如 MyBatis 从数据库读数据的时候,就会调用对象的 setter 方法,把数据库中读到的结果设置到对象的属性中,如果这个过程中,让 setArguments,参数是一个 String,并且在setArquments 内部针对字符串解析,解析成一个 Map 对象
- 具体实现
public String getArguments(){// Map 类型转换为 String(json)ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";
}public void setArguments(String arguments){ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}
}
启动数据库
- 在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:
- 如果数据库存在,表也都有了,不做任何操作
- 如果数据库不存在,则创建库,创建表,构造默认数据
- 依据下列框图构造⼀个类 DataBaseManager 来管理数据库
我们需要用 依赖查找 得到这个类.故而需要给项目启动类 增添 一个 静态属性:容器上下文
后续,我们就可以通过
MqApplication.context = SpringApplication.run(MqApplication.class);
来直接直接拿到 Spring 对象
文件设计
文件这一块主要存储的是消息
消息持久化
消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
在 data 中创建⼀些⼦⽬录,每个队列对应⼀个⼦⽬录,⼦⽬录名就是队列名
消息属性格式
使用两个文件:
- queue_data.txt 保存消息的内容
- queue.stat.txt 保存消息的统计内容(总消息 \t 有效消息
核心方法
- 垃圾回收
- 统计文件读写
- 创建消息目录和文件
- 删除消息目录和文件
- 消息序列化
- 把消息写入文件中
- 从文件中删除消息(逻辑删除)
- 从硬盘中恢复数据到内存
消息序列化
我们知道在存储时,我们需要保存到文件,而文件只能存储字符串/二进制数据,无法直接存储消息对象,同时通过socket套接字在网络中传输时,也需要转为二进制,因此消息的序列化与反序列化尤为重要
tip:此处不使⽤ json 进⾏序列化,由于 Message,⾥⾯存储是⼆进制数据。⽽jason序列化得到的结果是⽂本数据,JSON格式中有很多特殊符号,:"{}这些符号会影响 json
格式的解析如果存文本,你的键值对中不会包含上述特殊符号,如果存二进制,那就不好说.万一某个二进制的字节正好就和 上述特殊符号 的ascii样了,此时就可能会引起 json 解析的格式错误~~
实现如下:
// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组// 可以把 object 序列化的数据逐渐写入该流对象,再转为 byte[]try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){// 把该对象序列化, 写入objectOutputStream中,因为其关联byteArrayOutputStream// 所以相当于写入了 byteArrayOutputStream 中objectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}// 把一个字节数组,反序列化成一个对象public static Object fromByte(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){object = objectInputStream.readObject();}}return object;}
消息文件回收
由于当前会不停的往消息⽂件中写⼊消息,并且删除消息只是逻辑删除,这就可能导致消息⽂件越来越⼤,并且包含⼤量⽆⽤的消息。我们需要实现垃圾文件的回收
- 此处使⽤的是复制算法。如下:
- 此处就要⽤到我们每个队列⽬录中,所对应的另⼀个⽂件 queue_stat.txt了,使⽤这个⽂件来保存消息的统计信息
- 只存⼀⾏数据,⽤ \t 分割, 左边是 queue_data.txt 中消息的总数⽬,右边是 queue_data.txt中有
效的消息数⽬。 形如 2000\t1500, 代表该队列总共有2000条消息,其中有效消息为1500条
所以此处我们就约定,当消息总数超过2000条,并且有效消息数⽬低于总消息数的50%,就处理⼀次垃圾回收GC
具体实现代码:
// 检查是否需要进行GCpublic boolean checkGC(String queueName){// 判断Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){return true;}return false;}// GC操作使用复制算法,会创建一个新的文件出来,这里约定新文件的位置public String getQueueDataNewPath(String queueName){return getQueueDir(queueName) + "/queue_data_new.txt";}// 垃圾回收机制public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {synchronized (queue){// 统计花费的时间long gcBeg = System.currentTimeMillis();// 1.创建新文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()){throw new MqException("[MessageFileManager] gc 时发现该队列queue_data_new 已经存在");}boolean ok = queueDataNewFile.createNewFile();if (!ok){throw new MqException("[MessageFileManager] 创建文件失败 queueName=" + queueDataNewFile.getAbsolutePath());}// 2.读取有效消息LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3.将有效消息写入文件try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for(Message message : messages){byte[] buffer = BinaryTool.toBytes(message);dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4.删除旧的文件File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok){throw new MqException("[MessageFileManager] 删除旧的文件内容失败 queueDataOldFile="+ queueDataOldFile.getAbsolutePath());}// 5.重命名ok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok){throw new MqException("[MessageFileManager] 文件重命名失败 queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}//6.更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName()+ "time=" + (gcEnd - gcBeg) + "ms");}}
统一硬盘存储管理
上述我们存储在硬盘中的数据,分为了两个,⼀个是存放数据库中,⼀个是存放在⽂件中。
我们需要统⼀封装⼀个类对上⾯硬盘数据进⾏管理
package com.example.demo.mqsever.datacenter;import com.example.demo.common.MqException;
import com.example.demo.mqsever.core.Binding;
import com.example.demo.mqsever.core.Exchange;
import com.example.demo.mqsever.core.MSGQueue;
import com.example.demo.mqsever.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;// 整合 数据库:交换机、绑定、队列 + 数据文件:消息
public class DiskDataCenter {// 管理数据库的实例private DataBaseManager dataBaseManager = new DataBaseManager();// 管理数据文件中的实例private MessageFileManager messageFileManager = new MessageFileManager();public void init(){dataBaseManager.init();messageFileManager.init();}// 交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}// 队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);// 创建目录的同时,也要创建文件和目录messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);// 删除目录的同时,也要删除文件和目录messageFileManager.destroyQueueFiles(queueName);}public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueue();}// 绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}// 消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);// 判断是否需要进行 GC 操作if (messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}}
内存存储管理
借助内存中的⼀些列数据结构 ,保存 交换机、队列、绑定、消息,⼴泛使⽤了 哈希表、链表、嵌套的数据结构等使
⽤内存管理上述的数据,对于MQ来说,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)
线程安全
此处为了保证线程安全,统一使用 线程安全的 ConcurrentHashMap.同时再编写相关代码的时候,要考虑:要不要加锁?锁加到哪⾥?
数据结构实现
// key 是 exchangeName, value 是 Exchange 对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();// key 是 queueName, value 是 MSGQueue 对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();// 第一个 key 是 exchangeName, 第二个 key 是 queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();// key 是 messageId, value 是 Message 对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();// key 是 queueName, value 是一个 Message 的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();// 第一个 key 是 queueName, 第二个 key 是 messageIdprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();