RabbitMQ-死信队列(golang)

1、概念

      死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。

2、死信产生的原因

文心一言的回答如下:

  1. 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
  2. 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
  3. 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
  4. 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。

总结来说,主要原因就三个,消息被拒绝、消息过期、队列满

在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。

3、死信队列使用实践

3.1 消息过期

设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。

关键点:设置正常队列属性,ttl5s过期:

// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl":             int64(5000), // 5秒TTL"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)

全部代码如下: 

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue,                // durablefalse,               // delete when unusedfalse,               // exclusivefalse,               // no-waitnil,                 // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl":             int64(5000), // 5秒TTL"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name,        // queue nameq.Name,        // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}

队列信息包括绑定的死信队列信息、ttl等信息如下:

运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。

3.2 队列满

当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。

设置队列长度属性代码如下:

args := amqp.Table{// "x-message-ttl":             int64(5000), // 5秒TTL"x-max-length":              2,"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)

队列属性如下:

发送两条信息:

 继续发送第三个:

 测试代码:

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue,                // durablefalse,               // delete when unusedfalse,               // exclusivefalse,               // no-waitnil,                 // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{// "x-message-ttl":             int64(5000), // 5秒TTL"x-max-length":              2,"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name,        // queue nameq.Name,        // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}

3.3 消息被拒绝

       消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。

客户端函数:ch.Nack,函数原型:

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {ch.m.Lock()defer ch.m.Unlock()return ch.send(&basicNack{DeliveryTag: tag,Multiple:    multiple,Requeue:     requeue,})
}

入参含义如下: 

tag

这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 ch.Ackch.Nack 或 ch.Reject 时,必须提供这个标识符,以便RabbitMQ知道是对哪条消息进行确认或拒绝。

multiple

这是一个布尔值(bool),用于指示是否应该同时确认(或拒绝)多条消息。如果设置为 true,则RabbitMQ将认为从上一个被确认的消息开始(包括该消息),直到当前消息为止的所有未确认消息都被拒绝。这通常用于批量处理消息确认,但在使用 ch.Nack 时,它的作用更多是关于是否应该重新排队当前消息之后的消息(取决于RabbitMQ的配置和消息的属性)。

requeue

这也是一个布尔值(bool),用于指示被拒绝的消息是否应该被重新放入队列的末尾以便稍后重试。如果设置为 true,则消息将被重新排队;如果设置为 false,则消息将被丢弃(或者根据RabbitMQ的配置可能被发送到死信队列,如果配置了的话)。

测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:

之后使用如下代码进行消费:

package mainimport ("fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("my_exchange", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 声明正常队列// q, err := ch.QueueDeclare(// 	"normal_queue", // name// 	true,           // durable// 	false,          // delete when unused// 	false,          // exclusive// 	false,          // no-wait// 	nil,            // arguments// )// if err != nil {// 	fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())// 	return// }// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind("normal_queue", // queue name"normal_queue", // routing key"my_exchange",  // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}msgs, _ := ch.Consume("normal_queue", // queue"",             // consumerfalse,          // auto-acktrue,           // exclusivefalse,          // no-localfalse,          // no-waitnil,            // args)go func() {for d := range msgs {// 模拟处理失败,全部放入死信队列ch.Nack(d.DeliveryTag, false, false)}}()time.Sleep(10 * time.Second)
}

运行代码后,3条消息全部进入到死信队列中:

 4、总结

      RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886556.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ALSA - (高级Linux声音架构)是什么?

ALSA是Linux声音系统的核心组件,让用户可以精细控制声音硬件和声音进出。它通过抽象层屏蔽了硬件复杂性,使开发者能够专注于功能实现。这篇文章将逐步解析ALSA的基础知识,包括其运作原理、应用场景,以及如何完成一个基本配置和使用…

Ceph层次架构分析

Ceph的层次结构可以从逻辑上自下向上分为以下几个层次: 一、基础存储系统RADOS层 功能:RADOS(Reliable Autonomic Distributed Object Store)是Ceph的底层存储系统,提供了分布式存储的核心功能。它是一个完整的对象存…

常见error集合

Cannot use import statement outside a module 原因:在commonJS中用了es6的语法,import。分析: 一般我们的运行环境按照模块化标准来分,可以分为es6和commonJS两种,在es6中引入模块用import,在commonJS中…

在连锁零售行业中远程控制软件的应用

在连锁零售行业,远程控制软件正逐渐成为提高效率和降低成本的重要工具。作为零售经理,您可能已经注意到这种技术带来的变化。试想一下,无论您身在何处,都可以实时监控商店的运营情况,甚至在远离的地方解决顾客的问题。…

【MySQL】MySQL中的函数之JSON_REPLACE

在 MySQL 中,JSON_REPLACE() 函数用于在 JSON 文档中替换现有的值。如果指定的路径不存在,则 JSON_REPLACE() 不会修改 JSON 文档。如果需要添加新的键值对,可以使用 JSON_SET() 函数。 基本语法 JSON_REPLACE(json_doc, path, val[, path,…

JS学习日记(jQuery库)

前言 今天先更新jQuery库的介绍,它是一个用来帮助快速开发的工具 介绍 jQuery是一个快速,小型且功能丰富的JavaScript库,jQuery设计宗旨是“write less,do more”,即倡导写更少的代码,做更多的事&#xf…

Go语言24小时极速学习教程(五)Go语言中的SpringMVC框架——Gin

作为一个真正能用的企业级应用,怎么能缺少RESTful接口呢?所以我们需要尝试在Go语言环境中写出我们的对外接口,这样前端就可以借由Gin框架访问我们数据库中的数据了。 一、Gin框架的使用 1. 安装 Gin 首先,你需要在你的 Go 项目…

支持用户注册和登录、发布动态、点赞、评论、私信等功能的社交媒体平台创建!!!

需要整体源代码的可以在我的代码仓下载https://gitcode.com/speaking_me/social-media-platformTest.git 社交媒体平台 描述:社交媒体平台需要支持用户注册、发布动态、点赞、评论、私信等功能。 技术栈: 前端:React, Angular, Vue.js后端…

数字IC后端实现之Innovus specifyCellEdgeSpacing和ICC2 set_placement_spacing_rule的应用

昨天帮助社区IC训练营学员远程协助解决一个Calibre DRC案例。通过这个DRC Violation向大家分享下Innovus和ICC2中如何批量约束cell的spacing rule。 数字IC后端手把手实战教程 | Innovus verify_drc VIA1 DRC Violation解析及脚本自动化修复方案 下图所示为T12nm A55项目的Ca…

Spring Boot中的自动装配机制

文章目录 1. 什么是自动装配?2. 自动装配是如何工作的?3. 如何开启自动装配?4. 自动装配的注意事项5. 结语推荐阅读文章 在Spring Boot的世界里,自动装配(Auto-configuration)就像春风拂面,轻轻…

【时间之外】IT人求职和创业应知【37】-AIGC私有化

目录 新闻一:2024智媒体50人成都会议暨每经20周年财经媒体峰会召开 新闻二:全球机器学习技术大会在北京召开 新闻三:区块链技术在金融领域的应用取得新突破 不知不觉的坚持了1个月,按照心理学概念,还要坚持2个月&am…

基于语法树的SQL自动改写工具开发系列(2)-使用PYTHON进行简单SQL改写的开发实战

一、前言 前面一篇写了如何搭建环境,本文接着讲怎么使用antlr4进行开发。 二、实战 根据上一篇,基于语法树的SQL自动改写工具开发系列(1)-离线安装语法树解析工具antlr4-DA-技术分享-M版,先在本地部署好开发环境。 DEMO 1 写…

基于单片机智能温室大棚监测系统

本设计以单片机为核心的智能温室大棚监测系统,用于监测大棚内的温湿度、土壤湿度、CO2浓度和光照强度。该系统以STM32F103C8T6芯片为核心控制单元,涵盖电源、按键、NB-IoT模块、显示屏模块、空气温湿度检测、土壤湿度检测、二氧化碳检测和光敏电阻等模块…

JavaScript逆向爬虫教程-------基础篇之常用的编码与加密介绍(python和js实现)

目录 一、编码与加密原理 1.1 ASCII 编码1.2 详解 Base64 1.2.1 Base64 的编码过程和计算方法1.2.2 基于编码的反爬虫设计1.2.3 Python自带base64模块实现base64编码解码类封装 1.3 MD5消息摘要算法 1.3.1 MD5 介绍1.3.2 Python实现md5以及其他常用消息摘要算法封装 1.4 对称加…

RHCSA学习超详细知识点2命令篇

输入命令行的语法 终端中执行命令需要遵照一定的语法,输入命令的格式如下: 命令 参数命令 -选项 参数 输入命令时可以包含多个选项,假如一个命令有-a,-b,-c,-d四个选项,可以写作 命令 -a -b -c -d 参数 这里的多个选项可以“提…

【SQL】mysql常用命令

为方便查询,特整理MySQL常用命令。 约定:$后为Shell环境命令,>后为MySQL命令。 1 常用命令 第一步,连接数据库。 $ mysql -u root -p # 进入MySQL bin目录后执行,回车后输入密码连接。# 常用参数&…

Java结合ElasticSearch根据查询关键字,高亮显示全文数据。

由于es高亮显示机制的问题。当全文内容过多,且搜索中标又少时,就会出现高亮结果无法覆盖全文。因此需要根据需求手动替换。 1.根据es的ik分词器获取搜索词的分词结果。 es部分: //中文分词解析 post /_analyze {"analyzer":"…

Springboot 日志处理(非常详细)

日志存入数据库&#xff08;AOP&#xff09; 1.引入aop依赖 <!-- aop依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> 2.创建自定义注解类&a…

5. langgraph中的react agent使用 (从零构建一个react agent)

1. 定义 Agent 状态 首先&#xff0c;我们需要定义 Agent 的状态&#xff0c;这包括 Agent 所持有的消息。 from typing import (Annotated,Sequence,TypedDict, ) from langchain_core.messages import BaseMessage from langgraph.graph.message import add_messagesclass …

源码解析-Spring Eureka(更新ing)

源码解析-Spring Eureka 文章目录 源码解析-Spring Eureka前言一、从Spring.factory和注解开始二、重要的一步EurekaServerInitializerConfiguration三、初始化了什么&#xff1f;自动保护 四, 重新回到EurekaServerAutoConfiguration关于unavailable-replicas 前言 无 一、从…