RabbitMQ——死信队列和延迟队列

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

RabbitMQ——死信队列和延迟队列

1、死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

死信的来源

在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

  • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
  • 消息过期(TTL,Time-To-Live)。
  • 队列长度超过限制,无法再添加数据到mq中。

生产者

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;/*** @Author 方唐镜* @Create 2024-03-03 14:08* @Description*/
public class Produce {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//模拟消息过期 10s//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "hello world" + i;channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());}}
}

消费者

正常队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";private static final String NORMAL_QUEUE = "normal_queue";private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机和队列channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");//声明普通交换机和队列channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//死信配制 指定死信交换机和死信路由键Map<String, Object> map = new HashMap<>();map.put("x-dead-letter-exchange", DEAD_EXCHANGE);map.put("x-dead-letter-routing-key", "dead-routing-key");//最大长度//map.put("x-max-length", 6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.contains("5")){System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");//拒绝消息并把消息丢入死信队列channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer01接收消息:" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});}
}

死信队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer02 {private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.basicConsume(DEAD_QUEUE, true,(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),(consumerTag, e) -> {});}
}

生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

2、基于插件的延迟队列

延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

使用场景

  1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
  2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
  3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
  4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
  5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
  6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

2.1、安装延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

以docker方式安装

1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins

插件版本和rabbitmq版本一致

2、进入容器查看插件

在这里插入图片描述

3、启动插件

root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart 7c8726620871

5、安装成功

在这里插入图片描述

2.2、代码实例

配置类

package springbootrabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configuration
public class DelayedQueueConfig {//    队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//    交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//    routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingKey";//    声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//    声明交换机 基于插件的交换机@Beanpublic CustomExchange delayedExchange() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");/** 1.交换机名称* 2.交换机类型* 3.是否需要持久化* 4.是否需要自动删除* 5.其他参数* */return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);}//    绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {//            发送消息的时候 延迟时长msg.getMessageProperties().setDelay(delayTime);return msg;});
}

消费者

@Slf4j
@Component
public class DelayedQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/751887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【HTML】HTML表单8.2(表单标签2)

目录 接上期&#xff0c;大致实现效果 文章简要 注释&#xff1a;这一次介绍的很多效果需要后期与服务器配合&#xff0c;但我们这里先只介绍效果 ①提交按钮 ②获取验证码 ③上传文件 ④还原所有表单内容 ⑤下拉表单 ⑥文字域 接上期&#xff0c;大致实现效果 文章简要 注…

matlab中Signal Editor定义梯形信号输出矩形信号

matlab中Signal Editor定义梯形信号输出矩形信号&#xff0c;可以通过如下勾选差值数据实现梯形信号输出。

GPT-1, GPT-2, GPT-3, InstructGPT / ChatGPT and GPT-4 总结

1. GPT-1 What the problem GPT-1 solve? 在 GPT-1 之前&#xff0c;NLP 通常是一种监督模型。 对于每个任务&#xff0c;都有一些标记数据&#xff0c;然后根据这些标记数据开发监督模型。 这种方法存在几个问题&#xff1a;首先&#xff0c;需要标记数据。 但 NLP 不像 CV&…

云原生部署手册02:将本地应用部署至k8s集群

&#xff08;一&#xff09;部署集群镜像仓库 1. 集群配置 首先看一下集群配置&#xff1a; (base) ➜ ~ multipass ls Name State IPv4 Image master Running 192.168.64.5 Ubuntu 22.04 LTS1…

一. 并行处理与GPU体系架构-GPU并行处理

目录 前言0. 简述1. 这个小节会涉及到的关键字2. CPU与GPU在并行处理的优化方向3. Summary总结参考 前言 自动驾驶之心推出的 《CUDA与TensorRT部署实战课程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 本次课程我们来学习下课程第一章——并行处理与GPU体…

Google云计算原理与应用(三)

目录 五、分布式存储系统Megastore&#xff08;一&#xff09;设计目标及方案选择&#xff08;二&#xff09;Megastore数据模型&#xff08;三&#xff09;Megastore中的事务及并发控制&#xff08;四&#xff09;Megastore基本架构&#xff08;五&#xff09;核心技术——复制…

pom.xml中的配置无法被yaml读取

问题描述 项目中指定了多个profiles, 但是application.yaml读取报错&#xff0c;报错信息如下 Standard Commons Logging discovery in action with spring-jcl: please remove commons-logging.jar from classpath in order to avoid potential conflicts 12:41:52.325 [mai…

使用 pnpm 搭建 monorepo 项目

引言 在我之前的开发经历中&#xff0c;并没有实际使用过 Monorepo 管理项目&#xff0c;尽管之前对此有所了解&#xff0c;但并未深入探究。然而&#xff0c;如今许多开源项目都采纳了 Monorepo 方式&#xff0c;对于不熟悉它的开发者来说&#xff0c;阅读和理解这些项目的源…

