浅谈如何自我实现一个消息队列服务器(2)——实现 broker server 服务器

文章目录

  • 一、实现 broker server 服务器
    • 1.1 创建一个SpringBoot项目
    • 1.2 创建Java类
  • 二、硬盘持久化存储 broker server 里的数据
    • 2.1 数据库存储
      • 2.1.1 浅谈SQLite
        • MyBatis
      • 2.1.2 如何使用SQLite
    • 2.2 使用DataBaseManager类封装数据库操作
    • 2.3 文件存储消息
      • 2.3.1 存储消息时,按队列维度展开
        • 2.3.1.1 逻辑删除
        • 2.3.1.2 GC(垃圾回收)
          • 2.3.1.2.1 使用复制算法实现GC
        • 2.3.1.3 文件存储消息的扩展内容(该功能暂未实现)
          • 2.3.1.3.1 文件拆分
          • 2.3.1.3.2 文件合并
    • 2.4 使用类MessageFileManager封装文件存储操作
  • 三、将broker server 里的数据存储在内存上

一、实现 broker server 服务器

上次我们提到生产者-消费者模型中,最重要的是实现broker server(服务器模块)。 为什么最重要的是实现 broker server 呢?broker server 作为一个通用服务器,它可以给任何具有数据存储、转发需求的客户端服务器使用。就像现在的数据库程序一样,不论开发者实现什么业务,只要该业务中涉及数据的持久化存储,就需要使用数据库。

broker server 中管理着许多重要的概念(虚拟主机、交换机、队列、消息、绑定),这些概念的数据组织是实现 broker server 的关键。我们 以 如何使用代码实现一个 broker server 为核心,对 broker server 的实现进行详细介绍。

1.1 创建一个SpringBoot项目

之前的博客有介绍过如何使用IDEA创建一个 SpringBoot 项目,可以点击此处链接浅谈如何创建一个SpringBoot项目进行学习。

创建好一个SpringBoot项目之后,我们在pom.xml文件中引入SQLite的依赖,
在这里插入图片描述

再在配置文件application.yaml中针对SQLite、MyBatis的使用进行配置。

在这里插入图片描述

1.2 创建Java类

我们为 broker server 的实现创建一个包mqServer(这只是一个包名,可以自己随意进行自定义):
在这里插入图片描述

再在mqServer包下创建一个core包,core包里存放交换机、队列、消息、绑定这些概念的实体类:
在这里插入图片描述
交换机类(Exchange):
在这里插入图片描述

/** 这个类表示一个交换机*/
public class Exchange {// 此处使用 name 来作为交换机的身份标识. (唯一的)private String name;// 交换机类型, DIRECT, FANOUT, TOPICprivate ExchangeType type = ExchangeType.DIRECT;// 该交换机是否要持久化存储. true 表示需要持久化; false 表示不必持久化.private boolean durable = false;// 如果当前交换机, 没人使用了, 就会自动被删除.// 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)private boolean autoDelete = false;// arguments 表示的是创建交换机时指定的一些额外的参数选项. 后续代码中并没有真的实现对应的功能, 先列出来. (RabbitMQ 也是有的)// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.private Map<String, Object> arguments = new HashMap<>();public String getName() {return name;}public void setName(String name) {this.name = name;}public ExchangeType getType() {return type;}public void setType(ExchangeType type) {this.type = type;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}// 这里的 get set 用于和数据库交互使用.public String getArguments() {// 是把当前的 arguments 参数, 从 Map 转成 String (JSON)ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}// 如果代码真异常了, 返回一个空的 json 字符串就 okreturn "{}";}// 这个方法, 是从数据库读数据之后, 构造 Exchange 对象, 会自动调用到public void setArguments(String argumentsJson) {// 把参数中的 argumentsJson 按照 JSON 格式解析, 转成// 上述的 Map 对象ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}// 在这里针对 arguments, 再提供一组 getter setter , 用来去更方便的获取/设置这里的键值对.// 这一组在 java 代码内部使用 (比如测试的时候)public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

交换机类型 类(ExchangeType):
这个类是个枚举类,作为Exchange类的成员变量。

public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type) {this.type = type;}public int getType() {return type;}
}

