RabbitMQ(九)死信队列

目录

    • 一、简介
      • 1.1 定义
      • 1.2 何时进入死信队列?
      • 1.3 死信消息的变化
      • 1.4 死信队列的应用场景
      • 1.5 死信消息的生命周期
    • 二、代码实现
      • 2.1 死信队列的配置步骤
      • 2.2 配置类
      • 2.3 配置文件
      • 2.4 生产者
      • 2.5 业务消费者
      • 2.6 死信消费者
      • 2.7 测试结果
    • 三、总结

在这里插入图片描述

RabbitMQ 是流行的开源消息队列中间件,使用 erlang 语言开发,由于其社区活跃度高,维护更新较快,深得很多企业的喜爱。

一、简介

1.1 定义

死信队列(Dead Letter Queue,简称 DLX)是 RabbitMQ 中一种特殊的队列,用于处理无法正常被消费者消费的消息。当消息在原始队列中因为 达到最大重试次数过期、或者 满足特定条件 时,可以 将这些消息重新路由到一个预定义的死信队列中 进行进一步处理或记录。

1.2 何时进入死信队列?

当发生以下情况,业务队列中的消息会进入死信队列:

  1. 消息被否定确认:使用 channel.basicNackchannel.basicReject,并且此时 requeue 属性被设置为 false
  2. 消息过期:消息在队列的存活时间超过设置的 TTL 时间。
  3. 消息溢出:队列中的消息数量已经超过最大队列长度。

当发生以上三种情况后,该消息将成为 死信。死信消息会被 RabbitMQ 进行特殊处理:

  • 如果配置了死信队列,那么该消息将会被丢进死信队列中;
  • 如果没有配置,则该消息将会被丢弃。

1.3 死信消息的变化

那么 死信 被丢到死信队列后,会发生什么变化呢?

  • 如果队列配置了 x-dead-letter-routing-key 的话,“死信” 的路由键会被替换成该参数对应的值。
  • 如果没有配置,则保留该消息原有的路由键。

举个例子:

原有队列的路由键是 RoutingKey1,有以下两种情况:

  • 如果配置队列的 x-dead-letter-routing-key 参数值为 RoutingKey2,则该消息成为 “死信” 后,会将路由键更改为 RoutingKey2,从而进入死信交换机中的死信队列。
  • 如果没有配置 x-dead-letter-routing-key 参数,则该消息成为 “死信” 后,路由键不会更改,也不会进入死信队列。

在这里插入图片描述

当配置了 x-dead-letter-routing-key 参数后,消息成为 “死信” 后,会在消息的 Header 中添加很多奇奇怪怪的字段,我们可以在死信队列的消费端通过以下方式进行打印:

log.info("死信消息properties: {}", message.getMessageProperties());

日志内容如下:

