4.RabbitMQ - 延迟消息

RabbitMQ延迟消息

文章目录

  • RabbitMQ延迟消息
  • 一、延迟消息介绍
  • 二、实现
    • 2.1 死信交换机
    • 2.2 延迟消息插件
    • 2.3 取消超时订单

一、延迟消息介绍

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息

用户下单抢购,抢到了但是没有付款,此时其实库存的数量已经扣减了

image-20240524101232243

如果用户迟迟没有付款,超过一定的时间,就会将此订单取消掉,库存的数量也会重新加回来

我们可以定义一个定时任务扫描数据中订单的状态,超过一定时间没有付款的,我们就将订单取消

延迟任务:设置在一定时间之后才执行的任务

当用户下单成功后,立刻向MQ中发送一条延迟消息,设定延迟时间30分钟,30分钟到了之后就可以收到此消息,检查订单状态,如果发现未支付,则订单直接取消。

这样解决了实效性的问题,同时对数据库的压力也很小

二、实现

2.1 死信交换机

当队列满足下列的条件之一时就会称为死信(dead letter)

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

    消费者不要这个消息了

  • 消息是一个过期的消息(达到了队列或消息本身设置的过期时间),超时无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

死信交换机只是一种称呼,和普通的交换机其实是一样的

我们不给simple.queue队列绑定消费者,给dlx.queue绑定一个消费者

因为simple.queue队列没有消费者,所以不会有人来消费,当有人通过simple.direct交换机向simple.queue队列发送一条过期时间为30秒的消息,此消息就会在simple.queue队列卡主

image-20240524103939964

过了30s后,消息就会自动投递到dlx.direct死信交换机,然后进入dlx.queue队列,最终消费者拿到后会进行消费

利用死信交换机、死信队列。过期时间的方式,模拟出了延迟消息的效果

image-20240524104129199

验证一下

  1. 在控制台创建simple.direct交换机

image-20240524104741666

  1. 将此交换机与simple.queue绑定

image-20240524104914383

注意!simple.queue并没有绑定到消费者,进入到simple.queue队列的消息都会变成死信

  1. 创建队列dlx.queue和dlx.direct并将其绑定

创建队列

image-20240524105129163

创建交换机

image-20240524111001370

进行绑定

image-20240524105340163

  1. 给simple.queue队列设定死信交换机

注意,这个地方只能是在创建队列的时候进行绑定

image-20240524110834121

  1. 在消费者模块代码中定义两个队列simple.queue、dlx.queue
//检查一下,一定不要有simple.queue的消费者
//@RabbitListener(queues = "simple.queue")
//public void listenSimpleQueue(String msg){
//    System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");
//    throw new RuntimeException("抛出异常了");
//}@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(String msg){log.info("消费者收到了dlx.queue的消息:【" + msg +"】");
}
  1. 发送消息

在控制台中下面的这个属性是带过期时间的属性

image-20240524111749232

Java代码中的发送消息如下所示

@Test
void testSendTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration("10000") //过期时间10s.build();//发送到死信队列rabbitTemplate.convertAndSend("simple.direct", "hi", message);//直接向队列发送消息log.info("消息发送成功!");
}

simple.queue队列中10s内始终存在下面的一条消息

image-20240524112750348

dlx.queue队列的消费者在10s后会接收到消息

2.2 延迟消息插件

这种定时功能,都是有一定的性能损耗的(Redis除外)

MQ或者Spring的定时功能是在程序内部维护一个时钟,比如每隔一秒就往前跳一次,这种时钟的运行过程中CPU就需要不停地计算,定时任务越多,对于CPU的占用越大,定时任务属于一种CPU密集型的任务

采用延迟消息带来的弊端就是给服务器CPU造成的额外压力比较大

使用交换机实现延迟消息非常的繁琐,需要定义很多的交换机和队列,而且死信交换机的目的是为了让我们人工处理死信消息,并不是为了延迟消息而生的

延迟消息的插件能自动实现延迟效果

RabbitMQ官方也推出了一个插件,原生支持延迟消息功能

该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列

暂存的时间取决于发消息时配置的时间(也就是延迟时间)

在Java代码中配置延迟交换机的两种方式

在声明交换机的时候,需要多添加一个参数delayed=“true”

  • 注解的方式

    在消费者模块声明交换机、队列

