RabbitMQ高级特性2 、TTL、死信队列和延迟队列

MQ高级特性

1.削峰

设置 消费者

测试 添加多条消息

拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费

TTL

Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以在管理台新建队列、交换机,绑定

1.图形化操作

添加队列

添加交换机

将交换机和对应的队列进行绑定

时间结束 , 消息失效

2.代码实现

配置 生产者

@Configuration
public class TopicMqTtlConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name1}")private String QUEUENAME1;@Value("${mq.queue.name2}")private String QUEUENAME2;// 1// . 交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}// 2。 队列@Bean("queue1")public Queue getQueue1(){Queue queue = QueueBuilder.nonDurable(QUEUENAME1).withArgument("x-message-ttl",30000)//过期时间30秒.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃.build();return queue;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2).withArgument("x-message-ttl",300000000)//过期时间30秒.build();return queue2;}// 3. 交换机和队列进行绑定@Bean("binding1")public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs();return binding1;}@Bean("binding2")public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs();return binding2;}
}

测试

添加成功 ttl1只接收10条

时间过期

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机

消息在什么情况下会成为死信?(面试会问)

1.队列消息长度到最大的限制

最大的长度设置为10当第11条消息进来的时候就会成为死信

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)

设置消费者为手动签收的状态

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定交换机的方式是什么?

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

// 1.  交换机  :正常的交换机   死信交换机

// 2.队列  :正常的  死信

//3.绑定   正常ex - 正常的que

正常的que和死信交换机

死信ex-死信queue

2.代码实现

@Configuration
public class TopicMqDeadConfig {@Value("${mq1.exchange.name1}")private String EXCHANGENAME;@Value("${mq1.exchange.name2}")private String DEADEXCHANGE;@Value("${mq1.queue.name1}")private String QUEUENAME1;@Value("${mq1.queue.name2}")private String QUEUENAME2;// 声明正常交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}//  正常队列@Bean("queue1")public Queue getQueue1(){Queue queue = QueueBuilder.nonDurable(QUEUENAME1).withArgument("x-message-ttl",30000)//过期时间30秒.withArgument("x-dead-letter-exchange",DEADEXCHANGE).withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定//.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃.build();return queue;}// 交换机和队列进行绑定@Bean("binding1")public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();return binding1;}// 声明死信交换机@Bean("ex2")public Exchange getDeadExchange(){Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();return exchange;}//死信队列@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2).build();return queue2;}// 死信交换机和死信队列进行绑定@Bean("binding2")public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();return binding2;}}

测试

如果程序出现错误 拒绝签收

监听正常队列

发送消息 启动测试

总结:

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:

        1. 队列消息长度到达限制;

        2. 消费者拒接消费消息,并且不重回队列;

        3. 原队列存在消息过期设置,消息到达超时时间未被消费;

 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  • 1. 下单后,30分钟未支付,取消订单,回滚库存
  • 2. 新用户注册成功7天后,发送短信问候。

实现方式:

1. 定时器

2. 死信队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

1.配置

添加依赖

