1、简介
消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,本文详细介绍两个环节的消息可靠性传递方式:1)、消息传递到交换机的 confirm 模式;2)、消息传递到队列的 Return 模式。
消息从 producer 到 exchange 则会返回一个 confirmCallback;
消息从 exchange 到 queue 投递失败则会返回一个 returnCallback;
2、Confirm模式
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障。
2.1、使用confirm模式的具体步骤
1)、配置文件application.yml 开启确认模式
spring:rabbitmq:host: 192.168.30.88port: 5672username: adminpassword: adminvirtual-host: testpublisher-confirm-type: correlated # 开启数据关联确认机制
2)、组件绑定关系配置(采用直连交换机模式)
@SpringBootConfiguration
@ConfigurationProperties(prefix = "rabbit.confirm")
public class RabbitConfigCommit {private String exchangeName;private String queueName;@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}@Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}public Binding bindingA(DirectExchange exchange, Queue queue){return BindingBuilder.bind(queue).to(exchange).with("info");}
}
3)、写一个类实现implements RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理;设置rabbitTemplate的确认回调方法rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
@Service
public class MessageServiceConfirm implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}public void sendMsg(){Message message = MessageBuilder.withBody("hello long".getBytes()).build();// 设置消息关联数据CorrelationData correlationData = new CorrelationData();correlationData.setId("123");rabbitTemplate.convertAndSend("exchange.confirm", "info", message, correlationData);}
// 消息确认返回判断@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("消息发送到交换机成功!" + correlationData.getId());return;}System.out.println("消息发送到交换机失败!" + correlationData.getId());}
}
3、Return 模式
3.1、使用 retrun 模式的具体步骤(和confirm 模式一致)
1)、配置文件application.yml 开启 retrun 模式
spring:rabbitmq:host: 192.168.30.88port: 5672username: adminpassword: adminvirtual-host: testpublisher-confirm-type: correlatedpublisher-returns: true # 开启Return模式
2)、组件配置(参考confirm模式)
3)、使用rabbitTemplate.setReturnCallback设置退回函数,当消息从 exchange 路由到 queue 失败后,则会将消息退回给producer,并执行回调函数 returnedMessage(采用匿名内部类/lambda表达式实现)
@Service
public class MessageServiceReturn {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){// 不使用匿名内部类,就要实现接口 implements RabbitTemplate.ReturnsCallbackrabbitTemplate.setReturnsCallback(msg -> {System.out.println("消息发送结果:" + msg.getReplyText());});
// rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
// @Override
// public void returnedMessage(ReturnedMessage returnedMessage) {
// System.out.println("消息发送结果:" + returnedMessage.getReplyText());
// }
// });}public void sendMsg(){Message message = MessageBuilder.withBody("hello long".getBytes()).build();rabbitTemplate.convertAndSend("exchange.return", "info", message, correlationData);}
}
4、总结
本文详细介绍两种模式的消息可靠性保证,关于Rabbitmq 消息持久化、集群配置等更高级的内容关注下面公众号查阅。
本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)