2024-01-07 21:16:19.745  INFO 11776 --- [ntContainer#3-1] c.d.receiver.DeadLetterMessageReceiver   :消息properties: MessageProperties [headers={x-first-death-exchange=demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=demo.simple.business.exchange, time=Sun Jan 07 21:16:19 CST 2024, routing-keys=[], queue=demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=demo.simple.business.queuea}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=demo.simple.deadletter.exchange, receivedRoutingKey=demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-RPfmKjM8Lau9X7Fl0CtbEA, consumerQueue=demo.simple.deadletter.queuea]

格式化后:

在这里插入图片描述

Header 中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下 Header 中的值:

字段名含义
x-first-death-exchange第一次成为死信时的交换机名称。
x-first-death-reason第一次成为死信的原因:
rejected:消息在进入队列时被队列拒绝。
expired:消息过期。
maxlen:队列内消息数量超过队列最大容量。
x-first-death-queue第一次成为死信时的队列名称。
x-death历史被投入死信交换机的信息列表,同一个消息每进入一次死信交换机,这个数组的信息就会被更新。

1.4 死信队列的应用场景

通过上面的信息,我们已经知道如何使用死信队列了,那么死信队列一般在什么场景下使用呢?

死信队列 一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要是消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等。当发生异常时,当然 不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息 (没错,以前很多人这么干的 = =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好得多。

1.5 死信消息的生命周期

死信消息的生命周期如下:

  1. 业务消息被 投入业务队列
  2. 消费者 消费业务队列的消息,由于处理过程中 发生异常,于是 进行了 NackReject 操作
  3. NackReject 的消息由 RabbitMQ 投递到死信交换机中
  4. 死信交换机将消息 投入相应的死信队列
  5. 死信队列的消费者 消费死信消息

二、代码实现

2.1 死信队列的配置步骤

死信队列的配置可以分为以下三步:

  1. 配置业务队列,绑定到业务交换机上;
  2. 为业务队列 配置死信交换机、路由键;
  3. 为死信交换机 配置死信队列

注意:

并不是直接声明一个公共的死信队列,然后所有死信消息就会自己进入死信队列中了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后每个业务队列分配一个单独的路由键。

有了死信交换机和路由键后,接下来就像配置业务队列一样,配置死信队列,并绑定在死信交换机上。看到这里,大家应该可以明白:

  • 死信队列 并不是什么特殊的队列,只不过是绑定在死信交换机上的队列
  • 死信交换机 也不是什么特殊的交换机,只不过是用来接收死信队列的交换机

所以死信交换机可以为任何类型【Direct、Fanout、Topic】。一般来说,因为开发过程中会为每个业务队列分配一个独有的路由 key,并对应的配置一个死信队列进行监听。

有了前面的这些描述后,我们接下来实战操作一下。

2.2 配置类

配置类中声明了两个交换机:

  • 业务交换机(广播),绑定了两个业务队列:
    • 业务队列A;
    • 业务队列B。
  • 死信交换机(直连),绑定了两个死信队列,并配置了相应的路由键:
    • 死信队列A;
    • 死信队列B。

RabbitMQConfig.java

package com.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** <p> @Title RabbitMQOrderConfig* <p> @Description RabbitMQ配置** @author ACGkaka* @date 2023/12/22 14:05*/
@Configuration
public class RabbitMQConfig {/** 业务队列 */public static final String BUSINESS_EXCHANGE_NAME = "demo.simple.business.exchange";public static final String BUSINESS_QUEUEA_NAME = "demo.simple.business.queuea";public static final String BUSINESS_QUEUEB_NAME = "demo.simple.business.queueb";/** 死信队列 */public static final String DEAD_LETTER_EXCHANGE = "demo.simple.deadletter.exchange";public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "demo.simple.deadletter.queuea.routingkey";public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "demo.simple.deadletter.queueb.routingkey";public static final String DEAD_LETTER_QUEUEA_NAME = "demo.simple.deadletter.queuea";public static final String DEAD_LETTER_QUEUEB_NAME = "demo.simple.deadletter.queueb";// 声明业务交换机(广播)@Beanpublic FanoutExchange businessExchange() {return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}// 声明死信交换机(直连)@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 声明业务队列A@Beanpublic Queue businessQueueA() {Map<String, Object> args = new HashMap<>(2);// 声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 声明当前队列绑定的死信路由键args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();}// 声明业务队列B@Beanpublic Queue businessQueueB() {Map<String, Object> args = new HashMap<>(2);// 声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 声明当前队列绑定的死信路由键args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();}// 声明死信队列A@Beanpublic Queue deadLetterQueueA() {return new Queue(DEAD_LETTER_QUEUEA_NAME);}// 声明死信队列B@Beanpublic Queue deadLetterQueueB() {return new Queue(DEAD_LETTER_QUEUEB_NAME);}// 声明业务队列A绑定关系@Beanpublic Binding businessBindingA(Queue businessQueueA, FanoutExchange businessExchange) {return BindingBuilder.bind(businessQueueA).to(businessExchange);}// 声明业务队列B绑定关系@Beanpublic Binding businessBindingB(Queue businessQueueB, FanoutExchange businessExchange) {return BindingBuilder.bind(businessQueueB).to(businessExchange);}// 声明死信队列A绑定关系@Beanpublic Binding deadLetterBindingA(Queue deadLetterQueueA, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueueA).to(deadLetterExchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}// 声明死信队列B绑定关系@Beanpublic Binding deadLetterBindingB(Queue deadLetterQueueB, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueueB).to(deadLetterExchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);}
}

2.3 配置文件

application.yml

server:port: 8081spring:application:name: springboot-rabbitmq-dead-letterrabbitmq:# 此处不建议单独配置host和port,单独配置不支持连接RabbitMQ集群addresses: 127.0.0.1:5672username: guestpassword: guest# 虚拟host 可以不设置,使用server默认hostvirtual-host: /# 是否开启发送端消息抵达队列的确认publisher-returns: true# 发送方确认机制,默认为NONE,即不进行确认;SIMPLE:同步等待消息确认;CORRELATED:异步确认publisher-confirm-type: correlated# 消费者监听相关配置listener:simple:acknowledge-mode: manual # 确认模式,默认auto,自动确认;manual:手动确认default-requeue-rejected: false # 消费端抛出异常后消息是否返回队列,默认值为trueprefetch: 1 # 限制每次发送一条数据concurrency: 1 # 同一个队列启动几个消费者max-concurrency: 1 # 启动消费者最大数量# 重试机制retry:# 开启消费者(程序出现异常)重试机制,默认开启并一直重试enabled: true# 最大重试次数max-attempts: 3# 重试间隔时间(毫秒)initial-interval: 3000

2.4 生产者

为了方便测试,写一个简单的消息生产者,通过controller层来生产消息。

SendMessageController.java

import com.demo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** <p> @Title SendMessageController* <p> @Description 推送消息接口** @author ACGkaka* @date 2023/1/12 15:23*/
@Slf4j
@RestController
public class SendMessageController {/*** 使用 RabbitTemplate,这提供了接收/发送等方法。*/@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage")public String sendMessage(String message) {rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", message);return "OK";}
}

