一、消息可靠性问题
首先,分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
发送消息时丢失:
- 生产者发送消息时连接MQ失败
- 生产者发送消息到达MQ后未找到Exchange
- 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
- 消息到达MQ后,处理消息的进程发生异常。
MQ导致消息丢失:
- 消息到达MQ后,保存到队列后,尚未消费就突然宕机
消费者处理消息时:
- 消息接收后尚未处理突然宕机
- 消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息丢失
- 确保消费者一定要处理消息
1. 发送者的可靠性
1.1 发送者重连机制
有时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
①在mq-demo项目的publisher模块的application.yaml添加如下
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
②在虚拟机中把mq停止
docker stop mq
测试完之后重启mq
docker restart mq
③运行单元测试TestSimpleQueue()
注意,当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
1.2 发送者确认机制
SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其他情况都会返回NACK,告知投递失败
1.3 SpringAMQP实现发送者确认
①在publisher这个微服务的application.yml中添加配置
spring:rabbitmq:# ... ... 省略publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return 机制
logging:pattern:dateformat: MM-dd HH:mm:ss:SSSlevel:com.itheima: debug
配置说明:这里publisher-confirm-type有三种模式可选:
- none:关闭confirm机制,默认
- simple:同步阻塞等待MQ的回执消息
- correlated:MQ异步回调方式返回回执消息
②每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
publisher下新增config.MqConfig
package com.itheima.publisher.config;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setReturnsCallback(returned -> {log.error("监听到消息rentun callback");log.debug("交换机:{}", returned.getExchange());log.debug("routingKey:{}", returned.getRoutingKey());log.debug("message:{}", returned.getMessage());log.debug("replyCode:{}", returned.getReplyCode());log.debug("replyText:{}", returned.getReplyText());});}
}
③发送消息,指定消息ID、消息ConfirmCallback
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.UUID;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testConfirmCallback() throws InterruptedException {// 1. 创建correlationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.error("spring amqp 处理确认结果异常", ex);// 可以在此处实现重发逻辑,例如判断异常类型或重试次数}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 判断是否成功if(result.isAck()) {log.debug("收到confirmCallback ack, 消息发送成功!");} else {log.error("收到confirmCallback nack, 消息发送失败!reason:{}", result.getReason());}}});// 2. 交换机名String exchangeName = "hmall.direct";// 3. 发送消息String message = "hello, mq!";rabbitTemplate.convertAndSend(exchangeName, "red", message, cd);// 让线程休眠,使其有充分的时间接收回调Thread.sleep(2000);}
}
运行单元测试
当设置一个不存在的routingKey时:
交换机名称填错时:
2. MQ的可靠性
在默认情况下,RabbitMQ会接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
2.1 数据持久化
RabbitMQ实现数据持久化包括3个方面
- 交换机持久化
- 队列持久化
- 消息持久化
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {// 1. 自定义构建消息Message message = MessageBuilder.withBody("hello, SpringAMQP".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 2. 发送消息for (int i = 0; i < 1000000; i++) {rabbitTemplate.convertAndSend("simple.queue", "hello.SpringAMQP");}}
}
NON_PERSISTENT
PERSISTENT
2.2 Lazy Queue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列
惰性队列的特征如下:
- 接受到消息后直接存入磁盘,不再存储到内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
控制台配置Lazy模式:
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {// 1. 自定义构建消息Message message = MessageBuilder.withBody("hello, SpringAMQP".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 2. 发送消息for (int i = 0; i < 1000000; i++) {rabbitTemplate.convertAndSend("lazy.queue", "hello.SpringAMQP");}}
}
代码配置Lazy模式:
①在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可以设置队列为Lazy模式:
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
这里是通过QueueBuilder的lazy()函数配置Lazy模式,底层源码如下:
②当然,也可以基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
总结
RabbitMQ如何保证消息的可靠性?
- 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
- RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会成为队列的默认模式。LazyQueue会将所有消息都持久化。
- 开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。
3. 消费者的可靠性
3.1 消费者确认机制
消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
- none:不处理。即消息传递给消费者后立即ack,消息会立刻从MQ删除。非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
- auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时自动返回ack,当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常,会自动返回nack
- 如果是消息处理或校验异常,自动返回reject
通过下面的配置可以更改SpringAMQP的ACK处理方式:consumer下的application.yml
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理
可以在SpringRabbitListener中进行一个测试
package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {log.info("监听到simple.queue的消息:[" + message + "]");
// throw new RuntimeException("我是故意抛出的异常!");throw new MessageConversionException("我是故意抛出的异常!");}
}
SpringAmqpTest
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 1. 队列名String queueName = "simple.queue";// 2. 消息String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}
3.2 失败重试机制
SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。我们可以通过在consumer的application.yaml文件中添加配置来开启重试机制:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
利用3.1中的测试代码进行测试(RuntimeException)
失败消息处理策略
在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有由MessageRecover接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方法。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
将失败处理策略改为RepublishMessageRecoverer:
①首先,定义接收失败消息的交换机、队列及其绑定关系
②然后,定义RepublishMessageRecoverer
package com.itheima.consumer.config;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ErrorMessageConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
3.3 业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
要尽可能避免非幂等业务被重复执行。然而在实际场景中,由于意外经常会出现业务被重复执行的情况,例如:
- 页面卡顿时频繁刷新导致表单重复提交
- 服务间调用的重试
- MQ消息的重复投递
因此,我们必须想办法保证消息处理的幂等性。
3.3.1 唯一消息id
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
①每一条消息都生成一个唯一的id,与消息一起投递给消费者
②消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
③如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
SpringAmqpTest
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 1. 队列名String queueName = "simple.queue";// 2. 消息String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}
SpringRabbitListener
package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(Message message) {log.info("监听到simple.queue的消息-ID: [{}]", message.getMessageProperties().getMessageId());log.info("监听到simple.queue的消息:[{}]", new String(message.getBody()));}
}
唯一消息ID的方案需要改造原有的数据库。
3.3.2 业务判断
方案二,是结合业务逻辑,基于业务本身做判断。以我们的余额支付业务为例:
黑马商城hmall
package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.direct"),key = "pay.success"))public void listenPaySuccess(Long orderId) {// 1. 查询订单Order order = orderService.getById(orderId);// 2. 判断订单状态,是否为未支付if(order == null || order.getStatus() != 1) {// 不做处理return;}// 3. 标记订单状态为已支付orderService.markOrderPaySuccess(orderId);}
}
总结
如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
- 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。
如果交易服务消息处理失败,有没有什么兜底方案?
既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。
4. 延迟消息
延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
4.1 死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
mq-demo项目:SpringRabbitListener
package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlx.queue", durable = "true"),exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),key = {"blue"}))public void listenDlxQueue(String message) {log.info("消费者监听到dlx.queue的消息:[{}]", message);}
}
NormalConfiguration
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class NormalConfiguration {@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal.direct");}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();}@Beanpublic Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("blue");}
}
重启ConsumerApplication
SpringAmqpTest
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDelayMessage() {rabbitTemplate.convertAndSend("normal.direct", "blue", "blue sky", message -> {message.getMessageProperties().setExpiration("10000");return message;});}
}
启动单元测试
4.2 延迟消息插件
DelayExchange插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
(1)安装
①因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下:
[{"CreatedAt": "2024-06-12T13:30:30+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。
cd /var/lib/docker/volumes/mq-plugins/_data
接下来执行命令,安装插件
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
(2)声明延迟交换机
基于注解方式:
package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = {"hi"}))public void listenDelayQueue(String message) {log.info("消费者监听到delay.queue的消息:[{}]", message);}
}
启动ConsumerApplication
基于@Bean的方式:
package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
(3)发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
package com.itheima.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
@Slf4j
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDelayMessageByPlugin() {rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", message -> {message.getMessageProperties().setDelay(10000);return message;});}
}
启动单元测试
4.3 取消超时订单
用户下单完成后,发送15分钟延迟消息,在15分钟后接收消息,检查支付状态:
- 已支付:更新订单状态为已支付
- 未支付:更新订单状态为关闭订单,恢复商品库存
步骤:黑马商城hmall
①定义常量
在trade-service添加config.MQConstants
package com.hmall.trade.constants;public interface MQConstants {String DELAY_EXCHANGE_NAME = "trade.delay.direct";String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";String DELAY_ORDER_KEY = "delay.order.query";
}
②改造下单业务,发送延迟消息
OrderServiceImpl
为了方便测试,这里延迟时间设为10秒
package com.hmall.trade.service.impl;// ... .../*** <p>* 服务实现类* </p>** @author 虎哥* @since 2023-05-05*/
@Service
@RequiredArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {private final ItemClient itemClient;private final IOrderDetailService detailService;private final CartClient cartClient;private final RabbitTemplate rabbitTemplate;@Override@GlobalTransactionalpublic Long createOrder(OrderFormDTO orderFormDTO) {// 1.订单数据Order order = new Order();// 1.1.查询商品List<OrderDetailDTO> detailDTOS = orderFormDTO.getDetails();// 1.2.获取商品id和数量的MapMap<Long, Integer> itemNumMap = detailDTOS.stream().collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));Set<Long> itemIds = itemNumMap.keySet();// 1.3.查询商品List<ItemDTO> items = itemClient.queryItemByIds(itemIds);if (items == null || items.size() < itemIds.size()) {throw new BadRequestException("商品不存在");}// 1.4.基于商品价格、购买数量计算商品总价:totalFeeint total = 0;for (ItemDTO item : items) {total += item.getPrice() * itemNumMap.get(item.getId());}order.setTotalFee(total);// 1.5.其它属性order.setPaymentType(orderFormDTO.getPaymentType());order.setUserId(UserContext.getUser());order.setStatus(1);// 1.6.将Order写入数据库order表中save(order);// 2.保存订单详情List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);detailService.saveBatch(details);// 3.清理购物车商品cartClient.deleteCartItemByIds(itemIds);// 4.扣减库存try {itemClient.deductStock(detailDTOS);} catch (Exception e) {throw new RuntimeException("库存不足!");}// 5. 发送延迟消息,检测订单支付状态rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE_NAME,MQConstants.DELAY_ORDER_KEY,order.getId(),message -> {message.getMessageProperties().setDelay(10000);return message;});return order.getId();}
}
③编写查询支付状态接口
由于MQ消息处理时需要查询支付状态,因此我们要在pay-service
模块定义一个这样的接口,并提供对应的FeignClient
.
首先,在hm-api
模块定义三个类:
PayOrderDTO:
package com.hmall.api.dto;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDateTime;/*** <p>* 支付订单* </p>*/
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("业务订单号")private Long bizOrderNo;@ApiModelProperty("支付单号")private Long payOrderNo;@ApiModelProperty("支付用户id")private Long bizUserId;@ApiModelProperty("支付渠道编码")private String payChannelCode;@ApiModelProperty("支付金额,单位分")private Integer amount;@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")private Integer payType;@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")private String expandJson;@ApiModelProperty("第三方返回业务码")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功时间")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超时时间")private LocalDateTime payOverTime;@ApiModelProperty("支付二维码链接")private String qrCodeUrl;@ApiModelProperty("创建时间")private LocalDateTime createTime;@ApiModelProperty("更新时间")private LocalDateTime updateTime;
}
PayClient
package com.hmall.api.client;import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根据交易订单id查询支付单* @param id 业务订单id* @return 支付单信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
PayClientFallback
package com.hmall.api.client.fallback;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}
最后,在pay-service模块的PayController中实现该接口:
@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}
④监听消息,查询支付状态
在trade-service编写一个监听器,监听延迟消息,查询订单支付状态:
package com.hmall.trade.listener;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.trade.constants.MQConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class OrderDelayMessageListener {private final IOrderService orderService;private final PayClient payClient;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),key = MQConstants.DELAY_ORDER_KEY))public void listenOrderDelayMessage(Long orderId) {// 1. 查询订单Order order = orderService.getById(orderId);// 2. 检测订单状态,判断是否已支付if(order == null || order.getStatus() != 1) {// 订单不存在或者已经支付return;}// 3. 未支付,需要查询支付流水状态PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);// 4. 判断是否支付if(payOrder != null && payOrder.getStatus() == 3) {// 4.1 已支付,标记订单状态为已支付orderService.markOrderPaySuccess(orderId);} else {// 4.2 未支付,取消订单,恢复库存orderService.cancelOrder(orderId);}}
}
⑤取消订单 - OrderServiceImpl
package com.hmall.trade.service.impl;import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmall.api.client.ItemClient;
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.OrderDetailDTO;
import com.hmall.trade.domain.dto.OrderFormDTO;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.domain.po.OrderDetail;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;/*** <p>* 服务实现类* </p>** @author 虎哥* @since 2023-05-05*/
@Service
@RequiredArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {private final ItemClient itemClient;private final PayClient payClient;// ... ...@Overridepublic void cancelOrder(Long orderId) {// 1. 标记交易订单为已关闭lambdaUpdate().set(Order::getStatus, 5).eq(Order::getId, orderId).update();// 2. 标记支付单状态为已取消payClient.updatePayOrderStatusByBizOrderNo(orderId, 2);// 3. 恢复订单库存已经扣除的库存List<OrderDetail> list = detailService.lambdaQuery().eq(OrderDetail::getOrderId, orderId).list();List<OrderDetailDTO> orderDetailDTOS = BeanUtil.copyToList(list, OrderDetailDTO.class);itemClient.restoreStock(orderDetailDTOS);}
}
⑥标记支付单状态为已取消
PayController
@ApiOperation("修改支付单状态")
@PutMapping("/status/{id}/{status}")
public void updatePayOrderStatusByBizOrderNo(@PathVariable("id") Long orderId, @PathVariable("status") Integer status) {payOrderService.updateStatusByOrderId(orderId, status);
}
IPayOrderService
void updateStatusByOrderId(Long orderId, Integer status);
PayOrderServiceImpl
@Override
public void updateStatusByOrderId(Long orderId, Integer status) {lambdaUpdate().set(PayOrder::getStatus, status).eq(PayOrder::getBizOrderNo, orderId).update();
}
PayClient
@ApiOperation("修改支付单状态")
@PutMapping("/pay-orders/status/{id}/{status}")
public void updatePayOrderStatusByBizOrderNo(@PathVariable("id") Long orderId, @PathVariable("status") Integer status);
PayClientFallback
package com.hmall.api.client.fallback;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}@Overridepublic void updatePayOrderStatusByBizOrderNo(Long orderId, Integer status) {log.error("更新支付单状态失败!", cause);throw new RuntimeException(cause);}};}
}
⑦批量恢复商品库存
ItemController
@ApiOperation("批量恢复库存")
@PutMapping("/stock/restore")
public void restoreStock(@RequestBody List<OrderDetailDTO> orderDetails) {itemService.restoreStock(orderDetails);
}
IItemService
void restoreStock(List<OrderDetailDTO> orderDetails);
ItemServiceImpl
@Override
public void restoreStock(List<OrderDetailDTO> orderDetails) {for (OrderDetailDTO orderDetail : orderDetails) {// 根据商品id查询商品Item item = lambdaQuery().eq(Item::getId, orderDetail.getItemId()).one();// 恢复库存lambdaUpdate().set(Item::getStock, item.getStock() + orderDetail.getNum()).eq(Item::getId, orderDetail.getItemId()).update();}
}
ItemClient
@ApiOperation("批量恢复库存")
@PutMapping("/items/stock/restore")
public void restoreStock(@RequestBody List<OrderDetailDTO> orderDetails);
ItemClientFallbackFactory
package com.hmall.api.client.fallback;import com.hmall.api.client.ItemClient;
import com.hmall.api.dto.ItemDTO;
import com.hmall.api.dto.OrderDetailDTO;
import com.hmall.common.utils.CollUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;import java.util.Collection;
import java.util.List;@Slf4j
public class ItemClientFallbackFactory implements FallbackFactory<ItemClient> {@Overridepublic ItemClient create(Throwable cause) {return new ItemClient() {@Overridepublic List<ItemDTO> queryItemByIds(Collection<Long> ids) {log.error("查询商品失败!", cause);return CollUtils.emptyList();}@Overridepublic void deductStock(List<OrderDetailDTO> items) {log.error("扣减商品库存失败!", cause);throw new RuntimeException(cause);}@Overridepublic void restoreStock(List<OrderDetailDTO> orderDetails) {log.error("恢复商品库存失败!", cause);throw new RuntimeException(cause);}};}
}
⑧注释掉PayOrderServiceImpl中tryPayOrderByBalance的发通知代码,方便测试(测试完可以取消)
@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {// 1.查询支付单PayOrder po = getById(payOrderFormDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}// 5. TODO 修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());/*try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("发送支付状态通知失败,订单id:{}", po.getBizOrderNo(), e);}*/}
⑨给MqConfig加上条件注解
@Configuration
@ConditionalOnClass(RabbitTemplate.class) // 有RabbitTemplate才生效
public class MqConfig {
// ... ...
}
⑩启动所有服务,进行下单测试
修改订单状态
修改支付单状态
恢复库存
二、抽取MQ工具作业
步骤:
①在nacos中抽取RabbitMQ的共享配置,命名为shared-mq.yaml
:
spring:rabbitmq:host: ${hm.mq.host:192.168.126.151} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:hmall} # 用户名password: ${hm.mq.pw:123} # 密码
pay-service、trade-service模块把application.yaml里关于rabbitmq的配置删除,在bootstrap.yaml里添加如下
spring:application:name: pay-service # 服务名称profiles:active: devcloud:nacos:server-addr: 192.168.126.151 # nacos地址config:file-extension: yaml # 文件后缀名shared-configs: # 共享配置- data-id: shared-jdbc.yaml # 共享mybatis配置- data-id: shared-log.yaml # 共享日志配置- data-id: shared-swagger.yaml # 共享日志配置- data-id: shared-seata.yaml # 共享seata配置- data-id: shared-mq.yaml # 共享RabbitMQ配置
②引入依赖
在hm-common模块引入amqp、jackson依赖
<!--AMQP依赖-->
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><scope>provided</scope>
</dependency>
<!--Spring整合Rabbit依赖-->
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><scope>provided</scope>
</dependency>
<!--json处理-->
<dependency><groupId>com.fasterxml.jackson.coret</groupId><artifactId>jackson-databind</artifactId><scope>provided</scope>
</dependency>
③封装工具
在hm-common模块的com.hmall.common.utils
包下新建一个RabbitMqHelper
类:
package com.hmall.common.utils;import cn.hutool.core.lang.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;@Slf4j
@RequiredArgsConstructor
public class RabbitMqHelper {private final RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, Object msg) {log.debug("准备发送消息, exchange:{}, routingKey: {}, msg: {}", exchange, routingKey, msg);rabbitTemplate.convertAndSend(exchange, routingKey, msg);}public void sendDealyMessage(String exchange, String routingKey, Object msg, int delay) {rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {message.getMessageProperties().setDelay(delay);return message;});}public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, int maxRetries) {log.debug("准备发送消息,exchange: {}, routingKey: {}, msg: {}", exchange, routingKey, msg);CorrelationData cd = new CorrelationData(UUID.randomUUID().toString(true));cd.getFuture().addCallback(new ListenableFutureCallback<>() {int retryCount = 0;@Overridepublic void onFailure(Throwable ex) {log.error("处理ack回执失败", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {if(result != null && !result.isAck()) {log.debug("消息发送失败,收到nack,已重试次数: {}", retryCount);if(retryCount >= maxRetries) {log.error("消息发送重试次数耗尽,发送失败");return;}CorrelationData cd = new CorrelationData(UUID.randomUUID().toString(true));cd.getFuture().addCallback(this);rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd);retryCount++;}}});rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd);}
}
④自动装配
在hm-common模块的包下定义一个配置类:MqConfig
package com.hmall.common.config;import com.fasterxml.jackson.databind.ObjectMapper;
import com.hmall.common.utils.RabbitMqHelper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnClass(value = {RabbitTemplate.class, MessageConverter.class}) // 有RabbitTemplate才生效
public class MqConfig {@Bean@ConditionalOnBean(ObjectMapper.class)public MessageConverter messageConverter(ObjectMapper mapper) {// 1. 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter(mapper);// 2. 配置自动创建消息id,用于识别不同消息jjmc.setCreateMessageIds(true);return jjmc;}@Beanpublic RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate) {return new RabbitMqHelper(rabbitTemplate);}
}
注意,由于hm-common模块的包名为com.hmall.common
,与其它微服务的包名不一致,因此无法通过扫描包使配置生效。为了让我们的配置生效,我们需要在项目的classpath下的META-INF/spring.factories文件中声明这个配置类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.hmall.common.config.MyBatisConfig,\com.hmall.common.config.MvcConfig,\com.hmall.common.config.MqConfig,\com.hmall.common.config.JsonConfig