八、ActiveMQ持久化

ActiveMQ持久化

  • 一、MQ的高可用
  • 二、持久化介绍
  • 三、持久化存储方式
    • 1.AMQ Mesage Store(了解)
    • 2.KahaDB消息存储(默认)
      • 2.1 存储原理
    • 3.JDBC消息存储
    • 4.LevelDB消息存储(了解)
    • 5.JDBC Message Store with ActiveMQ Journal
    • 查询持久化存储方式
  • 四、持久化存储使用
    • 1.JDBC消息存储
      • 1.1 选择方案
      • 1.2 添加mysql数据库驱动
      • 1.3 jdbcPersistenceAdapter配置
      • 1.4 数据库连接池配置
      • 1.5 建库SQL和创表说明
      • 1.6 代码运行
      • 1.7 小总结
      • 1.8 开发的坑
    • 2.JDBC Message Store with ActiveMQ Journal
      • 2.1 配置文件配置
      • 2.2 数据库连接池配置
      • 2.3 总结

一、MQ的高可用

  • MQ自带:
    • 事务
    • 持久
    • 签收
  • MQ非自带:可持久化

二、持久化介绍

  • ActiveMQ宕机了,消息不会丢失的机制。
  • 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。
  • ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
    • 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
    • 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

三、持久化存储方式

1.AMQ Mesage Store(了解)

  • 官网地址:https://activemq.apache.org/components/classic/documentation/amq-message-store
  • AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。
  • 注意:AMQ适用于ActiveMQ5.3之前的版本

2.KahaDB消息存储(默认)

  • 官网地址:https://activemq.apache.org/components/classic/documentation/kahadb
  • 基于日志文件(类似redis的AOF),从ActiveMQ5.4开始默认的持久化插件
  • KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
  • 消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
  • KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。
  • 数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃

2.1 存储原理

  • KahaDB在消息保存的目录中有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比,这就非常简洁了。
    在这里插入图片描述
  • db-number.log
    • KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如没32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。在这里插入图片描述
  • db.data:该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。
  • db.free:当问当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID
  • db.redo:用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。
  • lock:文件锁,表示当前kahadb独写权限的broker。

3.JDBC消息存储

  • 消息基于JDBC存储的

4.LevelDB消息存储(了解)

  • 官网地址:https://activemq.apache.org/components/classic/documentation/leveldb-store
  • 这个数据库,在5.17版本被移除了
  • 这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
  • 但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

5.JDBC Message Store with ActiveMQ Journal

查询持久化存储方式

  • 打开 conf/activemq.xml
    在这里插入图片描述

四、持久化存储使用

1.JDBC消息存储

1.1 选择方案

  • MQ+MySQL
    • https://activemq.apache.org/components/classic/documentation/persistence

1.2 添加mysql数据库驱动

  • 添加mysql数据库的驱动包到lib文件夹
  • 我的MySQL是8.0.30
# 1.进入activemq的lib目录
cd /data/activemq/apache-activemq-5.15.9/lib
# 2.下载mysql驱动
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar

1.3 jdbcPersistenceAdapter配置

vi /data/activemq/apache-activemq-5.15.9/conf/activemq.xml
# 复制以下内容<persistenceAdapter>  <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/> </persistenceAdapter>

在这里插入图片描述

  • 参数解释:
    • dataSource是指定将要引用的持久化数据库的bean名称。
    • createTableOnStartup是否在启动的时候创建数据库表,默认是true,这样每次启动都会去创建表了,一般是第一次启动的时候设置为true,然后再去改成false。

1.4 数据库连接池配置

# 1.在mq的配置文件加入以下内容<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/><property name="url" value="jdbc:mysql://192.168.86.128:3306/activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="qwe123"/><property name="poolPreparedStatements" value="true"/></bean>

在这里插入图片描述

1.5 建库SQL和创表说明

# 创建activemq持久化库
CREATE DATABASE activemq;

重启mq