2.5 业务消费者

接下来是业务队列的消费端代码

BusinessMessageReceiver.java

import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** <p> @Title BusinessMessageReceiver* <p> @Description RabbitMQ业务队列消费端** @author ACGkaka* @date 2024/1/7 17:43*/
@Slf4j
@Component
public class BusinessMessageReceiver {@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)public void receiveA(String body, Message message, Channel channel) throws IOException {log.info("业务队列A收到消息: {}", body);try {if (body.contains("deadletter")) {throw new RuntimeException("dead letter exception");}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error("业务队列A消息消费发生异常,error msg: {}", e.getMessage(), e);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEB_NAME)public void receiveB(String body, Message message, Channel channel) throws IOException {log.info("业务队列B收到消息: {}", body);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

2.6 死信消费者

接下来是死信队列的消费端代码

DeadLetterMessageReceiver.java

import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** <p> @Title DeadLetterMessageReceiver* <p> @Description RabbitMQ死信队列消费端** @author ACGkaka* @date 2024/1/7 18:14*/
@Slf4j
@Component
public class DeadLetterMessageReceiver {@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)public void receiveA(String body, Message message, Channel channel) throws IOException {log.info("死信队列A收到消息: {}", body);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)public void receiveB(String body, Message message, Channel channel) throws IOException {log.info("死信队列B收到消息: {}", body);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

2.7 测试结果

消费正常消息,请求结果:

请求地址:http://localhost:8081/sendMessage?message=Hello

在这里插入图片描述

从日志可以看到:两个业务队列成功消费

在这里插入图片描述

消费错误消息,请求结果:

请求地址:http://localhost:8081/sendMessage?message=deadletter

在这里插入图片描述

从日志可以看到:业务队列A和B都收到了消息,但是 业务队列A消费发生异常,然后消息就被 转到了死信队列死信队列消费端成功消费

在这里插入图片描述


三、总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

死信消息 时 RabbitMQ 为我们做的一层保障,其实我们 也可以不使用死信队列,而是 在消息消费异常的时候,将消息主动投递到另一个交换机中,当你明白了这些之后,这些 Exchange 和 Queue 想怎样配合就可以怎样配合。比如:

