从零手搓一个【消息队列】实现数据的硬盘管理和内存管理(线程安全)

文章目录

  • 一、硬盘管理
    • 1, 创建 DiskDataCenter 类
    • 2, init() 初始化
    • 3, 封装交换机
    • 4, 封装队列
    • 5, 关于绑定
    • 6, 关于消息
  • 二、内存管理
    • 1, 数据结构的设计
    • 2, 创建 MemoryDataCenter 类
    • 3, 关于交换机
    • 4, 关于队列
    • 5, 关于绑定
    • 6, 关于消息
    • 7, 恢复数据
  • 三、小结


创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

整体目录结构 :
在这里插入图片描述

本文主要实现 server 包


一、硬盘管理

硬盘管理包括第二篇文章中实现的 DataBaseManager 类, 实现了数据库存储(交换机, 队列, 绑定)和第三篇文章中实现的 MessageFileManager 类, 实现了文件存储(消息)

接下来使用 DiskDataCenter 这个类, 对上述两个类进一步的整合, 封装, 统一对上层提供数据库和文件操作的 API

在这里插入图片描述

整合的好处
上层代码(服务器中的 VirtualHost 在操作硬盘数据的时候, 直接调用 DiskDataCenter 类的 API 即可, 不需要关心硬盘上的数据来自数据库还是文件, 也不需要知道具体是怎么实现的)


1, 创建 DiskDataCenter 类

要有两个成员属性, 分别是 dataBaseManager 对象和 messageFileManager 对象, 我们要持有这两个类的 API, 才能进一步封装

public class DiskDataCenter {private DataBaseManager dataBaseManager = new DataBaseManager();private MessageFileManager messageFileManager = new MessageFileManager();
}

2, init() 初始化

dataBaseManager 中需要对数据库进行初始化, 所以在当前类提供一个初始化 API

    public void init() {dataBaseManager.init();}

3, 封装交换机

    public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges() {return dataBaseManager.selectAllExchanges();}

4, 封装队列

创建队列的时候要创建对应的目录( queue 目录)和文件( data 文件和 stat 文件), 同理, 删除的时候和也要删除对应的目录和文件

	public void insertQueue(MessageQueue queue) throws IOException {dataBaseManager.insertQueue(queue);messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);messageFileManager.deleteFiles(queueName);}public List<MessageQueue> selectAllQueue() {return dataBaseManager.selectAllQueues();}

5, 关于绑定

	public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings() {return dataBaseManager.selectAllBindings();}

6, 关于消息

删除消息的时候顺便查一下 stat 文件, 如果需要GC, 就调用实现GC的方法(读一下 stat 文件并不是很耗时)

public void sendMessage(MessageQueue queue, Message message) throws IOException, MQException {messageFileManager.sendMessage(queue, message);}public void deleteMessage(MessageQueue queue, Message message) throws IOException, MQException {messageFileManager.deleteMessage(queue, message);// 这是逻辑删除, 而不是真正从文件中删掉这个数据, 并且需要判断是否需要进行垃圾回收if (messageFileManager.isNeedGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedList<Message> selectAllMessages(String queueName) throws MQException, IOException {return messageFileManager.loadAllMessageFromQueue(queueName);}

二、内存管理

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构

对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发

对于交换机, 队列, 绑定, 消息的查找, 直接在内存中找, 增加, 删除需要在管理内存的同时也管理硬盘
在这里插入图片描述


1, 数据结构的设计

需要在内存中存储交换机, 队列, 绑定, 交换机, 就需要使用合适的数据结构来辅助我们更方便的存储和管理数据, 并且还需要保证线程安全

  • 交换机
    使用 ConcurrentHashMap<String, Exchange>, key: exchangeName, value: exchange
  • 队列
    使用 ConcurrentHashMap<String, MessageQueue>, key: queueName, value: queue
  • 绑定
    使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>>, key: exchangeName, value: [key: queueName, value: binding]

生产者发布消息的时候, 要指定一个 exchangeName, 服务器需要拿着这个 exchangeName, 在绑定表里查有没有已经绑定过的队列, 如果有队列, 并且符合 bindingKey 和 routingKey 的匹配规则, 才能转发消息
绑定是从交换机和队列这两个维度建立的, 所以使用嵌套的 Map 存储绑定表

  • 消息
    使用 ConcurrentHashMap<String, Message>, key: messageId, value: message

  • 在队列里的 N 条消息
    使用 ConcurrentHashMap<String, LinkedList< Message>>, key: queueName, value: 该队列中存储消息的链表

  • 在队列里的 N 条未确认的消息( “待确认队列” )
    使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Message>>, key: queueName, value: [key: messageId, value: message]

  1. 之前的文章说明过, 消费者消费消息采用"推"的方式, 即: 队列中有消息之后, 服务器主动推送给订阅了该队列的消费者
  2. 推送之后, 如果消费者是手动应答
  3. 在消费者还没应答之前, 服务器视为消费者还没成功消费消息, 需要备份这条消息, 所以这个嵌套的Map 相当于一个 “待确认队列”
  4. 消费者确认应答之后, 服务器再从这个 “待确认队列” 中删除该消息

在这里插入图片描述

2, 创建 MemoryDataCenter 类

public class MemoryDataCenter {private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, MessageQueue> queueMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, LinkedList<Message>> messageInQueueMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> messageInQueueNotAckMap = new ConcurrentHashMap<>();
}

3, 关于交换机

    public void addExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter.addExchange()] " + exchange.getName() + "交换机添加成功");}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void removeExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter.removeExchange()] " + exchangeName + "交换机删除成功");}