@RabbitListener(bindings = @QueueBinding(//队列value = @Queue(name = "delay.queue", durable = "true"),//交换机exchange = @Exchange(name = "hmall.direct", delayed = "true"),//Routing keykey = "delay"
))
public void listenDelayMessage(String msg) {log.info("接收到delay.queue的延迟消息【" + msg + "】");
}
  • 注入Bean的方式

    这种方式只声明了交换机

@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct").delayed() //设置delay的属性为true 主要是这个.durable(true) //持久化.build();
}

发送延迟消息的Java代码

    @Testvoid testSendDelayMessage() {
//        Message message = MessageBuilder
//                .withBody("hello".getBytes(StandardCharsets.UTF_8))
//                .setExpiration("10000") //过期时间10s
//                .build();
//        //发送到死信队列rabbitTemplate.convertAndSend("dela.direct", "hi", "hello", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");//延迟十秒return message;}});//直接向队列发送消息log.info("消息发送成功!");}

2.3 取消超时订单

设置三十分钟后检测订单支付状态,存在两个问题

  • 如果并发比较高,30分钟可能堆积消息过多,对MQ的压力很大

  • 大多数订单在下单后1分钟内就会支付,但是却需要再MQ内等待30分钟,浪费资源

    30分钟太长,可以缩短为10s,10s后立刻来检查有没有支付

    假如10s后没有支付,可以再发一个10s的延迟消息,直到成功后不再发送延迟消息

    这样的话MQ的压力会减少很多

image-20240524162518090

处理如下所示:

查询支付状态的时候,需要先查询本地,之后再查询支付服务,查完之后判断支付状态

image-20240524164718756

定义延时消息时间数组

package com.hmall.common.domain;import com.hmall.common.utils.CollUtils;
import lombok.Data;import java.util.List;@Data
public class MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

定义好对应的交换机和队列

@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.order.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listenOrderPay(Long orderId) {/* // 1.查询订单Order order = orderService.getById(orderId);// 2.判断订单状态是否为未支付if(order == null || order.getStatus() != 1){// 订单不存在,或者状态异常return;}// 3.如果未支付,标记订单状态为已支付orderService.markOrderPaySuccess(orderId);*/// update order set status = 2 where id = ? AND status = 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();}
}

在方法中发送延迟检查订单状态的消息

    @Override@GlobalTransactionalpublic Long createOrder(OrderFormDTO orderFormDTO) {// 1.订单数据Order order = new Order();// 1.1.查询商品List<OrderDetailDTO> detailDTOS = orderFormDTO.getDetails();// 1.2.获取商品id和数量的MapMap<Long, Integer> itemNumMap = detailDTOS.stream().collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));Set<Long> itemIds = itemNumMap.keySet();// 1.3.查询商品List<ItemDTO> items = itemClient.queryItemByIds(itemIds);if (items == null || items.size() < itemIds.size()) {throw new BadRequestException("商品不存在");}// 1.4.基于商品价格、购买数量计算商品总价:totalFeeint total = 0;for (ItemDTO item : items) {total += item.getPrice() * itemNumMap.get(item.getId());}order.setTotalFee(total);// 1.5.其它属性order.setPaymentType(orderFormDTO.getPaymentType());order.setUserId(UserContext.getUser());order.setStatus(1);// 1.6.将Order写入数据库order表中save(order);// 2.保存订单详情List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);detailService.saveBatch(details);// 3.扣减库存try {itemClient.deductStock(detailDTOS);} catch (Exception e) {throw new RuntimeException("库存不足!");}// 4.清理购物车商品// cartClient.deleteCartItemByIds(itemIds);try {rabbitTemplate.convertAndSend(MqConstants.TRADE_EXCHANGE_NAME, MqConstants.ORDER_CREATE_KEY,itemIds/*,new RelyUserInfoMessageProcessor()*/);} catch (AmqpException e) {log.error("清理购物车的消息发送异常", e);}// 5.延迟检测订单状态消息try {MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,new DelayMessageProcessor(msg.removeNextDelay().intValue()));} catch (AmqpException e) {log.error("延迟消息发送异常!", e);}return order.getId();}

将MessagePostProcessMessage对象提取出来了,不用每次都new了

@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}

或者是使用下面视频里面的代码

但是下面的代码每次都要使用一个内部类

image-20240524174351318

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/78716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5.学习笔记-SpringMVC(P53-P60)

1.响应 &#xff08;1&#xff09;响应页面 &#xff08;2&#xff09;响应数据&#xff08;异步提交&#xff09;&#xff1a;文本数据、json数据 2.REST风格 (1)REST:表现形式状态转换。 (2)传统风格资源描述形式 3.Restful入门案例 5.基于RESTful页面数据…

Golang | 搜索表达式

// (( A | B | C ) & D ) | E & (( F | G ) & H )import "strings"// 实例化一个搜索表达式 func NewTermQuery(field, keyword string) *TermQuery {return &TermQuery{Keyword: &Keyword{Field: field, Word: keyword},} }func (tq *TermQuery…

LangChain构建大模型应用之RAG

RAG(Retrieval-augmented Generation 检索增强生成)是一种结合信息检索与生成模型的技术,通过动态整合外部知识库提升大模型输出的准确性和时效性。其核心思想是在生成答案前,先检索外部知识库中的相关信息作为上下文依据,从而突破传统生成模型的静态知识边界。 为什么我们…

Ubuntu 下 Nginx 1.28.0 源码编译安装与 systemd 管理全流程指南

一、环境与依赖准备 为确保编译顺利&#xff0c;我们首先更新系统并安装必要的编译工具和库&#xff1a; sudo apt update sudo apt install -y build-essential \libpcre3 libpcre3-dev \zlib1g zlib1g-dev \libssl-dev \wgetbuild-essential&#xff1a;提供 gcc、make 等基…

第十二章-PHP文件上传

第十二章-PHP文件上传 一&#xff0c;文件上传原理 一、HTTP协议与文件上传 1. 请求体结构 当表单设置enctype"multipart/form-data"时&#xff0c;浏览器会将表单数据编码为多部分&#xff08;multipart&#xff09;格式。 Boundary分隔符&#xff1a;随机生成的…

CSS元素动画篇:基于当前位置的变换动画(三)

基于当前位置的变换动画&#xff08;三&#xff09; 前言缩放效果类元素动画脉冲动画效果效果预览代码实现 橡皮筋动画效果效果预览代码实现 果冻动画效果效果预览代码实现 欢呼动画效果效果预览代码实现 心跳动画效果效果预览代码实现 结语 前言 CSS元素动画一般分为两种&…

Redis ssd是什么?Redis 内存空间优化的点都有哪些?embstr 和 row、intset、ziplist分别是什么?

Redis SSD 是什么&#xff1f; Redis SSD 通常指 Redis 使用 SSD&#xff08;固态硬盘&#xff09;作为持久化存储介质的场景。虽然 Redis 是内存数据库&#xff08;数据主要驻留内存&#xff09;&#xff0c;但其持久化机制&#xff08;如 RDB 快照和 AOF 日志&#xff09;需…

【蓝桥杯】 数字诗意

数字诗意 在诗人的眼中&#xff0c;数字是生活的韵律&#xff0c;也是诗意的表达。 小蓝&#xff0c;当代顶级诗人与数学家&#xff0c;被赋予了”数学诗人”的美誉。他擅长将冰冷的数字与抽象的诗意相融合&#xff0c;并用优雅的文字将数学之美展现于纸上。 某日&#xff0…

DHCP 服务器运行流程图

以常见的 DHCP v4 为例,其完整流程如下: 一、客户端请求 IP 地址阶段 DHCPDiscover:客户端启动后,会以广播的形式发送 DHCPDiscover 报文,目的是在网络中寻找可用的 DHCP 服务器。该报文中包含客户端的 MAC 地址等信息,以便服务器能够识别客户端。DHCPOffer:网络中的 D…

一种企业信息查询系统设计和实现:xujian.tech/cs

一种企业信息查询系统设计和实现&#xff1a;xujian.tech/cs 背景与定位 企业在对外合作、风控审查或市场调研时&#xff0c;常需快速获取公开的工商信息。本文介绍一个企业信息搜索引擎&#xff0c;面向普通用户与开发者&#xff0c;帮助快速定位企业名称、统一社会信用代码…

前端面试高频算法

前端面试高频算法 1 排序算法&#xff1b;1.1 如何分析一个排序算法1.1.1 执行效率3.1.2 内存消耗1.1.3 稳定性 1.2 冒泡排序&#xff08;Bubble Sort&#xff09;1.3 插入排序&#xff08;Insertion Sort&#xff09;1.4 选择排序&#xff08;Selection Sort&#xff09;1.5 归…

C++初阶-模板初阶

目录 1.泛型编程 2.函数模板 2.1函数模板概念 2.2实现函数模板 2.3模板的原理 2.4函数模板的实例化 2.4.1隐式实例化 2.4.2显式初始化 2.5模板参数的匹配原则 3.类模板 3.1类模板定义格式 3.2类模板的实例化 4.总结 1.泛型编程 对广泛的类型法写代码&#xff0c;我…

「Mac畅玩AIGC与多模态02」部署篇01 - 在 Mac 上部署 Ollama + Open WebUI

一、概述 本篇介绍如何在 macOS 环境下本地部署 Ollama 推理服务,并通过 Open WebUI 实现可视化交互界面。该流程无需 CUDA 或专用驱动,适用于 M 系列或 Intel 芯片的 Mac,便于快速测试本地大语言模型能力。 二、部署流程 1. 环境准备 安装 Homebrew(如尚未安装):/bin…

JavaScript 中 undefined 和 not defined 的区别

在 JavaScript 的调试过程中&#xff0c;你是否经常看到 undefined 却不知其来源&#xff1f;是否曾被 ReferenceError: xxx is not defined 的错误提示困扰&#xff1f;这两个看似相似的概念&#xff0c;实际上是 JavaScript 类型系统中最重要的分水岭。本文将带你拨开迷雾&am…

django admin AttributeError: ‘UserResorce‘ object has no attribute ‘ID‘

在 Django 中遇到 AttributeError: ‘UserResource’ object has no attribute ‘ID’ 这类错误通常是因为你在代码中尝试访问一个不存在的属性。在你的例子中&#xff0c;错误提示表明 UserResource 类中没有名为 ID 的属性。这可能是由以下几个原因造成的&#xff1a; 拼写错…

对鸿蒙 Next 系统“成熟论”的深度剖析-优雅草卓伊凡

对鸿蒙 Next 系统“成熟论”的深度剖析-优雅草卓伊凡 在科技飞速发展的当下&#xff0c;鸿蒙 Next 系统无疑成为了众多科技爱好者与行业人士关注的焦点。今日&#xff0c;卓伊凡便收到这样一个饶有趣味的问题&#xff1a;鸿蒙 Next 系统究竟需要多长时间才能完全成熟&#xff…

快速上手GO的net/http包,个人学习笔记

更多个人笔记&#xff1a;&#xff08;仅供参考&#xff0c;非盈利&#xff09; gitee&#xff1a; https://gitee.com/harryhack/it_note github&#xff1a; https://github.com/ZHLOVEYY/IT_note 针对GO中net/http包的学习笔记 基础快速了解 创建简单的GOHTTP服务 func …

AI-Browser适用于 ChatGPT、Gemini、Claude、DeepSeek、Grok的客户端开源应用程序,集成了 Monaco 编辑器。

一、软件介绍 文末提供程序和源码下载学习 AI-Browser适用于 ChatGPT、Gemini、Claude、DeepSeek、Grok、Felo、Cody、JENOVA、Phind、Perplexity、Genspark 和 Google AI Studio 的客户端应用程序&#xff0c;集成了 Monaco 编辑器。使用 Electron 构建的强大桌面应用程序&a…

Dify框架面试内容整理-Dify如何处理知识库的集成?

Dify 在知识库集成方面采用了“检索增强生成(RAG)”的技术架构,核心实现思路如下: 一、知识库集成的整体流程 Dify处理知识库集成通常包括以下关键步骤: 文档上传↓

Laravel 模型使用全局作用域和局部作用域

一. 需要解决什么问题 最近Laravel 项目中遇到一个需求&#xff0c;我有一个客户表&#xff0c;每个员工都有自己的客户&#xff0c;但是自己只能看自己的客户。 项目中&#xff0c;有很多功能需要查询客户列表&#xff0c;客户详情&#xff0c;查询客户入口很多&#xff0c;…