RabbitMQ——死信队列和延迟队列

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

RabbitMQ——死信队列和延迟队列

1、死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

死信的来源

在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

  • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
  • 消息过期(TTL,Time-To-Live)。
  • 队列长度超过限制,无法再添加数据到mq中。

生产者

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;/*** @Author 方唐镜* @Create 2024-03-03 14:08* @Description*/
public class Produce {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//模拟消息过期 10s//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "hello world" + i;channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());}}
}

消费者

正常队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";private static final String NORMAL_QUEUE = "normal_queue";private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机和队列channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");//声明普通交换机和队列channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//死信配制 指定死信交换机和死信路由键Map<String, Object> map = new HashMap<>();map.put("x-dead-letter-exchange", DEAD_EXCHANGE);map.put("x-dead-letter-routing-key", "dead-routing-key");//最大长度//map.put("x-max-length", 6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.contains("5")){System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");//拒绝消息并把消息丢入死信队列channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer01接收消息:" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});}
}

死信队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer02 {private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.basicConsume(DEAD_QUEUE, true,(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),(consumerTag, e) -> {});}
}

生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

2、基于插件的延迟队列

延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

使用场景

  1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
  2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
  3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
  4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
  5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
  6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

2.1、安装延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

以docker方式安装

1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins

插件版本和rabbitmq版本一致

2、进入容器查看插件

在这里插入图片描述

3、启动插件

root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart 7c8726620871

5、安装成功

在这里插入图片描述

2.2、代码实例

配置类

package springbootrabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configuration
public class DelayedQueueConfig {//    队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//    交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//    routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingKey";//    声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//    声明交换机 基于插件的交换机@Beanpublic CustomExchange delayedExchange() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");/** 1.交换机名称* 2.交换机类型* 3.是否需要持久化* 4.是否需要自动删除* 5.其他参数* */return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);}//    绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {//            发送消息的时候 延迟时长msg.getMessageProperties().setDelay(delayTime);return msg;});
}

消费者

@Slf4j
@Component
public class DelayedQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/751887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FreeRTOS入门基础

RTOS是为了更好地在嵌入式系统上实现多任务处理和时间敏感任务而设计的系统。它能确保任务在指定或预期的时间内得到处理。FreeRTOS是一款免费开源的RTOS&#xff0c;它广泛用于需要小型、预测性强、灵活系统的嵌入式设备。 创建第一个任务 任务函数&#xff1a;任务是通过函数…

python的集合应用

在Python中&#xff0c;集合是一种无序、可变的数据类型&#xff0c;用于存储不重复的元素。Python提供了内置的集合类型 set&#xff0c;以及 frozenset&#xff08;不可变的集合&#xff09;。以下是一些Python集合的常见应用场景&#xff1a; 去重&#xff1a; 集合是存储唯…

ChatGPT:论文写作的新潮解决方案

ChatGPT无限次数:点击直达 摘要&#xff1a; 本文介绍了一种新潮的论文写作解决方案——ChatGPT。ChatGPT是基于模型的自然语言处理技术&#xff0c;它通过生成人类般的语言来帮助用户撰写高质量的论文。本文通过多个示例演示了ChatGPT的强大功能&#xff0c;并探讨了其在论文写…

Spark 用AnyFunSuite单元测试Scala详细教程

在用java开发时&#xff0c;通过用Junit框架来测试&#xff0c;在用spark开发scala时&#xff0c;除了可以用Junit&#xff0c;还可以用AnyFunSuite&#xff0c;无需依赖AnyFunSuite。 步骤一&#xff1a;设置项目依赖 确保您的项目中包含了以下必要的依赖&#xff1a; <d…

【HTML】HTML表单8.2(表单标签2)

目录 接上期&#xff0c;大致实现效果 文章简要 注释&#xff1a;这一次介绍的很多效果需要后期与服务器配合&#xff0c;但我们这里先只介绍效果 ①提交按钮 ②获取验证码 ③上传文件 ④还原所有表单内容 ⑤下拉表单 ⑥文字域 接上期&#xff0c;大致实现效果 文章简要 注…

matlab中Signal Editor定义梯形信号输出矩形信号

matlab中Signal Editor定义梯形信号输出矩形信号&#xff0c;可以通过如下勾选差值数据实现梯形信号输出。

GPT-1, GPT-2, GPT-3, InstructGPT / ChatGPT and GPT-4 总结

1. GPT-1 What the problem GPT-1 solve? 在 GPT-1 之前&#xff0c;NLP 通常是一种监督模型。 对于每个任务&#xff0c;都有一些标记数据&#xff0c;然后根据这些标记数据开发监督模型。 这种方法存在几个问题&#xff1a;首先&#xff0c;需要标记数据。 但 NLP 不像 CV&…

