RabbitMQ进阶篇

文章目录

  • 发送者的可靠性
    • 生产者重试机制
    • 实现生产者确认
  • MQ的可靠性
    • 数据持久化
      • 交换机持久化
      • 队列持久化
      • 消息持久化
    • Lazy Queue(可配置~)
      • 控制台配置Lazy模式
      • 代码配置Lazy模式
      • 更新已有队列为lazy模式
  • 消费者的可靠性
    • 消费者确认机制
    • 失败重试机制
    • 失败处理策略
  • 业务幂等性
    • 唯一消息ID
    • 业务判断
    • 兜底方案
  • 延迟消息
    • 死信交换机(不推荐使用)
    • 延迟消息
    • DelayExchange插件(推荐)
      • 插件下载地址:
      • 安装:
      • 声明延迟交换机
      • 发送延迟消息
    • 超时订单问题
      • 定义常量
      • 抽取共享mq配置
      • 改造下单业务
      • 编写查询支付状态接口
      • 消息监听

发送者的可靠性

保证一个消息发送出去,至少被消费一次。
image.png
可能在多个步骤中给消息弄丢了

生产者重试机制

不建议使用, 会增加网络和资源的消耗

第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断
当RabbitTemplate与MQ连接超时后,多次重试
修改publisher模块的application.yaml文件,添加下面的内容:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

实现生产者确认

image.png

  • ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
  • ReturnCallback为路由不到队列时触发,成功则不触发;

Rabbitmq之ConfirmCallback与ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

定义ReturnCallback:
image.png

package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

image.png
image.png
定义ConfirmCallback:
由于每个消息发送时的处理逻辑不一定相同,因此**ConfirmCallback需要在每次发消息时定义。**具体来说,是在调用RabbitTemplate中的convertAndSend方法 时,多传递一个参数:

image.png

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

image.png

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:

@Test
void testPublisherConfirm() {// 1.创建CorrelationData,需要一个UUID,回调的时候通过id识别CorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

image.png

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了

image.png

MQ的可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化
image.png

  • 交换机持久化
  • 队列持久化
  • 消息持久化

数据持久化

交换机持久化

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

image.png

消息持久化

image.png
image.png

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

当内存占满, page out会影响MQ阻塞

Lazy Queue(可配置~)

image.png

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。官方推荐所有队列都为LazyQueue模式。

控制台配置Lazy模式

image.png

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}

当然,我们也可以基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",					arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有队列为lazy模式

可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

当然,也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:
image.png

消费者的可靠性

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回

    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 不做处理

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

失败处理策略

Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

在consumer服务中定义处理失败消息的交换机和队列
定义一个RepublishMessageRecoverer,关联队列和交换机
image.png

/*** 错误消息配置类,用于配置 RabbitMQ 错误消息处理相关的 Bean。* 当前类会根据 spring.rabbitmq.listener.simple.retry.enabled 属性的值来决定是否创建相关的 Bean。* 如果该属性值为 true,则会创建错误消息交换机、错误队列和绑定关系,并配置消息恢复器。*/
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {/*** 创建错误消息交换机*/@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}/*** 创建错误队列*/@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}/*** 创建错误队列与错误消息交换机的绑定关系,"error"是路由键*/@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}/*** 创建消息恢复器*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto, 由spring确认消息处理成功后返回ack, 异常时返回nack
  • 开启消费者失败重试机制, 并设置MessageRecoverer, 多次重试失败后将信息投递到异常交换机

业务幂等性

image.png
保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可
以Jackson的消息转换器为例( 加在启动类 ):

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);// 在底层会自动创建一个UUIDreturn jjmc;
}

在Spring AMQP中,当使用Jackson2JsonMessageConverter并开启setCreateMessageIds(true)功能时,底层会自动在消息的属性中添加一个名为amqp_messageId的字段,其值为自动生成的UUID。
具体来说,UUID会成为消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)属性中。这些属性是与消息一起传递的元数据,包含了关于消息的一些信息。amqp_messageId字段就是用于唯一标识消息的UUID。
当消息被发送给消费者时,消费者可以通过**message.getMessageProperties().getMessageId()**方法来获取消息的ID,然后根据业务需求将该ID保存到数据库中。在处理相同消息时,消费者可以在数据库中查询是否存在相同的消息ID,以判断是否为重复消息。

业务判断

image.png
image.png
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

image.png

兜底方案

既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

image.png

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

image.png

死信交换机(不推荐使用)

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

死信交换机有什么作用呢?

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

延迟消息

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

DelayExchange插件(推荐)

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装:

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:
image.png

[{"CreatedAt": "2024-01-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,启用插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"	
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

可以写一个延迟时间的类, 不用每次都new一个,具体代码如下:
image.png

/*** @author Ccoo* 2024/1/22*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}

最后上面的业务代码变为:

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message,new DelayMessageProcessor(message.removeNextDelay().intValue());
} 

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

超时订单问题

image.png

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到hm-common模块下:

image.png

@Data
public class MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

定义常量

image.png

/*** @author Ccoo* 2024/1/22*/
public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

