RabbitMQ高级特性2 、TTL、死信队列和延迟队列

MQ高级特性

1.削峰

设置 消费者

测试 添加多条消息

拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费

TTL

Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以在管理台新建队列、交换机,绑定

1.图形化操作

添加队列

添加交换机

将交换机和对应的队列进行绑定

时间结束 , 消息失效

2.代码实现

配置 生产者

@Configuration
public class TopicMqTtlConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name1}")private String QUEUENAME1;@Value("${mq.queue.name2}")private String QUEUENAME2;// 1// . 交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}// 2。 队列@Bean("queue1")public Queue getQueue1(){Queue queue = QueueBuilder.nonDurable(QUEUENAME1).withArgument("x-message-ttl",30000)//过期时间30秒.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃.build();return queue;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2).withArgument("x-message-ttl",300000000)//过期时间30秒.build();return queue2;}// 3. 交换机和队列进行绑定@Bean("binding1")public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs();return binding1;}@Bean("binding2")public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs();return binding2;}
}

测试

添加成功 ttl1只接收10条

时间过期

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机

消息在什么情况下会成为死信?(面试会问)

1.队列消息长度到最大的限制

最大的长度设置为10当第11条消息进来的时候就会成为死信

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)

设置消费者为手动签收的状态

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定交换机的方式是什么?

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

// 1.  交换机  :正常的交换机   死信交换机

// 2.队列  :正常的  死信

//3.绑定   正常ex - 正常的que

正常的que和死信交换机

死信ex-死信queue

2.代码实现

@Configuration
public class TopicMqDeadConfig {@Value("${mq1.exchange.name1}")private String EXCHANGENAME;@Value("${mq1.exchange.name2}")private String DEADEXCHANGE;@Value("${mq1.queue.name1}")private String QUEUENAME1;@Value("${mq1.queue.name2}")private String QUEUENAME2;// 声明正常交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}//  正常队列@Bean("queue1")public Queue getQueue1(){Queue queue = QueueBuilder.nonDurable(QUEUENAME1).withArgument("x-message-ttl",30000)//过期时间30秒.withArgument("x-dead-letter-exchange",DEADEXCHANGE).withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定//.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃.build();return queue;}// 交换机和队列进行绑定@Bean("binding1")public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();return binding1;}// 声明死信交换机@Bean("ex2")public Exchange getDeadExchange(){Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();return exchange;}//死信队列@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2).build();return queue2;}// 死信交换机和死信队列进行绑定@Bean("binding2")public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();return binding2;}}

测试

如果程序出现错误 拒绝签收

监听正常队列

发送消息 启动测试

总结:

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:

        1. 队列消息长度到达限制;

        2. 消费者拒接消费消息,并且不重回队列;

        3. 原队列存在消息过期设置,消息到达超时时间未被消费;

 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  • 1. 下单后,30分钟未支付,取消订单,回滚库存
  • 2. 新用户注册成功7天后,发送短信问候。

实现方式:

1. 定时器

2. 死信队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

1.配置

添加依赖

