延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。

下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。

1. 添加 RabbitMQ 依赖:

在你的pom.xml中加入Spring Boot对RabbitMQ的Starter:

xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置队列、交换器、绑定和容器

我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:

java

@Configuration
public class RabbitMQConfig {/* 正常队列配置 */@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_exchange");  // 队列消息过期后发送的交换器args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键return new Queue("order_queue", true, false, false, args);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");}/* 死信队列配置 */@Beanpublic Queue deadQueue() {return new Queue("dead_queue");}@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_exchange");}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}
}

3. 发送延时消息

创建订单时,我们发送一个延时消息到队列:

java

@Autowired
private RabbitTemplate rabbitTemplate;public void createOrder(String orderId) {//订单创建业务...rabbitTemplate.convertAndSend("order_exchange", "order", orderId, message -> {message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟return message;});
}

4. 消费死信队列中的消息

然后,我们需要消费死信队列中的消息,进行订单取消的操作:

java

@RabbitListener(queues = "dead_queue")
public void processDeadLetter(String orderId) {// 订单取消业务...
}
  1. 在RabbitMQ中设置消息的过期时间
    RabbitMQ允许你在两个级别上设置消息的过期时间:队列级别和消息级别。一旦消息过期,它将从队列中删除。
  • 队列级别:你可以在声明队列时通过"x-message-ttl"参数设置过期时间(以毫秒为单位)。这会影响队列中的所有消息。

java

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置60s过期时间
Channel channel = ...;
channel.queueDeclare("my_queue", false, false, false, args); 
  • 消息级别:你也可以在发布消息时单独为每条消息设置过期时间。如果队列级别的TTL和消息级别的TTL都被设置了,那么较小的那个值会被应用。

java

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
Channel channel = ...;
channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);

5、RabbitMQ中的死信队列如何工作?

“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。

当以下情况发生时,消息会被投递到死信队列:

  • 消息被拒绝 (basic.reject / basic.nack),并且requeue=false。
  • 在队列中排队的时间过长(超过设置的TTL)。
  • 队列长度溢出(超过设置的最大长度)。

在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。

 6、如何确保消费者在消费消息时不会发生重复消费的情况?

确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:

  • 消息确认机制:RabbitMQ提供了消息确认机制ACK,消费者处理完消息后,返回一个ACK信号,告诉RabbitMQ可以将该消息从队列中删除。如果在处理消息的过程中消费者出现异常(如断开连接),RabbitMQ将不会删除队列中的消息,并且会再次尝试发送该消息给消费者。
  • 幂等操作:在消费者端,确保消息处理逻辑是幂等的,也就是说,无论消息被处理一次还是多次,结果都是相同的。例如,对于一个扣款操作,无论这个操作执行多少次,账户的余额都应该只被扣除一次。
  • 分布式锁:在处理消息之前,消费者首先尝试获取一个分布式锁。只有获取到锁的消费者才能处理消息。处理完毕后释放锁。避免了同一消息被多个消费者处理的情况。

7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicPropertiespriority字段来指定消息的优先级。如下:

java

// 声明队列时设置最大优先级
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my_queue", false, false, false, args);// 发布消息时设置优先级
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", "my_queue", props, "Hello world".getBytes());

注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。

