说明:消息生产者在将数据发送到Mq的时候,可能由于网络等原因造成数据投递失败。
消息丢失大致分三种:这里说的是生产者消息丢失!
分析原因:
1.有没有一种可能,我刚发送消息,消息还没有到交换机就断网了,是不是消息就没有发送成功,这个时候如果不对这种情况处理,消息是不是就丢失了
2.又有没有一种可能,我又发送了一条消息,交换机拿到消息后正要发送给某个队列,就是你,你把那个队列给删掉了,这个时候消息找不到队列,消息就也丢失了
解决方法:
1.事务:Rabbitmq提供了事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
缺点:RabbitMQ 事务机制是同步的,提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,太耗性能了
2.confirm机制:相比于事务的同步,confirm机制是异步的,你发送完这个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块解决数据丢失,建议使用 confirm 机制。
话不多说,干代码
工程图:
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId> <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId> <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version> <!-- 被继承的父项目的版本 --></parent><groupId>MqLossDemo</groupId><artifactId>MqLossDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>MqLossDemo Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.1.7.RELEASE</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>MqLossDemo</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>
2.application.yml
server:port: 8080
spring:rabbitmq:port: 5672host: 你的 rabbitmq IPusername: adminpassword: adminvirtual-host: /# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: truetemplate:#在配置文件中配置 mandatory: true 页无用,需要在RabbitTemplate中手动设置mandatory: true
3.RabbitMqConfig
package com.dev.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqConfig {@Beanpublic ConfirmCallbackService confirmCallbackService() {return new ConfirmCallbackService();}@Beanpublic ReturnCallbackService returnCallbackService() {return new ReturnCallbackService();}@Beanpublic RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);//生产者发送消息到Mq交换机回执,手动ack回执回调处理//可以理解为:消息推送到server,但是在server里找不到交换机//如果想看效果【先清除交换机和队列】:在工程运行前注释掉RabbitMqQueueConfig类中的directExchange和bindingDirect方法rabbitTemplate.setConfirmCallback(confirmCallbackService());//生产者发送消息到Mq,交换机发送到队列回执,一定要设置手动设置Mandatory(true),配置文件中不生效//可以理解为:消息推送到server,但是在server里找不到队列//如果想看效果【先清除交换机和队列】:如果之前看过setConfirmCallback效果,先去掉RabbitMqQueueConfig类中注释// 在工程运行前注释掉RabbitMqQueueConfig类中的directQueue和bindingDirect方法rabbitTemplate.setReturnCallback(returnCallbackService());rabbitTemplate.setMandatory(true);return rabbitTemplate;}//生产者发送消息到Mq交换机回执 //可以理解为:消息推送到server,但是在server里找不到交换机class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {//log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);System.out.println(correlationData);System.out.println(ack);System.out.println(cause);System.out.println("--------");} else {System.out.println("消息发送异常!");//可以进行重发等操作//这里可以处理失败的业务}}}//生产者发送的消息到Mq队列回执 //可以理解为:消息推送到server,但是在server里找不到队列class ReturnCallbackService implements RabbitTemplate.ReturnCallback {//public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println(message.getMessageProperties().getMessageId());System.out.println(new String(message.getBody()));System.out.println(i);System.out.println(s);System.out.println(s1);System.out.println(s2);//可以将消息存储到一个新的位置,这里可以处理失败的业务}}}
4.RabbitMqQueueConfig
package com.dev.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqQueueConfig {//绑定键public final static String QUEUE_ONE = "loss_queue";public final static String EXCHANGE_ONE = "loss_exchange";@Beanpublic Queue directQueue() {return new Queue(RabbitMqQueueConfig.QUEUE_ONE);}//Direct交换机 起名:directExchange@BeanDirectExchange directExchange() {return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);}//绑定 将队列和交换机绑定, 并设置用于匹配键:directRoutingKey@BeanBinding bindingDirect() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");}}
5.RabbitContoller
package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 类名称:消息丢失问题** @author lqw* @date 2024年02月27日 14:47*/
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法/*** 消息丢失* @return*/@GetMapping("/sendMessage")public String sendMessage() {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","张龙");Message msg = MessageBuilder.withBody(JSONObject.toJSONString(map).getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);return "ok";}}
6.App
package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:11*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}}