  <!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--nacos 配置中心--><!--配置中心--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!-- application  bootstrap  --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId></dependency><!-- nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.example</groupId><artifactId>sys-comm</artifactId><version>0.0.1-SNAPSHOT</version></dependency>

 

修改配置

2.代码实现

创建实体类

发送消息 测试

过期后放入死信队列

添加依赖

 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency>

将json数据转化为对象

获取成功

3.连接数据库

创建表

创建测试类
@RestController
@RequestMapping("order")
public class OrderController {@Value("${mq1.exchange.name1}")private String EXCHANGENAME;//@Resourceprivate RabbitTemplate rabbitTemplate;@GetMappingpublic Result aaa(TabOrder order){//1. 消息 存放到mq里面String s = JSONUtil.toJsonStr(order);// openfeign  --      数据添加到数据库里面rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s);return Result.success(s);}
}

监听normal
import javax.annotation.Resource;
@Component
public class XiaoFeng implements ChannelAwareMessageListener {@Resourceprivate TabOrderMapper orderMapper;@Override@RabbitListener(queues = "test_queue_normal")public void onMessage(Message message, Channel channel) throws Exception {//Thread.sleep(2000);// 20sbyte[] body = message.getBody();String s = new String(body);System.out.println(s);// 将字符串转化为 对象long deliveryTag = message.getMessageProperties().getDeliveryTag();try{TabOrder order = JSONUtil.toBean(s, TabOrder.class);// 将订单的信息 报讯到数据库里面int insert = orderMapper.insert(order);channel.basicAck(deliveryTag,true); //}catch(Exception e){//long deliveryTag, boolean multiple, boolean requeueSystem.out.println("拒绝签收消息");channel.basicNack(deliveryTag,true,false);// 死信消息}}
}

监听dead
@Component
public class YanChi implements ChannelAwareMessageListener {@Resourceprivate TabOrderMapper orderMapper;@Override@RabbitListener(queues = "test_queue_dead")public void onMessage(Message message, Channel channel) throws Exception {//Thread.sleep(2000);// 20sbyte[] body = message.getBody();String s = new String(body);System.out.println(s);// 将字符串转化为 对象long deliveryTag = message.getMessageProperties().getDeliveryTag();try{TabOrder order = JSONUtil.toBean(s, TabOrder.class);//  order 的状态TabOrder tabOrder = orderMapper.selectById(order.getId());if(tabOrder.getStatus()==1){// 取消tabOrder.setStatus(3);}orderMapper.updateById(tabOrder);channel.basicAck(deliveryTag,true); //}catch(Exception e){//long deliveryTag, boolean multiple, boolean requeueSystem.out.println("拒绝签收消息");channel.basicNack(deliveryTag,true,false);// 死信消息}}
}

测试

成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/181122.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linaro交叉编译工具链下载与使用笔记

笔记 文章目录 笔记确定目标 &#xff08;aarch64&#xff09;选择版本&#xff08;7.5&#xff09;选择目标&#xff08;aarch64-linux-gnu&#xff09;下载地址工具链&#xff08;gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu.tar.xz&#xff09;编译测试 &#xff08…

ICC2/innovus设置no 1x gap的方法

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 ICC2设置no 1x的方法如下: 1) set_placement_spacing_label -name X -lib_cells {*} -side right set_placement_spacing_label -name Y -lib_cells {*} -side left 2) set_placement_spacing_rul…

Vue2 若依框架头像上传 全部代码

<template><div><div class"user-info-head" click"editCropper()"><img v-bind:src"options.img" title"点击上传头像"class"img-circle img-lg" /></div><el-dialog :title"title&…

什么是高层设计 - 学习系统设计

高层设计或HLD指的是整体系统设计&#xff0c;包括系统架构和设计的描述&#xff0c;是一种通用的系统设计&#xff0c;包括&#xff1a; •系统架构•数据库设计•对系统、服务、平台和模块之间关系的简要描述。 高层设计或HLD也被称为宏观级别设计。 什么是高层设计文档&…

解锁 ElasticJob 云原生实践的难题

发生了什么 最近在逛 ElasticJob 官方社区时发现很多小伙伴都在头疼这个 ElasticJob 上云的问题&#xff0c;ElasticJob 本就号称分布式弹性任务调度框架&#xff0c;怎么在云原生环境就有了问题了呢&#xff0c;这就要从 Kubenertes 和 ElasticJob 的一些状态化说起。 有意思的…

linux(2)之buildroot使用手册

Linux(2)之buildroot配置toolchain Author&#xff1a;Onceday Date&#xff1a;2023年11月27日 漫漫长路&#xff0c;才刚刚开始… 参考文档&#xff1a; Buildroot - Making Embedded Linux Easy 文章目录 Linux(2)之buildroot配置toolchain1. 构建配置1.1 配置config生成…

【LeetCode:1670. 设计前中后队列 | 数据结构设计】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

微信小程序仿网易严选(附精选源码32套,涵盖商城团购等)

商城主要实现的功能 首页、专题、分类、购物车、我的小程序授权登陆获取用户信息首页包含品牌制造页、品牌制造详情页面、新品首发页面、人气推荐页面、各分类列表商品详情页面&#xff0c;包含常见问题、大家都在看商品列表、加入购物车、收藏商品、立即购买、下订单、选择收…

C语言进阶指南(11)(指针数组与二维数组)

*欢迎来到博主的专栏——C语言进阶指南 博主id&#xff1a;reverie_ly 文章目录 N级指针指针数组指针数组与二维数组数组指针作为函数的参数 N级指针 指针变量是一个存放地址的变量&#xff0c;在C语言中&#xff0c;每个变量都会有一个地址值。所以指针变量也有一个地址。 …

高防服务器和高防CDN的区别是什么?

现今大环境下攻击问题愈发严峻&#xff0c;许多网站有遇到被攻击导致网站崩溃&#xff0c;资源消耗的问题&#xff0c;那么这时候高防就是给为站长&#xff0c;企业等的第一选择了&#xff0c;那边目前高防CDN和高防服务器这两种抵御DDoS攻击的两种主流防御&#xff0c;那种会更…

Mysql 高级日志binlog、undoLog、redoLog 详解

数据更新流程与日志记录&#xff1a; undoLog&#xff1a; binLog&#xff1a; redoLog&#xff1a;

本地MinIO存储服务通过Java程序结合Cpolar内网穿透进行远程访问

[本地MinIO存储服务通过Java程序结合Cpolar内网穿透进行远程访问] 前言 MinIO是一款高性能、分布式的对象存储系统&#xff0c;它可以100%的运行在标准硬件上&#xff0c;即X86等低成本机器也能够很好的运行MinIO。它的优点包括高性能、高可用性、易于部署和管理、支持多租户…

【古月居《ros入门21讲》学习笔记】16_tf坐标系广播与监听的编程实现

目录 说明&#xff1a; 1. 实现过程&#xff08;C&#xff09; 创建功能包&#xff08;C&#xff09; 创建tf广播器代码&#xff08;C&#xff09; 创建tf监听器代码&#xff08;C&#xff09; 配置tf监听器与广播器代码编译规则 编译并运行 编译 运行 2. 实现过程&a…

基于STC12C5A60S2系列1T 8051单片机的IIC总线器件24C02记录单片机上电次数应用

基于STC12C5A60S2系列1T 8051单片机的IIC总线器件24C02记录单片机上电次数应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍液晶显示器LCD1602简单介绍IIC通信简单介…

微信小程序生成二维码并保存到本地方法

微信小程序生成二维码请保存到本地方法 官方weapp-qrcode插件 github链接 功能完成样子 wxml <view class"qrcode"><canvas style"width: 275px; height: 275px;" canvas-idmyQrcode></canvas> </view> <view class" …

day66

今日回顾内容 web框架 django 路由控制 视图层 web框架 一、什么是web框架 Web框架&#xff08;Web framework&#xff09;是一种开发框架&#xff0c;用来支持动态网站、网络应用和网络服务的开发。这大多数的web框架提供了一套开发和部署网站的方式&#xff0c;也为web行…

resty-http库爬虫程序代码示例

lua -- 导入需要的库 local http require "resty.http" local io require "io" -- 创建一个客户端 local client http.new() -- 设置HTTP客户端的 client:set_proxy(proxy_host, proxy_port) -- 执行HTTP GET请求&#xff0c;获取网页内容 local res…

C语言数据结构之顺序表(上)

前言&#xff1a; ⭐️此篇博文主要分享博主在学习C语言的数据结构之顺序表的知识点时写的笔记&#xff0c;若有错误&#xff0c;还请佬指出&#xff0c;一定感谢&#xff01;制作不易&#xff0c;若觉得内容不错可以点赞&#x1f44d;收藏❤️&#xff0c;这是对博主最大的认可…

source: command not found错误的解决方法

偶遇的一个问题&#xff0c;因为在网上没有找到对应的解决办法&#xff0c;可能是属于个案&#xff0c;在此记录备忘&#xff0c;同时供大家参考。 问题现象&#xff1a; 执行命令 source /etc/profile时报错&#xff1a; bash: “source: command not found... 问题定位和…

内衣洗衣机和手洗哪个干净?小型洗衣机质量排名

这两年内衣洗衣机可以称得上较火的小电器&#xff0c;小小的身躯却有大大的能力&#xff0c;一键可以同时启动洗、漂、脱三种全自动为一体化功能&#xff0c;在多功能和性能的提升上&#xff0c;还可以解放我们双手的同时将衣物给清洗干净&#xff0c;让越来越多小伙伴选择一款…