8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck参数为true(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/792891.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51入门之LED

目录 1.配置文件 2.点亮一个LED 2.1单个端口操作点亮单个LED 2.2整体操作点亮LED 3.LED闪烁 4.LED实现流水灯 4.1使用for循环和移位实现 4.1.1移位操作符 4.1.2使用移位操作和for循环实现 4.2使用移位函数实现LED流水灯 众所周知&#xff0c;任何一个硬件工程师…

基于机器学习的木马检测模型的设计与实现(论文)_kaic

摘 要 科技的发展带来了人们生活的改变&#xff0c;近年来我国网民已突破十亿人口&#xff0c; 而且在后疫 情时代&#xff0c; 经历了疫情时期的一系列线上活动&#xff0c; 人们对网络的依赖比以往任何时期都要高 得多。高频次的上网行为也带来了一系列安全问题&#xff…

【C++风云录】从SQLite到Redis:探索C++与多种数据库的交互之道

开启数据库之旅&#xff1a;通过C与各种数据库交互&#xff0c;事半功倍 数据库操作&#xff1a;介绍与应用 前言 在现代软件开发中&#xff0c;数据库扮演着至关重要的角色&#xff0c;用于存储和管理大量的数据。为了更有效地操作数据库&#xff0c;开发人员常常依赖于专门…

金融数据_PySpark-3.0.3随机森林(RandomForestClassifier)实例

金融数据_PySpark-3.0.3随机森林(RandomForestClassifier)实例 随机森林 (Random Forest) 和随机森林回归 (Random Forest Regression) 都是基于集成学习的算法, 但它们在任务和输出方面存在一些关键的区别。 随机森林 (Random Forest)&#xff1a; 任务类型: 随机森林主要用…

Cisco路由器配置IPv6 Manual隧道

Cisco路由器配置IPv6 Manual隧道 IPv6与IPv4共存的方式 IPv6与IPv4共存方式大致有三种&#xff1a; 双栈&#xff1a;要求网络中所有设备均同时支持IPv4和IPv6转换&#xff1a;转换这种方式将IPv6协议的报头转换成IPv4协议报头。隧道&#xff1a;假定两个IPv6节点要使用IPv6…

flink源码编译-job提交

1、启动standalone集群的taskmanager standalone集群中的taskmanager启动类为 TaskManagerRunner 2 打开master启动类 通过 ctrln快捷键&#xff0c;找到、并打开类&#xff1a; org.apache.flink.runtime.taskexecutor.TaskManagerRunner 3 修改运⾏配置 基本完全按照mas…

RabbitMQ在云原生环境中部署和应用实践

一、RabbitMQ和云原生技术的关系 RabbitMQ是一种开源的、实现了先进的消息队列协议&#xff08;AMQP&#xff09;的消息队列软件。而云原生技术就是为在公共云、私有云以及其他各种云环境提供应用的一种方法。RabbitMQ和云原生技术在分布式系统和微服务架构中都起到了关键作用…

NIUSHOP完美运营版商城 虚拟商品全功能商城 全能商城小程序 智慧商城系统 全品类百货商城

完美运营版商城/拼团/团购/秒杀/积分/砍价/实物商品/虚拟商品等全功能商城 干干净净 没有一丝多余收据 还没过手其他站 还没乱七八走的广告和后门 后台可以自由拖曳修改前端UI页面 还支持虚拟商品自动发货等功能 挺不错的一套源码 前端UNIAPP 后端PHP 一键部署版本 源码免费…

腾讯云4核8G服务器性能怎么样?能用来干什么?

腾讯云4核8G服务器多少钱&#xff1f;腾讯云4核8G轻量应用服务器12M带宽租用价格646元15个月&#xff0c;活动页面 txybk.com/go/txy 活动链接打开如下图所示&#xff1a; 腾讯云4核8G服务器优惠价格 这台4核8G服务器是轻量应用服务器&#xff0c;详细配置为&#xff1a;轻量4核…

ros小问题之rosdep update time out问题

在另外一篇ROS 2边学边练系列的文章里有写碰到这种问题的解决方法&#xff08;主要参考了其他博主的文章&#xff0c;只是针对ROS 2做了些修改调整&#xff09;&#xff0c;此处单拎出来方便查找。 在ROS 2中执行rosdep update时&#xff0c;报出如下错误&#xff1a; 其实原因…

elsint报错Delete `␍`eslintprettier/prettier

一&#xff0c;原因 这篇博客写得很清楚&#xff1a;解决VSCode Delete ␍eslint(prettier/prettier)错误_vscode 删除cr-CSDN博客 还有这篇文章&#xff0c;解决办法很详细&#xff1a;滑动验证页面 二&#xff0c;解决办法 根目录下新建.prettierrc.js文件 module.exports…

谷歌AI搜索革新:探索高级搜索服务背后的未来趋势

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

vulhub中Apache Solr RemoteStreaming 文件读取与SSRF漏洞复现

Apache Solr 是一个开源的搜索服务器。在Apache Solr未开启认证的情况下&#xff0c;攻击者可直接构造特定请求开启特定配置&#xff0c;并最终造成SSRF或任意文件读取。 访问http://your-ip:8983即可查看Apache Solr后台 1.访问http://your-ip:8983/solr/admin/cores?indexI…

一致性hash问题(负载均衡原理)

一致性哈希问题 简介 一致性Hash是一种特殊的Hash算法&#xff0c;由于其均衡性、持久性的映射特点&#xff0c;被广泛的应用于负载均衡领域&#xff0c;如nginx和memcached都采用了一致性Hash来作为集群负载均衡的方案。 本文将介绍一致性Hash的基本思路&#xff0c;并讨论其…

gpt国内怎么用?最新版本来了

claude 3 opus面世后&#xff0c;这几天已经有许多应用&#xff0c;而其精确以及从不偷懒&#xff08;截止到2024年3月11日还没有偷懒&#xff09;的个性&#xff0c;也使得我们可以用它来首次完成各种需要多轮对话的尝试。 今天我们想要进行的一项尝试就是—— 如何从一个不知…

Python Django ORM使用简单的增,删,改,查

Django ORM&#xff08;对象关系映射&#xff09;是Django框架中用于与数据库交互的一个核心组件。它提供了一种方便、直观的方式来定义、查询和操作数据库中的数据。 1. 定义模型&#xff08;Model&#xff09; 首先&#xff0c;你需要通过定义模型来告诉Django你的数据应该…

正则表达式完全指南:语法、用法及JavaScript实例

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

生成器、迭代器、可迭代对象

生成器、迭代器、可迭代对象 生成器 函数体中包含yield关键字的就是生成器 把生成 器传给 next(...) 函数时&#xff0c;生成器函数会向前&#xff0c;执行函数定义体中的 下一个 yield 语句&#xff0c;返回产出的值&#xff0c;并在函数定义体的当前位置暂停。等到再次遇到n…

深度学习与神经网络:从基础到前沿

深度学习与神经网络是人工智能领域中的重要分支&#xff0c;其应用范围涵盖图像识别、语音识别、自然语言处理等多个领域&#xff0c;对于推动人工智能技术的发展具有重要意义。本文将从深度学习的基础原理开始&#xff0c;逐步探讨神经网络的结构、训练方法&#xff0c;以及在…

认识 Redis 与 分布式

Redis 官网页面 Redis官网链接 Redis 的简介 Redis 是一个在内存中存储数据的中间件 一方面用于作为数据库&#xff0c;另一方面用于作为数据缓存&#xff0c;适用于分布式系统中 Redis 基于网络&#xff0c;进行进程间通信&#xff0c;把自己内存中的变量给别的进程&#xf…