RabbitMQ spring boot TTL延时消费

关于延时消费主要分为两种实现,一种是rabbitmq的TTL机制,一种是rabbitmq的插件实现。

实现一:TTL

1、设置队列的过期时间
2、设置消息的过期时间

添加相关maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1、设置队列过期时间实现延时消费

交换机及队列配置

代码中有四个配置,
第一个配置的exchange是用来接收已过期的队列信息并进行重新分配队列进行消费,
第二个配置的repeatTradeQueue为exchange重新分配的队列名,
第三个是将repeatTradeQueue队列与exchange交换机绑定,并指定对应的routing key,
第四个配置的就是我们要设置过期时间的队列deadLetterQueue,
配置中有三个参数,x-message-ttl为过期时间,该队列所有消息的过期时间都为配置的这个值,单位为毫秒,这里设置过期时间为3秒,x-dead-letter-exchange是指过期消息重新转发到指定交换机,也就是exchange,x-dead-letter-routing-key是该交换机上绑定的routing-key,将通过配置的routing-key分配对应的队列,也就是前面配置的repeatTradeQueue。

import java.util.HashMap;
import java.util.Map;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class Application {//交换机用于重新分配队列@BeanDirectExchange exchange() {return new DirectExchange("exchange");}//用于延时消费的队列@Beanpublic Queue repeatTradeQueue() {Queue queue = new Queue("repeatTradeQueue",true,false,false);return queue; }//绑定交换机并指定routing key@Beanpublic Binding  repeatTradeBinding() {return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");}//配置死信队列@Beanpublic Queue deadLetterQueue() {Map<String,Object> args = new HashMap<>();args.put("x-message-ttl", 3000);args.put("x-dead-letter-exchange", "exchange");args.put("x-dead-letter-routing-key", "repeatTradeQueue");return new Queue("deadLetterQueue", true, false, false, args);}public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);}
}

配置生产者,这里生产者需要指定前面配置了过期时间的队列deadLetterQueue

import java.time.LocalDateTime;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class DeadLetterSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {System.out.println("DeadLetterSender 发送时间:"+LocalDateTime.now().toString()+" msg内容:"+msg);rabbitTemplate.convertAndSend("deadLetterQueue", msg);}}

配置消费者,消费者监听指定用于延时消费的队列repeatTradeQueue

import java.time.LocalDateTime;@Component
@RabbitListener(queues = "repeatTradeQueue")
public class RepeatTradeReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("repeatTradeQueue 接收时间:"+LocalDateTime.now().toString()+" 接收内容:"+msg);}}

写一个简单的接口调用测试延时消费是否成功

import org.springframework.beans.factory.annotation.Autowired;@RestController
@RequestMapping("/rabbit")
public class RabbitTest {@Autowiredprivate DeadLetterSender deadLetterSender;@GetMapping("/deadTest")public void deadTest() {deadLetterSender.send("队列设置过期时间测试");}}

2、设置消息过期时间实现延时消费

先贴上配置的代码,基本配置都一样,唯一的区别是deadLetterQueue的过期时间这里不做配置,需要注意的是,因为我这里用的是同一个队列名,所以即使将队列过期时间配置删除,mq中该队列过期时间仍然还是存在的,所以需要删除该队列,启动项目时才能重新配置该队列属性,可能可以通过配置的方式重新覆盖属性配置,当然也可以保留队列过期时间的配置,当两个过期时间都存在时,消息取更小的过期时间。

import java.util.HashMap;@SpringBootApplication
public class Application {//用于死信队列转发的交换机@BeanDirectExchange exchange() {return new DirectExchange("exchange");}//用于延时消费的队列@Beanpublic Queue repeatTradeQueue() {Queue queue = new Queue("repeatTradeQueue",true,false,false);return queue; }//绑定交换机并指定routing key@Beanpublic Binding  repeatTradeBinding() {return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");}//配置死信队列@Beanpublic Queue deadLetterQueue() {Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "exchange");args.put("x-dead-letter-routing-key", "repeatTradeQueue");return new Queue("deadLetterQueue", true, false, false, args);}public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);}
}

配置生产者,message的expiration就是过期时间的设置,单位也是毫秒