./activemq stop && ./activemq start
  • 重启后,MySQL数据库会创建3个表:
  • ACTIVEMQ_MSGS:消息表,缺省表名ACTIVEMQ_MSGS,Queue和Topic都存在里面,结构如下
    • ID:自增的数据库主键
    • CONTAINER:消息的Destination
    • MSGID_PROD:消息发送者的主键
    • MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
    • EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
    • MSG:消息本体的Java序列化对象的二进制数据
    • PRIORITY:优先级,从0-9,数值越大优先级越高
  • ACTIVEMQ_ACKS:这个表用于存储消息的确认信息(Acknowledgements)。当消费者消费消息并发送确认时,确认信息会被记录在 ACTIVEMQ_ACKS 表中。这些确认信息可以帮助 ActiveMQ 跟踪哪些消息已经被成功消费,哪些消息还需要继续传递。
  • ACTIVEMQ_LOCK:ACTIVEMQ_LOCK在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker
# 注意:如果表没有生成,那么在activemq数据库执行以下语句
-- auto-generated definition
create table ACTIVEMQ_ACKS
(CONTAINER     varchar(250)     not null comment '消息的Destination',SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',SUB_NAME      varchar(250)     not null comment '订阅者名称',SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',PRIORITY      bigint default 5 not null comment '优先级,默认5',XID           varchar(250)     null,primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';create index ACTIVEMQ_ACKS_XIDXon ACTIVEMQ_ACKS (XID);-- auto-generated definition
create table ACTIVEMQ_LOCK
(ID          bigint       not nullprimary key,TIME        bigint       null,BROKER_NAME varchar(250) null
);-- auto-generated definition
create table ACTIVEMQ_MSGS
(ID         bigint       not nullprimary key,CONTAINER  varchar(250) not null,MSGID_PROD varchar(250) null,MSGID_SEQ  bigint       null,EXPIRATION bigint       null,MSG        blob         null,PRIORITY   bigint       null,XID        varchar(250) null
);create index ACTIVEMQ_MSGS_CIDXon ACTIVEMQ_MSGS (CONTAINER);create index ACTIVEMQ_MSGS_EIDXon ACTIVEMQ_MSGS (EXPIRATION);create index ACTIVEMQ_MSGS_MIDXon ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);create index ACTIVEMQ_MSGS_PIDXon ACTIVEMQ_MSGS (PRIORITY);create index ACTIVEMQ_MSGS_XIDXon ACTIVEMQ_MSGS (XID);

将createTableOnStartup设置为false

  • 防止下次重启,重复执行建表语句
    在这里插入图片描述

1.6 代码运行

  • 生产者代码里面一定要开启持久化
  • 持久化主要是针对生产者,消费者的代码不需要额外添加东西。
  • 下面是生产者队列,对于topic是一样的。
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61616";public static final String QUEUE_NAME = "jdbc01";public static void main(String[] args) throws JMSException {//1.创建连接工厂,按照给定的URL,采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted=事务,acknowledgeMode=确认模式(签收)//开启事务需要commitSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue = session.createQueue(QUEUE_NAME);//5.创建消息的生产者,并设置不持久化消息MessageProducer producer = session.createProducer(queue);// 开启持久化存储producer.setDeliveryMode(DeliveryMode.PERSISTENT);//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面// 7.发送消息for (int i = 0; i < 3; i++) {TextMessage textMessage = session.createTextMessage("jdbc msg--" + i);producer.send(textMessage);}//8.关闭资源producer.close();session.close();connection.close();}}

在这里插入图片描述

1.7 小总结

  • 如果是queue:在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除消费过的消息
  • 如果是topic:一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs,在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。

1.8 开发的坑

  • 在配置关系型数据库作为ActiveMQ的持久化存储方案时,有坑
    • 数据库jar包:注意把对应版本的数据库jar或者你自己使用的非自带的数据库连接池jar包
    • createTablesOnStartup属性:默认为true,每次启动activemq都会自动创建表,在第一次启动后,应改为false,避免不必要的损失。
    • java.lang.IllegalStateException: LifecycleProcessor not initialized:确认计算机主机名名称没有下划线。如果有下划线,就要更改机器名并且重启即可。

2.JDBC Message Store with ActiveMQ Journal

  • 这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
  • ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
  • 当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
  • 举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
  • 注意:不会马上写入数据库

2.1 配置文件配置

vi /data/activemq/apache-activemq-5.15.9/conf/activemq.xml
# 复制以下内容<persistenceFactory><journalPersistenceAdapterFactoryjournalLogFiles="5"journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds"dataDirectory="../activemq-data" /></persistenceFactory>   