【HarmonyOS】ArkUI - 向左/向右滑动删除

核心知识点&#xff1a;List容器 -> ListItem -> swipeAction 先看效果图&#xff1a; 代码实现&#xff1a; // 任务类 class Task {static id: number 1// 任务名称name: string 任务${Task.id}// 任务状态finished: boolean false }// 统一的卡片样式 Styles func…

C语言快速入门之内存函数的使用和模拟实现

1.memcpy 它可以理解为memory copy的组合&#xff0c;memory有记忆的意思&#xff0c;这里指的是内存&#xff0c;copy是拷贝&#xff0c;这个函数是针对内存块进行拷贝的 函数原型 void* memcpy(void* destination,const void* source, size_t num); 从source位置开始&am…

ChatGPT国内镜像站大全

#今天在知乎看到一个问题&#xff1a;“平民不参与内测的话没有账号还有机会使用ChatGPT吗&#xff1f;” 从去年GPT大火到现在&#xff0c;关于GPT的消息铺天盖地&#xff0c;真要有心想要去用&#xff0c;途径很多&#xff0c;别的不说&#xff0c;国内GPT的镜像站到处都是&…

基于sortablejs实现拖拽element-ui el-table表格行进行排序

可以用原生的dragstart、drag、dragend、dragover、drop、dragleave实现这个效果&#xff0c;但是有现成的轮子就不要重复造了&#xff0c;看效果&#xff1a; <template><el-table :class"$options.name" :data"tableData" ref"table"…

Docker进阶教程 - 1 Dockerfile

更好的阅读体验&#xff1a;点这里 &#xff08; www.doubibiji.com &#xff09; 1 Dockerfile Dockerfile 是做什么的&#xff1f; 我们前面说到&#xff0c;制作镜像的方法主要有两种方式&#xff1a; 使用 docker commit 命令&#xff1b;使用 Dockerfile 文件。 但是…

leetcode每日一题310.最小高度树

目录 一.题目原型 二.题目思路 三.代码实现 一.题目原型 二.题目思路 首先&#xff0c;我们看了样例&#xff0c;发现这个树并不是二叉树&#xff0c;是多叉树。 然后&#xff0c;我们可能想到的解法是&#xff1a;根据题目的意思&#xff0c;就挨个节点遍历bfs&#xff0c;…

瑞_Redis_短信登录_Redis代替session的业务流程

文章目录 项目介绍1 短信登录1.1 项目准备1.2 基于Session实现登录流程1.3 Redis代替session的业务流程1.3.1 设计key的结构1.3.2 设计key的具体细节1.3.3 整体访问流程1.3.4 代码实现 &#x1f64a; 前言&#xff1a;本文章为瑞_系列专栏之《Redis》的实战篇的短信登录章节的R…

springboot项目读取excel表格内容到数据库,excel表格字段为整数的读取方法

在我昨天的项目中&#xff0c;我需要把excel表格中字段为整数的字段读取到数据库中进行保存&#xff0c;但是在内置方法中并没有读取整数的方法&#xff08;也有可能是我没发现&#xff0c;太菜了~~&#xff09;&#xff0c;那接下来我就提供给大家一个简单地方法来读取excel表…

Apache-Doris基础概念

OLAP数据库Doris 一、Doris架构二、基本概念1. Row & Column2. Partition & Tablet3. 建表示例&#xff08;1&#xff09;列的定义&#xff08;2&#xff09;分区分桶&#xff08;3&#xff09;多列分区&#xff08;4&#xff09;PROPERTIES&#xff08;5&#xff09;E…

【LabVIEW FPGA入门】单周期定时循环

单周期定时循环详解 单周期定时环路是FPGA编程中最强大的结构之一。单周期定时循环中的代码更加优化&#xff0c;在FPGA上占用更少的空间&#xff0c;并且比标准While循环中的相同代码执行得更快。单周期定时环路将使能链从环路中移除&#xff0c;以节省FPGA上的空间。…

windows下使用tree指定层数生成项目结构

windows自带的tree太辣鸡了&#xff0c;我们需要找东西代替 工具链接 Tree for Windows (sourceforge.net) 点击这里下载 置入Git 解压下载的压缩包&#xff0c;将bin目录下的exe复制下来 进入你的Git目录&#xff0c;将其放入Git目录下的usr/bin目录下 打开Git Bash 输入…

如何在Mac中删除照片?这里有详细步骤

前言 本文介绍如何从Mac中删除照片,以释放硬盘空间或更好地组织文件和文件夹。 如何使用废纸篓删除Mac上的图片 在Mac上删除图片的最简单方法之一是使用废纸篓功能。学习只需几秒钟。下面是如何删除单个图片以及如何在Mac上删除多个图片,以及一些关键和有用的提示,以使该…