云原生部署手册02:将本地应用部署至k8s集群

&#xff08;一&#xff09;部署集群镜像仓库 1. 集群配置 首先看一下集群配置&#xff1a; (base) ➜ ~ multipass ls Name State IPv4 Image master Running 192.168.64.5 Ubuntu 22.04 LTS1…

栈队列数组试题(四)——数组和特殊矩阵

01&#xff0e;对特殊矩阵采用压缩存储的主要目的是( D ). A.表达变得简单 B.对矩阵元素的存取变得简单 C.去掉矩阵中的多余元素 D.减少不必要的存储空间解析&#xff1a;特殊矩阵中含有很多相同元素…

Pyqt5专栏目录索引

文章目录 安装 Pyqt5 和 DesignerQt Designer教程Qt 官方文档及阅读方法主窗口按键&#xff08;Buttons&#xff09;项目视图&#xff08;Item Views&#xff09;输入控件&#xff08;Input Widgets&#xff09;显示控件&#xff08;Display Widgets&#xff09;弹出框音频播放…

iOS——【Blocks】

Blocks概要 Blocks是C语言的扩充功能&#xff0c;即带有自动变量的匿名函数。匿名函数就是不带函数名的函数。这一概念同样被称为“闭包”&#xff0c;lambda计算等。 自动变量是在函数内部声明的变量&#xff0c;其作用域仅限于声明它的函数内部。这意味着它们只能在其声明的…

一. 并行处理与GPU体系架构-GPU并行处理

目录 前言0. 简述1. 这个小节会涉及到的关键字2. CPU与GPU在并行处理的优化方向3. Summary总结参考 前言 自动驾驶之心推出的 《CUDA与TensorRT部署实战课程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 本次课程我们来学习下课程第一章——并行处理与GPU体…

Google云计算原理与应用(三)

目录 五、分布式存储系统Megastore&#xff08;一&#xff09;设计目标及方案选择&#xff08;二&#xff09;Megastore数据模型&#xff08;三&#xff09;Megastore中的事务及并发控制&#xff08;四&#xff09;Megastore基本架构&#xff08;五&#xff09;核心技术——复制…

pom.xml中的配置无法被yaml读取

问题描述 项目中指定了多个profiles, 但是application.yaml读取报错&#xff0c;报错信息如下 Standard Commons Logging discovery in action with spring-jcl: please remove commons-logging.jar from classpath in order to avoid potential conflicts 12:41:52.325 [mai…

数值分析复习:Newton插值

文章目录 牛顿&#xff08;Newton&#xff09;插值插值条件基函数插值多项式差商差商的基本性质差商估计差商的Leibniz公式 余项估计 本篇文章适合个人复习翻阅&#xff0c;不建议新手入门使用 牛顿&#xff08;Newton&#xff09;插值 插值条件 n1个插值节点 x 0 , x 1 , ……

使用 pnpm 搭建 monorepo 项目

引言 在我之前的开发经历中&#xff0c;并没有实际使用过 Monorepo 管理项目&#xff0c;尽管之前对此有所了解&#xff0c;但并未深入探究。然而&#xff0c;如今许多开源项目都采纳了 Monorepo 方式&#xff0c;对于不熟悉它的开发者来说&#xff0c;阅读和理解这些项目的源…

【HarmonyOS】ArkUI - 向左/向右滑动删除

核心知识点&#xff1a;List容器 -> ListItem -> swipeAction 先看效果图&#xff1a; 代码实现&#xff1a; // 任务类 class Task {static id: number 1// 任务名称name: string 任务${Task.id}// 任务状态finished: boolean false }// 统一的卡片样式 Styles func…

C语言快速入门之内存函数的使用和模拟实现

1.memcpy 它可以理解为memory copy的组合&#xff0c;memory有记忆的意思&#xff0c;这里指的是内存&#xff0c;copy是拷贝&#xff0c;这个函数是针对内存块进行拷贝的 函数原型 void* memcpy(void* destination,const void* source, size_t num); 从source位置开始&am…

ChatGPT国内镜像站大全

#今天在知乎看到一个问题&#xff1a;“平民不参与内测的话没有账号还有机会使用ChatGPT吗&#xff1f;” 从去年GPT大火到现在&#xff0c;关于GPT的消息铺天盖地&#xff0c;真要有心想要去用&#xff0c;途径很多&#xff0c;别的不说&#xff0c;国内GPT的镜像站到处都是&…

基于sortablejs实现拖拽element-ui el-table表格行进行排序

可以用原生的dragstart、drag、dragend、dragover、drop、dragleave实现这个效果&#xff0c;但是有现成的轮子就不要重复造了&#xff0c;看效果&#xff1a; <template><el-table :class"$options.name" :data"tableData" ref"table"…