我们已经学过使用 SpringAMQP去收和发消息,但是发和收消息是只是MQ最基本的功能了,在收发消息的过程中,会有很多的问题需要去解决,下面需要学习rabbitMQ的高级特性去解决
死信交换机:这个可以帮助我们实现消息的延迟的特性
惰性队列:可以去解决成百万更高消息堆积的问题
MQ集群:解决单点MQ的可用性问题
(一)消息的可靠性
在消息的传递过程中,凡是有消息传递,都由可能导致消息丢失,生产者者将消息传输到交换机,这个过程中有网络传输,有可能丢失。
交换机将消息路由给队列,这个过程中也有可能丢失,他们都属于在消息的发送过程中丢失,只有到达队列,队列将消息保存起来了,才算发送成功
发送成功消息并不安全,这个时候如果MQ宕机了,mq又是内存存储,一宕机内存中的数据全部都是,消息也会丢失,MQ本身也有可能把消息弄丢
前面没有问题,消息到了消费者,消费者也有可能宕机,比如说消息刚到消费者,消费者还没有处理就挂了,消息也就丢失了,消费者也有可能把消息弄丢
(1)生产者消息确认
生产者如何保证消息不会丢失呢?
动手实现消费者确认
消费者
生产者:
ApplicationContext是Spring的Bean容器(工厂),Aware是通知,ApplicationContextAware是Bean容器的通知,当Spring的Bean工厂准备好之后,它会来通知你
上面的代码是在Bean工厂创建完以后,代码会在项目启动后去执行,CallBack是全局的CallBack
ReturnCallback是消息到了交换机了,但是路由的时候失败了
ConfirmCallBack,是指消息没有到达交换机
在生产者代码中加:
发送消息通过单元测试发:
点击这个交换机,绑定队列:
原来简单的发送消息:是这样的:
现在要做消息确认:
Lambda表达式简化:
失败的情况:
消息没有到交换机,网络丢包了,我们这里交换机名字填错:
还有一种情况消息成功到达交换机,没有到达队列:有很多种比如说到了交换机还没有来的及到队列的时候结果服务出现故障了,我们这里也是修改路由key写错了:这个时候肯定到达不了队列,这个时候会返回ReturnCallBack
(2)消息的持久化
生产者确认可以保证消息投递到队列当中,但是这样消息还是不安全的,RabbitMQ
默认是内存存储,此时MQ出现宕机,消息也是会丢失的 ,要想让消息真正安全,要把消息做到持久化,把消息写到磁盘当中
我们已经发送一条消息,这个时候我们重启下MQ,看看队列中的消息是否持久存在:
发现所有队列都没了
系统自带的交换机都是存在的:
在控制台创建队列的时候可以把对列设置为持久的:
在消费者代码中添加:
启动消费者服务:
浏览器队列
交换机:
重新发送一条消息,先关掉消费者服务,不让他消费消息
浏览器发送消息:点击这个队列:
刷新页面就有消息了
在次重启mq:
交换机队列都在:
但是消息没了:说明消息没有持久
需要设置消息持久:
单元测试发送持久化消息:
再重新mq:
刷新页面消息还在:
我们平常在SpringAMQP当中,队列,交换机,以及消息默认情况下都是持久的
上面的内容是告诉大家持久化怎么去做,持久化是写磁盘的,会有性能的损耗,不是所有的数据需要持久化
(3)消费者消息确认
经过前面学习能够保证消息能够投递到消费者,但是消费者能够正常消费吗?不能的,消息投递后的那一刻消费者立马就挂了,这样消息还是没有消费,休息就丢失了,RabbbitMQ中又提供一种消费者的消费确认机制
在消费者的配置文件添加:先设置none
None模式:消息投递立即删除
刷新浏览器:
消费者抛异常了,消息就废了,消费者这里没了,队列也没了,消息就丢失了
改为auto模式:
先往里面发送一条消息:
刷新浏览器:
状态变为Unacked,等待返回ack
释放断点:出现异常,它会一直重复发送消息 ,永不停止,这样不太好,需要去处理,此时消息不会丢失
(4)失败重试机制
我们通过上面设置,实现了消费者的确认机制,至少确保消息能被消费一次,只不过当消费者消费失败以后它会返回nack,这样消息会重新到 MQ的队列里,MQ再次把消息投递给消费者,如果代码没有问题,消息最终可能被消费,如果代码有问题,就会无限循环,默认的重复尝试机制是有问题的
重试的时间消息间隔倍数为三
超过重试次数,消息就会被拒绝
此时消息就没有了,消息就被丢弃了
(5)消费者失败消息处理策略
消费者这里重试,不会给mq带来压力,它是在消费者本地不断的重试,这种做法有一个问题,在多次重试之后,会将消息直接丢弃,对一些普通消息来讲直接丢了无所谓,但是有一些业务中的消息很重要不能直接丢弃 ,下面是小时重试之后的处理方案
失败的消息,我们可以找一个消费者专门监听失败的消息队列,凡是失败的消息都能够被消费者拿到,这个消费者可以把这个消息发送给管理员,通知它消息失败了,这个时候可以人工去介入处理这些消息,通过这样设置就可以做到消息真正的万无一失了,从生产者到消费者,到最后还有一门兜底的方案
发送一条消息:
在error队列中就会出现错误的异常跟消息,这样管理员就可以知道那条消息失败,和错误的原因