这是一些高级的内容。
RabbitMQ还是运行在网络上的,倘若遇到了网络故障,mq自己挂了,出异常了,都会造成最终状态不一致的问题。这就是可靠性问题。
可靠性:一个消息发送出去之后,至少被消费1次。
要解决这3个问题
- 消息发送的时候丢了
- mq自己丢了
- 消费者丢了
以上都失败了,还得有一个兜底方案。这样尽可能万无一失了。
生产者重连
生产者连不上mq了,此时发送直接失败。因此有了失败重连。
它会影响失败时候的性能,因为mq主打不等,所以生产环境建议禁用。
生产者确认
生产者开启确认机制后,MQ收到消息后,会返回消息给生产者
- 投递到MQ,但是路由失败了。(你的routingKey不对,或者交换机没绑定。)一般是开发人员的问题。
- 投递到了MQ,并且成功入队,告知投递成功ACK。
- 持久化消息,入队成功完成持久化好后,才会ACK。
其他情况都会返回NACK,告知投递失败。
如何接收ack/nack
有同步和异步两个方法
先配置
@Configuration
public class MqConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnsCallback(new ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.debug("收到消息的return callback");}});}
}
@Testvoid testConfirm() throws InterruptedException {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息回调失败", ex);}@Overridepublic void onSuccess(Confirm result) {log.debug("收到回执");if (result.isAck()) {log.debug("成功");} else {log.debug("失败");}}});rabbitTemplate.convertAndSend("hmall.direct11", "blue11", "hello",correlationData);Thread.sleep(2000);}
情况总结:
- 消息成功到达交换机但是没被路由,返回ack但是有renturn消息。
- 彻底成功
- 持久化消息,入队并且持久化成功,返回ACK
- 其他情况都会返回NACK,告知投递失败。
注意nack是由 result.isAck() 为false返回出来的,要关注这个选项。生产者确认有网络开销,建议不开启。要开,publisher-return 开这个就可以了。
数据持久化
消息发给mq,但是mq自己挂了,那么消息就丢失了。
或者mq内存空间有限,消息积压,引发mq阻塞。 mq在太满的时候,会把消息存到硬盘上,这个过程是阻塞的过程。
mq3.6之前用数据持久化解决
交换机的持久化
- 交换机持久化
- 队列持久化 durable
Spring 默认都是持久化的。
消息的持久化,delivery_model =2 是消息的持久化。默认是1。
这个性能上不是很好。
接下来mq3.6之后,有了惰性队列。
接收到消息后,直接保存到磁盘。读消息就从磁盘中读取兵器加载到内存。内存只保存最近的2048条。在3.12之后,所有队列都是Lazy Queue模式,无法更改。
LazyQueue对写磁盘有优化。
消费者可靠性
消费者确认机制
消费者是否成功处理消息,得有一个回执,告诉到底是处理成功了,还是失败了。
- ack
- nack
- reject