一、生产者重连
1、概念
由于网络波动导致客户端无法连接上MQ,这是可以开启MQ的失败后重连机制。
注意:
是连接失败的重试,而不是消息发送失败后的重试。
2、开启配置
spring:rabbitmq:template:retry:enabled: true # 是否启用重试机制max-attempts: 3 # 最大重试次数initial-interval: 1000ms # 第一次重试的间隔时间multiplier: 2 # 重试间隔时间的倍数max-interval: 10000ms # 最大重试间隔时间,超过该时间则停止重试
3、重连结果
4、总结
这种超时重连的方式是阻塞式的,后面的代码没办法执行,如果说业务要求比较严格,则需要禁止使用;如果必要使用的情况下,合理设置重连时间。
二、生产者确认
1、概念
生产者将消息发送到MQ之后,MQ会返回一个确认消息给到生产者。有两种方式:
- Publisher Confirm 消息确认
- Publisher Return 消息回执
有几种情况产生:
- 消息投递到MQ,但是路由失败。Publisher Return会返回路由失败,然后返回ACK,告知投递成功。
- 临时消息投递到MQ,并且入队成功。返回ACK,告知投递成功。
- 持久消息投递到MQ,并且持久化完成。返回ACK,告知投递成功。
- 除上述情况外,均会返回NACK,告知投递失败。
2、开启配置
spring:rabbitmq:publisher-confirm-type: correlated # 消息确认机制,异步确认publisher-returns: true # 消息返回机制
其中,publisher-confirm-type有三种模式:
- none:不开启确认机制。
- simple:同步阻塞等待MQ的回执消息,生产者同步等待。
- correlated:MQ异步回调方式返回回执消息,生产者发送消息之后,继续执行其他任务,MQ收到消息之后,处理完会回执确认信息。首选
3、代码实现
Publisher Return的配置类,这个只需要写一个就行了,但是Publisher Confirm是每次一个消息发送的时候都得写一个:
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取模板类RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//创建回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息返回-----开始");System.out.println(returnedMessage.getReplyCode());System.out.println(returnedMessage.getExchange());System.out.println(returnedMessage.getReplyText());System.out.println(returnedMessage.getRoutingKey());System.out.println("消息返回-----结束");}});}
}
Publisher Confirm例子:
public String push9() throws InterruptedException {//1、创建CorrelationData,// 构造函数需要指定随机id,消息回调时需要使用该id进行匹配CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//2、添加ConfirmCallbackcorrelationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {//这个一般不会触发@Overridepublic void onFailure(Throwable ex) {System.out.println("消息回调失败");}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//交换机收到消息,不管路由是否成功都会收到if (result.isAck()) {System.out.println("消息确认成功");} else {//交换机错误或者网络错误就会重试System.out.println("消息确认失败,失败原因为:" + result.getReason());}}});String topicExchange = "topicExchange";rabbitTemplate.convertAndSend(topicExchange, "cq.hh","重庆串串也好吃",correlationData);Thread.sleep(4000);return "success";}
rabbitTemplate.convertAndSend(topicExchange, "cq.hh","重庆串串也好吃",correlationData); 加粗的这个绑定不要忘记了,我就是忘记了这个,一直找不到原因。
4、结果测试
(1)如果是交换机写错了,就说明交换机没有收到消息,所以ACK应答是false:
(2)交换机收到消息,路由失败;会显示交换机收到消息了,ACK成功,但是Publisher Return会告诉你相关信息,比如没有路由:NO_ROUTE
(3)消息接收成功,路由成功的场景,直接告诉ACK成功: