从零手搓一个【消息队列】实现数据的硬盘管理和内存管理(线程安全)

文章目录

  • 一、硬盘管理
    • 1, 创建 DiskDataCenter 类
    • 2, init() 初始化
    • 3, 封装交换机
    • 4, 封装队列
    • 5, 关于绑定
    • 6, 关于消息
  • 二、内存管理
    • 1, 数据结构的设计
    • 2, 创建 MemoryDataCenter 类
    • 3, 关于交换机
    • 4, 关于队列
    • 5, 关于绑定
    • 6, 关于消息
    • 7, 恢复数据
  • 三、小结


创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

整体目录结构 :
在这里插入图片描述

本文主要实现 server 包


一、硬盘管理

硬盘管理包括第二篇文章中实现的 DataBaseManager 类, 实现了数据库存储(交换机, 队列, 绑定)和第三篇文章中实现的 MessageFileManager 类, 实现了文件存储(消息)

接下来使用 DiskDataCenter 这个类, 对上述两个类进一步的整合, 封装, 统一对上层提供数据库和文件操作的 API

在这里插入图片描述

整合的好处
上层代码(服务器中的 VirtualHost 在操作硬盘数据的时候, 直接调用 DiskDataCenter 类的 API 即可, 不需要关心硬盘上的数据来自数据库还是文件, 也不需要知道具体是怎么实现的)


1, 创建 DiskDataCenter 类

要有两个成员属性, 分别是 dataBaseManager 对象和 messageFileManager 对象, 我们要持有这两个类的 API, 才能进一步封装

public class DiskDataCenter {private DataBaseManager dataBaseManager = new DataBaseManager();private MessageFileManager messageFileManager = new MessageFileManager();
}

2, init() 初始化

dataBaseManager 中需要对数据库进行初始化, 所以在当前类提供一个初始化 API

    public void init() {dataBaseManager.init();}

3, 封装交换机

    public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges() {return dataBaseManager.selectAllExchanges();}

4, 封装队列

创建队列的时候要创建对应的目录( queue 目录)和文件( data 文件和 stat 文件), 同理, 删除的时候和也要删除对应的目录和文件

	public void insertQueue(MessageQueue queue) throws IOException {dataBaseManager.insertQueue(queue);messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);messageFileManager.deleteFiles(queueName);}public List<MessageQueue> selectAllQueue() {return dataBaseManager.selectAllQueues();}

5, 关于绑定

	public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings() {return dataBaseManager.selectAllBindings();}

6, 关于消息

删除消息的时候顺便查一下 stat 文件, 如果需要GC, 就调用实现GC的方法(读一下 stat 文件并不是很耗时)

