004 死信(限制队列最大长度)

文章目录

    • 消息ttl过期成为死信
    • 队列达到最大长度成为死信
      • MyOrder.java
      • RabbitMQDirectConfig.java
      • OrderProducer.java
      • PayConsumer.java
      • DeadOrderConsumer.java
    • application.yaml

死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

消息 TTL (Time To Live ) : x-message-ttl
队列达到最大长度(队列满了无法再添加数据到 mq 中) : x-max-length
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

消息ttl过期成为死信

map.put("x-message-ttl",2000); // 消息存活时间1s

rabbitmq中,设置了死信队列。a消息设置了ttl,a消息已经被消费,但消费者未给通知,a消息ttl过期后不会被送到死信队列

在RabbitMQ中
死信队列的概念:
RabbitMQ的死信队列是一种用于处理失败或无法路由的消息的机制。当消息处理失败、过期、被拒绝或无法路由时,这些消息可以被发送到死信队列。

消息的TTL(Time To Live):
TTL表示消息的过期时间。在RabbitMQ中,可以对消息设置TTL,意味着消息在一定时间内如果没有被消费,则会被认为是死信。但这里的关键是,TTL的判断通常发生在消息即将被投递给消费者之前。

消息被消费的情况:
如果a消息已经被消费,但消费者未给出确认通知(即未发送ack确认),那么这条消息在RabbitMQ内部的状态仍然是未确认的。然而,这并不影响消息的TTL判断。一旦消息被成功地从队列中取出并传递给消费者,TTL机制就不再对其起作用,因为此时消息已经处于消费者的控制之下。

死信队列与TTL的关联:
当消息的TTL过期时,如果这条消息还在队列中等待消费,那么它会被标记为死信并发送到死信队列(如果配置了死信队列的话)。但是,如果消息已经被消费,即使消费者没有发送确认通知,它也不会因为TTL过期而被送到死信队列。

综上所述:

a消息设置了TTL,并且已经被消费,但消费者未给出确认通知。
在这种情况下,即使a消息的TTL过期,它不会被送到死信队列。
原因是消息已经被消费,RabbitMQ认为该消息已经不在其控制之下,因此TTL机制不再适用。

a消息在已经被消费的情况下,不会因为TTL过期而被送到死信队列。

队列达到最大长度成为死信

MyOrder.java