  • 从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。
  • 或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费等等。

整理完毕,完结撒花~ 🌻





参考地址:

1.【RabbitMQ】一文带你搞定RabbitMQ死信队列,https://cloud.tencent.com/developer/article/1463065

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/605375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL数据库进阶-索引

索引 索引是帮助 MySQL 高效获取数据的数据结构&#xff08;有序&#xff09;。在数据之外&#xff0c;数据库系统还维护着满足特定查找算法的数据结构&#xff0c;这些数据结构以某种方式引用&#xff08;指向&#xff09;数据&#xff0c;这样就可以在这些数据结构上实现高级…

CSS3渐变属性详解

渐变属性 线性渐变 概念&#xff1a;线性渐变&#xff0c;指的是在一条直线上进行的渐变。在线性渐变过程中&#xff0c;起始颜色会沿着一条直线按顺序过渡到结束颜色 语法&#xff1a; background:linear-gradient(渐变角度&#xff0c;开始颜色&#xff0c;结束颜色);渐变…

https配置证书

HTTPS 基本原理 https 介绍 HTTPS&#xff08;全称&#xff1a;HyperText Transfer Protocol over Secure Socket Layer&#xff09;&#xff0c;其实 HTTPS 并不是一个新鲜协议&#xff0c;Google 很早就开始启用了&#xff0c;初衷是为了保证数据安全。 国内外的大型互联网…

SQL 基础知识点

1. 数据库相关术语 数据库&#xff08;database&#xff09;&#xff1a;保存有组织的数据的容器&#xff08;通常是一个文件或一组文件&#xff09;。数据表&#xff08;table&#xff09; &#xff1a;某种特定类型数据的结构化清单。模式&#xff08;schema&#xff09;&am…

springboot 房屋租赁系统

spring boot mysql mybatis 前台后端

开心自走棋:使用 Laf 云开发支撑数百万玩家

先介绍一下开心自走棋 开心自走棋是一款剑与魔法的烧脑自走棋游戏。以著名的魔幻世界观为蓝本&#xff0c;采用了轻松可爱的画面风格&#xff0c;精致细腻的动画和特效来还原魔兽之战。 现在市面上自走棋游戏多是 PvP 玩法为主&#xff0c;而开心自走棋是以 PvE 玩法为主的&a…

C语言光速入门笔记

C语言是一门面向过程的编译型语言&#xff0c;它的运行速度极快&#xff0c;仅次于汇编语言。C语言是计算机产业的核心语言&#xff0c;操作系统、硬件驱动、关键组件、数据库等都离不开C语言&#xff1b;不学习C语言&#xff0c;就不能了解计算机底层。 目录 C语言介绍C语言特…

相机成像之图像传感器与ISP【四】

文章目录 1、图像传感器基础1.1 基础原理——光电效应1.2 基础的图像传感器设计1.3 衡量传感器效率的一个关键指标&#xff1a;光量子效率&#xff08;QE&#xff09;1.4 感光单元的响应1.5 像素的满阱容量1.6 像素尺寸和填充比例1.7 微透镜的作用1.8 光学低通滤波器简介1.9 传…

机器学习周报第27周

目录 摘要Abstract一、文献阅读 摘要 本周阅读了一篇混沌时间序列预测的论文&#xff0c;论文模型主要使用的是时间卷积网络&#xff08;Temporal Convolutional Network&#xff0c;TCN&#xff09;、LSTM以及GRU。在数据集方面除了使用现实的时间序列数据外&#xff0c;还通…

计算机毕业设计 | SpringBoot+vue农产品商城 买菜购物网站(附源码)

1&#xff0c;绪论 1.1 项目背景 随着社会发展&#xff0c;网上购物已经成为我们日常生活的一部分。但是&#xff0c;至今为止大部分电商平台都是从人们日常生活出发&#xff0c;出售都是一些日常用品比如&#xff1a;食物、服装等等&#xff0c;并未发现一个专注于菜品的电商…

路由器02_静态路由DHCP

一、静态路由 &#xff11;、静态路由特点 由管理员手工配置&#xff0c;是单向的&#xff0c;缺乏灵活性 &#xff12;、默认路由 默认路由是一种比较特殊静态路由&#xff0c;一般用于末节&#xff08;末梢&#xff09;网络&#xff0c;直接指定目标为任何地方 二、静态…

为什么 Kafka 这么快?它是如何工作的?

随着数据以指数级的速度流入企业&#xff0c;强大且高性能的消息传递系统至关重要。Apache Kafka 因其速度和可扩展性而成为热门选择&#xff0c;但究竟是什么让它如此之快&#xff1f; 在本期中&#xff0c;我们将探讨&#xff1a; Kafka 的架构及其核心组件&#xff0c;如生…

Xfs文件系统磁盘布局

目录 一&#xff0c;CentOS下Xfs文件系统的安装 二&#xff0c;准备工作 三&#xff0c;AG结构 四&#xff0c;AG超级块 五&#xff0c;AG空闲磁盘空间管理 六&#xff0c;ABTB的Btree 七&#xff0c;ABTB/ABTC的节点块管理 八&#xff0c;inode节点管理 九&#xff0…

Vue-5、el和data的两种写法

1、el 第一种写法 <!DOCTYPE html> <html lang"en" xmlns:v-model"http://www.w3.org/1999/xhtml" xmlns:v-bind"http://www.w3.org/1999/xhtml"> <head><meta charset"UTF-8"><title>el和data的两种写…

vue3中路由的使用(详细讲解)

1、路由的简介 路由(route)&#xff1a;就是根据特定的规则将数据包或请求从源地址传输到目标地址的过程。 在前端或者vue3项目中路由主要用于构建单页面应用程序&#xff08;SPA&#xff09;&#xff0c;其中所有的页面都在同一个HTML文件中加载&#xff0c;通过JavaScript动…

自监督深度学习技术

一、定义 自监督学习&#xff08;SSL&#xff09;是机器学习的一种范式&#xff0c;用于处理未标记数据以获取有用的表示&#xff0c;以帮助下游学习任务。SSL方法最显著的特点是它们不需要人类标注的标签&#xff0c;这意味着它的训练完全基于由未标记的数据样本组成的数据集…

网络通信过程的一些基础问题

客户端A在和服务器进行TCP/IP通信时&#xff0c;发送和接收数据使用的是同一个端口吗&#xff1f; 这个问题可以这样来思考&#xff1a;在客户端A与服务器B建立连接时&#xff0c;A需要指定一个端口a向服务器发送数据。当服务器接收到A的报文时&#xff0c;从报文头部解析出A的…

018、通用集合类型

Rust标准库包含了一系列非常有用的被称为集合的数据结构。大部分的数据结构都代表着某个特定的值&#xff0c;但集合却可以包含多个值。 与内置的数组与元组类型不同&#xff0c;这些集合将自己持有的数据存储在了堆上。这意味着数据的大小不需要在编译时确定&#xff0c;并且可…

WEB 3D技术 three.js 顶点交换

本文 我们来说 顶点的转换 其实就是 我们所有顶点的位置发生转变 我们整个物体的位置也会随之转变 这里 我们编写代码如下 import ./style.css import * as THREE from "three"; import { OrbitControls } from "three/examples/jsm/controls/OrbitControls.j…

kettle的基本介绍和使用

1、 kettle概述 1.1 什么是kettle Kettle是一款开源的ETL工具&#xff0c;纯java编写&#xff0c;可以在Window、Linux、Unix上运行&#xff0c;绿色无需安装&#xff0c;数据抽取高效稳定。 1.2 Kettle核心知识点 1.2.1 Kettle工程存储方式 以XML形式存储以资源库方式存储…