在这里插入图片描述

2.2 数据库连接池配置

# 1.在mq的配置文件加入以下内容<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/><property name="url" value="jdbc:mysql://192.168.86.128:3306/activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="qwe123"/><property name="poolPreparedStatements" value="true"/></bean>

在这里插入图片描述

./activemq stop && ./activemq start

2.3 总结

  • 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
  • 持久化机制演变的过程:从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
  • ActiveMQ消息持久化机制有:
    • AMQ 基于日志文件
    • KahaDB 基于日志文件,从ActiveMQ5.4开始默认使用
    • JDBC 基于第三方数据库
    • Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/708797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++:模版初阶 | STL简介

创作不易&#xff0c;感谢支持&#xff01;&#xff01; 一、泛型编程思想 如何实现一个通用的交换函数呢&#xff1f; 注&#xff1a;其实swap函数在C的标准库提供了&#xff0c;不需要自己写&#xff0c;这边只是举个例子 void Swap(int& left, int& right) { in…

【小夏送书 | 第二期】世界顶级名校计算机专业,都在用哪些书当教材?

&#x1f304;参与规则 参与方式&#xff1a;关注博主点赞收藏评论&#xff0c;&#xff08;每人最多评论三次&#xff09; 本次送书1~3本【取决于阅读量&#xff0c;阅读量越多&#xff0c;送的越多】 活动时间至&#xff1a;2024-3-6 20:00:00 | 随机抽取由博主动态公布抽…

《国色芳华》争议不断,杨紫簪花妆惊艳全场,李现造型更是抢眼。

♥ 为方便您进行讨论和分享&#xff0c;同时也为能带给您不一样的参与感。请您在阅读本文之前&#xff0c;点击一下“关注”&#xff0c;非常感谢您的支持&#xff01; 文 |猴哥聊娱乐 编 辑|徐 婷 校 对|侯欢庭 猴哥来啦&#xff01;新剧《国色芳华》火热开拍&#xff0c;杨…

Mysql数据库管理系统学习笔记1——sql语句,DBMS,数据库的分类

mysql是一种数据库管理系统&#xff08;DBMS&#xff09;&#xff0c;data base manage system sql语句即为“structured query language”&#xff0c;结构化查询语言 数据库的分类&#xff1a;关系型数据库&#xff08;RDBMS&#xff09;与非关系型数据库 对于一些具有相同…

el-table通过这样封装可以实现校验-表格校验的原理

我们一般在后台系统中&#xff0c;很常见的操作时表格里面嵌套表单&#xff0c;之前我的网上找到了一些封装的用法&#xff1a; <el-form :model"formData" :rules"ruleData" ref"formDom"><el-table :data"formData.tableData&q…

WordPress分类目录ID怎么看?如何查找WordPress标签ID?

在WordPress网站中&#xff0c;我们需要判断某篇文章是否属于某个分类目录&#xff0c;或者是否拥有某个标签&#xff0c;那么就需要用到分类目录ID和标签ID&#xff0c;那么WordPress分类目录ID怎么看&#xff1f;如何查找WordPress标签ID&#xff1f;下面boke112百科就跟大家…

Node.js基础---加载机制

模块的加载机制 1. 优先成缓存中加载 模块在第一次加载后会被缓存&#xff0c;意味着多次调用 require() 不会导致模块代码被多次执行 注意&#xff1a;无论是什么模块都会优先从缓存内加载&#xff0c;以提高加载效率 2. 内置模块的加载机制 内置模块是 Node.js官网提供的模块…

【论文阅读】微纳米气泡技术作为CO2-EOR和CO2地质储存技术的新方向:综述

Micro and nanobubbles technologies as a new horizon for CO2-EOR and CO2 geological storage techniques: A review 微纳米气泡技术作为CO2-EOR和CO2地质储存技术的新方向&#xff1a;综述 期刊信息&#xff1a;Fuel 2023 期刊级别&#xff1a;EI检索 SCI升级版工程技术1区…