4, 关于队列

	public void addQueue(MessageQueue queue) {queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter.addQueue()] " + queue.getName() + "队列添加成功");}public MessageQueue getQueue(String queueName) {return queueMap.get(queueName);}public void removeQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter.removeQueue()] " + queueName + "队列删除成功");}

5, 关于绑定

  • 1, 添加绑定
    注意 bindingsMap 和 bindingMap 不同, bindingMap 表示 bindingsMap 的 value 值
    使用 bindingsMap.computeIfAbsent(key) 可以创建 bindingsMap 的 value 值( bindingMap )
    public void addBinding(Binding binding) throws MQException {String exchangeName = binding.getExchangeName();String queueName = binding.getQueueName();
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }// 1, 先查是否存在exchange绑定的queue, 没有则创建ConcurrentHashMap<String, Binding> bindingMap =bindingsMap.computeIfAbsent(exchangeName, V -> new ConcurrentHashMap<>());// 2, 再查exchange和queue是否已经存在绑定, 没有创建synchronized (bindingMap) {if (bindingMap.get(exchangeName) != null) {// 如果 exchange-queue 这个绑定已经存在, 不能再插入throw new MQException("[MemoryDataCenter.addBinding()] exchangeName = " + queueName+ "-queueName = " + queueName + "的绑定已经存在, 添加失败");}bindingMap.put(queueName, binding);}System.out.println("[MemoryDataCenter.addBinding()] exchangeName = " + exchangeName+ "-queueName = " + queueName + "的绑定已经添加成功");}
  • 2, 获取绑定(根据交换机和队列获取绑定)
    public Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}
  • 3, 获取绑定(根据交换机, 获取该交换机的所有绑定关系)
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}
  • 4, 删除绑定
