RabbitMQ入门到精通
- 一、了解RabbitMQ
- 1.基础知识
- 2.多种交换机模型详解
- 二、服务端搭建
- 1.简单搭建
- 2.信息持久化到容器外部
- 三、消息生产者和消费者
- 1.消息生产者
- 2.消息消费者
- 3.RabbitTemplate 详解
- 4.RabbitListener详解
- 5.其他注解
- 四、如何保证消息可靠性
- 1.发送方进行消息发送成功确认
- 1.1 增加配置
- 1.2 设置回调方法
- 2.发送方增加消息失败重试机制
- 2.1 增加自定义失败重试配置
- 2.2 获取配置信息配置类
- 2.3 交换机回调失败重试代码
- 2.4 设置消息前置处理器
- 2.5 消息发送
- 3.发送方发送持久化消息
- 4.持久化交换机、持久化队列
- 4.1 在Rabbit的管理后台中创建的话交换机和队列默认都是持久化的
- 4.2 在java代码中创建交换机和队列
- 5.消费方进行手动ack
- 6.发送方和消费方重试机制
- 五、消息监听器
- 1.消费者监听容器:simple
- 2.消费者监听容器:direct
- 2.1 direct使用
- 2.2 使用direct设置自定义线程池
- 2.3 同时使用simple和direct
- 六、其他关键信息
- 1.消费者的消息预取
- 2.工作模式:work与能者多劳
- 3.死信队列
- 3.1 TTL - Queue
- 3.2 TTL - Message
- 3.3 消费者reject或者nack
- 3.4 队列消息达到最大
- 3.6 死信消息的处理
- 4.延迟队列 - 使用死信队列实现
- 5.延迟队列 - 使用延迟交换机插件
- 5.1 安装延迟交换机插件
- 5.2 使用延迟交换机
- 6.队列的惰性模式 - Lazy Queue
- 6.开启trace日志
- 6.1 启用trace插件
- 6.2 使用trace
- 七、可能碰到的异常
- 1.业务幂等
- 2.如何数据持久后再进行消息推送
- 3.发送文件信息过大
- 4.ack超时导致消息重新入队
- 5.消息大量积压
一、了解RabbitMQ
RabbitMq是目前主流的消息服务,其他常用的有RocketMQ、kafka等。还有比较古老的ActiveMQ,现在使用的比较少了,基本都是前面三种,一般会根据自己业务需要选择不同的消息中间件。这一篇总结RabbitMQ。
1.基础知识
-
1.MQ的主要作用
削峰:流量突然爆发针对超出容量的消息,进行入队等待消费。
异步:异步处理有助于给请求端更快的响应,更好的体验。
解耦:系统间调用使用OpenFeign或者RPC都是同步的,很多场景是可以使用异步的,此时使用MQ就可以实现系统间的解耦 -
2.MQ遵循的协议
JMS:java message service,是java定义了api接口,实现者根据接口进行实现,典型的是ActiveMQ
AMQP:AMQP是一种消息协议,他定义的是一种规范,因此可以实现跨平台和语言,典型的是RabbitMQ
其他协议:kafka则是自定义的协议,并不是以上两种,而且kafka目前占有率已经越来越高了 -
3.MQ是典型的生产者消费者模型
在MQ中主要有三大角色:生产者、MQ、消费者,他是典型的生产者消费者模型,生产者生产消息交给MQ,MQ内部通过一系列动作将其放入队列,消费者监听队列获取消息。
-
4.RabbitMQ内部都有什么
Broker:中间人的意思,其实就是指MQ服务本身
Host:Host相当于在Broker内部的虚拟机,一个Broker内部可以有多个Host,每个Host内的交换机和队列都是隔离的,通常用于区分系统
Exchange:交换机,生产者将消息发送到Broker时,是先将消息发送到交换机的,交换机根据routingkey进行发送到队列
Queue:消息队列,发送方发送消息的最终目的地,与消费方监听的对象,也是消息的存储之地
-
5.发送方如何通过RabbitMQ发送消息
-
1.生产者和Broker通过三次握手建立TCP连接,形成connection,这个在Management中会有展示
-
2.生产者和Broker基于connection的TCP长连接建立逻辑通道channel,这个在管理页面也有展示
connection因为是TCP连接,建立起来属于重量级的动作,而channel是在TCP长连接内部的逻辑连接,相对比较轻量级,所以一般connection是不会被主动销毁的,除非是一方主动断开连接,而channel若是长时间不使用是会被销毁的。 -
3.生产者发送消息到Broker,Broker根据指定的Host(不指定默认进入默认Host:/),然后交给他的交换机,交换机根据routingkey进行路由到指定的队列,到底消息发送成功
-
-
6.消费方如何通过RabbitMQ消费消息
- 1.消费者和Broker通过三次握手建立TCP连接,形成connection,这个connection也会和生产者的一起展示在管理也的connections中
- 2.消费者和Broker建立channel,这个也会展示在channels中
- 3.消费者监听队列,接收消息
- 4.消费者回复队列消息接受成功(ack)
-
7.RabbitMQ工作模型
-
8.RabbitMQ与主流MQ的对比
2.多种交换机模型详解
-
direct 直连交换机
该交换机通过routing key和队列建立关系,且只有routing key完全匹配时才可以将消息路由到相应的队列,否则路由失败,不支持通配符,且必须是完全匹配的routing key,所以叫直连交换机。一个直连交换机可以绑定多个队列,每个队列都可以声明不同的routing key,若是所有队列都是用相同的路由key或者使用空或者null进行绑定,那么直连就变成了广播交换机。 -
fanout 广播交换机
广播交换机,消息收到后会发给所有绑定到交换机的队列,不会区分routing key,即使队列绑定广播交换机时声明了routing key,也不会遵循routing key,这里的模式永远是全部分发给所有绑定的队列,忽略rouint key。 -
topic 主题模式
主体模式根据routing key进行消息分发,与direct交换机的区别是,topic支持通配符。因此topic成为了最为灵活的交换机,他既能做direct的事,也可以做fanout的事,所以topic也是使用最为广泛的交换机。他的通配符支持两种,一种是#(匹配0个或者多个任意词),一种是*(匹配任意一个词)。比如有以下四个队列绑定了topic交换机,队列之后是各自的routing key- queue.one key.*
- queue.two key.*.*
- queue.three *.three.*
- queue.thrff *.three.#
如果发送一条消息他的rouking key是key.one 则消息只会进入到queue.one中
如果发送一条消息他的rouking key是key.one.msg 则消息只会进入到queue.two中
如果发送一条消息他的rouking key是key.three.msg 则消息只会进入到queue.two、queue.three、queue.thrff
如果发送一条消息他的rouking key是key.three.msg.action 则消息只会进入到queue.thrff
这就是主体交换机了,记住一点:*表示可以匹配任何一个词,这个词表示的是点之间间隔的词,而不是一个字母,比如key.one.msg这里的key或者one或者msg是一个词,*可以代表这种一个词。#,代表0或者多个词。 -
headers 头部交换机
头部交换机,就是使用头部信息进行路由的交换机,该交换机不支持使用routing key进行路由,只能通过头部信息进行路由,且头部交换机在绑定队列时应该声明监听的头部属性,同时声明x-match,x-math有两个值,一个是any一个是all。any表示匹配任何一个键值对就会进行路由,all表示必须满足全部的键值对才会路由,如果在进行交换机和队列的绑定时没有声明x-match,则默认是all。
如果一个头部交换机绑定了这三个队列,则headers.one 必须头部信息包含 key=headers.one 和key2=headers.two两个键值对,才会成功路由到headers.one队列。而headers.three则只需要包含一种一个就可以路由,因为他的x-match是any。
还有一点需要注意发送消息时,属性声明的是头部信息,而不是属性,这个需要注意。
二、服务端搭建
1.简单搭建
上面了解了一些基本概念,这里需要开始实操了,首先需要搭建一个MQ的服务端和管理控制台,如果手动安装下载的话这俩是分开的,这里使用docker进行安装,我们可以在docker 仓库中选择后缀带有management的镜像,这些都是集成了MQ服务和管理控制台的镜像,只需要一次安装即可,下面是安装命令:
这里对外暴露了两个端口5672,用于MQ服务对外的连接端口,还有一个15672,这是管理控制台的访问端口,不过MQ不是只用了这俩端口,只是其他端口可以不向外暴露而已。
# -d 后台运行
# -u 指定运行用户
# --name 指定容器名
# -p 端口映射
docker run -d -u root --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11.2-management
安装完成后,使用guest/guest进行登录管理控制台:
到此服务端已经搭建完成了。
2.信息持久化到容器外部
上面的搭建命令会存在一个问题,若是mq因为问题需要重建,那么mq中的交换机、队列、消息、用户等等信息将全部会丢失,这种问题是不能容忍的,因为这种信息丢失是很大的生产问题,那么问题来了,怎么解决呢,先上命令
docker run -d
-u root \
--name rabbitmq \
--hostname rabbit-1 \
-v /apps/rabbitmq/data:/var/lib/rabbitmq/mnesia
-p 5672:5672 -p 15672:15672 \
rabbitmq:3.11.2-management
使用这个命令,无论容器意外损坏需要重新建立rabbitmq容器,只要还是使用了这个数据目录,数据就依然还在,这里需要注意hostname的指定,一定是不可以省略的,因为rabbitmq的持久化文件会以容器的hostname作为持久化文件名的一部分,如下所示:
如果不手动指定hostname,那么即使数据文件映射没问题,数据依然是不可用的,这点很容易被忽略,其次就是数据映射了,rabbitmq的数据存储目录是/var/lib/rabbitmq/mnesia ,所以需要我们将他映射到本地。通过这两部操作就可以将容器内的持久化数据永久留存到本地了,即使更换容器信息也不回丢失。
三、消息生产者和消费者
服务端已经搭建完成,那么就可以开始验证消息的发送了,这里环境使用SpringBoot2.6.11 进行测试。
1.消息生产者
这里进行简单的消息发送的展示
- 1.引入依赖
这里引入的依赖中并不会含有rabbitmq,因为主流的AMQP协议的MQ其实就是RabbitMQ了,而且Spring中提供的AMQP的默认实现就是RabbitMQ,所以在Spring里提到AMQP其实就是指RabbitMQ,如果你的父工程指明了SpringBoot的版本或者当前工厂的父工程就是SpringBoot指定的pom,那我们都无需声明版本号,Spring会自动帮我们寻找适配的版本,自动安装客户端。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 2.配置更改
MQ的配置信息基础如下,这里使用addresses和使用host+port是等效的
spring:rabbitmq:
# host: 192.168.150.199
# port: 5672addresses: 192.168.150.199:5672virtual-host: /username: guestpassword: guest
- 3.编写代码
代码很简单,如下:
package com.ebbing.task.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author pcc*/
@Slf4j
@RestController
public class TestController {@AutowiredRabbitTemplate rabbitTemplate;@PostMapping("/testMQ")public void testMQ(){log.info("收到信息");rabbitTemplate.convertAndSend("amq.topic","one","hello amqp");log.info("发送完成");}
}
这里是通过指定交换机、路由key、消息三者来去发送消息的,需要我们在管理页面,正确配置交换机和队列的绑定,其他就没了。
amq.topic:交换机
one:路由key
hello amqp:发送的message
2.消息消费者
消费者同样需要引入相同的依赖和配置,这个和生产者相同,这里不重复展示了。唯一需要展示的是简单的一点代码:
package com.cheng.ebbing.message.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author pcc*/
@Slf4j
@Component
public class TestMQ {@RabbitListener(queues = "Test-queue-one")public void test(String msg){log.info("收到消息:{}",msg);}
}
代码很简单,只需要借助注解RabbitListener轻松实现队列的监听,然后获取到队列的消息,至于监听方法使用的参数类型,Spring也是可以自动帮我们转化的。
3.RabbitTemplate 详解
在SpringBoot中发送消息时,SpringBoot已经为我们封了一个Template为我们使用,他可以方便我们处理消息,大致列举了以下几类消息可能会用到,但是RabbitTemplate并不是只提供了这些,还有很多。下面每一个方法基本都是有很多重载方法,这里只说这类方法的作用,无法一一列举。
-
send:原始的消息发送api,与convertAndSend的区别是未使用转换器,但也可以做为消息发送的api
TbDict tbDict = new TbDict();tbDict.setDictId(111L);tbDict.setDictDesc("测试desc");tbDict.setTypeCode("test_code");Message message = MessageBuilder.withBody(JSON.toJSON(tbDict).toString().getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();rabbitTemplate.send("amq.topic","one",message,new CorrelationData());
下面是mq的管理端获取到的消息:
下面是使用java监听接收到的消息:
send系列api都差不多。 -
sendAndReceive:官方注释解释是基于RPC模式的调用,会尝试获取接收结果,等带一段时间(直到超时)没有结果就返回null,我尝试了多次,无论信息有没有被接收到获取到的返回信息都是null,感觉不推荐使用。
-
convertAndSend: 这个api是使用频率最高的了,基本使用RabbitTemplate都是会使用这个api进行消息发送,他相对于send来说该api多的功能是消息转换。该api支持直接传入消息内容,默认的消息转换器SimpleMessageConverter会将消息内容根据String、Byte[]、Serializable、null进行不同的转换。但是如果使用send方法,则不会有这种自动操作,需要我们手动设置Message,这里需要我们声明消息头已经内容编码,内容格式等信息。
rabbitTemplate.convertAndSend("amq.topic", "one", JSON.toJSON(tbDict).toString(), new CorrelationData());
下面是收到消息的截图:
-
convertSendAndReceive:这个方法不言而喻了,就是会等待接收结果和上面的sendReceive差不多
-
receive:可以用来做消息接收,不过一般都是使用@RabbitListener来进行接收消息,不过使用receive也是同样可以的,不过receive触发一次只会接收一条消息。
Message message = MessageBuilder.withBody(JSON.toJSON(tbDict).toString().getBytes("UTF-8"))// 当使用默认的转换器时,即使这么设置在mq中的消息仍然是text/plain// 使用Jackson2json转换器时,则无需这么设置也会是json格式.setContentType(MessageProperties.CONTENT_TYPE_JSON).build();CorrelationData correlationData = new CorrelationData();correlationData.setReturned(new ReturnedMessage(message,0,null,"amq.topic","one"));rabbitTemplate.convertAndSend("amq.topic", "one", JSON.toJSON(tbDict).toString(), correlationData);log.info("发送完成");Message receive = rabbitTemplate.receive("Test-queue-one");String content = new String(receive.getBody(), StandardCharsets.UTF_8);log.info("接收到消息:{}",content);log.info("接收到消息-转换json:{}",JSON.parse(content).toString());
需要说的是,若是使用默认的消息转换器,则我们发送String类型的消息时,消息内容类型会被设置为:text/plain,此时我们接收到消息时这种样式:
若是使用了消息转换器,比如Jackson2JsonMessageConverter,则会帮我们自动设置application/json格式,则java中接收到如下:
他们在管理台的展示如下:
不过无论哪种,在Java中使用JSon工具类都是可以正常转换的,如上面的JSON.parse。 -
addBeforePublishPostProcessors
该方法顾名思义就是在消息发送之前对消息进行改造的方法,如果有统一处理,我们是可以放在这里的,比如消息的id处理,格式处理,请求头处理等,都可以放入到这里,这样即可与业务进行解耦了,下面只是展示,如果使用可以直接将这块代码写入配置。rabbitTemplate.addBeforePublishPostProcessors(message1->{log.info("发送前处理开始-message1:{}",message1);message1.getMessageProperties().setContentType("application/xml");log.info("发送前处理完成-message1:{}",message1);return message1;});
代码比较简单就是将消息的样式改成了"application/xml"了,下面是日志输出:
下面是管理台的截图,可以看到消息内容变成了我们已经更改的"application/xml"了。
-
addAfterReceivePostProcessors
这个与上面类似,是消息接收之后的处理方法,不过需要注意的是这里的收到之后执行是指channel收到,而不是指exchange或者queue,更不是指消费者。这个很容易误解,同时发送到了channel并不表示进入了exchange,更不表示进入了队列rabbitTemplate.setAfterReceivePostProcessors(message2->{return message2;});
注意postProcessors此类方法只允许全局设置一次,而且需要设置在配位文件中,不可设置到业务代码里,否则每次执行业务代码都会新增一个postprocessors,因为此类方法都是允许多个processor的。
4.RabbitListener详解
上面的demo里只使用了这个注解指定队列,就实现了消息的消费。实际上绝大部分场景使用这一个注解就够了,少数特殊场景需要的注解可以看5中的使用,这部分提供了额外的一些支持。这里详细介绍下RabbitListener的所有属性,RabbitListener可用于类、方法、注解等上面,且一个方法上支持声明多次该注解。
- queues 监听队列声明
支持声明多个队列,声明多个队列的话就会监听多个对列,被监听的队列有消息当前方法就会收到。@RabbitListener(queues = {"Test-queue-one","test_queue"}) public void test(String msg){log.info("收到消息:{}",msg); }
- containerFactory 支持多消费者
用以支持多个消费者,如果一个服务里需要消费多个Rabbit的队列,那么就需要为非默认的Rabbit指定containerFactory属性,containerFactory需要传入他的一个子类,这里使用SimpleRabbitListenerContainerFactory进行演示:
然后我们只需要在使用RabbitListener注解时声明containerFactory即可,如下:package com.cheng.ebbing.message.mq;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author pcc* @version 1.0.0* @description 多mq配置类*/ @Configuration public class anothermqconfig {@Bean("mySimpleRabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory authorityListenerContainerFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("192.168.150.202");connectionFactory.setPort(5673);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(false); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConnectionFactory(connectionFactory);factory.setMissingQueuesFatal(false);return factory;} }
如上,test方法使用的是默认配置的Rabbit,test2使用的是自定义配置的另一个Rabbit,此时从两个mq分别往对应队列发消息,就可以实现一个服务监听多个Rabbit的队列:package com.cheng.ebbing.message.mq;import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException;/*** @author pcc*/ @Slf4j @Component public class TestMQ {@RabbitListener(queues = {"Test-queue-one","test_queue"})public void test(String msg){log.info("test1收到消息:{}",msg);}@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory")public void test2(String msg, Channel channel) throws IOException {log.info("test2收到消息:{}",msg);channel.basicAck(1,false);} }
- concurrency 并发消费
这里声明一个字符串数字,用以创建多线程去消费消息,一般用不到,若是消息量比较大,可以使用多线程进行消费,多条消息。
下面截图可以看出,客户端确实启动了五个消费者:@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",concurrency = "5") public void test2(String msg, Channel channel) throws IOException {log.info("test2收到消息:{}",msg);channel.basicAck(1,false); }
- exclusive 独占消费
这个和上面的作用正好相反,配置他是为了只让一个节点消费,当有多个服务节点时,若是信息只想让一个节点消费就可以将该属性置为true。下面开启两个监听者进行验证:
如上,test2和test3将只有一个可以接收到消息,且是一直是那一个接收,另一个不接收。@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",exclusive = true) public void test2(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {log.info("test2收到消息:{}",msg);channel.basicAck(deliveryTag,false); }@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",exclusive = true) public void test3(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {log.info("test3收到消息:{}",msg);channel.basicAck(deliveryTag,false); }
- Acknowledge 确认模式
确认模式可以在配置文件中指定,也可以在RabbitListener指定,优先级肯定是注解中的更高:
这就是RabbitListener支持的大部分属性了,基本已经可以满足大部分需要了,但却不是全部,有些场景还是需要配合其他信息一起使用的。@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",exclusive = true,ackMode = "NONE") public void test3(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {log.info("test3收到消息:{}",msg);channel.basicAck(deliveryTag,false); }
5.其他注解
这里介绍下可以配合RabbitListener注解使用的一些注解。
- 5.1 RabbitHandler
用来标记处理消息的方法,基本用不到,也不建议使用。 - 5.2 Payload
用以标识消息体,支持将消息转为自定义的数据结构,使用在方法参数上:
@RabbitListener(queues = "Test-queue-one",ackMode = "MANUAL")public void testRabbitHandler2(@Payload String s,@Header(AmqpHeaders.DELIVERY_TAG) long delivery ,Channel channel) throws IOException {log.info("testRabbitHandler2: {},delivery:{} , channel:{} ",s,delivery,channel);channel.basicAck(delivery,false);}
不过这里的Payload完全可以不用,是不会有任何影响的,默认会将消息转换到这个参数上,不过这需要建立在只有一个这种参数的基础上,如果此时delivery不加注解,s也不加注解肯定会报错,因为springamqp不知道转换到哪个参数上,channel本身就是特殊的类型,不会影响消息转换。
- 5.3 Header/Headers
上面已经看到了这个注解,可以用于转换消息头中的信息用来接收 - 5.4 sendTo
这个注解的有意思在于,会将方法的返回参数发送到指定的队列中
@RabbitListener(queues = "Test-queue-one",ackMode = "MANUAL")@SendTo("test_queue")public String testRabbitHandler2(String s,@Header(AmqpHeaders.DELIVERY_TAG) long delivery ,Channel channel) throws IOException {log.info("testRabbitHandler2: {},delivery:{} , channel:{} ",s,delivery,channel);channel.basicAck(delivery,false);return "hahahh";}
如上代码,会在执行完之后发送test_queue队列中消息,发送的内容就是方法的返回内容,这里sendTo的使用和RabbitListener没有什么关系。
四、如何保证消息可靠性
消息可靠性是mq必须要关注的一点,因为是异步通信所有中间任何环境都有可能出现问题,所以消息可靠性就更为重要了,一般需要从以下几个方面考虑:
- 发送方进行消息发送确认
- 发送方增加消息失败重试机制
- 发送方发送持久化消息
- 持久化交换机、持久化队列
- 消费方先落库再进行手动ack
- 发送方和消费方重试
1.发送方进行消息发送成功确认
1.1 增加配置
需要增加两个配置项,打开消息发送确认,如下publisher-confirm-type: correlated表示消息到达交换机时,会有回调,publisher-returns: true 则表示消息达到最列时会有回调,队列回调一般只有routing key错误才会触发。
spring:rabbitmq:
# host: 192.168.150.199
# port: 5672addresses: 192.168.150.202:5672virtual-host: /username: guestpassword: guestpublisher-confirm-type: correlated # 交换机回调 none/simple/correlatedpublisher-returns: true # 队列回调
publisher-confirm-type 用于消息发送到交换机的回调,对消息可靠性要求比较高的话建议开启,且使用correlated模式,不过开启回调后mq的性能会有大幅下降,虽然大幅下降但是每秒接近上万的并发还是可以支持的(无回调配置允许情况下可到10w),这个配置他支持三种模式:
- none : 这是默认值,表示不进行发布确认
- simple:这是简单模式的发布确认,需要使用RabbitTemplate的confirm方法进行确认,如下
boolean flag = rabbitTemplate.waitForConfirms(1000);
- correlated:这是最常用的确认模式,使用CorrelationData返回的数据进行确认,这种确认方式需要我们声明回调函数。
publisher-returns 用于消息发送到队列的回调,一般只有routing key的错误才会导致消息到达交换机成功而到达队列失败,所以这个配置一般可以不开启,因为意义不大,触发发送消息时写错了routing key。因为mq内部可以保证routing key正确的情况下,消息到达交换机后一定可以到达队列。
1.2 设置回调方法
上面的两种回调(交换机回调、队列回到),需要我们手动设置回调方法才可以实现真正的回调,而且这两个回调方法都是全局共用的,而且只能设置一次,所以回调方法一般会设置在配置文件中。
// 这里使用correlationData 记录数据,要求发送方必须传递correlationData,否则异常
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{});
切记这俩回调方法只允许设置一次, 交换机回调的代码中的correlationData 就是我们发送消息时传递给mq的CorrelationData,这个是原封不动传回来的,注意是原封不动传回来的,所以可以利用这一点做很多事,比如失败重试时就需要这里的信息进行重新发送。ack是一个boolean值,true表示消息到达交换机成功,false表示消息到达交换机失败了。cause在ack为false时,会给出造成ack失败的可能原因。
// 队列回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 获取消息内容String msgContent = new String(message.getBody(), StandardCharsets.UTF_8);// 这里只用来做日志输出,一般只有路由key错误才会触发log.error("RabbitTemplate 发送消息到达队列失败,请检查routing key, " +"exchange: {}, " +"routingKey: {}, " +"replyCode: {}, " +"replyText: {}, " +"message: {} ",exchange,routingKey ,replyCode,replyText,msgContent);
});
这个回调一般只有routing key错误才会出现,所以无需我们来做什么,最多记录下信息即可。不过需要注意的是队列回调在成功时是不回调的也就是说消息成功到达队列不会触发该方法,只有消息到达队列失败了才会触发这个回调,所以这里使用的是log.error。message是消息对象,replyCode是错误编码,replyText是错误信息,exchange交换机,routinKey路由键。
2.发送方增加消息失败重试机制
当消息发送失败以后,发送方开启了交换机回调我们就可以获知到消息发送失败了,若是失败则需要我们进行尝试重新发送,也就是失败重试了,如果对消息有更高的要求还需要考虑失败重试最后依然失败的处理机制,如果需要可以将消息进行入库然后另起线程进行处理,这里只展示失败重试的操作。
2.1 增加自定义失败重试配置
失败重试这种机制是很常见的,一般的失败重试基本参数也是类似,这里在rabbit配置中增加以下自定义配置,这个重试是参照网关的重试过滤器的重试机制来设置的。
spring:rabbitmq:publisher: # 注意:这个配置是自定义配置,用于发送方交换机失败重试,并不是官方配置retry:enabled: true # 开启失败重试max-retries: 10 # 最大重试次数first-interval-millis: 2000 # 首次重试的间隔时间msmax-interval-millis: 10000 # 最大重试间隔时间ms,间隔时间超过10s将以10s作为最大间隔时间factor: 2 # 乘子:重试间隔的倍数based-previous-value: true # 是否根据前一次的时间进行计算间隔时间,fase的话根据第一次间隔时间进行计算
2.2 获取配置信息配置类
这个配置类为了获取到rabbit的配置信息,方便在项目中使用
package com.cheng.common.mq.api.config;import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;/*** @author pcc* @version 1.0.0* @description rabbit配置类*/
@Data
@Configuration
public class RabbitConfig {// mq基础配置@Value("${spring.rabbitmq.host:localhost}")private String host;@Value("${spring.rabbitmq.port:5672}")private Integer port;@Value("${spring.rabbitmq.addresses:localhost:5672}")private String addresses;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;// 发送方ack 配置@Value("${spring.rabbitmq.publisher-confirm-type}")private String publisherConfirms;@Value("${spring.rabbitmq.publisher-returns}")private boolean publisherReturns;// 发送方nack重试配置@Value("${spring.rabbitmq.publisher.retry.enabled:false}")private Boolean retryEnabled;@Value("${spring.rabbitmq.publisher.retry.max-retries:3}")private Integer retryMaxRetries;@Value("${spring.rabbitmq.publisher.retry.first-interval-millis:200}")private Integer retryFirstIntervalMillis;@Value("${spring.rabbitmq.publisher.retry.max-interval-millis:1000}")private Integer retryMaxIntervalMillis;@Value("${spring.rabbitmq.publisher.retry.factor:2}")private Integer retryFactor;@Value("${spring.rabbitmq.publisher.retry.based-previous-value:false}")private Boolean retryBasedPreviousValue;// mq重连(失败重试)配置@Value("${spring.rabbitmq.template.retry.enabled}")private Boolean templateEnabled;@Value("${spring.rabbitmq.template.retry.max-attempts}")private Integer templateMaxAttempts;@Value("${spring.rabbitmq.template.retry.initial-interval}")private Integer templateInitialInterval;@Value("${spring.rabbitmq.template.retry.max-interval}")private Integer templateMaxInterval;}
2.3 交换机回调失败重试代码
这里要去消息发送时必须携带CorrelationData,以下代码主要通过relayCode来实现回调,注意这里的relayCode在交换机回调期间无论是ack还是nack都是我们设置的值,所以我们可以利用这个值来标识回调的次数而不用借助其他工具。
package com.cheng.common.mq.api.config;import com.cheng.common.mq.api.processor.DealReturnMessagePostProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;/*** @author pcc* @version 1.0.0* @description Rabbit 回调配置*/
@Slf4j
@Configuration
public class RabbitConfirmConfiguration {@Resourceprivate RabbitConfig rabbitConfig;@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate(){// 更换消息转换器,json类型的消息,接收到以后会有转义符rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());// 消息前置处理器rabbitTemplate.setBeforePublishPostProcessors(new DealReturnMessagePostProcessor());}//回调@PostConstruct@ConditionalOnProperty(name = "spring.rabbitmq.publisher-confirm-type",havingValue = "correlated")public void initConfirmCallBack(){// 这里使用correlationData 记录数据,要求发送方必须传递correlationData,否则异常rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){log.info("RabbitTemplate 发送消息到达交换机成功 " +"exchange: {} , " +"routing key: {} ," +"correlationId: {}, " +"replyCode: {}, " +"message: {} ",correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey(),correlationData.getId(),correlationData.getReturned().getReplyCode(),new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));}else{log.error("RabbitTemplate 发送消息到达交换机失败, 原因:{}, " +"exchange: {}, " +"routing key: {}," +"correlationId: {}," +"replyCode: {}, " +"message: {}",cause,correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey(),correlationData.getId(),correlationData.getReturned().getReplyCode(),new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));// 重试try {retry(rabbitTemplate,rabbitConfig,correlationData);} catch (InterruptedException e) {log.info("RabbitTemplate 消息重试期间异常:{} ",e.getMessage());}}});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 获取消息内容String msgContent = new String(message.getBody(), StandardCharsets.UTF_8);// 这里只用来做日志输出,一般只有路由key错误才会触发log.error("RabbitTemplate 发送消息到达队列失败,请检查routing key, " +"exchange: {}, " +"routingKey: {}, " +"replyCode: {}, " +"replyText: {}, " +"message: {} ",exchange,routingKey ,replyCode,replyText,msgContent);});}// 交换机nack重试,要求发送前CorrelationData必须不为空private void retry(RabbitTemplate rabbitTemplate, RabbitConfig rabbitConfig, CorrelationData correlationData) throws InterruptedException {// 重构messageMessage message = MessageBuilder.withBody(correlationData.getReturned().getMessage().getBody()).setContentEncoding("UTF-8").setContentType(MessageProperties.CONTENT_TYPE_JSON).setMessageId(correlationData.getReturned().getMessage().getMessageProperties().getMessageId()).build();// 重构correlationDatacorrelationData.setReturned(new ReturnedMessage(correlationData.getReturned().getMessage(),correlationData.getReturned().getReplyCode()+1,correlationData.getReturned().getReplyText(),correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey()));// 计算暂停时间if(rabbitConfig.getRetryEnabled()){// 首次推送if (correlationData.getReturned().getReplyCode() == 1) {TimeUnit.MILLISECONDS.sleep(rabbitConfig.getRetryFirstIntervalMillis());} else if(correlationData.getReturned().getReplyCode()<=rabbitConfig.getRetryMaxRetries()){// 非首次推送if (rabbitConfig.getRetryBasedPreviousValue()) {// 基于上次推送时间计算下次推送时间,若是时间大于最大等待时间,则使用最大等待时间TimeUnit.MILLISECONDS.sleep((long) ((rabbitConfig.getRetryFirstIntervalMillis() * Math.pow(rabbitConfig.getRetryFactor(),correlationData.getReturned().getReplyCode() - 1))<=rabbitConfig.getRetryMaxIntervalMillis()?(rabbitConfig.getRetryFirstIntervalMillis() * Math.pow(rabbitConfig.getRetryFactor(),correlationData.getReturned().getReplyCode() - 1)):rabbitConfig.getRetryMaxIntervalMillis()));} else {// 基于首次推送时间计算下次推送时间,若是时间大于最大等待时间,则使用最大等待时间TimeUnit.MILLISECONDS.sleep(rabbitConfig.getRetryFactor() * rabbitConfig.getRetryFirstIntervalMillis()>=rabbitConfig.getRetryMaxIntervalMillis()?rabbitConfig.getRetryMaxIntervalMillis():rabbitConfig.getRetryFactor() * rabbitConfig.getRetryFirstIntervalMillis());}}if(correlationData.getReturned().getReplyCode()<=rabbitConfig.getRetryMaxRetries()){log.info("RabbitTemplate 已开启交换机发送失败重试,最大重推次数:{} ,开始第:{} 次重推, " +"exchange: {}, " +"routing key: {}, " +"correlationId: {}, " +"replyCode: {}, " +"message: {} ",rabbitConfig.getRetryMaxRetries(),correlationData.getReturned().getReplyCode(),correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey(),correlationData.getId(),correlationData.getReturned().getReplyCode(),new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey(),new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8),correlationData);}}else{// 默认为0,上方构建的correlationData对象中没有设置重试次数+1if(1==correlationData.getReturned().getReplyCode()){log.info("RabbitTemplate 未开启交换机发送失败重试,尝试重推一次开始, " +"exchange: {}, " +"routing key: {}, " +"correlationId: {}, " +"replyCode: {}, " +"message: {} ",correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey(),correlationData.getId(),correlationData.getReturned().getReplyCode()+1,new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(correlationData.getReturned().getExchange(),correlationData.getReturned().getRoutingKey(),new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8),correlationData);}}}}
2.4 设置消息前置处理器
前面已经说了失败重试需要依赖CorrelationData的信息,依赖的就是他的ReturnedMessage信息,ReturnedMessage含有消息体Message,交换机、路由key,以及relayCode(可以用来标识重试次数),所以这部分信息我们可以统一放在前置处理器中进行封装,而不用每次发送消息时进行填充,当然不用前置处理器也是完全可以的。这里有个前提是CorrelationData不可以为空,也就是使用RabbitTemplate发送消息时要传入CorrelationData。
package com.cheng.common.mq.api.processor;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.UUID;/*** @author pcc* @version 1.0.0* @description 消息前置处理器* 注意spring实际执行的是第二个方法*/
@Slf4j
public class DealReturnMessagePostProcessor implements MessagePostProcessor {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {return message;}@Overridepublic Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {// correlationID / messageIdString id = UUID.randomUUID().toString().replace("-","").toLowerCase();Message msg = message;// 这里只有CorrelationData AsyncCorrelationData,后者基本不用,此处不做适配if(correlation !=null && correlation instanceof CorrelationData ){CorrelationData correlationData = (CorrelationData) correlation;// 构建message,只有重试的时候才会从新构建消息if(correlationData.getReturned()!=null && correlationData.getReturned().getReplyCode()!=0){msg = MessageBuilder.fromMessage(correlationData.getReturned().getMessage()).build();}else{// 首次推送,共用messageid与correlationidcorrelationData.setId(id);msg = MessageBuilder.fromMessage(msg).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setTimestamp(Date.from(LocalDateTime.now().atZone( ZoneId.systemDefault()).toInstant())).setMessageId(id).build();}// 这里构建的ReturnedMessage,是为了方便在回调时获取到发送时的信息,方便进行重试correlationData.setReturned(new ReturnedMessage(msg,correlationData.getReturned()!=null?correlationData.getReturned().getReplyCode():0,correlationData.getReturned()!=null?correlationData.getReturned().getReplyText():"",exchange,routingKey));log.info("RabbitTemplate 前置处理器:消息发送成功, exchange: {}, routingKey: {}, correlationId: {}, messageId: {}, message: {}", exchange, routingKey, correlationData.getId(),msg.getMessageProperties().getMessageId(),new String(msg.getBody(), StandardCharsets.UTF_8));}return postProcessMessage(msg);}
}
将前置处理器交给RabbitTemplate,这样每次RabbitTemplate发型消息前都会帮我们封装CorrelationData了。这块代码其实写在了回调配置类里了,这里重复展示下。
@PostConstructpublic void initRabbitTemplate(){// 更换消息转换器,json类型的消息,接收到以后会有转义符rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());// 消息前置处理器rabbitTemplate.setBeforePublishPostProcessors(new DealReturnMessagePostProcessor());}
2.5 消息发送
消息发送没有什么特殊的,需要关注的就是CorrelationData不为空即可。
package com.ebbing.task.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** @author pcc* @version 1.0.0* @description 测试Rabbit*/
@Slf4j
@RestController
public class TestRabbitController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping(value ="/send")public void testSend(){// 开启回调的话,CorrelationData不可为空rabbitTemplate.convertAndSend("amq.topic1", "one", "hello ampq",new CorrelationData());}
}
下面是发送信息失败的重试(只需要写个错误的交换机名称就会触发),可以清晰的看到消息在正常的重试了(省略了重复信息)。
2024-01-09 16:05:13.496 INFO 43776 --- [nio-8808-exec-3] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功, exchange: amq.topic1, routingKey: one, correlationId: 853f32dc470c457f9da691aa9c51f5a6, messageId: 853f32dc470c457f9da691aa9c51f5a6, message: "hello ampq"
2024-01-09 16:05:13.512 ERROR 43776 --- [68.150.204:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.topic1' in vhost '/', class-id=60, method-id=40)
2024-01-09 16:05:13.515 ERROR 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.topic1' in vhost '/', class-id=60, method-id=40), exchange: amq.topic1, routing key: one,correlationId: 853f32dc470c457f9da691aa9c51f5a6,replyCode: 0, message: "hello ampq"
2024-01-09 16:05:15.516 INFO 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 已开启交换机发送失败重试,最大重推次数:10 ,开始第:1 次重推, exchange: amq.topic1, routing key: one, correlationId: 853f32dc470c457f9da691aa9c51f5a6, replyCode: 1, message: "hello ampq"
2024-01-09 16:05:15.519 INFO 43776 --- [nectionFactory2] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功, 。。。。
2024-01-09 16:05:15.525 ERROR 43776 --- [nectionFactory1] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:。。。。
2024-01-09 16:05:19.525 INFO 43776 --- [nectionFactory1] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 已开启交换机发送失败重试,最大重推次数:10 ,开始第:2 次重推, 。。。。
2024-01-09 16:05:19.528 INFO 43776 --- [nectionFactory1] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功,。。。。
2024-01-09 16:05:19.530 ERROR 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:。。。。
2024-01-09 16:05:23.531 INFO 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 已开启交换机发送失败重试,最大重推次数:10 ,开始第:3 次重推, 。。。。
2024-01-09 16:05:23.534 INFO 43776 --- [nectionFactory2] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功, 。。。。
2024-01-09 16:05:23.537 ERROR 43776 --- [nectionFactory1] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:。。。。
。。。。。。
3.发送方发送持久化消息
这个很简单,只需要我们发送时声明消息的投递模式是持久化的即可,RabbitTemplate的默认模式就是持久化的,所以这里我们可以不做任何操作,如果想要手动设置的话则在Message中设置即可,可以将这段代码放入前置处理器:
MessageBuilder.fromMessage(msg).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setTimestamp(Date.from(LocalDateTime.now().atZone( ZoneId.systemDefault()).toInstant())).setMessageId(id).setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 这是默认值,可以不设置,默认就是持久化消息.build();
4.持久化交换机、持久化队列
交换机、队列、绑定等信息可以在Rabbit的管理后台创建也可以在Java代码中创建都是可以的。
4.1 在Rabbit的管理后台中创建的话交换机和队列默认都是持久化的
4.2 在java代码中创建交换机和队列
这里创建默认也是持久化的,需要注意的是这里在项目启动阶段不会创建交换机和队列或者绑定,只有在使用mq时才会使用RabbitAdmin触发创建(版本:springboot2.6.11)。
package com.ebbing.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author pcc* @version 1.0.0* @description 基础信息类,交换机、队列默认是持久化的*/
@Configuration
public class RabbitBaseConfig {// ***************交换机***************@Bean("test-java-exchange")public DirectExchange createExchange(){return ExchangeBuilder.directExchange("test-java-exchange").durable(true).build();}@Bean("test-java-exchange2")public DirectExchange createExchange2(){return ExchangeBuilder.directExchange("test-java-exchange2").durable(true).build();}// ***************队列***************@Bean("test-java-queue")public Queue createQueue(){return QueueBuilder.durable("test-java-queue").build();}@Bean("test-java-queue2")public Queue createQueue2(){return QueueBuilder.durable("test-java-queue2").build();}@Bean("test-java-queue3")public Queue createQueue3(){return QueueBuilder.durable("test-java-queue3").build();}// ***************队列绑定交换机绑定***************@Beanpublic Binding createBinding(@Qualifier("test-java-queue") Queue queue, @Qualifier("test-java-exchange") DirectExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange).with("key.test-java-exchange");}@Beanpublic Binding createBinding2(@Qualifier("test-java-queue2") Queue queue, @Qualifier("test-java-exchange2") DirectExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange).with("key.test-java-exchange2");}@Beanpublic Binding createBinding3(@Qualifier("test-java-queue3") Queue queue, @Qualifier("test-java-exchange2") DirectExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange).with("key.test-java-exchange2");}
}
5.消费方进行手动ack
这里使用simple进行展示,direct、stream会单独抽出来说一说。
新增如下配置:
spring:rabbitmq:addresses: 192.168.150.204:5672username: guestpassword: guestrequested-heartbeat: 5 # 默认值virtual-host: /listener:type: simple # 指定监听模式,此处可以不配置,如不配置可根据下方会自动适配simple: # 与上方的simple对应acknowledge-mode: manual # none不做ack、auto自动ack、manual手动ack
默认是auto模式,也就是消息收到就会自动进行ack,此时无论是消息处理成功与否都不会从新入队,这样就会造成消息丢失,所以我们需要使用手动ack,只有我们手动确认了消息,消息才会从队列中进行删除。下面是手动确认的代码:
@RabbitListener(queues = {"Test-queue-one"})public void testManualAck(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);channel.basicAck(deliveryTag,false);}
这里使用的是 channel.basicAck(deliveryTag,false),deliveryTag表示的是消息在队列里的位置,mq的服务端需要靠这个找到具体的消息,false 表示不使用批量确认,如果使用批量确认那么在接收这条消息之前收到的消息都会被确认掉,一般不会使用批量确认,除非是可有可无的信息,丢失几条也没有影响,否则不要使用批量确认,传false即可。
其他消息确认的方法:
- basicNack(long deliveryTag, boolean multiple, boolean requeue)
这里是异常时做Nack使用的方法,deliveryTag表示消息在队列里的位置该参数需要获取如上,multiple表示是否批量确认这里批量确认是指将未ack的消息进行批量处理到底是如何处理需要根据第三个参数来定,requeue是否重新入队,对于错误信息如果重新入队,那么发送方需要做好消息死信的处理,也就是死信队列的设置,死信队列后面细说。 - basicReject(long deliveryTag, boolean requeue)
这个和上面的方法区别不大,唯一的区别是不可以批量确认,相比直辖basicNack更灵活。
6.发送方和消费方重试机制
通过上面的举措已经基本可以保证消息的不丢失了,但是还有一种可能就是消息发送失败和监听失败的场景,此时可能消息都无法发送和接收,这种肯定是因为mq的服务异常导致的,所以一般mq是需要配置集群的,所以可以规避这种单节点突然宕机的风险。如果是单节点就需要增加重连机制了。
- 发送方
发送方可以增加template的重试,当发送消息时若是无法建立连接,则会根据配置进行重试,重试最后失败则会抛出异常,这里重试是同步的
上面配置的是3次重试,加上原始的一次是4次,若是4次还连接不上就会报异常。当然集群也是可以使用这个配置进行重连。spring:rabbitmq:addresses: 192.168.150.204:5672virtual-host: /username: guestpassword: guestpublisher-confirm-type: correlated # 交换机回调 none/simple/correlatedpublisher-returns: true # 队列回调template: # 此处用于发送消息连接mq异常时或者其他场景发生异常(exception)时进行重试,发送交换机、队列的nack并不会触发这个重试retry:enabled: truemax-attempts: 3initial-interval: 1000max-interval: 10000
- 消费方
消费方只要使用了RabbitListener注解,就会一直尝试重连mq服务端,无需额外的配置。
五、消息监听器
SpringBoot中支持了两种消息监听器simple(SimpleMessageListenerContainer)、direct(DirectMessageListenerContainer)。在最早时只有simple,2.0之后开始添加了direct。他们的特点各有利弊,需要使用哪个需要根据自身业务场景做选择。
1.消费者监听容器:simple
这是最好的监听器,也是默认的监听容器。适用于普遍大多数的场景,所以一般如果没有特殊需要直接使用simple即可,使用simple时支持以下参数的配置:
spring:rabbitmq:addresses: 192.168.150.204:5672username: guestpassword: guesthost: 192.168.150.204requested-heartbeat: 5 # 默认值virtual-host: /port: 5672listener:type: simplesimple:prefetch: 100 # 消息预取的数量,默认值是250,越小消息消费越慢acknowledge-mode: manualconcurrency: 10 # 指定消费者数量,在simple中该值等于channel的数量。一个消费者一个channeldefault-requeue-rejected: false # 消息决绝后默认不重新入队retry: # 当消费时出现异常,默认情况会进行重试,这个重试是在消费者本地从新尝试消费,而不是从队列里重试enabled: truemax-attempts: 3initial-interval: 1000
如上配置了消费者数量是10,这里的消费者便是10了,且会有10各channel:
下面是channel
这是simple的消息的处理模型:
2.消费者监听容器:direct
2.1 direct使用
下面是direct常用的配置:
spring:application:name: ebbing-messagerabbitmq:addresses: 192.168.150.204:5672username: guestpassword: guesthost: 192.168.150.204requested-heartbeat: 5 # 默认值virtual-host: /port: 5672listener:type: directdirect:prefetch: 100acknowledge-mode: manualconsumers-per-queue: 20 # 每个队列消费者的数量,与上面的concurrency类似default-requeue-rejected: falseretry:enabled: truemax-attempts: 3initial-interval: 1000
consumers-per-queue 配置了20个消费者,如下会体现出20哥消费者和20个channel
下面是channel的信息
下面是direct的处理模型:
对比direct和simple可以发现他们最大的不同在于消费者对应的线程上,在simple中每个消费者都会有一个线程(这里说的消费者指的是使用concurrency指定的消费者),而direct则是共享线程。因此direct支持手动设置线程池,如果使用direct则需要我们手动设置线程池,其他则区别不大。
2.2 使用direct设置自定义线程池
很简单,只需要如下即可,这样便会取代默认的CachingConnectionFactory 连接工厂了。
package com.cheng.ebbing.message.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @author pcc* @version 1.0.0* @description 多mq配置类*/
@Configuration
public class anothermqconfig {@Beanpublic CachingConnectionFactory getConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("192.168.150.204");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");connectionFactory.setExecutor(new ThreadPoolExecutor(3,3,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>()));return connectionFactory;}
}
这里设置的线程最大是3个都是核心线程,测试下,下面是测试代码:
@RabbitListener(queues = {"test-java-queue"})public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);log.info("当前处理线程:{}",Thread.currentThread().getName());channel.basicAck(deliveryTag,false);}
下面是执行截图,看一看到确实只有3
2.3 同时使用simple和direct
如果一个项目想要同时使用simple和direct呢(不过这种场景可能不是太多,不过可能会碰到一个项目需要使用不同的rabbit,他们的ip和端口不同,都是一样的处理)。
配置一个独立的mq监听:
package com.cheng.ebbing.message.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @author pcc* @version 1.0.0* @description 多mq配置类*/
@Configuration
public class anothermqconfig {@Bean("directRabbitListenerContainerFactory")public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("192.168.150.204");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setExecutor(new ThreadPoolExecutor(3,3,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>()));DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConnectionFactory(connectionFactory);factory.setMissingQueuesFatal(false);return factory;}}
为监听者声明监听工厂:
// 使用默认配置监听 simple@RabbitListener(queues = {"dead-letter-queue"})public void testDeadLetterQueue(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws IOException {log.info("收到死信队列消息:{}",message);channel.basicAck(deliveryTag,false);}// 使用自定义的容器工厂进行监听 direct@RabbitListener(queues = {"test-java-queue"},containerFactory = "directRabbitListenerContainerFactory")public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);log.info("当前处理线程:{}",Thread.currentThread().getName());channel.basicAck(deliveryTag,false);}
如此便可以实现simple和direct共存了。
六、其他关键信息
1.消费者的消息预取
消费者的预取是指一次性从mq队列里获取的消息数量,比如mq里有500条消息,而默认的预取条数是250,则如果只有一个消费者,那么当消费者连接到mq时,应该是250条消息属于unacked的,total还是500。消费者对消息的预取之后是将消息存储到了本地,这样就可以减少消费者于mq服务端的交互次数,从而提升消息消费性能,当消息量较大时,预取的值也可以适当调高,但具体调整多少,应该根据机器的性能测试而定,而不可以随意指定。
注意消息预取并不会导致消息的乱序,预取以后得消息的顺序与mq服务器中的消息顺序还是一致的。如果想要调整预取的数量,在SpringBoot中可以如下进行设置:
spring:rabbitmq:addresses: 192.168.150.204:5672username: guestpassword: guesthost: 192.168.150.204requested-heartbeat: 5 # 默认值virtual-host: /port: 5672listener:simple:prefetch: 100 # 消息预取,默认值是250
注意:prefetch应该设置合理的值,太小容易导致消息处理过慢,太大可能会导致服务器性能下降。其次prefetch使用时应该使用manual的ack模式,虽然auto模式下野生效,不过会造成消息重复投递的问题,所以使用prefetch应使用manual模式的ack
2.工作模式:work与能者多劳
如果一个队列有多个消费者在连接,那么默认情况下每个消费者接收到的消息数量都是一样的,无论机器性能相同与否,这个是mq服务端的负载均衡策略-轮询。这也是rabbit的默认工作模式,这个很好验证,建立两个消费者看他们的消息消费数量即可。这里不细说了,很容易验证且没有任何难度。主要说一下如何破坏这种模式,然后思考下破坏好还是不破坏好。
-
如何破坏work
- 1、ack模式需要使用manual
自动模式下消息接收到立马ack,是无法感知到服务器性能的。自动ack场景下抛开网络影响,多个消费者对ack的响应时间基本相同,所以破坏不破坏work没有多大意义,这里需要是手动ack,意义在于手动ack一般是有业务处理以后才会手动ack,那么不同机器就很容易有性能差异,此时破坏work也就有了意义。 - 2、prefetch与channel.basicQos应该保持一致
这里首先解释下这两个参数。prefetch上面介绍了,就是消息者对消息的预取数量。basicQos表示的是一次处理完多少条消息之前就不要给我发消息了,这里与prefetch相等就会形成一个闭环,进来多少条那么处理多少条,只有全部处理完才会继续处理下一批,如果不这么设置,因为消息的预取模式,消息会一直按照轮询打进来。注意:这里prefetch与basicQos并不需要等于1.他们可以是任何值,只要相等即可。如果他们的值比较大的话,那么发送的消息最好是prefetch的3倍以上,方便验证,这里设置prefetch和basicQos为100.
在消费者代码中增加channel.basicQos代码:spring:rabbitmq:addresses: 192.168.150.204:5672username: guestpassword: guesthost: 192.168.150.204requested-heartbeat: 5 # 默认值virtual-host: /listener:simple:prefetch: 100 # 消息预取的数量,默认值是250,越小消息消费越慢
此时我往队列里发了500条消息,然后处理结果是消费者1处理了:112,消费者2处理了:388。可见work模式被破坏了。@RabbitListener(queues = {"Test-queue-one"}) public void testManualAck(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);count1.getAndIncrement();channel.basicQos(100);TimeUnit.MILLISECONDS.sleep(100L);channel.basicAck(deliveryTag,false);System.out.println("消费者1处理了几条:"+count1); }@RabbitListener(queues = {"Test-queue-one"}) public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);channel.basicQos(100);count2.getAndIncrement();channel.basicAck(deliveryTag,false);System.out.println("消费者2处理了几条:"+count2); }
- 1、ack模式需要使用manual
-
破坏work好还是不破坏好?
这个需要根据机器来判断,如果消费者性能都差不多,则不需要破坏,我们只需要寻找到预取数量得合理值即可,若是机器性能存在差异,则需要我们破坏work了,从而使得消费者集群拥有更好地消费能力,不至于快的等慢的出现资源浪费的情况。
3.死信队列
死信队列就是死亡的消息该去的地方,那何种消息才是死亡的消息呢?
- 消息的TTL过期了:x-message-ttl
- 消费者reject或者nack且requeue为false
- 队列长度达到最大:最先入队的消息优先死亡
以上三种场景会导致消息死亡,死亡的消息则需要一个地方进行存储就是死信队列了,其实死信队列就是普通队列,还有死信交换机也是普通交换机,只是他们用于处理死信所以才叫死信交换机、死信队列,所以重点不是死信交换机和队列,而是死信。
下面是死信队列和交换机以及绑定的创建,和普通交换机、队列、绑定并无区别。
// 死信交换机-直连@Bean("dead-letter-exchange")public DirectExchange createExchange3(){return ExchangeBuilder.directExchange("dead-letter-exchange").durable(true).build();}// 死信队列@Bean("dead-letter-queue")public Queue createDeadLetterQueue(){return QueueBuilder.durable("dead-letter-queue").build();}// 死信交换机与死信队列绑定@Beanpublic Binding createDeadLetterBinding(@Qualifier("dead-letter-queue") Queue queue,@Qualifier("dead-letter-exchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dead");}
后面有死信消息就会往这里发送。
3.1 TTL - Queue
为消息设置存活时间有两种方式(这里只说代码的方式,不说后台):队列设置ttl属性,这样队列中所有的消息如果在ttl内没有被消费都会变成死信,另外一种则是为消息设置ttl,这样ttl只会针对一条消息生效。先演示设置队列的ttl的方式。
这里只有队列声明时增加ttl属性即可,其他和正常的队列、exchange等都无区别。
// TTL queue@Bean("ttl-queue")public Queue createTtlQueue(){return QueueBuilder.durable("ttl-queue").ttl(10000) // 设置队列消息存活时间.deadLetterExchange("dead-letter-exchange") // 死信交换机.deadLetterRoutingKey("dead") // 死信路由.build();}// TTL exchange@Bean("ttl-exchange")public DirectExchange createTtlExchange(){return ExchangeBuilder.directExchange("ttl-exchange").durable(true).build();}// TTL queue与TTL exchange绑定@Beanpublic Binding createTtlBinding(@Qualifier("ttl-queue") Queue queue,@Qualifier("ttl-exchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl");}
下面测试发送消息到ttl-queue:
package com.ebbing.mq;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @author pcc* @version 1.0.0* @className TestMQ*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestMQ {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test3() throws Exception{for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("ttl-exchange","ttl","hello ttl-queue:"+i+"个 ", new CorrelationData());log.info("发送消息成功:{}",i);}}
}
10s后消息进入死信队列了:
3.2 TTL - Message
还可以为消息单独设置ttl,队列交换机等都是普通队列、交换机,唯一区别是为该队列声明死信交换机和路由即可。如下方式:
普通队列声明死信交换机、路由:
@Bean("test-java-queue")public Queue createQueue(){return QueueBuilder.durable("test-java-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead").build();}
测试发送ttl消息:
package com.cheng.ebbing.message.mq;import lombok.extern.slf4j.Slf4j;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;/*** @author pcc* @version 1.0.0* @description 描述下这个类吧*/
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class Test {@ResourceRabbitTemplate rabbitTemplate;@org.junit.Testpublic void testSendTTLMsg(){rabbitTemplate.convertAndSend("test-java-exchange","key.test-java-exchange","hello TTL message",message->{message.getMessageProperties().setExpiration("5000");return message;});log.info("消息发送完成");}
}
5s后消息进入死信队列:
3.3 消费者reject或者nack
为接收消息的队列声明死信交换机和队列(交换机和绑定无区别,不贴了):
@Bean("test-java-queue")public Queue createQueue(){return QueueBuilder.durable("test-java-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead").build();}
消息接收者直接拒绝消息:
@RabbitListener(queues = {"Test-queue-one"})public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);// 拒绝消息,且不重新入队-消息死信channel.basicReject(deliveryTag,false);}
发送消息:
package com.ebbing.mq;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @author pcc* @version 1.0.0* @className TestMQ*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestMQ {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test2() throws Exception{for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("test-java-exchange","key.test-java-exchange","hello deadLetter:"+i+"个 ", new CorrelationData());log.info("发送消息成功:{}",i);}}
消息全部进入死信队列了:
3.4 队列消息达到最大
队列默认无边界,消息的最大存储量依赖本地磁盘,要想要模拟队列消息满了,需要为队列设置最大长度。
创建相关队列和交换机:
// maxlength-queue@Bean("maxlength-queue")public Queue createMaxlengthQueue(){return QueueBuilder.durable("maxlength-queue").maxLength(9).deadLetterExchange("dead-letter-exchange")// 死信交换机.deadLetterRoutingKey("dead").build();}// maxlength-exchange@Bean("maxlength-exchange")public DirectExchange createMaxlengthExchange(){return ExchangeBuilder.directExchange("maxlength-exchange").durable(true).build();}// maxlength-queue与maxlength-exchange绑定@Beanpublic Binding createMaxlengthBinding(@Qualifier("maxlength-queue") Queue queue,@Qualifier("maxlength-exchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("maxlength");}
发送消息没有区别,不重复贴了,期望结果是队列里面9条,死信队列里1条,下图可见与预期并无区别。
然后看下消息希望的原因,可以看到是因为队列满了:
3.6 死信消息的处理
死信消息不可能扔到私信队列就完事了,我们还需要针对死信消息做对应处理,可以根据消息的死亡类型做不通处理。
- 消费端拒绝:这种消息一般消费端已经处理不了,应该认为介入查看原因,可做短信通知或者平台通知主责人。
- ttl死信:这种可以进行重试
- 队列最大:这种也可以进行重试
可以通过x-death 来获取到消息死亡的明细信息,这样就方便我们根据死亡的类型和原队列等信息进行不同的处理了。
// 监听死信队列-判断消息死亡类型@RabbitListener(queues = {"dead-letter-queue"})public void testDeadLetterQueue2(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,@Header("x-death") List<Map<String, Object>> xDeathHeaders,Channel channel) throws IOException {log.info("收到死信队列消息:{}, 死信信息: ",message);xDeathHeaders.forEach(item->{item.forEach((key,value)->{log.info("key:{},value:{}",key,value);});});channel.basicAck(deliveryTag,false);}
下面是输出:
4.延迟队列 - 使用死信队列实现
任务超时处理,订单超时未支付等都是延迟队列的应用场景。如果不使用延时队列,一般是将其放入到任务列表定时轮询,这种做法有些low了,最好的处理方式还是使用延迟队列来解决。Rabbit中延迟队列有两种实现方式,一种是使用ttl+死信队列实现,一种是安装延时交换机插件。这里先说使用ttl+私信队列。
原理:为消息设置ttl,消息死亡后就会进入死信队列,死信队列就是延迟队列,通过监听死信队列的信息达到延迟队列的目的,这种代码就不贴了,上面3 .2 TTL-Message 已经介绍过了,代码并没有什么区别。
5.延迟队列 - 使用延迟交换机插件
5.1 安装延迟交换机插件
这里需要安装一个delay的插件:rabbitmq-delayed-message-exchange
官网位置:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags
注意下载的是后缀ez的文件。
将其复制到容器内部:
docker cp rabbitmq_delayed_message_exchange-3.11.1.ez 07:/plugins/
然后执行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
有以下输出以后重启容器:
然后交换机的创建就会有延迟队列的选项:
5.2 使用延迟交换机
注意:延迟消息延迟的地方是交换机到达队列这个步骤,当消息通过交换机发出去后会经过指定的延时时间才会到达队列。
下面是延迟交换机的声明代码:
// delay-queue@Bean("delay-queue")public Queue createDelayQueue(){return QueueBuilder.durable("delay-queue").build();}// delay-exchange@Bean("delay-exchange")public CustomExchange createDelayedExchange(){Map<String,Object> map = new HashMap<String,Object>();map.put("x-delayed-type","direct");return new CustomExchange("delay-exchange","x-delayed-message",true,false,map);}// delay-queue与delay-exchange绑定@Beanpublic Binding createDelayBinding(@Qualifier("delay-queue") Queue queue,@Qualifier("delay-exchange") CustomExchange exchange){// 这里会多一个noargs,因为使用的是CustomExchangereturn BindingBuilder.bind(queue).to(exchange).with("delayed").noargs();}
然后进行测试,下面是测试代码:
@org.junit.Testpublic void testSendDelayMessage(){rabbitTemplate.convertAndSend("delay-exchange","delayed","hello delay个 ", message->{message.getMessageProperties().setDelay(10000); // 单位毫秒return message;});log.info("消息发送完成");}
如此即可实现延迟消息了,注意消息真正延迟的地方是交换机和队列之间,不是队列和消费者之间。
6.队列的惰性模式 - Lazy Queue
Rabbit从3.6开始支持了Lazy Queue,在3.12 版本中已经将Lazy Queue设置为了默认模式。
Rabbit已经默认使用了,足以说明问题,我们先看看不使用Lazy Queue时消息处理可能存在问题:
- 1.对于持久化队列,消息会存在内存和磁盘各一份,消息量比较小时是无所谓的,如果消息量特别大,对于mq的内存来说压力会增加
- 2.对于非持久化队列,消息存在于内存中,当消息量比较大时,rabbit会将消息分页出去(磁盘中),以减轻内存的压力
所以无论你用什么方式,消息量特别大时,系统的内存压力肯定会增加这是无法避免的,如果想要解决这个问题,只能是把信息从内存中移出去,但信息移出去也会碰到另外的问题,信息使用时加载磁盘中的信息会增加IO。所以如果移出去肯定会增加信息消费时的时间。
事实上Rabbit就是这么做的,Rabbit通过声明队列的模式来将队列声明为惰性队列,惰性队列的消息会存储在磁盘中(内存中只有少量的消息),当消息使用时再从磁盘加载出来。那么惰性队列的优缺点是什么:
- 1.优点:消息量特别大时,惰性队列基本不会有内存爆炸的风险
- 2.优点:降低内存使用率,提升系统整体响应速度
- 3.缺点:消息消费时增加了IO操作,所以会使得消息消费速度略微降低(但仍然还是很快的)
下面是声明队列时,将其设置为惰性队列:
// 惰性队列@Bean("lazy-queue")public Queue createLazyQueue(){return QueueBuilder.durable("lazy-queue").lazy() // 惰性队列.build();}
给这个队列发了两条持久化信息,如下,可以看出消息不存在内存而是在磁盘:
6.开启trace日志
在做中台类系统时,使用Rabbit来解耦上下游系统是个很不错的选择,不过若是系统没有消息记录则会增加很多扯皮的事情,而且这个消息不是指发送的日志,也不是指发送方的发送成功确认日志,而是指消费者和Rabbit的通信日志,有了这个就可以做到全流程信息把控,防止下游系统信息接收到说没有收到了,这样的事情我可碰到的太多了。
6.1 启用trace插件
使用trace日志的话,需要使用trace插件来实现,Rabbit安装以后trace插件就默认携带,只是没有开启,所以我们只需要开启即可:
# 查看所有插件
rabbitmq-plugins list
# 启用trace插件
rabbitmqctl trace_on
rabbitmq-plugins enable rabbitmq_tracing
操作完成后刷新mq的management就会在如下位置除夕拿Tracing:
6.2 使用trace
如下图,新增一个trace:
然后就可以发送消息进行测试trace了,当消息发送成功后会有如下信息在日志里,这表示消息发送到了交换机了,注意这里的Message published表示的是消息到达交换机了
然后再看下消费者收到消息的日志,Message received 表示消费者已经接收到了该消息,下面是消息的具体信息:
trace日志的好处在于可以明确的知道下游系统是否成功接收到了消息。
trace日志好用单也存在一个明显的问题,就是日志不会自动分割,所以需要我们自己加日志的分割,这个可以使用shell编辑一个脚本来实现日志的分割,下面是分割trace文件的shell,可以使用,然后增加一个crontab即可,我用的是按日期分割,每天一个日志文件。
#!/bin/bash
cp /var/tmp/rabbitmq-tracing/trace_log.log /apps/rabbitmq_log/trace_log_$(date +%Y%m%d).log
cat /dev/null > /var/tmp/rabbitmq-tracing/trace_log.log
七、可能碰到的异常
这里是使用MQ可能会碰到的一些问题和对应的解决思路,供参考欢迎补充。
1.业务幂等
MQ可能会出现消息重复入队或者重复发送的情况,消费者消费超时会导致消息重新入队,消费者的requeue为true也会重新入队,死信队列处理完消息也有可能重新扔进来等等都有可能出现消息重复的现象,在业务上来说消息的类型无非就是增删改查,那这些场景怎么做幂等呢?
- 增:这个是一定要考虑幂等的处理的,消息接收到以后一般会先进中间表,可以以消息的messageid作为幂等的依据进行判断,如果没有messageid,则可以使用唯一的业务细腻进行判断,比如人员的手机号,组织的编码或者名称等。
- 删:删除场景一般支持幂等,如果需要做客使用messageid,无messageid,也可以使用业务信息+状态来做幂等判断。
- 改:修改一般可以不做幂等性的处理,如果需要也可以使用messageid,也可以使用业务信息做幂等,但无疑messageid才是更好地选择。
- 查:查询类消息很少,基本没有吧,如果有因为不需要管,查询天然支持幂等。
2.如何数据持久后再进行消息推送
一般消息发送肯定是想要在数据处理完成以后才会发送消息,这样可以保证消息的准确性,这就需要我们在事务完成后再进行消息发送,那该怎么做才能让消息在事务结束以后再发送呢?
Spring提供了事务的声明周期函数,我们可以利用事务的声明周期函数来实现,代码如下,表示事务完成以后再执行该逻辑,这块代码需要写在含有事务的方法内部(如使用注解Transactional)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization(){@Overridepublic void afterCompletion(int status) {// 事务commited:0 提交完成,1回滚完成,2其他if(Predicate.isEqual(status).equals(0)){rabbitTemplate.convernetAndSend(...);}}
});
如此便可以保证消息肯定是在数据库事务完成之后才会发送,而此时数据一定是已经持久化了的。
3.发送文件信息过大
这个错误笔者在云上的Rabbit碰到过一次,报错显示消息过大mq报错了,但是自己搭建的mq无法复现该问题,查询资料发现Rabbit只有一个总体大小限制并没有看到单条消息的限制,可能是版本不一致吧。这里记录下,解决方法很简单:只需要将消息分开进行发送即可,如果是无法拆分的消息,则需要考虑业务问题了可以只给其他系统发id,让他来你的系统通过id检索,毕竟信息太大,这么做也是可以的。
4.ack超时导致消息重新入队
ack超时导致消息重新入队,这样可能会有一个问题,即使做了ack消息还是一直递增。而不减少,以为ack超时,所以消息并没有在unack的位置,所以即使ack了消息也不会消失。这种问题增加消费节点依然是解决不了的,需要优化消费者代码,可以先入中间表然后ack,不要做业务上的处理,减少ack的等待时间,防止消息重复入队。
5.消息大量积压
这个问题应该很容易碰到,不废话直接说方案,首先需要定位消息积压的原因,是因为发送方消息量激增还是因为下游消费能力突然下降,定位号问题以后我们来看解决方案:
- 发送方消息激增:发送方无法解决,必须消费者提升消费能力,使用方案一
- 消费方消费慢:这种是消费方本来就慢,首先是应该优化代码,然后才是使用方案一
- 消费方突然变慢:如果代码无问题,大概率是服务器瓶颈了,使用方案二
方案一:
这个方案旨在提升消费者的节点,和单节点的上的消费者数量。
- 1.消费者如果使用的simple监听器,则直接增加配置信息concurrency的数量即可,该配置提升对应着消费者的数量和消费线程的同等提升,如果你使用的是direct监听器,先增加consumers-per-queue的数量,该值的增加会同步增加消费者数量和channel,但是不会增加消费者使用的线程数,如果线程数不提升会导致线程频繁切换,并不会提升消费者的性能,所以还需要同步更改CachingConnectionFactory的线程配置(这些配置本文都有,这里不重复列举代码了)
- 2.增加消息的预取数量,减少消费者和mq服务直接的消息获取通讯
- 3.消费者代码增加channel.basicQos(预取数量),破坏轮询模式最大程度利用不同节点的性能
- 4.使用消息批量确认,减少消费者与mq的ack次数
- 5.增加消费者服务的节点
- 6.如果以上措施效果仍达不到要求(肯定会有用),可以写一个临时的消费者代码加上去,入库直接ack,后面再慢慢处理。
方案二:
这个场景主要是因为单节点的系统性能瓶颈导致的,方案一中的第一点不适用方案二,其他均可以适用于方案二。