抽取共享mq配置

在nacos中定义一个名为shared-mq.xml的配置文件,内容如下:

spring: rabbitmq:host: ${hm.mq.host:192.168.164.128} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:itheima} # 用户名password: ${hm.mq.pw:123321} # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在trade-service模块添加共享配置:
image.png

改造下单业务

  1. 引入依赖

在trade-service模块的pom.xml中引入amqp的依赖:

<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 改造下单业务

image.png

编写查询支付状态接口

首先,在hm-api模块定义三个类:

image.png

说明:

  • PayOrderDTO:支付单的数据传输实体
  • PayClient:支付系统的Feign客户端
  • PayClientFallback:支付系统的fallback逻辑
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("业务订单号")private Long bizOrderNo;@ApiModelProperty("支付单号")private Long payOrderNo;@ApiModelProperty("支付用户id")private Long bizUserId;@ApiModelProperty("支付渠道编码")private String payChannelCode;@ApiModelProperty("支付金额,单位分")private Integer amount;@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")private Integer payType;@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")private String expandJson;@ApiModelProperty("第三方返回业务码")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功时间")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超时时间")private LocalDateTime payOverTime;@ApiModelProperty("支付二维码链接")private String qrCodeUrl;@ApiModelProperty("创建时间")private LocalDateTime createTime;@ApiModelProperty("更新时间")private LocalDateTime updateTime;
}
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根据交易订单id查询支付单* @param id 业务订单id* @return 支付单信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}

最后,在pay-service模块的PayController中实现该接口:

@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