Python实现时间序列分析进行平稳性检验(ADF和KPSS)和差分去趋势(adfuller和kpss算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 时间序列分析中的平稳性检验是评估一个时间序列是否具有稳定的均值和方差。在经济学、金融学以及其他诸…

麒麟OS:操作系统国家队

这是ren_dong的第31篇原创 1、中标软件 中标软件&#xff1a;国产操作系统龙头 中标软件有限公司成立于2003 年&#xff0c;是国产自主操作系统和办公软件产品提供商&#xff0c;拥有 国防、民用两方面的相关企业与产品资质&#xff0c;是安全操作系统旗舰企业。 中标软件的主要…

飞天使-学以致用-devops知识点3-安装jenkins

文章目录 构建带maven环境的jenkins 镜像安装jenkinsjenkins yaml 文件安装插件jenkins 配置k8s创建用户凭证 构建带maven环境的jenkins 镜像 # 构建带 maven 环境的 jenkins 镜像 docker build -t 192.168.113.122:8858/library/jenkins-maven:jdk-11 .# 登录 harbor docker …

解读人工智能的理论基石

1956年的一个夏天&#xff0c;在达特茅斯学院的一个小会议室里&#xff0c;一群充满好奇和野心的年轻科学家聚集在一起&#xff0c;他们有一个共同的梦想&#xff1a;创造能够模仿人类智能的机器。这不仅仅是科幻小说的情节&#xff0c;更是人工智能历史上一个真实的起点。从那…

基于JAVA的毕业生追踪系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 登陆注册模块2.2 学生基本配置模块2.3 就业状况模块2.4 学历深造模块2.5 信息汇总分析模块2.6 校友论坛模块 三、系统设计3.1 用例设计3.2 实体设计 四、系统展示五、核心代码5.1 查询我的就业状况5.2 初始化就业状况5.…

防御保护:防火墙内容安全

一、IAE&#xff08;Intelligent Awareness Engine&#xff09;引擎 二、深度检测技术(DFI和DPI&#xff09; 1.DPI – 深度包检测技术 DPI主要针对完整的数据包&#xff08;数据包分片&#xff0c;分段需要重组&#xff09;&#xff0c;之后对数据包的内容进行识别。&#x…

微服务 人工智能AI 物联网智慧工地云平台源码

目录 ​编辑 智慧工地架构 智慧工地系统 智慧工地云平台功能模块 1、基础数据管理 2、考勤管理 3、安全隐患管理 4、视频监控 5、塔吊监控 6、升降机监控 7、移动端数据推送 智慧工地管理平台子系统构成 智慧工地物联网解决方案&#xff0c;对工地施工安全人员、设…

引入本地图片报错:require is not defined

文章目录 问题分析1. 原始写法2. 最初的解决方案3. 尝试使用 require 引入4. 封装方法进行解析引入图片 问题 Vue3 Vite 使用本地图片报错&#xff1a;require is not defined 分析 1. 原始写法 刚开始我是这样写的&#xff0c;数据是这样定义的&#xff0c;但是数据没出…

【Linux深入剖析】再续环境变量 | 进程地址空间

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 目录 1.环境变量再续1.1 和…

FX110网:外汇交易中的隔夜利息该如何计算?

在交易过程中&#xff0c;我们经常能够听到 “隔夜利息”这个词&#xff0c;但不少新手依然不是很明白这个专业名词的意思。今天小编帮助大家理解这个概念。“隔夜利息”的含义 顾名思义&#xff0c;是根据持仓总数计算的每日可赚取或需支付的利息。每个货币都有他们自己的基准…

贝叶斯优化双向门控循环单元BO-BIGRU时序预测的matlab实现【源代码】

贝叶斯优化双向门控循环单元简介&#xff1a; 贝叶斯优化双向门控循环单元&#xff08;BO-BIGRU&#xff09;是一种结合了贝叶斯优化和双向门控循环单元&#xff08;BIGRU&#xff09;的神经网络模型。BIGRU是一种改进的循环神经网络&#xff08;RNN&#xff09;&#xff0c;它…

现代信号处理学习笔记(二)参数估计理论

参数估计理论为我们提供了一套系统性的工具和方法&#xff0c;使我们能够从样本数据中推断总体参数&#xff0c;并评估估计的准确性和可靠性。这些概念在统计学和数据分析中起着关键的作用。 目录 前言 一、估计子的性能 1、无偏估计与渐近无偏估计 2、估计子的有效性 两个…