public void removeBinding(Binding binding) throws MQException {// 1, 先判断该交换机是否存在绑定String exchangeName = binding.getExchangeName();ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {throw new MQException("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定不存在, 删除失败");}// 2, 交换机不存在绑定才能删除bindingsMap.remove(binding.getExchangeName());System.out.println("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定已删除成功");}

6, 关于消息

  • 关于 messageMap
    public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter.addMessage()] 消息添加成功");}public Message getMessage(String messageId) {return messageMap.get(messageId);}public void removeMessage(String messageId){messageMap.remove(messageId);System.out.println("MemoryDataCenter.removeMessage()] 消息删除成功");}
  • 关于 messageInQueueMap
	public void sendMessage(MessageQueue queue, Message message) {LinkedList<Message> messageList = messageInQueueMap.computeIfAbsent(queue.getName(), V -> new LinkedList<>());synchronized (messageList) {messageList.add(message); // 尾插}addMessage(message);System.out.println("[MemoryDataCenter.sendMessage()] 消息发送成功");}/*** 从队列中取走消息, 但不代表消息在内存中没了, 在 messageMap 中还保留*/public Message pollMessage(MessageQueue queue) {LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());if (messageList == null || messageList.size() == 0) {return null;}synchronized (messageList) {Message result = messageList.removeFirst(); // 头删System.out.println("[MemoryDataCenter.pollMessage()] 消息取出成功");return result;}}/*** 获取队列中的消息总数*/public int getMessageCount(MessageQueue queue) {LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());if (messageList == null) {return 0;}return messageList.size();}
  • 关于 messageInQueueNotAckMap
	public void addMessageNotAck(String queueName, Message message) {ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.computeIfAbsent(queueName,V -> new ConcurrentHashMap<>());messageNotAckMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter.addMessageNotAck()] 未确认消息添加成功");}public void removeMessageNotAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);if (messageNotAckMap == null) {return;}messageNotAckMap.remove(messageId);System.out.println("[MemoryDataCenter.removeMessageNotAck()] 未确认消息删除成功");}public Message getMessageNotAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);if (messageNotAckMap == null) {return null;}return messageNotAckMap.get(messageId);}

7, 恢复数据

当服务器重启时, 需要把硬盘上的数据恢复到内存中

	public void recover(DiskDataCenter diskDataCenter) throws MQException, IOException {// 1, 清楚所有内存数据(防止残留)exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();messageInQueueMap.clear();messageInQueueNotAckMap.clear();// 2. 从硬盘中获取数据并存储在内存// 2.1, 获取交换机数据List<Exchange> exchangeList =  diskDataCenter.selectAllExchanges();for (Exchange exchange : exchangeList) {exchangeMap.put(exchange.getName(), exchange);}// 2.2, 获取队列数据List<MessageQueue> queueList = diskDataCenter.selectAllQueue();for (MessageQueue queue : queueList) {queueMap.put(queue.getName(), queue);}// 2.3, 获取绑定数据List<Binding> bindingList = diskDataCenter.selectAllBindings();for (Binding binding : bindingList) {ConcurrentHashMap<String, Binding> bindingMap =bindingsMap.computeIfAbsent(binding.getExchangeName(), V -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}// 2.4, 获取消息数据for (MessageQueue queue : queueList) {LinkedList<Message> messageList = diskDataCenter.selectAllMessages(queue.getName());messageInQueueMap.put(queue.getName(), messageList);for (Message message : messageList) {messageMap.put(message.getMessageId(), message);}}}

恢复消息数据, 只需要恢复 messageInQueueMap 和 messageMap , 不需要关心 messageInQueueNotAckMap 因为 messageInQueueNotAckMap 只是在内存中存储, 如果服务器重启, 这块数据丢失了就丢失了, 因为 “未确认的消息” 不会被服务器真正的删除, 重启之后, “未确认的消息” 会被重新加载回内存


三、小结

本文主实现了两点:

  • 1, 实现了对数据库+文件这两个模块的进一步整合, 封装, 为上层提供了硬盘数据管理的 API
  • 2, 实现了内存数据管理, 并考虑了线程安全
    • 2.1, 设计了消息在内存中存储使用的数据结构
    • 2.2, 实现了内存中的交换机, 队列, 绑定, 消息, 以及消息和队列的关联, 这些数据的增删查

至此, 服务器对于硬盘数据和内存数据都做好了管理, 下篇文章就可以设计 VirtualHost 了, 需要对硬盘数据和内存数据再做进一步的整合, 封装, 统一管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91974.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

26 docker前后端部署

[参考博客]((257条消息) DockerNginx部署前后端分离项目(SpringBootVue)的详细教程_在docker中安装nginx实现前后端分离_这里是杨杨吖的博客-CSDN博客) (DockerNginx部署前后端分离项目(SpringBootVue)) 安装docker # 1、yum 包更新到最新 yum update # 2、安装需要的软件包…

SEO的优化教程(百度SEO的介绍和优化)

百度SEO关键字介绍&#xff1a; 百度SEO关键字是指用户在搜索引擎上输入的词语&#xff0c;是搜索引擎了解网站内容和相关性的重要因素。百度SEO关键字可以分为短尾词、中尾词和长尾词&#xff0c;其中长尾词更具有针对性和精准性&#xff0c;更易于获得高质量的流量。蘑菇号-…

构建一个TypeScript环境的node项目

本文 我们用一种不太一样的方式来创建项目 这里 我们事先创建了一个文件夹作为项目目录 然后打开项目终端 输入 npm init然后 在新弹出的对话框中 大体就是 名字随便写一个 然后 后面的回车&#xff0c;到最后一个输入 yes 然后回车 这样 我们就有一个基础的 node项目结构了…

AGV小车、机械臂协同作业实战06-任务分配算法(图解蚁群算法)代码示例java

什么是蚁群算法&#xff1f; 蚁群系统(Ant System(AS)或Ant Colony System(ACS))是由意大利学者Dorigo、Maniezzo等人于20世纪90年代首先提出来的。他们在研究蚂蚁觅食的过程中&#xff0c;发现蚁群整体会体现一些智能的行为&#xff0c;例如蚁群可以在不同的环境下&#xff0c…

排序篇(四)----归并排序

排序篇(四)----归并排序 1.归并(递归) 基本思想&#xff1a; 归并排序&#xff08;MERGE-SORT&#xff09;是建立在归并操作上的一种有效的排序算法,该算法是采用分治法&#xff08;Divide andConquer&#xff09;的一个非常典型的应用。将已有序的子序列合并&#xff0c;得到…

Hive SQL初级练习(30题)

前言 Hive 的重要性不必多说&#xff0c;离线批处理的王者&#xff0c;Hive 用来做数据分析&#xff0c;SQL 基础必须十分牢固。 环境准备 建表语句 这里建4张表&#xff0c;下面的练习题都用这些数据。 -- 创建学生表 create table if not exists student_info(stu_id st…

yolov5分割+检测c++ qt 中部署,以opencv方式(详细代码(全)+复制可用)

1&#xff1a;版本说明&#xff1a; qt 5.12.10 opencv 4.5.3 &#xff08;yolov5模型部署要求opencv>4.5.0&#xff09; 2&#xff1a;检测的代码 yolo.h #pragma once #include<iostream> #include<cmath> #include<vector> #include <opencv2/…

毅速课堂:3D打印随形水路在小零件注塑中优势明显

小零件注塑中的冷却不均匀问题常常导致烧焦现象的发生。这主要是因为传统机加工方法无法制造出足够细小的水路&#xff0c;以适应小零件的复杂形状。而3D打印技术的引入&#xff0c;尤其是随形水路的设计&#xff0c;为解决这一问题提供了新的解决方案。 3D打印随形水路技术的优…

TS编译选项——编译TS文件同时对JS文件进行编译

一、允许对JS文件进行编译 我们在默认情况下编译TS项目时是不能编译js文件的&#xff0c;如下图中的hello.js文件并未编译到dist目录下&#xff08;这里配置了编译文件放到dist目录下&#xff09; 如果我们想要实现编译TS文件同时对JS文件进行编译&#xff0c;就需要在tsconfi…

使用U3D、pico开发VR(二)——添加手柄摇杆控制移动

一、将unity 与visual studio 相关联 1.Edit->Preference->External tool 选择相应的版本 二、手柄遥控人物转向和人物移动 1.添加Locomotion System组件 选择XR Origin&#xff1b; 2.添加Continuous Move Provider&#xff08;Action-based&#xff09;组件 1>…

编程每日一练(多语言实现)基础篇:求总数问题

文章目录 一、实例描述二、技术要点三、代码实现3.1 C 语言实现3.2 Python 语言实现3.3 Java 语言实现3.4 JavaScript 语言实现 一、实例描述 集邮爱好者把所有的邮票存放在三个集邮册中&#xff0c;在A册内存放全部的十分之二&#xff0c;在B册内存放不知道是全部的七分之几&…

MyBatis的一级缓存和二级缓存:原理和作用

MyBatis的一级缓存和二级缓存&#xff1a;原理和作用 引言 在数据库访问中&#xff0c;缓存是一种重要的性能优化手段&#xff0c;它可以减少数据库查询的次数&#xff0c;加快数据访问速度。MyBatis作为一款流行的Java持久层框架&#xff0c;提供了一级缓存和二级缓存来帮助…

基于Java的大学生就业招聘系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

【数据结构】排序算法(一)—>插入排序、希尔排序、选择排序、堆排序

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.直接插入排序 2.希尔排序 3.直接选择排…

OpenCV之分水岭算法(watershed)

Opencv 中 watershed函数原型&#xff1a; void watershed( InputArray image, InputOutputArray markers ); 第一个参数 image&#xff0c;必须是一个8bit 3通道彩色图像矩阵序列&#xff0c;第一个参数没什么要说的。关键是第二个参数 markers&#xff0c;Opencv官方文档的说…

全网最全Python系列教程(非常详细)---集合讲解(学Python入门必收藏)

&#x1f9e1;&#x1f9e1;&#x1f9e1;这篇是关于Python中集合的讲解&#xff0c;涉及到以下内容&#xff0c;欢迎点赞和收藏&#xff0c;你点赞和收藏是我更新的动力&#x1f9e1;&#x1f9e1;&#x1f9e1; 1、集合是什么&#xff1f; 2、集合应该怎么去定义&#xff1f…

搭建前端框架

在终端进入web目录&#xff0c;然后创建vuecrud工程 创建工程并引入ElementUI和axios手把手教学>传送门:VueCLI脚手架搭建

力扣 -- 718. 最长重复子数组

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:int findLength(vector<int>& nums1, vector<int>& nums2) {int m nums1.size();int n nums2.size();//多开一行&#xff0c;多开一列vector<vector<int>> dp(m 1, ve…

Ghostscript 在 Linux 和 Windows 系统的应用与问题解决

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

背诵不等于理解,深度解析大模型背后的知识储存与提取

自然语言模型的背诵 (memorization) 并不等于理解。即使模型能完整记住所有数据&#xff0c;也可能无法通过微调 (finetune) 提取这些知识&#xff0c;无法回答简单的问题。 随着模型规模的增大&#xff0c;人们开始探索大模型是如何掌握大量知识的。一种观点认为这归功于 “无…