文章目录
- Rabbitmq的消息可靠性投递
- Rabbitmq的消息可靠性投递confirmCallback
- Rabbitmq的消息可靠性投递returnCallback
- Rabbitmq的消息确机制ACK
Rabbitmq的消息可靠性投递
什么是消息的可靠性投递
保证消息百分百发送到消息队列中去
1 保证mq节点成功接受消息,消息发送端需要接受到mq服务端接受到消息的确认应答
2 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理
RabbitMQ消息投递路径
⽣产者–>交换机->队列->消费者
通过两个的点控制消息的可靠性投递
⽣产者到交换机 通过confirmCallback
交换机到队列 通过returnCallback
建议
开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量
下降严重,不是⾮常重要的消息不建议⽤消息确认机制
Rabbitmq的消息可靠性投递confirmCallback
⽣产者到交换机
通过confirmCallback,⽣产者投递消息后,如果Broker收到消息后,会给⽣产者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否正常发送到Broker,这种⽅式是消息可靠性投递的核⼼
开启confirmCallback
//旧版,确认消息发送成功,通过实现ConfirmCallBack接⼝,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
//新版,NONE值是禁⽤发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调⽅法
spring.rabbitmq.publisher-confirm-type:correlated
@SpringBootTest
class ApplicationTests {@Autowiredprivate RabbitTemplate template;/*** 生产者到交换机可靠性投递测试*/@Testvoid testConfirmCallback(){template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 配置* @param ack 交换机是否收到消息,true是成功,false是失败* @param cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback======>");System.out.println("correlationData======>correlationData="+correlationData);System.out.println("ack======>ack="+ack);System.out.println("cause======>cause="+cause);if(ack){System.out.println("发送成功");//更新数据库 消息的状态为成功 TODO}else {System.out.println("发送失败,记录到日志或者数据库");//更新数据库 消息的状态为失败 TODO}}});//数据库新增一个消息记录,状态是发送 TODO//发送消息template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单");}
Rabbitmq的消息可靠性投递returnCallback
交换机到队列
通过returnCallback,消息从交换器发送到对应队列失败时触发
两种模式
交换机到队列不成功,则丢弃消息(默认)
交换机到队列不成功,返回给消息⽣产者,触发returnCallback
第⼀步 开启returnCallback配置
spring.rabbitmq.publisher-returns=true
第⼆步 修改交换机投递到队列失败的策略(丢弃或者返回给消息⽣产者)
//为true,则交换机处理消息到路由失败,则会返回给⽣产者
spring.rabbitmq.template.mandatory=true
@SpringBootTest
class ApplicationTests {@Autowiredprivate RabbitTemplate template;/*** 交换机到队列可靠性投递测试*/@Testvoid testReturnCallback(){template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {int code = returned.getReplyCode();System.out.println("code="+code);System.out.println("returned="+returned.toString());}});template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback");}
}
Rabbitmq的消息确机制ACK
消费者从broker中监听消息,需要确保消息被合理处理
ACK介绍
消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列
中删除,消费者在处理消息出现了⽹络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked
确认⽅式
⾃动确认(默认)
⼿动确认 manual
其他(基本不⽤,忽略)
spring:rabbitmq:#开启⼿动确认消息,如果消息重新⼊队,进⾏重试listener:simple:acknowledge-mode: manual
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {@RabbitHandlerpublic void messageHandler(String body, Message message, Channel channel) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();System.out.println("msgTag="+msgTag);System.out.println("message="+message.toString());System.out.println("body="+body);//复杂业务逻辑//告诉broker,消息已经被确认//成功确认,使⽤此回执⽅法后,消息会被rabbitmq broker 删除channel.basicAck(msgTag,false);//告诉broker,消息拒绝确认//channel.basicNack(msgTag,false,true);//channel.basicReject(msgTag,true);}
}
deliveryTag介绍
表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
basicNack和basicReject介绍
basicReject⼀次只能拒绝接收⼀个消息,可以设置是否requeue。
basicNack⽅法可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue。
⼈⼯审核异常消息
设置重试阈值,超过后确认消费成功,记录消息,⼈⼯处理