  <!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--nacos 配置中心--><!--配置中心--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!-- application  bootstrap  --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId></dependency><!-- nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.example</groupId><artifactId>sys-comm</artifactId><version>0.0.1-SNAPSHOT</version></dependency>

 

修改配置

2.代码实现

创建实体类

发送消息 测试

过期后放入死信队列

添加依赖

 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency>

将json数据转化为对象

获取成功

3.连接数据库

创建表

创建测试类
@RestController
@RequestMapping("order")
public class OrderController {@Value("${mq1.exchange.name1}")private String EXCHANGENAME;//@Resourceprivate RabbitTemplate rabbitTemplate;@GetMappingpublic Result aaa(TabOrder order){//1. 消息 存放到mq里面String s = JSONUtil.toJsonStr(order);// openfeign  --      数据添加到数据库里面rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s);return Result.success(s);}
}

监听normal
import javax.annotation.Resource;
@Component
public class XiaoFeng implements ChannelAwareMessageListener {@Resourceprivate TabOrderMapper orderMapper;@Override@RabbitListener(queues = "test_queue_normal")public void onMessage(Message message, Channel channel) throws Exception {//Thread.sleep(2000);// 20sbyte[] body = message.getBody();String s = new String(body);System.out.println(s);// 将字符串转化为 对象long deliveryTag = message.getMessageProperties().getDeliveryTag();try{TabOrder order = JSONUtil.toBean(s, TabOrder.class);// 将订单的信息 报讯到数据库里面int insert = orderMapper.insert(order);channel.basicAck(deliveryTag,true); //}catch(Exception e){//long deliveryTag, boolean multiple, boolean requeueSystem.out.println("拒绝签收消息");channel.basicNack(deliveryTag,true,false);// 死信消息}}
}

监听dead
@Component
public class YanChi implements ChannelAwareMessageListener {@Resourceprivate TabOrderMapper orderMapper;@Override@RabbitListener(queues = "test_queue_dead")public void onMessage(Message message, Channel channel) throws Exception {//Thread.sleep(2000);// 20sbyte[] body = message.getBody();String s = new String(body);System.out.println(s);// 将字符串转化为 对象long deliveryTag = message.getMessageProperties().getDeliveryTag();try{TabOrder order = JSONUtil.toBean(s, TabOrder.class);//  order 的状态TabOrder tabOrder = orderMapper.selectById(order.getId());if(tabOrder.getStatus()==1){// 取消tabOrder.setStatus(3);}orderMapper.updateById(tabOrder);channel.basicAck(deliveryTag,true); //}catch(Exception e){//long deliveryTag, boolean multiple, boolean requeueSystem.out.println("拒绝签收消息");channel.basicNack(deliveryTag,true,false);// 死信消息}}
}

测试

成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/181122.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linaro交叉编译工具链下载与使用笔记

笔记 文章目录 笔记确定目标 &#xff08;aarch64&#xff09;选择版本&#xff08;7.5&#xff09;选择目标&#xff08;aarch64-linux-gnu&#xff09;下载地址工具链&#xff08;gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu.tar.xz&#xff09;编译测试 &#xff08…

ICC2/innovus设置no 1x gap的方法

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 ICC2设置no 1x的方法如下: 1) set_placement_spacing_label -name X -lib_cells {*} -side right set_placement_spacing_label -name Y -lib_cells {*} -side left 2) set_placement_spacing_rul…

Vue2 若依框架头像上传 全部代码

<template><div><div class"user-info-head" click"editCropper()"><img v-bind:src"options.img" title"点击上传头像"class"img-circle img-lg" /></div><el-dialog :title"title&…

什么是高层设计 - 学习系统设计

高层设计或HLD指的是整体系统设计&#xff0c;包括系统架构和设计的描述&#xff0c;是一种通用的系统设计&#xff0c;包括&#xff1a; •系统架构•数据库设计•对系统、服务、平台和模块之间关系的简要描述。 高层设计或HLD也被称为宏观级别设计。 什么是高层设计文档&…

解锁 ElasticJob 云原生实践的难题

发生了什么 最近在逛 ElasticJob 官方社区时发现很多小伙伴都在头疼这个 ElasticJob 上云的问题&#xff0c;ElasticJob 本就号称分布式弹性任务调度框架&#xff0c;怎么在云原生环境就有了问题了呢&#xff0c;这就要从 Kubenertes 和 ElasticJob 的一些状态化说起。 有意思的…

1076 Forwards on Weibo (链接表层序遍历)

题意&#xff1a;给出关注列表&#xff0c;博主的粉丝会给博主点赞&#xff0c;粉丝的粉丝也会给博主点赞&#xff0c;一直递推到最多L层&#xff0c;求&#xff0c;最后会有多少人给博主点赞。 思路&#xff1a;将关注的粉丝用链接表存储&#xff0c;再对博主进行层序遍历&am…

2023年生肖在不同时间段的运势预测

随着信息技术的飞速发展&#xff0c;API已经成为了数据获取和交互的重要途径。很多网站和APP都在运用API来获取数据。今天我们来介绍一个十分有趣的API——《十二生肖运势预测API》&#xff0c;通过这个API&#xff0c;我们可以获取到每个生肖在不同时间段的运势预测&#xff0…

linux(2)之buildroot使用手册

Linux(2)之buildroot配置toolchain Author&#xff1a;Onceday Date&#xff1a;2023年11月27日 漫漫长路&#xff0c;才刚刚开始… 参考文档&#xff1a; Buildroot - Making Embedded Linux Easy 文章目录 Linux(2)之buildroot配置toolchain1. 构建配置1.1 配置config生成…

Java NIO SelectionKey

在 Java NIO&#xff08;New I/O&#xff09;中&#xff0c;SelectionKey 是与选择器 Selector 绑定的对象&#xff0c;用于表示通道 Channel 注册到选择器上的状态和事件。SelectionKey 提供了管理和操作通道的能力&#xff0c;可以监视通道的可读、可写、连接和接受事件&…

【LeetCode:1670. 设计前中后队列 | 数据结构设计】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

Python怎么在py文件中执行某个命令行,这个命令行是运行另外一个程序的命令,例如“python aa.py”

1、使用os.system os.system方法可以用来运行命令行命令。它比subprocess简单&#xff0c;但功能也更有限&#xff0c;不如subprocess那样灵活。 import oscommand "python properties_computer/on.py --input_datasets 12.csv" os.system(command) 2、通过“subp…

微信小程序仿网易严选(附精选源码32套,涵盖商城团购等)

商城主要实现的功能 首页、专题、分类、购物车、我的小程序授权登陆获取用户信息首页包含品牌制造页、品牌制造详情页面、新品首发页面、人气推荐页面、各分类列表商品详情页面&#xff0c;包含常见问题、大家都在看商品列表、加入购物车、收藏商品、立即购买、下订单、选择收…

WebDriverWait 等待

包的引用 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC 使用方法 WebDriverWait(driver, 5, 0.5).until(EC.vi…

C语言进阶指南(11)(指针数组与二维数组)

*欢迎来到博主的专栏——C语言进阶指南 博主id&#xff1a;reverie_ly 文章目录 N级指针指针数组指针数组与二维数组数组指针作为函数的参数 N级指针 指针变量是一个存放地址的变量&#xff0c;在C语言中&#xff0c;每个变量都会有一个地址值。所以指针变量也有一个地址。 …

高防服务器和高防CDN的区别是什么?

现今大环境下攻击问题愈发严峻&#xff0c;许多网站有遇到被攻击导致网站崩溃&#xff0c;资源消耗的问题&#xff0c;那么这时候高防就是给为站长&#xff0c;企业等的第一选择了&#xff0c;那边目前高防CDN和高防服务器这两种抵御DDoS攻击的两种主流防御&#xff0c;那种会更…

Mysql 高级日志binlog、undoLog、redoLog 详解

数据更新流程与日志记录&#xff1a; undoLog&#xff1a; binLog&#xff1a; redoLog&#xff1a;

influxdb2.x安装配置指南

influxdb的教程已经是很清楚了&#xff0c;但没有中文版翻译&#xff0c;以下是个人安装配置总结 如果机器上只需要一个influxdb实例&#xff0c;或docker安装&#xff0c;直接yum install 就可以了&#xff0c;或者采用离线安装&#xff1a; sudo yum localinstall influxdb…

本地MinIO存储服务通过Java程序结合Cpolar内网穿透进行远程访问

[本地MinIO存储服务通过Java程序结合Cpolar内网穿透进行远程访问] 前言 MinIO是一款高性能、分布式的对象存储系统&#xff0c;它可以100%的运行在标准硬件上&#xff0c;即X86等低成本机器也能够很好的运行MinIO。它的优点包括高性能、高可用性、易于部署和管理、支持多租户…

golang log模块使用

在Go中&#xff0c;log 包是用于输出日志信息的标准库。以下是一些基本的 log 包的使用示例&#xff1a; 基本用法&#xff1a; package mainimport ("log" )func main() {log.Println("This is a log message.") } 运行程序&#xff0c;你会在控制台看到日…

【古月居《ros入门21讲》学习笔记】16_tf坐标系广播与监听的编程实现

目录 说明&#xff1a; 1. 实现过程&#xff08;C&#xff09; 创建功能包&#xff08;C&#xff09; 创建tf广播器代码&#xff08;C&#xff09; 创建tf监听器代码&#xff08;C&#xff09; 配置tf监听器与广播器代码编译规则 编译并运行 编译 运行 2. 实现过程&a…