队列类(MSGQueue):
在这里插入图片描述

/** 这个类表示一个存储消息的队列* MSG => Message*/
public class MSGQueue {// 表示队列的身份标识.private String name;// 表示队列是否持久化, true 表示持久化保存, false 表示不持久化.private boolean durable = false;// 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用// 这个 独占 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.private boolean exclusive = false;// 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.// 这个 自动删除 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.private boolean autoDelete = false;// 也是表示扩展参数. 当前也是先列在这里, 先暂时不实现private Map<String, Object> arguments = new HashMap<>();/*** 因为消息是存在队列里的,每个队列下面都有相关的两个消息文件,生产者生产出消息之后,将* 消息投递给了broker server 服务器,消费者通过服务器订阅消息,消费者订阅到消息之后* 消费者具体想要使用这些消息做些什么,我们不知道,这得由消费者自己决定,所以消费者使用回调函数* Consumer之后,使用里面的抽象方法handleDelivery自己决定自己想要将此消息用来做些什么(重写)*** 因此在MSGQueue类里面,我们需要定义一个具有ConsumerEnv类型的List,* 表示当前队列里的消息都有哪些消费者订阅了*/// 当前队列都有哪些消费者订阅了.private List<ConsumerEnv> consumerEnvList = new ArrayList<>();// 记录当前取到了第几个消费者. 方便实现轮询策略.
//    压根没懂这个 todoprivate AtomicInteger consumerSeq = new AtomicInteger(0);
//    添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv){synchronized (this){consumerEnvList.add(consumerEnv);}}//    删除一个订阅者//    挑选一个订阅者,用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer(){if(consumerEnvList.size() == 0){
//            当前队列没人订阅return null;}//        压根没懂这个 todoint index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getName() {return name;}public void setName(String name) {this.name = name;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isExclusive() {return exclusive;}public void setExclusive(boolean exclusive) {this.exclusive = exclusive;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";}public void setArguments(String argumentsJson) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

绑定类(Binding):
在这里插入图片描述

/** 表示队列和交换机之间的关联关系*/
public class Binding {private String exchangeName;private String queueName;// bindingKey 就是在出题, 要求领红包的人要画个 "桌子" 出来~~private String bindingKey;// Binding 这个东西, 依附于 Exchange 和 Queue 的!!!// 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化,// 此时你针对 Binding 持久化是没有任何意义的public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey = bindingKey;}
}

消息类(Message):
在这里插入图片描述

/** 表示一个要传递的消息*/
public class Message implements Serializable {// 这两个属性是 Message 最核心的部分.private BasicProperties basicProperties = new BasicProperties();private byte[] body;// 下面的属性则是辅助用的属性.// Message 后续会存储到文件中(如果持久化的话).// 一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?// 使用下列的两个偏移量来进行表示. [offsetBeg, offsetEnd)// 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.// 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.private transient long offsetBeg = 0;  // 消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd = 0;  // 消息数据的结尾距离文件开头的位置偏移(字节)// 使用这个属性表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式)// 0x1 表示有效. 0x0 表示无效.private byte isValid = 0x1;// 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.// 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId// 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message = new Message();if (basicProperties != null) {message.setBasicProperties(basicProperties);}// 此处生成的 MessageId 以 M- 作为前缀.message.setMessageId("M-" + UUID.randomUUID());message.setRoutingKey(routingKey);message.body = body;// 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.// 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.// 此处只是在内存中创建一个 Message 对象.return message;}public String getMessageId() {return basicProperties.getMessageId();}public void setMessageId(String messageId) {basicProperties.setMessageId(messageId);}public String getRoutingKey() {return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey) {basicProperties.setRoutingKey(routingKey);}public int getDeliverMode() {return basicProperties.getDeliverMode();}public void setDeliverMode(int mode) {basicProperties.setDeliverMode(mode);}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}public long getOffsetBeg() {return offsetBeg;}public void setOffsetBeg(long offsetBeg) {this.offsetBeg = offsetBeg;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd = offsetEnd;}public byte getIsValid() {return isValid;}public void setIsValid(byte isValid) {this.isValid = isValid;}@Overridepublic String toString() {return "Message{" +"basicProperties=" + basicProperties +", body=" + Arrays.toString(body) +", offsetBeg=" + offsetBeg +", offsetEnd=" + offsetEnd +", isValid=" + isValid +'}';}
}

BasicProperties 类(是Message类的成员变量):

public class BasicProperties implements Serializable {// 消息的唯一身份标识. 此处为了保证 id 的唯一性, 使用 UUID 来作为 message idprivate String messageId;// 是一个消息上带有的内容, 和 bindingKey 做匹配.// 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.// 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).// 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列.private String routingKey;// 这个属性表示消息是否要持久化. 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样搞的....)private int deliverMode = 1;// 其实针对 RabbitMQ 来说, BasicProperties 里面还有很多别的属性. 其他的属性暂时先不考虑了.public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public int getDeliverMode() {return deliverMode;}public void setDeliverMode(int deliverMode) {this.deliverMode = deliverMode;}@Overridepublic String toString() {return "BasicProperties{" +"messageId='" + messageId + '\'' +", routingKey='" + routingKey + '\'' +", deliverMode=" + deliverMode +'}';}
}

二、硬盘持久化存储 broker server 里的数据

上述我们创建了 交换机、队列、绑定、消息 的实体Java类,这些概念的数据需要在内存、硬盘上各存一份。所以此时我们首先来讨论 交换机(Exchange)、队列(MSGQueue)、消息(Message)、绑定(Binding) 他们以何种方式存储在硬盘上。 至于他们在内存中以什么样的数据结构进行存储,我们后续再讨论。

2.1 数据库存储

对于交换机、队列、绑定来说,由于他们的数据量没有消息多,同时他们还需经常性进行 增删改查 操作,因此他们使用数据库进行数据的持久化存储。一般我们都是使用MySQL进行数据存储,但是由于MySQL是个客户端-服务器结构的程序,它本身具有一定的重量。在这里,我们为了简便,采用轻量的SQLite数据库进行数据存储。

2.1.1 浅谈SQLite

可能很多同学一开始学习使用数据库进行数据存储时,接触到的就是SQL Server、MySQL、Oracle这些数据库(包括我寄自),没怎么听说过SQLite。

但SQLite的应用也很广泛。

SQLite很轻量,它是一个可执行文件exe,主要在一些性能不高的设备上使用,尤其是在移动端(手机、APP)和嵌入式设备(投影仪、冰箱、交换机、路由器…),Android系统就内置了SQLite。虽然SQLite 轻量,但其具有的功能不输MySQL,其sql语句使用起来与MySQL基本无异,也支持通过MyBatis这样的框架来使用。(学习过安卓开发课程的同学肯定知道SQLite)。

SQLite 是一个本地数据库,把数据存储在当前硬盘的某个指定文件中,其无法跨网络使用,这个数据库相当于直接操作本地的硬盘文件,其将每个数据库database抽取成一个单独文件。

MyBatis

在介绍如何使用SQLite之前,先了解一下如何使用MyBatis吧。因为我发现有些同学还不太理解如何使用MyBatis操作数据库,当然了,掌握了MyBatis的同学可以直接跳过此处内容科普。

MyBatis是Spring框架集成的一个操作数据库的框架,以前我们想要在项目中连接数据库来进行数据的存储,使用的是JDBC,但是使用JDBC将后端代码与数据库连接时,发现JDBC其代码量较大,代码重复率高,同时sql语句以及Java代码都杂揉在了一起,代码显得不太优雅。因此现在Java项目中主流使用MyBatis操作数据库。

使用MyBatis操作数据库的流程:
1、一般是在项目中创建一个名叫mapper的包,在该包下创建interface(interface的数量由自己项目的业务代码决定),描述有哪些方法要给Java代码使用。

2、创建对应的xml文件,通过xml文件来实现上述interface中的抽象方法。
在这里插入图片描述
如果觉得MyBatis这里讲得比较抽象,不理解,我再找个时间专门出一篇关于MyBatis详解的博客。

2.1.2 如何使用SQLite

1、在Java中使用SQLite,无需安装以及下载任何东西。直接使用Maven,将SQLite的依赖引进pom.xml文件中即可,此时Maven会自动从中央仓库加载SQLite的jar包和dll(动态库)。
在这里插入图片描述
SQLite的依赖:

<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency>

2、我们还需要在项目的配置文件application.yaml中,针对SQLite、MyBatis进行配置:
在这里插入图片描述
此时,我们就可以在项目中使用数据库SQLite进行数据持久化存储了。

在将 交换机、队列、绑定 持久化存储在数据库时,有一个需要注意的点:交换机类、队列类,他们都有一个Map类型的成员变量arguments。
在这里插入图片描述
由于数据库存储数据时,不支持Map类型的数据,因此当我们需要将Map类型的arguments变量在数据库中存储时,Map类型的arguments在数据库中就需要使用字符串类型表示。
在这里插入图片描述
我们知道MyBatis操作数据库进行写数据操作时,是调用对象属性对应的getter方法,将从getter方法获取到的属性的值写入数据库中;数据库进行读数据操作时,是调用对象的对应属性的setter方法,将从数据库获取到的值赋值给对应属性。

那么针对arguments这个变量,其数据类型在对象中是Map类型,但在数据库中其数据类型是字符串,那么如果数据库想要进行写操作时,就需要通过arguments变量的getter方法获取到一个String(字符串)类型的arguments,此时获取到的arguments类型才会与数据库中的字符串类型arguments相匹配,才能正确的进行数据库的写操作。但是由于在对象中的arguments它的数据类型是Map类型,此时对象中提供的getArguments()方法,其返回值类型是Map类型,当数据库调用这个getter方法时,数据库获取到的是一个Map类型的arguments,由于arguments在数据库中的存储类型是字符串,因此此时数据类型不匹配,获取到的Map类型的arguments无法存入数据库,就会出错。

所以我们需要在交换机类、队列类中里再提供一个返回值类型是字符串(String)的getArguments()方法,在该getter方法内部,将对象中是Map类型的arguments转化成字符串类型的arguments,此时数据库进行写数据时,调用getter方法将获取到的argumrnts是字符串类型的了,此时就可以将此字符串类型的argumrnts数据写入数据库中了,不会出错。

在这里插入图片描述

如果数据库想要进行读操作,就需要通过arguments变量的setter方法将从数据库中获取到的字符串类型的arguments设置给对象中的是Map类型的arguments,此时由于从数据库中获取到的arguments值是字符串类型的,而对象中的arguments是Map类型的,因此此时数据类型不匹配,无法设置,会出错。

所以我们需要在 交换机类、队列类中提供再提供一个参数是字符串类型的arguments,该setter方法内部使用JSON将字符串类型的arguments转化成一个Map类型的arguments,当数据库进行读操作时,就会调用这个setArguments(String arguments) 方法,就从数据库中获取到的字符串类型的arguments通过setter方法将其转换成Map类型的arguments,此时就不会出错了。

在这里插入图片描述

2.2 使用DataBaseManager类封装数据库操作

在我们实现 broker server 的代码中,进行数据库操作的类主要有 mqserver 下的mapper包下的MetaMapper接口,此时我们在mqserver包下新创建一个datacenter的包,datacenter包下创建一个DataBaseManager类,在该类中对数据库操作进行整合和调用。

该类中会定义一个初始化方法 init(),使用该方法进行数据库初始化:在 broker server 启动时进行逻辑判定,如果数据库已经存在,数据库表也已经存在,此时我们就不创建数据库了,只打印一个日志提醒数据库已经存在;如果数据库不存在,此时我们就创建数据库、创建表、插入一些默认的数据,同时打印日志提醒数据库初始化完成!

进行此逻辑判定是十分必要的,因为如果将 beoker server 部署到 一台新机器上,此时数据库是空的,所以需要进行建库、建表、插入默认数据等一系列操作;如果将 broker server 部署到 一台旧机器上,由于原先已经存在数据库,此时不再需要进行建库、建表、插入默认数据操作。

由于DataBaseManager类是进行数据库的封装,MataMapper中含有一系列数据库操作的方法,因此DataBaseManager类中需要引入MetaMapper实例。

DataBaseManager类中除了 metaMapper实例,init()初始化数据库方法外,还需要封装MetaMapper接口中的一系列操作数据库的方法,同时还有一个删除数据库的方法。

在这里插入图片描述
封装数据库操作这里还有一个需要注意的点:
**有没有发现引入DataaBaseManager类中的metaMapper实例是空引用?**我们从Spring框架中拿到Bean,一般都是使用注解@Autowired,但是此时我们并没有使用注解将MetaMapper的Bean拿到,所以此时写在DataBaseManager类中的metaMapper是个空对象。

那么有同学可能觉得,那就加个注解不就行了。但是只在MetaMapper对象上加还不行,外面使用它的类DataBaseManager也需要加上五大注解注册到Spring中,此时metaMapper对象才有效。

但是我们这里由于业务逻辑,打算自己管理这个类,因此并不打算将DataBaseManager这个类交给Spring管理。因此此时我们需要手动将MetaMapper类的Bean构造出来:
1、找到项目的入口启动类,在里面添加一个静态成员变量。
在这里插入图片描述
入口启动类:打开项目的src目录,点击main目录之后出现的XXApplication(是一个蓝色的带着C的图标),每个项目由于项目命名不同,其入口启动类都有差别,以自己的为准。

2、点击入口启动类,在类里面添加一个静态的成员变量context,同时在main方法中使用静态成员context接收run方法的返回值:
在这里插入图片描述

3、回到DataBaseManager类里的数据库初始化方法init(),对metaMapper对象进行手动获取:

metaMapper = MqApplication.context.getBean(MetaMapper.class);

此时metaMapper的Bean就获取到了,不再是空对象。

2.3 文件存储消息

由于咱们的消息数量庞大,且不需要频繁使用数据库进行增删改查,因此此时我们使用文件存储消息。一个文件是需要存储多条消息的,不是一个文件只存储一条消息哈!

由于一个文件可以存储多条消息,因此当我们想要从一个文件中获取某条消息时,就需要确定某条消息的位置,这会比较困难。因此我们在消息类(Message)中,定义了两个成员变量,分别是 offsetBeg、offsetEnd。offsetBeg 表示此条消息的头部到文件头部的距离,offsetEnd 表示此条消息的尾部到文件头部的距离。

在这里插入图片描述

2.3.1 存储消息时,按队列维度展开

那我们如何将消息存入文件呢??前面我们说过,生产者发送消息到 broker server 服务器之后,交换机(不同类型的交换机具有不同的转发规则)会将消息投递至当前与自己绑定的队列中。因此我们首先需要明确了,消息是依附于队列的,消息存储时,按照队列的维度展开。我们之前是将数据库文件meta.db存在目录data下的,目录data路径是在当前项目的相对路径下。

此时我们就在data目录下,创建出若干个子目录,每个子目录对应一个队列,子目录的名字以队列名命名进行区分。再在每个子目录下面创建两个文件,分别是 queue_data.txt(消息数据文件)该文件用于存储消息的内容、queue_stat.txt(消息统计文件),该文件用于存储消息的统计信息。

我们这样 约定:(自定义应用层)
1、queue_data.txt文件:该文件是一个二进制格式的文件,这个文件包含若干个消息,每个消息以二进制方式存储,每个消息由以下这几部分组成:
在这里插入图片描述
2、queue_stat.txt文件里,存储的是消息数据文件中消息的总数目以及消息的有效数目,此文件存储一行数据,但这一行数据有两列,譬如这样的:8000\t6800(消息的总内容数目/t消息的内容有效数目),意思是queue_data.txt文件中,消息的总数是8000,有效数目是6800,这两列数目使用 /t 分隔。

Message 对象在内存、硬盘中各存一份,内存中的那一份,要记录offsetBeg、offsetEnd,以便找到内存中的Message对象,找到内存中的Message对象,就能找到硬盘上的Message对象。

将数据存储一份在硬盘上,其实就是为了 当 broker server 重启时,内存上的数据消失不见后,能够将硬盘上持久化存储的数据读回到内存中,将原来的数据拿到。因此内存中的Message对象记录offsetBeg、offsetEnd后,就能方便的找到硬盘上的Message对象了。

2.3.1.1 逻辑删除

在这里我们探讨一下Message对象里isValid属性的作用。isValid 属性是用来判断 某条Message 是否 有效。对于 broker server 来说,消息既有新增也有删除。生产者生产一个消息,broker server 就需要新增一个消息;消费者消费掉一个消息,broker server 就需要删除一个消息。新增、删除在内存中比较容易(直接使用集合类操作即可),但是在文件上 增删比较困难。文件类似于一个 “顺序表”,插入比较简单,尾插即可,但是删除元素的话,就需要进行元素搬运,这个复杂程度不亚于O(N),甚至由于文件里消息数量庞大,复杂度比O(N)还高。因此我们使用 “逻辑删除” 去删除文件上的无效消息。

“逻辑删除” :逻辑删除不是真正的删除,只是将文件上的无效消息标记成无效

2.3.1.2 GC(垃圾回收)

使用逻辑删除衍生出来一个问题:要知道,逻辑删除只是一种标记手段,不是真正的删除,此时就会造成文件越来越大,但是实际里面的有效内容很少。此时,我们就要考虑进行GC(垃圾回收)。我们知道JVM为内存提供了GC,但是硬盘也可以GC。不过由于访问硬盘比访问内存低效,进行GC也就会比较低效。不过我们可以自己实现一个对硬盘的GC。

2.3.1.2.1 使用复制算法实现GC

此处的GC采用复制算法。啥是复制算法。

在这里插入图片描述

但在文件中,我们直接遍历消息数据文件,将消息数据文件里的有效消息拷贝出来,放到新文件queue_data_new.txt文件中,然后将原有的消息数据文件 queue_data.txt 删除,再将新文件 queue_data_new.txt文件重命名 成 queue_data.txt文件中。

确定了GC使用的策略后,啥时候触发一次GC呢?我们约定(此处是我的约定,你也可以约定别的情况下进行触发GC),当消息总数到达3000并且有效消息数小于60%时,触发GC。

2.3.1.3 文件存储消息的扩展内容(该功能暂未实现)

如果某个队列中,消息特别多,且有效消息占比很大,此时就会导致整个消息数据文件特别大。后续我们针对这个文件进行各种操作,成本就会上升很多。加入该文件大小是5G,此时触发一次GC其耗时很长,成本较高,影响系统性能。这时候我们考虑对有效数据多的大文件进行拆分,对无效数据多的消息进行GC,GC后的一个个小文件彼此相邻又可以进行合并。

2.3.1.3.1 文件拆分

当单个文件达到一定阈值之后,就会拆分成两个文件,此时拆着拆着就会成很多文件了。

2.3.1.3.2 文件合并

每个单独文件进行GC,GC过后文件变小了,就可以与相邻其他文件进行合并,此时可以防止大量小文件出现在硬盘上,浪费硬盘空间大小。

2.4 使用类MessageFileManager封装文件存储操作

三、将broker server 里的数据存储在内存上

这里先不讲。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/762053.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

宏景eHR report_org_collect_tree.jsp SQL注入漏洞复现

0x01 产品简介 宏景eHR人力资源管理软件是一款人力资源管理与数字化应用相融合,满足动态化、协同化、流程化、战略化需求的软件。 0x02 漏洞概述 宏景eHR report_org_collect_tree.jsp 接口处存在SQL注入漏洞,未经过身份认证的远程攻击者可利用此漏洞执行任意SQL指令,从而…

Docker部署Alist全平台网盘神器结合内网穿透实现无公网IP访问云盘资源

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法|MySQL| ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-oZuxWTWUiXLx3aQO {font-family:"trebuchet ms",verdana,arial,sans-serif;f…

SpringBoot实战(二十七)集成WebFlux

目录 一、WebFlux1.1 定义1.2 WebFlux 与 Spring MVC 区别 二、代码实现2.1 Maven 配置2.2 暴露 RESTful API 接口的方式方式一&#xff1a;基于注解的控制器方式二&#xff1a;函数式路由器&#xff08;Functional Endpoints&#xff09; 2.3 测试Service2.4 测试ServiceImpl2…

Streamlit实战手册:从数据应用到机器学习模型部署

Streamlit实战手册&#xff1a;从数据应用到机器学习模型部署 简介Streamlit核心功能介绍Streamlit的安装创建第一个Streamlit应用界面布局与导航数据处理与展示 Streamlit的进阶应用交互式组件按钮复选框单选按钮滑块 图表与可视化使用Matplotlib绘图使用Plotly创建交互式图表…

【题目】【网络系统管理】2019年全国职业技能大赛高职组计算机网络应用赛项H卷

极安云科专注职业教育技能竞赛培训4年&#xff0c;包含信息安全管理与评估、网络系统管理、网络搭建等多个赛项及各大CTF模块培训学习服务。本团队基于赛项知识点&#xff0c;提供完整全面的系统性理论教学与技能培训&#xff0c;成立至今持续优化教学资源与讲师结构&#xff0…

Springboot 整合 Knife4j (API文档生成工具)

目录 一、Knife4j 介绍 二、Springboot 整合 Knife4j 1、pom.xml中引入依赖包 2、在application.yml 中添加 Knife4j 相关配置 3、打开 Knife4j UI界面 三、关于Knife4j框架中常用的注解 1、Api 2、ApiOperation ​3、ApiOperationSupport(order X) ​4、ApiImplici…

Leetcode 994. 腐烂的橘子

心路历程&#xff1a; 一开始以为和刚做过的岛屿问题很像&#xff0c;只不过是把岛屿问题换成BFS去做&#xff0c;然后再加上一些计数的规则。结果做完后发现只能通过一半左右的测试用例&#xff0c;发现有一个逻辑错误在于&#xff0c;当腐烂的橘子位于两端时&#xff0c;可以…

C#探索之路基础篇(2):接口Interface的概念、实现、应用范围

文章目录 1 概念2 示例代码&#xff1a;2.1 简单接口的实现2.2 简单的使用接口2.3 使用接口呈现多态性2.4 通过接口实现一个数组迭代器2.5 通过接口来实现松耦合的关系2.6 使用接口实现可扩展、便利性 3 使用范围与时机4 注意事项 不知道大家在学习的过程中&#xff0c;有没有反…

笔试总结01

1、spring原理 1、spring原理 spring的最大作用ioc/di,将类与类的依赖关系写在配置文件中&#xff0c;程序在运行时根据配置文件动态加载依赖的类&#xff0c;降低的类与类之间的藕合度。它的原理是在applicationContext.xml加入bean标记,在bean标记中通过class属性说明具体类…

vue3+threejs新手从零开发卡牌游戏(二):初始化场景

在删掉初始化中一些没用的代码后&#xff0c;在views目录下新建game文件夹&#xff0c;在里面新建一个index.vue&#xff0c;这里就当成游戏的主入口。 目录结构如下&#xff1a; 下面开始尝试创建场景&#xff1a; 一、添加一个div作为threejs的画布对象&#xff0c;之后整个…

ubuntu - 编译 linphone-sdk

业务需求需要定制sdk&#xff0c;首先声明我们需要的是在Android4.4上跑的sdk&#xff0c;因此本次编译的sdk最低支持为19&#xff08;不同版本需要的环境不一致&#xff09;&#xff0c;编译过程较容易&#xff0c;难点在于环境配置 环境准备 Ubuntu 18.04.6 android-sdk_r24.…

mysql分页查询多用GitCode平台

目录 一、在GitCode平台AI搜索结果&#xff08;这个更优&#xff09; 二、在百度搜索输入“mysql Java分页查询”的输出结果&#xff1a; 三、推荐的文章 四、GitCode的使用 1&#xff09;如搜索jdk11可以直接下载jdk11的包 2&#xff09;搜索开源项目 3&#xff09;如搜…

步进电机驱动器的接线与使用(接线详细)

今天小编就来继续学习与使用步行电机的学习&#xff0c;如果位置对你有帮助&#xff0c;评论收藏&#xff0c;点赞一下 步进电机驱动器 步进电机驱动器是一种专用于控制步进电机的电子设备&#xff0c;用于控制步进电机的转动和位置。步进电机是一种将电信号转换为机械运动的电…

Compose UI 之 Segmented buttons 分段按钮

Segmented buttons SegmentedButton 是一种分段式按钮组件,它允许用户在一组相关的选项中选择一个或几个。 上图中:① 单选的分段式按钮。② 多选的分段式按钮。 分段式按钮的几个特点: 分段式按钮是带有状态的按钮,又有单选和多选之分。 从设计上将,不论是单选或是多选…

【GPT概念-03】:人工智能中的注意力机制

说明 注意力机制生成分数&#xff08;通常使用输入函数&#xff09;&#xff0c;确定对每个数据部分的关注程度。这些分数用于创建输入的加权总和&#xff0c;该总和馈送到下一个网络层。这允许模型捕获数据中的上下文和关系&#xff0c;而传统的固定序列处理方法可能会遗漏这…

虚拟机扩展:虚拟机快照

虚拟机快照 在学习阶段我们无法避免的可能损坏Linux操作系统。如果损坏的话&#xff0c;重新安装一个Linux操作系统就会十分麻烦。 那我们就可以通过快照将当前虚拟机的状态保存下来&#xff0c;在以后系统损坏时通过快照恢复虚拟机到保存的状态。 制作并还原快照 在VMware …

SAP HCM 0008信息类型间接评估与直接评估

如果在间接评估模块输入就是间接评估&#xff08;tarif是读取下图中的数据 a代表不需要输入工资项&#xff0c;b表示需要找工资相&#xff09; 不输入就是直接评估需要客户自己输入数字 第2个情况 summe求和 &#xff08;比如在0008中输入9000与9001 那么自动求出9002工资项数…

C# WPF编程-布局

C# WPF编程-布局 布局WPF布局原则布局过程布局容器布局属性Border控件StackPanel布局WrapPanel布局DockPanel布局Grid布局UniformGrid布局Canvas布局 布局 WPF布局原则 WPF窗口只能包含单个元素。为在WPF窗口中放置多个元素并创建更贴近实用的用户界面&#xff0c;需要在窗口…

SpringBoot项目如何打包成war包,并部署在tomcat上运行

项目场景&#xff1a; 正常情况下&#xff0c;我们开发 SpringBoot 项目&#xff0c;由于内置了Tomcat&#xff0c;所以项目可以直接启动&#xff0c;部署到服务器的时候&#xff0c;直接打成 jar 包&#xff0c;就可以运行了。 有时我们会需要打包成 war 包&#xff0c;放入外…

Redis进阶(持久化、复制、集群、多线程、缓存)

Redis进阶 1.Redis持久化1.1 什么是Redis持久化&#xff1f;为什么需要持久化&#xff1f;1.2 Redis持久化方式——RDB(Redis DataBase)1.2.1 什么是RDB&#xff1f;1.2.2 备份文件位置1.2.3 触发RDB的方式1.2.3.1 自动触发1.2.3.2 手动触发1.2.3.3 其他触发方式 1.2.4 RDB优缺…