消息监听

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
image.png

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {private final IOrderService orderService;private final PayClient payClient;private final RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY))public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {// 1.获取消息中的订单idLong orderId = msg.getData();// 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭Order order = orderService.getById(orderId);if (order == null || order.getStatus() > 1) {// 订单不存在或交易已经结束,放弃处理return;}// 3.可能是未支付,查询支付服务PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {// 支付成功,更新订单状态orderService.markOrderPaySuccess(orderId);return;}// 4.确定未支付,判断是否还有剩余延迟时间if (msg.hasNextDelay()) {// 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值int delayVal = msg.removeNextDelay().intValue();// 4.2.发送延迟消息rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,message -> {message.getMessageProperties().setDelay(delayVal);return message;});return;}// 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单orderService.cancelOrder(orderId);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/38427.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

西部智慧健身小程序+华为运动健康服务

1、 应用介绍 西部智慧健身小程序为用户提供一站式全流程科学健身综合服务。用户通过登录微信小程序&#xff0c;可享用健康筛查、运动风险评估、体质检测评估、运动处方推送、个人运动数据监控与评估等公益服务。 2、 体验介绍西部智慧健身小程序华为运动健康服务核心体验如…

idea xml ctrl+/ 注释格式不对齐

处理前 处理后 解决办法 取消这两个勾选

核方法总结(三)———核主成分(kernel PCA)学习笔记

一、核主成分 1.1 和PCA的区别 PCA &#xff08;主成分分析&#xff09;对应一个线性高斯模型&#xff08;参考书的第二章&#xff09;&#xff0c;其基本假设是数据由一个符合正态分布的隐变量通过一个线性映射得到&#xff0c;因此可很好描述符合高斯分布的数据。然而在很多实…

ViewBinding的使用(因为kotlin-android-extensions插件的淘汰)

书籍&#xff1a; 《第一行代码 Android》第三版 开发环境&#xff1a; Android Studio Jellyfish | 2023.3.1 问题&#xff1a; 3.2.4在Activity中使用Toast章节中使用到了kotlin-android-extensions插件,但是该插件已经淘汰,根据网上了解,目前使用了新的技术VewBinding替…

UE4_材质_材质节点_DepthFade

一、DepthFade参数 DepthFade&#xff08;深度消退&#xff09;表达式用来隐藏半透明对象与不透明对象相交时出现的不美观接缝。 项目说明属性消退距离&#xff08;Fade Distance&#xff09;这是应该发生消退的全局空间距离。未连接 FadeDistance&#xff08;FadeDistance&a…

光照老化试验箱:材料耐久性的“时间加速器”

光照老化试验箱&#xff1a;材料耐久性的“时间加速器”概述 光照老化试验箱是一种模拟自然光照条件下材料老化过程的设备&#xff0c;广泛应用于材料科学领域的耐久性能评估。通过模拟日光中的紫外线、热辐射等环境因素&#xff0c;加速材料老化过程&#xff0c;以此来验证材…

redhawk:tech file与lefdef layer name不匹配问题

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 一些工艺厂商给的redhawk tech file是加密的&#xff0c;读完tech file再读lef/def会报错&#xff0c;根本不知道问题在哪&#xff0c;他们一般会搭配给一个layer map&#xff…

分解+降维+预测!多重创新!直接写核心!EMD-KPCA-Transformer多变量时间序列光伏功率预测

分解降维预测&#xff01;多重创新&#xff01;直接写核心&#xff01;EMD-KPCA-Transformer多变量时间序列光伏功率预测 目录 分解降维预测&#xff01;多重创新&#xff01;直接写核心&#xff01;EMD-KPCA-Transformer多变量时间序列光伏功率预测效果一览基本介绍程序设计参…

【简单讲解神经网络训练中batch的作用】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

STM32第十五课:LCD屏幕及应用

文章目录 需求一、LCD显示屏二、全屏图片三、数据显示1.显示欢迎词2.显示温湿度3.显示当前时间 四、需求实现代码 需求 1.在LCD屏上显示一张全屏图片。 2.在LCD屏上显示当前时间&#xff0c;温度&#xff0c;湿度。 一、LCD显示屏 液晶显示器&#xff0c;简称 LCD(Liquid Cry…

【Windows】Visual Studio Installer下载缓慢解决办法

【Windows】Visual Studio Installer下载缓慢解决办法 1.背景2.分析3.结果 1.背景 使用visual studio在线安装包进行IDE安装&#xff0c;发现下载几乎停滞&#xff0c;网速几乎为零。 经过排查并不是因为实际网络带宽导致。 这里涉及DNS知识&#xff1b; DNS&#xff08;Dom…

消防认证-防火卷帘

一、消防认证 消防认证是指消防产品符合国家相关技术要求和标准&#xff0c;且通过了国家认证认可监督管理委员会审批&#xff0c;获得消防认证资质的认证机构颁发的证书&#xff0c;消防产品具有完好的防火功能&#xff0c;是住房和城乡建设领域验收的重要指标。 二、认证依据…

最快33天录用!一投就中的医学4区SCI,几乎不退稿~

【SciencePub学术】今天小编给大家推荐2本生物医学领域的SCI&#xff0c;此期刊为我处目前合作的重点期刊&#xff01;影响因子0-3.0之间&#xff0c;最重要的是审稿周期较短&#xff0c;对急投的学者较为友好&#xff01; 医学医药类SCI 01 / 期刊概况 【期刊简介】IF&…

那些好用的 Vue3 的工具搭子!!【送源码】

2020 年 9 月 18 日 Vue3 的正式发布已经过去了大约 3 年 9 个月左右&#xff01;&#xff01;&#xff01; 随着 Vue3 版本的逐渐成熟&#xff0c;我们的前端世界也迎来了一系列令人振奋的更新和工具。Vue 生态圈的持续扩大&#xff0c;无疑为前端开发人员带来了前所未有的便…

通过pycharm使用git和github的步骤

一、在Pycharm工具中配置集成Git和GitHub。 1.集成Git。 打开Pycharm, 点击File-->Settins-->Version Control-->Git 然后在 Path to Git executable中选择本地的git.exe路径。如下图&#xff1a; 2.集成GitHub 打开Pycharm, 点击File-->Settins-->Version…

探索未来远程调试新纪元——《串口网口远程调试软件》:无缝连接,高效调试

文章目录 前言一、无缝连接&#xff0c;突破距离限制二、高效调试&#xff0c;提升工作效率三、安全可靠&#xff0c;保护数据安全四、用户友好&#xff0c;简化操作流程五、软件地址六、远程调试软件 七、基本操作1、订阅主题2、连接3、串口调试4、网口调试 八、软件地址结束语…

PO模式登录测试

项目实践 登陆项目测试 get_driver import page from selenium import webdriverclass GetDriver:driver Noneclassmethoddef get_driver(cls):if cls.driver is None:cls.driver webdriver.Edge()cls.driver.maximize_window()cls.driver.get(page.url)return cls.drivercl…

智谱AI: ChatGLM API的使用

一、获取API 1、打开网址&#xff1a;智谱AI开放平台 注册账号登录 2、登录&#xff0c;查看API key (注册后赠送100万token&#xff0c;实名认证后多赠送400万, 有效期一个) 二、安装及调用 安装质谱SDK pip install zhipuai调用方式 流式调用 from zhipuai import ZhipuA…

开放签电子签章,让签字有迹可循

开放签&#xff08;企业版&#xff09;V2.0.5版本上线后&#xff0c;系统支持一键查询电子文件的签署操作记录&#xff0c;支持一键生成详细的签署记录报告&#xff0c;详细请看下图&#xff1a; 1、操作记录详情&#xff1a; 从合同发起、填写、签署、撤销等环节全流程展示操…

【Linux从入门到放弃】探究进程如何退出以进程等待的前因后果

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《Linux从入门到放弃》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; 进…