import java.time.LocalDateTime;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class DeadLetterSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg, long times) {System.out.println("DeadLetterSender 发送时间:" + LocalDateTime.now().toString() + " msg内容:" + msg);MessagePostProcessor processor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(times + "");return message;}};rabbitTemplate.convertAndSend("deadLetterQueue", (Object)msg, processor);}
}

消费者不变,用之前的类即可

稍微修改一下接口,设置时间为5秒

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import me.miaobo.mq.sender.DeadLetterSender;@RestController
@RequestMapping("/rabbit")
public class RabbitTest {@Autowiredprivate DeadLetterSender deadLetterSender;@GetMapping("/deadTest")public void deadTest() {deadLetterSender.send("消息设置过期时间测试",5000);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/2853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【信号处理】基于CNN的心电(ECG)信号分类典型方法实现(tensorflow)

关于 本实验使用1维卷积神经网络实现心电信号的5分类。由于数据类别不均衡&#xff0c;这里使用典型的上采样方法&#xff0c;实现数据类别的均衡化处理。 工具 方法实现 数据加载 Read the CSV file datasets: NORMAL_LABEL0 , ABNORMAL_LABEL1,2,3,4,5 ptbdb_abnormalpd.…

C++:函数符(一)

正文 函数对象也叫函数符&#xff0c;函数符是可以以函数方式与()结合使用的任意对象。这包括函数名、指向函数的指针和重载了()运算符的类对象。 上面这句话的意思是指&#xff1a;函数名、指向函数的指针和重载了括号运算符的类对象与括号结合&#xff0c;从而以函数方式实…

【行为型模式】解释器模式

一、解释器模式概述 解释器模式定义&#xff1a;给分析对象定义一个语言&#xff0c;并定义该语言的文法表示&#xff0c;再设计一个解析器来解释语言中的句子。也就是说&#xff0c;用编译语言的方式来分析应用中的实例。这种模式实现了文法表达式处理的接口&#xff0c;该接口…

python高级进阶

目录 一、str字符串 1. 字符串定义 2. 获取字符串中元素 3. 遍历字符串 4. 字符串常见操作 二、set集合 1. 集合的创建 2. 遍历集合中的元素 3. 集合中添加元素 4. 集合删除元素 三、字典 1. 字典的定义 2. 字典的特点 3. 字典增删改查 4. 字典遍历 四、slice切片…

【经验总结】Ubuntu 源代码方式安装 Microsoft DeepSpeed

1. 背景介绍 使用 DeepSpeed 在多服务器上分布式训练大模型 2. 安装方法 2.1 查看显卡参数 ~$ CUDA_VISIBLE_DEVICES0 python -c "import torch; print(torch.cuda.get_device_capability())" (8, 0) ~$ CUDA_VISIBLE_DEVICES0 python -c "import torch; pr…

python常见语法

变量赋值&#xff1a; my_var 10 基本数据类型&#xff1a; 整数&#xff08;int&#xff09;、浮点数&#xff08;float&#xff09;、字符串&#xff08;str&#xff09;、布尔值&#xff08;bool&#xff09;、列表&#xff08;list&#xff09;、元组&#xff08;tuple&…

代码随想录第三十天|无重叠区间| 划分字母区间| 合并区间

今天三道都是重叠区间问题&#xff0c;重叠区间问题第一步就是先对数组进行排序&#xff0c;才能进行后续操作。 无重叠区间 这一题和昨天的最少多少支箭射爆气球的解法是相同的&#xff0c;判断相邻区间是否重叠&#xff0c;若两个区间重叠了则找出重叠区间最小右边界&#…

怎样用PHP语言实现远程控制三路开关

怎样用PHP语言实现远程控制三路开关呢&#xff1f; 本文描述了使用PHP语言调用HTTP接口&#xff0c;实现控制三路开关&#xff0c;三路开关可控制三路照明、排风扇等电器。 可选用产品&#xff1a;可根据实际场景需求&#xff0c;选择对应的规格 序号设备名称厂商1智能WiFi墙…

C++:模板(初级)

hello&#xff0c;各位小伙伴&#xff0c;本篇文章跟大家一起学习《C&#xff1a;模板&#xff08;初级&#xff09;》&#xff0c;感谢大家对我上一篇的支持&#xff0c;如有什么问题&#xff0c;还请多多指教 &#xff01; 如果本篇文章对你有帮助&#xff0c;还请各位点点赞…

Docker容器搭建Hadoop集群(hadoop-3.1.3)

Docker容器环境下搭建Hadoop集群&#xff08;完全分布式&#xff09; hadoop版本为hadoop-3.1.3 &#xff08;1&#xff09;安装额外的速度较快的镜像库 yum install -y epel-release &#xff08;2&#xff09;安装同步工具&#xff0c;方便在多台服务器上进行文件的传输 …

Oracle Hint 语法详解

什么是Hint Hint 是 Oracle 提供的一种 SQL 语法&#xff0c;它允许用户在 SQL 语句中插入相关的语法&#xff0c;从而影响 SQL 的执行方式。 因为 Hint 的特殊作用&#xff0c;所以对于开发人员不应该在代码中使用它&#xff0c;Hint 更像是 Oracle 提供给 DBA 用来分析诊断问…

从零学算法377

377. 组合总和 Ⅳ 给你一个由 不同 整数组成的数组 nums &#xff0c;和一个目标整数 target 。请你从 nums 中找出并返回总和为 target 的元素组合的个数。 题目数据保证答案符合 32 位整数范围。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3], target 4 输出&#xff…

【操作系统】进程同步(水果盘问题)使用Python多线程threading实现

一、进程同步&#xff08;水果盘问题&#xff09; 1、吃水果问题&#xff1a;桌子有一只盘子&#xff0c;只允许放一个水果&#xff0c;父亲专向盘子放苹果&#xff0c;母亲专向盘子放桔子 儿子专等吃盘子的桔子&#xff0c;女儿专等吃盘子的苹果。只要盘子为空&#xff0c;父…

QA测试开发工程师面试题满分问答20: 软件的安全性应从哪几个方面去测试?

软件的安全性测试应从多个方面进行&#xff0c;并确保覆盖以下关键方面&#xff1a; 当回答问题时&#xff0c;可以根据自己的经验和知识&#xff0c;从上述要点中选择适合的方面进行详细说明。强调测试的综合性、全面性和持续性&#xff0c;并强调测试的重要性以及如何与开发团…

IDEA最好用插件推荐

1 背景 俗话说&#xff1a;“工欲善其事必先利其器”&#xff0c;本问介绍几款强大实用的 IDEA 插件&#xff0c;助力大家开发。 希望大家做一个聪明又努力的人&#xff0c;而不只是一个努力的人。 以下插件大都可以通过 IDEA 自带的插件管理中心安装&#xff0c;如果搜不到可以…

python字典和集合

字典&#xff08;Dictionary&#xff09; 键值对&#xff1a;字典存储键值对&#xff08;key-value pairs&#xff09;&#xff0c;其中键&#xff08;key&#xff09;是唯一的&#xff0c;而值&#xff08;value&#xff09;可以是任何数据类型。可变&#xff1a;字典是可变的…

恶补《操作系统》2_1——王道学习笔记

2操作系统-进程 2.1_1 进程的定义、组成、组织方式、特征 组成&#xff1a;PCB&#xff08;进程存在唯一的标志&#xff09;&#xff0c;程序段&#xff0c;数据段 组织方式&#xff1a;链接方式&#xff0c;指针指向不同的队列&#xff1b;索引方式&#xff0c;索引表 特征…

Uptime Kuma 使用指南:一款简单易用的站点监控工具

我平时的工作会涉及到监控&#xff0c;而站点是一个很重要的监控项。项目上线后&#xff0c;我们通常会将站点监控配置到云平台上&#xff0c;以检测各站点的连通性。但随着项目不断增多&#xff0c;云平台上的配额就有点捉急了。针对这个情况&#xff0c;我们可以试试这个开源…

使用H5+app在安卓5.1离线环境实现文字转语音

在Vue中实现中文文字转语音的方法可以使用HTML5的SpeechSynthesis API,同时需要考虑到在H5+ App里面的离线环境。 在配置文件中正确引入plus库: <script src="http://www.dcloud.io/helloh5plus/api.js"></script> 在Vue组件中使用SpeechSynthesi…

设计模式——状态模式19

状态模式是一种行为设计模式&#xff0c; 允许一个对象在其内部状态改变时改变它的行为&#xff0c;对象看起来好像修改了它的类。状态模式的核心是状态与行为绑定&#xff0c;不同的状态对应不同的行为。 设计模式&#xff0c;一定要敲代码理解 状态行为抽象 //在某种状态下&…