问题
生产环境中由于一些不明原因,导致rabbitmq重启,在重启的期间生产者消息投递失败,导致消息丢失,需要手动处理恢复。那么如何才能进行rabbitmq的消息可靠性投递?特别是在极端的情况,rabbitmq集群不可用的时候,无法投递的消息该如何处理?
例如这样的异常信息:
方案
生产者将发送的消息发给rabbitmq的同时,将消息备份到缓存中。如果rabbitmq宕机了。会有一个定时任务会对未成功发送的消息进行重新投递。如果交换机成功收到消息会从缓存中清除已收到的消息。
分析
造成消息丢失会有两种情况,一种是交换机故障,另一个中是队列故障。
交换机确认消息是否收到的解决办法
启用发布确认的配置
spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlated
默认是none值,是不开启的,禁用发布确认模式。
correlated,发布消息到交换机后会触发回调方法。
simple, 单个确认消息。
代码
配置类
package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm.exchange";//队列public static final String QUEUE_NAME = "confirm.queue";//Routing Keypublic static final String ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}
回调接口
package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用* 1.1 correlationData 回调消息的id及相关信息* 1.2 交换机收到消息 ack = true* 1.3 cause null* 2.发消息 交换机接收失败 回调* 2.1 correlationData 回调消息的id及相关信息* 2.2 交换机收到消息 ack = false* 2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}
}
消费者
package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}", msg);}
}
生产者
@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlation);log.info("发送消息内容{}", message);}
结果
小技巧:如果要是测试交换机接收失败的回调,可以通过修改生产者发消息的交换机的名字为一个不存在的名字即可。
@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange"+"123", "key1", message, correlation);log.info("发送消息内容{}", message);}
问题:上面只能保证交换机收到消息的确认回调,不能保证队列收到消息的确认回调?
队列确认消息是否收到的解决办法
比如routingKey错了,或者队列出了问题,队列也将无法收到消息。
在仅开启生产者确认机制情况下,接换机接收到消息后,会直接给消息生产者发送确认消息。如果发现该消息不可路由,那么消息会直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
解决办法
通过设置mandatory参数可以在当消息传递过程中不可达目的时将消息返回给生产者。
添加配置
spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlatedpublisher-returns: true
publiser-returns发布退回消息。
说明:这里为了测试故意把routingkey写错
代码
生产者
估计把routingKey改成错误的 key1123
@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1"+"123", message, correlation);log.info("发送消息内容{}", message);}
消费者
package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}", msg);}
}
配置
package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm.exchange";//队列public static final String QUEUE_NAME = "confirm.queue";//Routing Keypublic static final String ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}
回调接口
package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用* 1.1 correlationData 回调消息的id及相关信息* 1.2 交换机收到消息 ack = true* 1.3 cause null* 2.发消息 交换机接收失败 回调* 2.1 correlationData 回调消息的id及相关信息* 2.2 交换机收到消息 ack = false* 2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}/*** 可以在当消息传递过程中不可达目的时将消息返回给生产者* 注意此方法是消息传递失败才会调用,成功就不会执行* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {try {log.error("消息:{},被交换机:{},给退回了,原因:{},RoutingKey={}",new String(message.getBody(), "UTF-8"),exchange,replyText,routingKey);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}