public void sendMessage(MessageQueue queue, Message message) throws IOException, MQException {messageFileManager.sendMessage(queue, message);}public void deleteMessage(MessageQueue queue, Message message) throws IOException, MQException {messageFileManager.deleteMessage(queue, message);// 这是逻辑删除, 而不是真正从文件中删掉这个数据, 并且需要判断是否需要进行垃圾回收if (messageFileManager.isNeedGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedList<Message> selectAllMessages(String queueName) throws MQException, IOException {return messageFileManager.loadAllMessageFromQueue(queueName);}

二、内存管理

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构

对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发

对于交换机, 队列, 绑定, 消息的查找, 直接在内存中找, 增加, 删除需要在管理内存的同时也管理硬盘
在这里插入图片描述


1, 数据结构的设计

需要在内存中存储交换机, 队列, 绑定, 交换机, 就需要使用合适的数据结构来辅助我们更方便的存储和管理数据, 并且还需要保证线程安全

  • 交换机
    使用 ConcurrentHashMap<String, Exchange>, key: exchangeName, value: exchange
  • 队列
    使用 ConcurrentHashMap<String, MessageQueue>, key: queueName, value: queue
  • 绑定
    使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>>, key: exchangeName, value: [key: queueName, value: binding]

生产者发布消息的时候, 要指定一个 exchangeName, 服务器需要拿着这个 exchangeName, 在绑定表里查有没有已经绑定过的队列, 如果有队列, 并且符合 bindingKey 和 routingKey 的匹配规则, 才能转发消息
绑定是从交换机和队列这两个维度建立的, 所以使用嵌套的 Map 存储绑定表

  • 消息
    使用 ConcurrentHashMap<String, Message>, key: messageId, value: message

  • 在队列里的 N 条消息
    使用 ConcurrentHashMap<String, LinkedList< Message>>, key: queueName, value: 该队列中存储消息的链表

  • 在队列里的 N 条未确认的消息( “待确认队列” )
    使用(嵌套的) ConcurrentHashMap<String, ConcurrentHashMap<String, Message>>, key: queueName, value: [key: messageId, value: message]

  1. 之前的文章说明过, 消费者消费消息采用"推"的方式, 即: 队列中有消息之后, 服务器主动推送给订阅了该队列的消费者
  2. 推送之后, 如果消费者是手动应答
  3. 在消费者还没应答之前, 服务器视为消费者还没成功消费消息, 需要备份这条消息, 所以这个嵌套的Map 相当于一个 “待确认队列”
  4. 消费者确认应答之后, 服务器再从这个 “待确认队列” 中删除该消息

在这里插入图片描述

2, 创建 MemoryDataCenter 类

public class MemoryDataCenter {private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, MessageQueue> queueMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, LinkedList<Message>> messageInQueueMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> messageInQueueNotAckMap = new ConcurrentHashMap<>();
}

3, 关于交换机

    public void addExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter.addExchange()] " + exchange.getName() + "交换机添加成功");}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void removeExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter.removeExchange()] " + exchangeName + "交换机删除成功");}

4, 关于队列

	public void addQueue(MessageQueue queue) {queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter.addQueue()] " + queue.getName() + "队列添加成功");}public MessageQueue getQueue(String queueName) {return queueMap.get(queueName);}public void removeQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter.removeQueue()] " + queueName + "队列删除成功");}

5, 关于绑定

  • 1, 添加绑定
    注意 bindingsMap 和 bindingMap 不同, bindingMap 表示 bindingsMap 的 value 值
    使用 bindingsMap.computeIfAbsent(key) 可以创建 bindingsMap 的 value 值( bindingMap )
    public void addBinding(Binding binding) throws MQException {String exchangeName = binding.getExchangeName();String queueName = binding.getQueueName();
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }// 1, 先查是否存在exchange绑定的queue, 没有则创建ConcurrentHashMap<String, Binding> bindingMap =bindingsMap.computeIfAbsent(exchangeName, V -> new ConcurrentHashMap<>());// 2, 再查exchange和queue是否已经存在绑定, 没有创建synchronized (bindingMap) {if (bindingMap.get(exchangeName) != null) {// 如果 exchange-queue 这个绑定已经存在, 不能再插入throw new MQException("[MemoryDataCenter.addBinding()] exchangeName = " + queueName+ "-queueName = " + queueName + "的绑定已经存在, 添加失败");}bindingMap.put(queueName, binding);}System.out.println("[MemoryDataCenter.addBinding()] exchangeName = " + exchangeName+ "-queueName = " + queueName + "的绑定已经添加成功");}
  • 2, 获取绑定(根据交换机和队列获取绑定)
    public Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}
  • 3, 获取绑定(根据交换机, 获取该交换机的所有绑定关系)
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}
  • 4, 删除绑定