package com.example.direct;import java.io.Serializable;public class MyOrder implements Serializable {private String orderId;private String orderNumber;private String customerName;private Integer productId;private String productName;private Float productPrice;private Integer productCount;private Float orderPrice;public MyOrder(){}public MyOrder(String orderId, String orderNumber, String customerName, Integer productId, String productName, Float productPrice, Integer productCount, Float orderPrice) {this.orderId = orderId;this.orderNumber = orderNumber;this.customerName = customerName;this.productId = productId;this.productName = productName;this.productPrice = productPrice;this.productCount = productCount;this.orderPrice = orderPrice;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getOrderNumber() {return orderNumber;}public Integer getProductId() {return productId;}public void setProductId(Integer productId) {this.productId = productId;}public void setOrderNumber(String orderNumber) {this.orderNumber = orderNumber;}public String getCustomerName() {return customerName;}public void setCustomerName(String customerName) {this.customerName = customerName;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public Float getProductPrice() {return productPrice;}public void setProductPrice(Float productPrice) {this.productPrice = productPrice;}public Integer getProductCount() {return productCount;}public void setProductCount(Integer productCount) {this.productCount = productCount;}public Float getOrderPrice() {return orderPrice;}public void setOrderPrice(Float orderPrice) {this.orderPrice = orderPrice;}@Overridepublic String toString() {return "MyOrder{" +"orderId=" + orderId +", orderNumber='" + orderNumber + '\'' +", customerName='" + customerName + '\'' +", productName='" + productName + '\'' +", productPrice=" + productPrice +", productCount=" + productCount +", orderPrice=" + orderPrice +'}';}
}

RabbitMQDirectConfig.java


package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQDirectConfig  {//    1. 创建交换机
//    @Bean
//    public DirectExchange newDirectExchange(){
//        return new DirectExchange("myDirectExchangeAAA",true,false);
//    }//2. 创建队列
//    @Bean
//    public Queue newQueueA(){
//        return new Queue("queueAAA",true);
//    }//3. 绑定队列到交换机中
//    @Bean
//    public Binding bindingA(){
//        return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
//    }//==================死信//1. 创建交换机@Beanpublic DirectExchange newExchange(){return new DirectExchange("normalExchange",true,false);}//2. 创建队列@Beanpublic Queue newQueue(){Map<String ,Object> map = new HashMap<>();//map.put("x-message-ttl",2000); // 消息存活时间1smap.put("x-max-length",6); // 队列达到最大长度 为6map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字return new Queue("normalQueueA",true,false,false,map);}//3. 绑定@Beanpublic Binding binding(){return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");}//4. 创建死信交换机@Beanpublic DirectExchange newDeadExchange(){return new DirectExchange("deadExchange",true,false);}//5. 创建死信队列@Beanpublic Queue newDeadQueue(){return new Queue("deadQueueA",true,false,false);}//6. 绑定@Beanpublic Binding bindingDead(){return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");}}

OrderProducer.java


package com.example.direct;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;@RestController
@RequestMapping("a")
public class OrderProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/submitOrder")public String submitOrder(){Map<String,Object> map = new HashMap<>();map.put("orderNumber","2222");//Stringmap.put("productId",1111);//Integerfor(int i=0;i<=130;i++){String orderId = UUID.randomUUID().toString().replace("-","");map.put("orderId",orderId);rabbitTemplate.convertAndSend("normalExchange", "key1", map);}return "生产者下单成功";}}

PayConsumer.java

package com.example.direct;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
public class PayConsumer {@RabbitHandler@RabbitListener(queues = "normalQueueA")public void process(Map map, Channel channel, Message message) throws IOException {try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("支付服务接收到的消息:" + map);String orderId = (String)map.get("orderId");//StringInteger productId = (Integer)map.get("productId");//IntegerString orderNum = (String)map.get("orderNumber");//StringSystem.out.println("支付服务接收到的orderId:" + orderId);System.out.println("支付服务接收到的productId:" + productId);System.out.println("支付服务接收到的orderNum:" + orderNum);//告诉broker,消息已经被确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

DeadOrderConsumer.java

package com.example.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class DeadOrderConsumer {// 获得死信队列中的消息@RabbitHandler@RabbitListener(queues = "deadQueueA")public void process(Map map){System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);String orderId = (String)map.get("orderId");//StringInteger productId = (Integer)map.get("productId");//IntegerString orderNum = (String)map.get("orderNumber");//StringSystem.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);System.out.println("取消支付后,从死信队列中接收到的productId:" + productId);System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);}
}

application.yaml


server:servlet:context-path: /app
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)listener:simple:acknowledge-mode: manual # 手动消息确认concurrency: 1 #消费者数量max-concurrency: 1  #消费者最大数量prefetch: 1  #消费者每次从队列中取几个消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/4563.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

黑马点评项目遇到的部分问题

目录 1. Invalid default value for ‘begin_time‘报错2. [ThreadLocal](https://blog.csdn.net/u010445301/article/details/111322569)3. 悲观锁实现单体一人一单超卖问题4. redisson5. 回顾秒杀优化6. Nginx 负载均衡 1. Invalid default value for ‘begin_time‘报错 my…

Java设计模式_概述(设计模式类型和基本原则)

一、设计模式 设计模式&#xff08;Design pattern&#xff09;代表了最佳的实践&#xff0c;是软件开发人员在软件开发过程中面临一般问题的解决方案&#xff0c;是众多软件开发人员经过相当长的一段时间的试验和错误总结出来的。是优秀程序猿的经验结晶。 但不推荐刚入门的开…

如何删除.gitignore文件中指定的所有被忽略的文件

要删除.gitignore文件中指定的所有被忽略的文件&#xff0c;你可以使用git rm命令结合-r选项。以下是一些步骤&#xff1a; 查看将要删除的文件&#xff1a;首先&#xff0c;你可以使用git ls-files命令来列出被git忽略的文件&#xff0c;以确保你想要删除的文件列表是正确的。…

LeetCode 热题 100 Day06

矩阵相关题型 Leetcode 48. 旋转图像【中等】 题意理解&#xff1a; 将一个矩阵顺时针旋转90度&#xff0c;返回旋转后的矩阵。 要求&#xff1a; 在原地修改&#xff0c;不借助额外的空间 如果可以使用辅助数组来实现转置,则有 matrix_new[i][j]matrix[j][row-i-1]; 解…

Kubernetes学习-核心概念篇(三) 核心概念和专业术语

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Kubernetes渐进式学习-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 1. 前言 在前面两篇文章我们简单介绍了什么是K8S&#xff0c;以及K8S的…

mysql服务器无法启动问题处理

一台hlr服务器用网管软件登录失败&#xff0c;查找原因&#xff0c;发现网关软件连接服务器的tcp的10002端口失败&#xff0c;超时无应答&#xff0c;导致连接失败。 用户反馈核心网hlr&#xff0c;smc无法登录&#xff0c;putty登录服务器&#xff0c;发现hlr10002端口没有打…

【保姆级讲解下gateway基本配置】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

STM32与OLED显示屏通信(四针脚和七阵脚)

系列文章目录 STM32单片机系列专栏 C语言术语和结构总结专栏 文章目录 1. 单片机调试 2. OLED简介 3. 接线 4. OLED驱动函数 4.1 四针脚版本 OLED.c OLED.h OLED_Font.h 4.2 七针脚版本 引脚连接 OLED.c OLED.h OLED_Font.h 5. 主函数 工程文件模板 1. 单片机…

Android 当存在双卡时,移动网络默认为SIM卡1

文章目录 一、当Android设备中存在双卡时&#xff0c;移动网络默认为SIM卡1二、下面是完整的代码路径和修改点 一、当Android设备中存在双卡时&#xff0c;移动网络默认为SIM卡1 完成这个需求有以下两个修改点&#xff1a; 下面依旧是Android13 MTK平台&#xff0c;在MtkMulti…

SoC如何开机自动运行脚本程序?

目录 1、通过启动加载程序执行脚本 2、通过Linux init 系统 2.1、/etc/init.d目录中的脚本 2.2、修改/etc/rc.local文件 在Linux SoC开发中&#xff0c;实现SoC启动时执行特定脚本的方法主要取决于你使用的启动加载程序&#xff08;Bootloader&#xff09;以及Linux内核的配置。…

unity中压缩文件与解压文件

今天研究了一下在unity中 把文件压缩后转二进制发送到服务器并从服务器下载后解压使用文件&#xff0c;废话不多说直接上代码&#xff0c;zip压缩插件是用的dotnetzip插件&#xff0c;网上可以搜索下载这个dll private static void GetPathMeshData_ZIP(Milling_ProjectData da…

(Snowflake Algorithm)雪花算法Java的简单使用

概述 雪花算法&#xff08;Snowflake Algorithm&#xff09;最初是由Twitter开源的&#xff0c;用于生成一个64位的长整型数字作为全局唯一的ID。这个算法是用Scala语言编写的&#xff0c;并且在Twitter内部得到了广泛应用。由于其简单、高效和分布式友好的特性&#xff0c;雪…

Fiddlers使用

下载 FiddlerClassic&#xff0c;是免费的&#xff0c;不过只能在Windows上使用。 使用 如何使用Fiddler进行手机端抓包 手机抓包&#xff0c;如果使用有线WindowsPC共享Wifi热点&#xff0c;子网络ip地址段与PC不一致&#xff0c;再添PC ip地址&#xff08;8888&#xff09…

JMeter的下载安装与使用(Mac)

1、下载地址​​​​​​https://jmeter.apache.org/download_jmeter.cgi 2、下载Binaries 下的apache-jmeter5.5.tgz 3、解压 4、启动 在bin目录下打开终端&#xff0c;输入sh jmeter 出现jmeter首页界面&#xff0c;即为成功。 5、使用 5.1 语言选择 option选项卡&am…

揭秘!七大副业赚钱秘籍,让你轻松实现财务自由!

以下是七种赚钱的副业推荐&#xff1a; 1&#xff0c;自媒体运营 自媒体运营是当下非常火热的副业之一。通过在微博、微信公众号、抖音、B站等自媒体平台上发布原创内容&#xff0c;吸引粉丝关注&#xff0c;进而实现流量变现。自媒体运营的核心在于内容创作和粉丝互动&#…

java解析PDF、WORD获取其中的表格以及文本内容

近期因工作需要需要解析PDF&#xff0c;需要把PDF中的文本和表格分离&#xff0c;最终要实现的目标是PDF中的文本内容放一块&#xff0c;表格内容放一块&#xff0c;以list的形式存储。解析PDF的技术有很多&#xff0c;经过多次尝试发现使用AdobeAcrobat可以实现表格和文本分离…

06 华三防火墙的如何进入web页面?

1 AI 思路 要进入华三防火墙的Web页面,你需要按照以下步骤操作: 确定防火墙的IP地址:首先,你需要知道你的华三防火墙的IP地址。通常,你可以从网络管理员或者设备本身获取这个信息。 打开浏览器:在你的电脑上打开一个网页浏览器,例如Chrome、Firefox或者Edge等。 输入UR…

系统服务(22年国赛)—— DHCPDHCP Relay(中继)

前言&#xff1a;原文在我的博客网站中&#xff0c;持续更新数通、系统方面的知识&#xff0c;欢迎来访&#xff01; 系统服务&#xff08;22年国赛&#xff09;—— DHCP&&DHCP Relay(中继)https://myweb.myskillstree.cn/94.html 目录 一、题目 DHCP AppSrv 二…

Linux学习之路 -- 进程篇 -- 自定义shell的编写

前面介绍了进程程序替换的相关知识&#xff0c;接下来&#xff0c;我将介绍如何基于前面的知识&#xff0c;编写一个简单的shell&#xff0c;另外本文的所展示的shell可能仅供参考。 目录 <1>获取用户的输入和打印命令行提示符 <2>切割字符串 <3>执行这个…

第 4 篇 : Netty客户端互发图片和音/视频

说明 因为图片和音/视频不能确定其具体大小, 故引入MinIO。客户端之间只发送消息, 通过上传/下载来获取额外信息 1. MinIO搭建(参考前面文章), 并启动 2. 登录MinIO创建3个Bucket: image、voice、video 3. 客户端改造 3.1 修改 pom.xml <?xml version"1.0" …