activemq消息中间件

ActiveMQ消息中间件详解

下载地址:https://activemq.apache.org/activemq-5015009-release

1、MQ的产品种类

1.1、消息中间件的特性/共同特性/共同维度

  • Kafka(大数据专用、由java/scala编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • RabbitMQ(erlang编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • RocketMQ(java编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • ActiveMQ
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合

1.2 入门场景使用概述

订单秒杀系统下单之后存在在业务的流程

(读取订单、库存检查、库存冻结、余额检查、余额冻结、订单生成、余额扣减、库存扣减、生成流水、余额冻结、库存解冻)

RPC接口基本是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于最慢的那个接口。比如A调用B/C/D都是50 ms,但是B调用B1花费的时间为2000 ms,那么将会拖累整个系统的服务性能。

image-20230531220137860

注:在设计系统时,明确达到的目标

  1. 要做到系统解耦,当新的模块接进来时,要做到代码的改动最小;能够解耦
  2. 设置流量缓冲池,可以让后端按照系统自身的吞吐能力进行消费,不被冲垮;能够削峰
  3. 强弱依赖梳理能将非常关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

消息中间件的作用:解耦、削峰、异步;

定义:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给消息接收者。在这个过程中,发送和接收都是异步的,也就是发送无需等待,而且发送者和接收者的生命周期也没有必然的关系;尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接收者。

image-20230531220905761

队列(queue):相当于发短信,一对一。

主题(topic):相当于朋友圈,需要订阅,公众号;一对多。

特点

  • 异步处理
  • 应用系统之间解耦
  • 削峰

image-20230531222008307

ActiveMQ的解压安装

  • 在官网进行下载(下载地址:https://activemq.apache.org/activemq-5015009-release)

image-20230603215545727

注:建议下载linux版本,消息队列大多数情况都是在集群的环境下进行部署,而集群的使用大多数是使用linux

  • 上传linux压缩包并进行解压(注:外来文件放在/opt下

image-20230603220051707

  • 进行启动(在bin目录下)
./activemq start #启动
./activemq stop #关闭
./activemq restart #重启
  • 查看启动是否成功activemq的默认进程编号为61616,查看进程是否被占用
netstat -anp|grep 61616
lsof -i 61616
ps -ef|grep activemq
#可以访问http://localhost:8161 可现可视化界面
#默认的用户名/密码: admin/admin

注:页面访问,点击进行登录

  • 携带日志启动
./activemq start > 路径/run_activemq.log

image-20230603221649898

1.3、JMS编码总体架构

image-20230604165513141

2、代码编写

  • IDEA创建新的maven工程,并引进相应版本的依赖。可在你下载ActiveMQ的版本网页查找对应版本的maven依赖

image-20230604171146556

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version>
</dependency>

2.1、队列(Queues)实现

注:ActiveMQ的访问地址的方式为tcp的方式

进行生产者进行消息发送

//生产者
public static  void queueDemo() throws JMSException {String path="tcp://192.168.160.128:61616/";String name ="queue01";//创建连接工厂,采用默认的用户名和密码ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(path);//创建连接并启动Connection connection = connectionFactory.createConnection();connection.start();//创建会话session,第一个参数是事务,第二个参数是签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地Queue queue = session.createQueue(name);//创建消息生产者MessageProducer producer = session.createProducer(queue);//进行消息发送for (int i =0;i<3;i++){//创建消息String message = "这是第"+i+"条消息";//创建消息Message textMessage = session.createTextMessage(message);//消息发送producer.send(textMessage);}producer.close();session.close();connection.close();System.err.println("消息发送完成");}//消费者
public static void queueConsumersDemo() throws JMSException {String path = "tcp://192.168.160.128:61616";String name ="queue01";//创建连接工厂ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(path);//创建链接Connection connection = connectionFactory.createConnection();//启动connection.start();//创建sessionSession session = connection.createSession(false, AUTO_ACKNOWLEDGE);//创建消费者Queue queue = session.createQueue(name);//创建消费者MessageConsumer consumer = session.createConsumer(queue);while (true){//可以通过recive设置等待时间,当超时时,消费者自动关闭//Message receive = consumer.receive(4000L);Message receive = consumer.receive();if (receive != null){System.err.println(receive);}}}

注:tcp对应的进程的端口为61616,注意连接的地址

image-20230604174451423

image-20230604180026455

队列监听setMessageListener

 public static void setListener() throws JMSException, IOException {ActiveMQConnectionFactory MQ = new ActiveMQConnectionFactory(path);Connection conn = MQ.createConnection();conn.start();Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(name);MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message!=null ){try {
//                        TextMessage receive = (TextMessage) consumer.receive();TextMessage textMessage = (TextMessage) message;System.out.println("==>>>"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});//防止后台关闭,对MQ进行监听,当有消息的时候进行输出System.in.read();consumer.close();session.close();conn.close();}

在当多个消费者进行等待,之后生产者进行消息的产生,每个消费者对消息进行平均分配,类似于负载均衡。

例:当两个消费者在进行等待时,生产者产生6条消息,则两个消费者对者6条消息进行平均分配,没人三条。

2.2、主题(topic)代码实现

特点:

  1. 生产者将消息发送到topic中,每个消息可以有多个消费者,属于1:N的关系
  2. 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息
  3. 生产者生产时,topic不保存消息它是无状态不能落地,例如无人订阅就去生产,那是一条废消息,所以,一般先启动消费者在启动生产者

JMS规范允许客户创建持久订阅,还在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,类似于微信公众号的订阅

//主题生产者public static void topicProduce() throws JMSException {ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);Connection connection = mq.createConnection();connection.start();Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(topicName);MessageProducer producer = session.createProducer(topic);for (int i =0; i <3; i++){String tpc = "这是第一条消息";TextMessage textMessage = session.createTextMessage(tpc);producer.send(textMessage);}producer.close();session.close();connection.close();System.err.println("主题发送成功====");}
//创建消费者
public static void topicConsumer() throws JMSException, IOException {ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);Connection connection = mq.createConnection();connection.start();Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(topicName);MessageConsumer consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.err.println("============="+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});System.in.read();consumer.close();session.close();connection.close();}

注:主题的启动顺序,先启动消费者,在启动生产者。消费者每个都会得到一份生产者发出的所有的信息。

image-20230615220347023

3、浅谈JMS与JavaEE

3.1、什么是JavaEE

JavaEE是一套使用Java进行企业级应用开发的大家一直遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配即部署企业应用程序。

image-20230615221826581

3.2、什么时JMS(Java消息服务)

Java消息服务是指的两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序的开发,在JavaEE中,当两个应用程序使用JMS进行通信时,他们之间并不是直接相连,而是通过一个共同的消息收发服务组件关联起来以达到解耦、异步、削峰的效果

image-20230615222420549

3.3、MQ中间件的其他落地产品

image-20230615222605081

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
opic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
其他Apache软件基金会开发、起步较早,但没有经过大量吞吐场景验证,目前社区不是很活跃开源,稳定,社区活跃度高阿里出品,目前已交给Apache,但社区活跃度较低Apache软件基金会开发、开源、高通吐量,社区活跃度高

3.4、JMS组成结构和特点

  1. JMS provider 实现JMS接口和规范的消息中间件,也就是我们的MQ服务器
  2. JMS producer 消息的生产者 创建和发送消息的客户端应用
  3. JMS consumer 消息消费者 接受和处理JMS消息的终端
  4. JMS message 产生的消息信息体
    • 消息头
    • 消息属性
    • 消息体 封装具体的消息数据,5中消息体格式,发送和接收的消息体类型必须一一对应

注:发送什么类型的消息,就得接收什么类型的消息,要一一对应

消息持久模式与非持久模式

  • 持久性,应该被传送“一次仅仅一次”,这意味着如果JMS提供者出现故障,该消息不会丢失。他会在服务器恢复之后再次传递(注:主要没有被发送,便会一直在消息服务器中存储)
  • 非持久,最多会传送一次,只要服务器出现故障,消息就会被丢失

消息头属性

  1. JMSDestination 消息发动的目的地
  2. JMSDeliveryMode 消息是否持久
  3. JMSExpiration 消息的过期时间(默认时永不过期)可设置消息在一定时间之后会过期。消息的过期时间Destination的send方法中的timeToLive值加上发送时刻的GMT时间值如果timeToLive的值为0,表示消息永不过期,如果发送后,在消息过期时间之后消息还没有被发送到消息的目的地,则该消息被清除。
  4. JMSPriority 消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求MQ严格按照者是个优先级发送消息,但必须保重加急消息要先于普通消息到达默认级别是4
  5. JMSMessageId 消息ID,消息的唯一识别方式。

消息体的五种属性

  1. TextMessage 字符串类型
  2. MapMessage Map类型
  3. BytesMessage 字节类型(二进制数组消息)
  4. StreamMessage 流类型
  5. ObjectMessage 对象类型

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性,识别/去重/重点标注等操作非常有用的方法。他们是以属性名和**属性值对(K:V)**的形式指定的,可以将属性是为消息头的扩展,属性指定一些消息头没有包括的附加消息,比如可以在属性里指定消息选择器。

消息的属性就像可以分配给一条消息的附加消息头一样,他们可以允许开发者添加有关消息的不透明附加消息,他们还用于暴漏消息选择器在消息过滤是使用的数据。

例:

TextMessage message = session.createTextMessage();
message.setText(text);
message.setStringProperty("username","ABC")// 进行消息自定义

3.5、消息持久性

3.5.1、持久的队列(Queue)

消息队列中,默认的消息为消息持久化

采用持久性消息,当ActiveMQ宕机时,未消费的消息的数量保持不变,有利于保持消息而不被丢失。

队列的默认传送模式,此模式保证这些消息被成功的发送一次和成功的使用一次。对于这些消息可靠性是优先考虑的因素。

可靠性另一个重要的方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

3.5.2、持久主题(topic)

  • 先启动订阅,在启动生产
	//持久化主题,消息消费者public static void lastingTopicConsumer() throws JMSException, IOException {ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);Connection connection = mq.createConnection();connection.setClientID(name);Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(topicName);TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");connection.start();//主题的订阅者Message receive = topicSubscriber.receive();while (receive!=null){TextMessage textMessage =(TextMessage) receive;System.out.println("====="+textMessage.getText());receive = topicSubscriber.receive(5000);}System.in.read();session.close();connection.close();}
  1. 一定要先运行一次消费者,类似于订阅这个主题
  2. 然后在运行生产者发送信息
  3. 无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收到的消息接收下来

4、ActiveMQ的broker

定义:相当于一个ActiveMQ服务器实例

Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,

在用的时候再去启动这样节省了资源,也保证了可靠性。

方式:

用ActiveMQ Broker作为独立的消息服务器来构建java应用

ActiveMQ也支持在VM中通信基于嵌入式的broker,能够无缝的集成其他java应用

所需mvn依赖

		//ActiveMQ核心依赖<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>// 整合spring所需要的依赖  <dependency><groupId>org.apache.xbean</groupId><artifactId>xbean-spring</artifactId><version>3.16</version></dependency>//进行json数据格式转换,在进行Java内嵌activqMQ时,需要引进,否则会报错<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.5</version></dependency>

代码实现

//相当于在本机上边开启了一个ActiveMQ的服务
public static void main(String[] args) throws Exception {BrokerService brokerService = new BrokerService();brokerService.setUseJmx(true);brokerService.addConnector("tcp://localhost:61616");brokerService.start();}

image-20230705214051813

5、spring整合ActiveMQ

application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><!--开启包的自动扫描--><context:component-scan base-package="com.atguigu.activemq"/><bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp:192.168.160.128"/></bean></property><property name="maxConnections" value="100"/></bean><bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-active-queue"/></bean><bean id="jsmTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="jmsFactory"/><property name="defaultDestination" ref="destinationQueue"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean>
</beans>

生产者

package com.atguigu.activemq;import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;@Service
public class Producer {@Autowiredprivate JmsTemplate  jmsTemplate;public static void main(String[] args) {ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");Producer producer =(Producer) applicationContext.getBean("Producer");producer.jmsTemplate.send(new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage("********当前消息");return textMessage;}});}
}

消费者

package com.atguigu.activemq;import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;@Service
public class Customer {@Autowiredprivate JmsTemplate jmsTemplate;public static void main(String[] args) {ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");Customer customer =(Customer) applicationContext.getBean("Customer");String receiveAndConvert =(String) customer.jmsTemplate.receiveAndConvert();System.out.println(receiveAndConvert);}
}

mvn依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>ActiveMQDemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><!--activemq需要的jav包--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency><dependency><groupId>org.apache.xbean</groupId><artifactId>xbean-spring</artifactId><version>3.16</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.5</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.23.RELEASE</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.9</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.3.23.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.3.23.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>4.3.23.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-orm</artifactId><version>4.3.23.RELEASE</version></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjrt</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId><version>1.6.8</version></dependency><dependency><groupId>cglib</groupId><artifactId>cglib</artifactId><version>2.1_2</version></dependency><!--下面是junit/log4等通用配置--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>

6、springBoot整合ActiveMQ

7、ActiveMQ的传输协议

  • ActiveMQ支持的client-broker通讯协议有:TCP、NIO、 UDP、SSL、Https(Http)、VM 。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。

image-20230715134903343

注:在ActiveMQ中默认的使用TCP协议,也默认支持多种的传输协议

设置支持NIO网络协议

<transportConnector name="nio" uri="nio://0.0.0.0:61618"/>

设置进行多协议支持,在5.13之后在支持auto多协议同时支持

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61617?auto.protocols=default,stomp"/>

image-20230715142346935

8、ActiveMQ的消息存储和持久化

为了避免以为宕机之后数据的丢失,需要做到重启之后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制又JDBC、AMQ、KahaDB(默认使用)、LevelDB,无论使用哪一种持久化的机制,消息存储的逻辑都是一致的。

消息存储机制:

就是在发送者在发送出去之后,消息中心首先将消息存储到本地的数据文件、内存数据库或者远程数据库等在试图将消息在发送给接收者,成功则将消息从存储中进行删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要将消息发送出去。

KahaDB:在5.3版本之后建议推荐使用KahaDB存储方式,在5.4版本之后,默认使用kahaDB存储方式

image-20230715145324601

默认的文件的存储位置在activeMQ下的data文件中

image-20230715145453094

8.1、KahaDB存储机制的原理

  • 在KahaDB在消息目录进行存储时。只有4类文件和一个lock。以下四个文件还有一个db.free 四类文件一把锁

  • KahaDB会将消息存储在db-.log文件之中,当一个文件已满时(默认的一个文件的大小为32MB),会自动创建一个新的文件进行相关的存储。例:db-1.log、db-2.log …。当不会再有引用到数据文件中的任何数据消息时,文件会被删除或者时归档。
  • db.data该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质是B-Tree(B树),使用B-Tree作为索引执行db-.log文件中存储的消息
  • db.free当前db.data文件那些页面是空闲的,文件具体内通过是所有空闲页面的ID
  • db.radio是用来进行消息恢复的,当KahaDB消息存储被强制退出后启动,用于恢复BTree索引
  • lock为文件的读取进行添加锁机制,防止数据出现混乱。

8.2、JDBC消息存储(一部分消息会被存储到数据库中)

注:对于长时间的存储,建议使用JDBC的存储方式

  • 将数据库驱动jar包放到MQ的lib文件夹下
  • 做JDBC吃持久化的配置,对文件进行修改适配

image-20230715153921537

<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>   
</persistenceAdapter>

dataSource指定将要引用的持久化数据库的bean名称;
createTablesOnStartup 是否在启动的时候创建数据表,默认值是true;
这样每次启动都会去创建数据表,一般是第一次启动的时候设置为true 之后改成false;

  • 数据库连接池的配置
relaxAutoCommit 表示进行自动提交<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="poolPreparedStatements" value="true"/> 
</bean> 

注:注意配置信息将要添加的位置,否则可能会报错:

**image-20230715155441934

  • 建仓SQL和建表说明
  1. 创建对应名称的数据库
  2. 创建表,默认表名:
    1. ACTIVEMQ_MSGS
    2. ACTIVEMQ_ACKS
    3. ACTIVEMQ_LOCK
  3. 如果新建数据库OK+上述的配置OK+代码运行OK,3表会自动生成。

ACTIVEMQ_MSGS表字段:

ID:自增数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者的主键
MSG_SEQ:是发下哦那个消息的顺序,MSGID_PROD+MSG_SEQ可以足证JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01
MSG:消息本体的Java序列对象的二进制数据
PRIORITY:优先级,0-9,数值越大优先级越高

ACTIVEMQ_ACKS表字段:用于存储订阅关系,如果是持久化topic,订阅者和服务器的订阅关系在这个表里面进行保存;

CONTAINER:消息的介绍
SUB_DEST:如果使用的是Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择之消费满足条件的信息,条件可以用自定义的属性进行实现,支持多属性ANDOR
LAST_ACKED_ID:记录消费过的消息的ID

可能出现的错误

在这里插入图片描述

解决方法:

在保证activeMQ在使用JDBC消息持久化时,所需要的jar包,以及数据库密码,用户、地址都没有错误的情况下,造成报错的原因可能是数据库远程连接的权限没有被放开,导致数据库在进行远程连接时,连接失败。这种情况下,需要放开数据库远程连接的权限。

  • 在本地通过连接数据库的工具或者cmd命令框进入到数据库都可以,在这我采用cmd
mysql -u root -p

image-20230716162207402

  • 查看mysql中存在的库,并使用mysql库
show databases;
use mysql;

image-20230716162533190

  • 查看对应表
select User,Host from user;

image-20230716162455449

注:如果权限没有被放开的情况下,root所对应的Host为localhost,这是我们需要将其改为%

update set Host='%' where User='root';
  • 进行配置刷新
flush privileges;

注:在修改完成之后一定要进行配置刷新,否则相关修改不起作用

  • 在服务器重启activeMQ,进行日志查看

image-20230716162852413

  • 启动成功,activeMQ会在你对应的数据库中创建相关的表

image-20230716162949701

注:在使用消息持久化存储机制时,一定要将activeMQ设置为持久化。否则不会将信息存储到数据库中

点对点类型
在DeliveryMode设置为NODE_PERSISTENCE(非持久化)时,消息保存在内存中

在DeliveryMode设置为PERSISTENCE(持久化),消息保存在broker的相应的文件或者数据库中

而且点对点类型中消息一旦被消费,消息就会在存储的位置进行删除操作。

在使用Topic时,在消息持久化,消息在被消费的时候,消息不会被删除。

8.3、开发中遇到的问题

如果是queue

在没有消费者消费的情况下会将信息保存在activemq_msgs表中,只要有任意消费者已经消费过了,消费之后这些消息将被删除。

如果时topic

一般是先启动消费订阅然后在生产的情况下,会将消息保存到activemq_acks表中

数据库jar包

记得需要使用到的相关的jar文件,放置到lib目录下,mysql-jdbc驱动的jar包和对应的数据库连接池的jar包。默认是dbcp

createTableOnStartup属性
在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时,在第一次启动ActiveMQ时,ActiveMQ服务节点会自动在数据库中创建相关的数据库表,启动完成后可以去掉这个属性,或者是将属性值修改为false。其属性值默认为true,建议在不用的时候将属性值,修改为false

下划线的问题

“java.lang.lllegalStateExcepton:BeanFactory not initialized or already closed”

产生报错的原因是因为您的操作系统的机器名中存在“_”符号,请修改机器名并重启可以解决相关问题。

8.4、高性能缓存(ActiveMQ Journal)

在activemq.xml中进行配置

<persistenceFactory><journalPersistenceAdapterFactory journalLogFiles="4"journalLogFileSize=""useJournal="true"useQuickJournal="true"dataSource="#mysql-ds"dataDirectory="activemq-data"/>
</persistenceFactory>    

配置完成之后,对activeMQ进行重启操作

9、ActiveMQ的高级特性

9.1、ActiveMQ异步传输以及确认发送成功

注:ActiveMQ默认的消息发送方式为异步传输,但是建议在进行代码编写时,再次将消息传输的方式设置为异步
注:当消息设置为不采用事务,但是却将消息设置为持久化时,MQ会默认将消息的传输 方式设置为同步消息传输。这样的传输方式,只有当消息发送出去之后,只有被接受返回成功之后,才会进行下一个消息的处理,容易造成堵塞。

注:在进行异步消息传输时,MQ允许存在极少量的数据丢失,也会存在极少量数据丢失的情况。

例:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);((ActiveMQConnection)connection).setUseAsyncSend(true);

注:在使用异步消息队列是,注意要采用消息的回调,来确认消息是否发送成功

 public static  void queueDemo() throws JMSException {
//        String path="tcp://192.168.160.128:61616/";String name ="queue01";//创建连接工厂,采用默认的用户名和密码ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");//创建连接并启动Connection connection = connectionFactory.createConnection();connection.start();//创建会话session,第一个参数是事务,第二个参数是签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地Queue queue = session.createQueue(name);//创建消息生产者ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);//进行消息发送for (int i =0;i<3;i++){//创建消息String message = "这是第"+i+"条消息";//创建消息Message textMessage = session.createTextMessage(message);textMessage.setJMSMessageID(UUID.randomUUID().toString().replaceAll("-","")); //设置消息的id//消息发送producer.send(textMessage, new AsyncCallback() {@Overridepublic void onSuccess() {//进行消息的回调System.out.println("发送成功的消息"+textMessage.toString());}@Overridepublic void onException(JMSException e) {//进行消息的回调System.out.println("发送失败的消息"+textMessage.toString());}});}producer.close();session.close();connection.close();System.err.println("消息发送完成");}

9.2、消息的延时发送和定时发送

image-20230726211612426

9.3、分发策略

9.4、ActiveMQ重试机制(重新交付政策)

官网地址:https://activemq.apache.org/redelivery-policy

引起消息重发的情况

Client用了transaction且在session中调用了rollback();(没被签收被回调)

Client用了transactions且在调用commit()之前关闭或者时没有commit(事务没有被提交)

Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()

默认:每一秒钟发送6次,当消息发送超过最大次数时,该消息会被标志位异常消息,最终会被存储到死信队列

常用属性:

image-20230726213519746

9.5、死信队列

处理发送失败的消息

  • 一般的生产环境之中,MQ一般会设置两个队列:核心业务队列和死信队列
  • 核心业务队列:就是处理正常的业务信息,死信队列主主要是处理异常的业务队列信息

将所有的DeadLetter保存到一个共享的队列之中,是ActiveMQ的默认的策略

共享队列默认为ActiveMQ.DLQ可以通过deadLetterQueue属性进行设定

<deadLetterStratege><shareDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStratege>   

可以将定时自动删除死信队列中的消息,或者可以存储非持久的异常消息

9.6、如何保证消息不被重复消费

可以通过radis进行相关的解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/11463.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【vue3】获取字典数据,封装为公共方法

前言: 后台项目中基本上都有字典管理页面,Vue封装字典数据的主要目的是为了方便数据的管理和使用 不管在哪个页面使用下拉框,el-select的options数据源需要通过调用接口获取到,不同的数据源调用不同的接口,引入和使用都是不小的工作量,如果使用字典数据管理,不管同个页…

【Spring Cloud Gateway 新一代网关】—— 每天一点小知识

&#x1f4a7; S p r i n g C l o u d G a t e w a y 新一代网关 \color{#FF1493}{Spring Cloud Gateway 新一代网关} SpringCloudGateway新一代网关&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&a…

linux NDK交叉编译rtmp 与 ffmpeg+rtmp交叉编译(v7a,v8a) 完成流程

最近在学RTMP,记录一下完成的编译流程 我是mac 电脑,但是mac上编译一直通过不了,后来才换到服务器上编译, 其实mac也能编译,只是最开始踩到坑里面了… 这里记录一下linux编译完整流程 环境: NDK: android-ndk-r17cFfmpeg: ffmpeg4.2.2 (高版本也可以编译)system: mac 1. …

【Python】Python 网络编程 ( Socket 套接字简介 | Socket 套接字使用步骤 | Socket 套接字服务端与客户端开发 )

文章目录 一、Socket 套接字简介1、Socket 套接字概念2、Socket 套接字类型3、Socket 套接字使用步骤4、Socket 套接字服务端与客户端 二、Socket 服务端与客户端开发1、服务端2、客户端3、执行结果 一、Socket 套接字简介 1、Socket 套接字概念 Socket 套接字 是一种 进程之间…

什么是 web3?

在百度搜索引擎输入 “Web3”、“大厂”。跳出来基本都是这样的标题. 以及如今的互联网行业 “哀鸿遍野”&#xff0c;不仅内卷&#xff0c;还裁员。然后掀起一阵风&#xff0c;猛吹 Web3 的好&#xff0c;数据回归用户……最后再 “威逼利诱” 一下&#xff0c;Web3 就是 20 年…

剑指 Offer 37. 序列化二叉树 / LeetCode297. 二叉树的序列化与反序列化(二叉树遍历(深度优先搜索))

题目&#xff1a; 链接&#xff1a;剑指 Offer 37. 序列化二叉树&#xff1b;LeetCode 297. 二叉树的序列化与反序列化 难度&#xff1a;困难 序列化是将一个数据结构或者对象转换为连续的比特位的操作&#xff0c;进而可以将转换后的数据存储在一个文件或者内存中&#xff0…

LViT:语言与视觉Transformer在医学图像分割

论文链接&#xff1a;https://arxiv.org/abs/2206.14718 代码链接&#xff1a;GitHub - HUANGLIZI/LViT: This repo is the official implementation of "LViT: Language meets Vision Transformer in Medical Image Segmentation" (IEEE Transactions on Medical I…

MIPI D-PHY 2.1协议(学习笔记)

1~3 简介/术语/参考文档 这三章属于介绍性内容&#xff0c;包括缩略语等名词术语解释内容&#xff0c;不再赘述。 直接进入以下正文部分 4 D-PHY概述 D-PHY描述了一种Source同步、高速、低功耗、低成本的PHY&#xff0c;特别适用于移动应用。这个D-PHY规范主要是为了将相机…

Vue 中通用的 css 列表入场动画效果

css 代码 .gradientAnimation {animation-name: gradient;animation-duration: 0.85s;animation-fill-mode: forwards;opacity: 0; }/* 不带前缀的放到最后 */ keyframes gradient {0% {opacity: 0;transform: translate(-100px, 0px);}100% {opacity: 1;transform: translate…

Linux_CentOS_7.9部署Docker以及镜像加速配置等实操验证全过程手册

前言&#xff1a;实操之前大家应该熟悉一个新的名词DevOps 俗称开发即运维、新一代开发工程师&#xff08;Development和Operations的组合词&#xff09;是一组过程、方法与系统的统称&#xff0c;用于促进开发&#xff08;应用程序/软件工程&#xff09;、技术运营和质量保障&…

合并 K 个升序链表——力扣23

题目描述 法一 顺序合并 class Solution { public:ListNode* mergeTwoLists(ListNode* l1, ListNode* l2){ListNode* dummy new ListNode(-1); //创建一个新的头节点 ListNode *curdummy, *aPtr l1, *bPtr l2;while(aPtr && bPtr){if(aPtr->val < bPtr->…

版本适配好帮手 Android SDK Upgrade Assistant / Android Studio Giraffe新功能

首先是新版本一顿下载↓&#xff1a; Download Android Studio & App Tools - Android Developers 在Tools中找到Android SDK Upgrade Assistant 可以在此直接查看SDK升级相关信息&#xff0c;不用跑到WEB端去查看了。 例如看一下之前经常要对老项目维护的android 12蓝牙…

gitee中fork了其他仓库,如何在本地进行同步

GitHub 操作&#xff1a;同步 Fork 来的仓库&#xff08;上游仓库&#xff09;_sigmarising的博客-CSDN博客 1. 设置upstream 2. git pull --rebase 3. 然后再执行pull、push操作

神经数据库:用于使用 ChatGPT 构建专用 AI 代理的下一代上下文检索系统 — (第 2/3 部分)

书接上回理解构建LLM驱动的聊天机器人时的向量数据库检索的局限性 - &#xff08;第1/3部分&#xff09;_阿尔法旺旺的博客-CSDN博客 其中我们强调了&#xff08;1&#xff09;嵌入生成&#xff0c;然后&#xff08;2&#xff09;使用近似近邻&#xff08;ANN&#xff09;搜索…

Intellij IDEA有什么奇技淫巧?

IDEA全称 IntelliJIDEA&#xff0c;是java语言开发的集成环境&#xff0c;IntelliJ在业界被公认为最好的java开发工具之一&#xff0c;尤其在智能代码助手、代码自动提示、重构、J2EE支持、Ant、JUnit、CVS整合、代码审查、创新的GUI设计等方面的功能可以说是超常的。 idea下载…

index页面通过<script>引入根目录下的js文件后,vite打包项目后,项目中无js文件解决方法

解决方法&#xff1a; 根据打包报错提示&#xff0c;如图&#xff1a;即在<script>标签中加入&#xff1a;type&#xff0c;如图&#xff1a; 再打包 js文件就会被打包进去&#xff01;

水文监测软件 HYPACK 2023.2 Crack

HYPACK是由美国coastal海洋图像公司出品的一款世界知名的水文综合测量软件。它能够为勘测员提供了设计勘测、收集数据、处理数据、减少数据和生成最终产品所需的所有工具。从大地测量转换、测量设计、数据采集、数据后处理直到最终测量成图都实现了快速可靠&#xff0c;强大的绘…

六边形架构和分层架构的区别?

六边形架构和分层架构是什么&#xff1f; 六边形架构&#xff08;Hexagonal Architecture&#xff09;和分层架构&#xff08;Layered Architecture&#xff09;是两种常见的软件架构模式。六边形架构强调将核心业务逻辑与外部依赖解耦&#xff0c;通过接口与外部世界进行通信。…

一文让你彻底搞懂Mybatis之缓存机制

编译软件&#xff1a;IntelliJ IDEA 2019.2.4 x64 操作系统&#xff1a;win10 x64 位 家庭版 Maven版本&#xff1a;apache-maven-3.6.3 Mybatis版本&#xff1a;3.5.6 文章目录 一. 缓存是什么&#xff1f;二. 为什么要使用缓存&#xff1f;三. Mybatis中的缓存分哪几种&#…

MySql基本操作

在了解具体的MySql操作之前&#xff0c;我们需要了解一些基本的sql语句注意事项&#xff0c;如下所示&#xff1a; 每一条sql语句都需要以英文 ; 作为结尾&#xff1b;sql语句当中的关键字不区分大小写&#xff0c;不区分双引号和单引号&#xff1b;sql中库名称、表名称和字段…