public void removeBinding(Binding binding) throws MQException {// 1, 先判断该交换机是否存在绑定String exchangeName = binding.getExchangeName();ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {throw new MQException("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定不存在, 删除失败");}// 2, 交换机不存在绑定才能删除bindingsMap.remove(binding.getExchangeName());System.out.println("[MemoryDataCenter.removeBinding()] exchangeName = " + exchangeName + "该交换机的绑定已删除成功");}

6, 关于消息

  • 关于 messageMap
    public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter.addMessage()] 消息添加成功");}public Message getMessage(String messageId) {return messageMap.get(messageId);}public void removeMessage(String messageId){messageMap.remove(messageId);System.out.println("MemoryDataCenter.removeMessage()] 消息删除成功");}
  • 关于 messageInQueueMap
	public void sendMessage(MessageQueue queue, Message message) {LinkedList<Message> messageList = messageInQueueMap.computeIfAbsent(queue.getName(), V -> new LinkedList<>());synchronized (messageList) {messageList.add(message); // 尾插}addMessage(message);System.out.println("[MemoryDataCenter.sendMessage()] 消息发送成功");}/*** 从队列中取走消息, 但不代表消息在内存中没了, 在 messageMap 中还保留*/public Message pollMessage(MessageQueue queue) {LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());if (messageList == null || messageList.size() == 0) {return null;}synchronized (messageList) {Message result = messageList.removeFirst(); // 头删System.out.println("[MemoryDataCenter.pollMessage()] 消息取出成功");return result;}}/*** 获取队列中的消息总数*/public int getMessageCount(MessageQueue queue) {LinkedList<Message> messageList = messageInQueueMap.get(queue.getName());if (messageList == null) {return 0;}return messageList.size();}
  • 关于 messageInQueueNotAckMap
	public void addMessageNotAck(String queueName, Message message) {ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.computeIfAbsent(queueName,V -> new ConcurrentHashMap<>());messageNotAckMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter.addMessageNotAck()] 未确认消息添加成功");}public void removeMessageNotAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);if (messageNotAckMap == null) {return;}messageNotAckMap.remove(messageId);System.out.println("[MemoryDataCenter.removeMessageNotAck()] 未确认消息删除成功");}public Message getMessageNotAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageNotAckMap = messageInQueueNotAckMap.get(queueName);if (messageNotAckMap == null) {return null;}return messageNotAckMap.get(messageId);}

7, 恢复数据

当服务器重启时, 需要把硬盘上的数据恢复到内存中

	public void recover(DiskDataCenter diskDataCenter) throws MQException, IOException {// 1, 清楚所有内存数据(防止残留)exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();messageInQueueMap.clear();messageInQueueNotAckMap.clear();// 2. 从硬盘中获取数据并存储在内存// 2.1, 获取交换机数据List<Exchange> exchangeList =  diskDataCenter.selectAllExchanges();for (Exchange exchange : exchangeList) {exchangeMap.put(exchange.getName(), exchange);}// 2.2, 获取队列数据List<MessageQueue> queueList = diskDataCenter.selectAllQueue();for (MessageQueue queue : queueList) {queueMap.put(queue.getName(), queue);}// 2.3, 获取绑定数据List<Binding> bindingList = diskDataCenter.selectAllBindings();for (Binding binding : bindingList) {ConcurrentHashMap<String, Binding> bindingMap =bindingsMap.computeIfAbsent(binding.getExchangeName(), V -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}// 2.4, 获取消息数据for (MessageQueue queue : queueList) {LinkedList<Message> messageList = diskDataCenter.selectAllMessages(queue.getName());messageInQueueMap.put(queue.getName(), messageList);for (Message message : messageList) {messageMap.put(message.getMessageId(), message);}}}

恢复消息数据, 只需要恢复 messageInQueueMap 和 messageMap , 不需要关心 messageInQueueNotAckMap 因为 messageInQueueNotAckMap 只是在内存中存储, 如果服务器重启, 这块数据丢失了就丢失了, 因为 “未确认的消息” 不会被服务器真正的删除, 重启之后, “未确认的消息” 会被重新加载回内存


三、小结

本文主实现了两点:

  • 1, 实现了对数据库+文件这两个模块的进一步整合, 封装, 为上层提供了硬盘数据管理的 API
  • 2, 实现了内存数据管理, 并考虑了线程安全
    • 2.1, 设计了消息在内存中存储使用的数据结构
    • 2.2, 实现了内存中的交换机, 队列, 绑定, 消息, 以及消息和队列的关联, 这些数据的增删查

至此, 服务器对于硬盘数据和内存数据都做好了管理, 下篇文章就可以设计 VirtualHost 了, 需要对硬盘数据和内存数据再做进一步的整合, 封装, 统一管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91974.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

26 docker前后端部署

[参考博客]((257条消息) DockerNginx部署前后端分离项目(SpringBootVue)的详细教程_在docker中安装nginx实现前后端分离_这里是杨杨吖的博客-CSDN博客) (DockerNginx部署前后端分离项目(SpringBootVue)) 安装docker # 1、yum 包更新到最新 yum update # 2、安装需要的软件包…

SEO的优化教程(百度SEO的介绍和优化)

百度SEO关键字介绍&#xff1a; 百度SEO关键字是指用户在搜索引擎上输入的词语&#xff0c;是搜索引擎了解网站内容和相关性的重要因素。百度SEO关键字可以分为短尾词、中尾词和长尾词&#xff0c;其中长尾词更具有针对性和精准性&#xff0c;更易于获得高质量的流量。蘑菇号-…

构建一个TypeScript环境的node项目

本文 我们用一种不太一样的方式来创建项目 这里 我们事先创建了一个文件夹作为项目目录 然后打开项目终端 输入 npm init然后 在新弹出的对话框中 大体就是 名字随便写一个 然后 后面的回车&#xff0c;到最后一个输入 yes 然后回车 这样 我们就有一个基础的 node项目结构了…

AGV小车、机械臂协同作业实战06-任务分配算法(图解蚁群算法)代码示例java

什么是蚁群算法&#xff1f; 蚁群系统(Ant System(AS)或Ant Colony System(ACS))是由意大利学者Dorigo、Maniezzo等人于20世纪90年代首先提出来的。他们在研究蚂蚁觅食的过程中&#xff0c;发现蚁群整体会体现一些智能的行为&#xff0c;例如蚁群可以在不同的环境下&#xff0c…

排序篇(四)----归并排序

排序篇(四)----归并排序 1.归并(递归) 基本思想&#xff1a; 归并排序&#xff08;MERGE-SORT&#xff09;是建立在归并操作上的一种有效的排序算法,该算法是采用分治法&#xff08;Divide andConquer&#xff09;的一个非常典型的应用。将已有序的子序列合并&#xff0c;得到…

Hive SQL初级练习(30题)

前言 Hive 的重要性不必多说&#xff0c;离线批处理的王者&#xff0c;Hive 用来做数据分析&#xff0c;SQL 基础必须十分牢固。 环境准备 建表语句 这里建4张表&#xff0c;下面的练习题都用这些数据。 -- 创建学生表 create table if not exists student_info(stu_id st…

rabbimq之java.net.SocketException: Connection reset与MissedHeartbeatException分析

一、前言 在android前端中接入了rabbitmq消息队列来处理业务&#xff0c;在手机网络环境错综复杂&#xff0c;网络信号不稳定&#xff0c;可能导致mq的频繁断开与连接&#xff0c;在日志中&#xff0c;发现有很多这样的日志&#xff0c;java.net.SocketException: Connection …

yolov5分割+检测c++ qt 中部署,以opencv方式(详细代码(全)+复制可用)

1&#xff1a;版本说明&#xff1a; qt 5.12.10 opencv 4.5.3 &#xff08;yolov5模型部署要求opencv>4.5.0&#xff09; 2&#xff1a;检测的代码 yolo.h #pragma once #include<iostream> #include<cmath> #include<vector> #include <opencv2/…

【QandA C++】内存分段和内存分页等重点知识汇总

目录 内存分段 内存分页 内存分段 程序是由若干个逻辑分段组成的&#xff0c;如可由代码分段、数据分段、栈段、堆段组成。不同的段是有不同的属性的&#xff0c;所以就用分段的形式把这些段分离出来。 分段机制下&#xff0c;虚拟地址和物理地址是如何映射的&#xff1f; …

毅速课堂:3D打印随形水路在小零件注塑中优势明显

小零件注塑中的冷却不均匀问题常常导致烧焦现象的发生。这主要是因为传统机加工方法无法制造出足够细小的水路&#xff0c;以适应小零件的复杂形状。而3D打印技术的引入&#xff0c;尤其是随形水路的设计&#xff0c;为解决这一问题提供了新的解决方案。 3D打印随形水路技术的优…

TS编译选项——编译TS文件同时对JS文件进行编译

一、允许对JS文件进行编译 我们在默认情况下编译TS项目时是不能编译js文件的&#xff0c;如下图中的hello.js文件并未编译到dist目录下&#xff08;这里配置了编译文件放到dist目录下&#xff09; 如果我们想要实现编译TS文件同时对JS文件进行编译&#xff0c;就需要在tsconfi…

列出使用Typescript的一些优点?

使用Typescript有以下优点&#xff1a; 类型安全&#xff1a;Typescript是一种静态类型语言&#xff0c;它要求在编码阶段明确定义变量和函数的类型。这种类型安全可以减少在运行时出现错误的可能性&#xff0c;并提高代码的可读性和可维护性。代码可读性和可维护性&#xff1…

使用U3D、pico开发VR(二)——添加手柄摇杆控制移动

一、将unity 与visual studio 相关联 1.Edit->Preference->External tool 选择相应的版本 二、手柄遥控人物转向和人物移动 1.添加Locomotion System组件 选择XR Origin&#xff1b; 2.添加Continuous Move Provider&#xff08;Action-based&#xff09;组件 1>…

Android - kts文件配置应用签名

升级最新的AndroidStudio后&#xff0c;gradle配置文件从Groovy 迁移到 KTS&#xff0c;这里把自己配置应用签名遇到的问题及注意事项分享下。 Google官方说明地址将 build 配置从 Groovy 迁移到 KTS 配置后的代码如下&#xff1a; signingConfigs {create("keyStore&q…

PHP 反序列化漏洞:手写序列化文本

文章目录 参考环境序列化文本Scalar Type整数浮点数布尔值字符串 Compound Type数组数据结构序列化文本 对象数据结构序列化文本 Special TypeNULL数据结构序列化文本 手写序列化文本过程中的注意事项个数描述须于现实相符序列化文本前缀的大小写变化符号公共属性 参考 项目描…

编程每日一练(多语言实现)基础篇:求总数问题

文章目录 一、实例描述二、技术要点三、代码实现3.1 C 语言实现3.2 Python 语言实现3.3 Java 语言实现3.4 JavaScript 语言实现 一、实例描述 集邮爱好者把所有的邮票存放在三个集邮册中&#xff0c;在A册内存放全部的十分之二&#xff0c;在B册内存放不知道是全部的七分之几&…

MyBatis的一级缓存和二级缓存:原理和作用

MyBatis的一级缓存和二级缓存&#xff1a;原理和作用 引言 在数据库访问中&#xff0c;缓存是一种重要的性能优化手段&#xff0c;它可以减少数据库查询的次数&#xff0c;加快数据访问速度。MyBatis作为一款流行的Java持久层框架&#xff0c;提供了一级缓存和二级缓存来帮助…

基于Java的大学生就业招聘系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

【数据结构】排序算法(一)—>插入排序、希尔排序、选择排序、堆排序

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.直接插入排序 2.希尔排序 3.直接选择排…

Synchronized 原 理

Synchronized 其 原 理 是 什 么 ? synchronized 是 Java 中实现互斥同步的一种机制。当查看被 synchronized 修饰的代码块编译后的字节码,会发现编译器生成了 monitorenter 和 monitorexit 两个字节码指令。 这两个指令的作用如下: monitorenter:当虚拟机执行到 monitor…