rabbitMQ死信队列快速编写记录

文章目录

  • 1.介绍
    • 1.1 什么是死信队列
    • 1.2 死信队列有什么用
  • 2. 如何编码
    • 2.1 架构分析
    • 2.2 maven坐标
    • 2.3 工具类编写
    • 2.4 consumer1编写
    • 2.5 consumer2编写
    • 2.6 producer编写
  • 3.整合springboot
    • 3.1 架构图
    • 3.2 maven坐标
    • 3.3 构建配置类,创建exchange,queue,并绑定
    • 3.4 编写生产者(controller的一个方法)
    • 3.5 编写消费者(一个类, 方法上加上@RabbitListenner,表明需要监听的queue)
  • 4. 常见参数汇总

1.介绍

1.1 什么是死信队列

再rabbitMQ中,有两个重要的组件。exchange(交换机),queue(队列)。交换机用于路由消息,简单来说就是接收客户端传递的消息转发到queue中。队列做的事情就是存储消息
但消息并不会一只存储在队列中。当存在一下三种情况,消息就会死掉

  • 队列存储不了过多的消息
  • 消息本身存在过期时间

当遇到死掉的消息时,我们通常会将这些死信转发到新的交换机中,这个交换机就叫做死信交换机,而配合死信交换机存储信息的队列,叫做死信队列

1.2 死信队列有什么用

死信队列在构建延迟队列时,有巨大作用。比如用户购票订单,30min不支付就过期。在rabbitMQ中可以这样实现

  • 1 存储购票信息到exchange-queue
  • 2 设置消息过期时间为30min
  • 3 如果超过30min消息未被消费(消息过期,成为死信),存储死信队列,通知服务取消订单

2. 如何编码

2.1 架构分析

先上绑定架构图
在这里插入图片描述

整个流程中,出现了三方。在编写代码时,我们可以分三个大类,分别是producer,consumer1,consumer2。其消息传递顺序如下

  • producer -> normal_exchange
  • normal_queue -> consumer1
  • dead_queue-> consumer2

我们可以在编写consumer1的时候,完成exchange和queue的创建与绑定

2.2 maven坐标

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>

2.3 工具类编写

rabbitMQ整体的编写流程如下

  • 创建Connection工厂
  • 配置工厂
  • 创建链接
  • 获取channel
  • 通过channel创建exchange, queue, 关系绑定, 监听消息, 发送消息

因此,我们可以先创建工具类,帮我们获取channel,以此减少开发代码量

public class MQUtils {public static Channel getChannel() throws Exception {// 创建工厂链接ConnectionFactory factory = new ConnectionFactory();// 设置工厂factory.setHost("your_ip");factory.setUsername("your_username");factory.setPassword("your_password");factory.setVirtualHost("/"); // 基本都是/Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

2.4 consumer1编写

public class Consumer1 {static String EXCHANGE_NAME = "normal_exchange";static String QUEUE_NAME = "normal_queue";static String DEAD_EXCHANGE_NAME = "dead_exchange";static String DEAD_QUEUE_NAME = "dead_queue";public static void main(String[] args) throws Exception{Channel channel = MQUtils.getChannel();// 普通交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, null);// 普通队列// 配置死信交换机参数HashMap<String, Object> map = new HashMap<>();// 配置normal_queue连接的dead_exchangemap.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);// 设置normal_queue的消息过期时间map.put("x-message-ttl", 10000);// 设置路由到死信交换机的路由key: lisimap.put("x-dead-letter-routing-key", "lisi");// map.put("x-max-length", 6); // 设置队列最大长度channel.queueDeclare(QUEUE_NAME, false, false, true, map);// 绑定普通交换机和普通队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, false, null);// 死信队列channel.queueDeclare(DEAD_QUEUE_NAME, false, false, true, null);// 绑定死信交换机和死信队列channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");// 监听channel.basicConsume(QUEUE_NAME, false, (consumerTag, message) -> {System.out.println("监听普通队列: " + new String(message.getBody()));}, consumerTag -> {});}
}

2.5 consumer2编写

public class Consumer2{static String DEAD_QUEUE_NAME = "dead_queue";public static void main(String[] args) throws Exception{Channel channel = MQUtils.getChannel();// 监听channel.basicConsume(DEAD_QUEUE_NAME, true, (consumerTag, message) -> {System.out.println("监听死信队列: " + new String(message.getBody()));}, consumerTag -> {});}
}

2.6 producer编写

public class Producer {static String EXCHANGE_NAME = "normal_exchange";public static void main(String[] args) throws Exception{Channel channel = MQUtils.getChannel();for (int i = 0; i < 10; i++) {String msg = i + "";channel.basicPublish(EXCHANGE_NAME, "zhangsan",null,msg.getBytes());}}
}

3.整合springboot

3.1 架构图

在这里插入图片描述

3.2 maven坐标

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.3.9.RELEASE</version></dependency>

3.3 构建配置类,创建exchange,queue,并绑定

package com.xhf.mq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TTLConfig {/*-----交换机名称-----*/private static final String EXCHANGE_NAME = "X";private static final String DEAD_EXCHANGE_NAME = "Y";/*-----队列名称-----*/private static final String QUEUE_NAME1 = "QA";private static final String QUEUE_NAME2 = "QB";private static final String DEAD_QUEUE_NAME = "QD";// 注册交换机@Bean("xExchange")public DirectExchange xExchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().build();
//        return new DirectExchange(EXCHANGE_NAME);}// 注册死信交换机@Bean("deadExchange")public DirectExchange deadExchange() {return ExchangeBuilder.directExchange(DEAD_EXCHANGE_NAME).autoDelete().build();}// 注册队列QA@Bean("queueA")public Queue queueA() {return QueueBuilder.nonDurable(QUEUE_NAME1).withArgument("x-message-ttl", 10000).withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME).withArgument("x-dead-letter-routing-key", "YD").build();}// 注册队列QB@Bean("queueB")public Queue queueB() {return QueueBuilder.nonDurable(QUEUE_NAME2).withArgument("x-message-ttl", 40000).withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME).withArgument("x-dead-letter-routing-key", "YD").build();}// 注册队列QD@Bean("queueD")public Queue queueD() {return QueueBuilder.nonDurable(DEAD_QUEUE_NAME).build();}// 绑定普通交换机, 队列@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") Exchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA").and(null);}@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") Exchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB").and(null);}// 绑定死信交换机, 队列@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("deadExchange") Exchange deadExchange) {return BindingBuilder.bind(queueD).to(deadExchange).with("YD").and(null);}
}

3.4 编写生产者(controller的一个方法)

   @GetMapping("/sendToQA")public void sendToQA() {// 向X交换机发送消息, 消息通过"XA"路由到队列rabbitTemplate.convertAndSend("X", "XA", "hello".getBytes());}

3.5 编写消费者(一个类, 方法上加上@RabbitListenner,表明需要监听的queue)

@Component
public class Customer {@RabbitListener(queues = "QD")public void customer(Message message, Channel channel) {byte[] body = message.getBody();System.out.println(new String(body));}
}

4. 常见参数汇总

  • x-dead-letter-exchange :死信交换机名称
  • x-message-ttl:消息time to live时间(过期时间)
  • x-dead-letter-routing-key:死信交换机路由key
  • x-max-length:队列最大长度

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/94231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

想要精通算法和SQL的成长之路 - 二叉树的判断问题(子树判断 | 对称性 | 一致性判断)

想要精通算法和SQL的成长之路 - 二叉树的判断问题 前言一. 相同的树二. 对称二叉树三. 判断子树 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 相同的树 原题链接 这题目典型的递归题&#xff1a; 如果两个节点都是null&#xff0c;我们返回true。如果两个节点一个nul…

centos 部署nginx 并配置https

centos版本&#xff1a;centos 7.8 &#xff08;最好不要用8&#xff0c;8的很多用法和7相差很大&#xff09; 一.安装nginx 1。下载Nginx安装包&#xff1a;首先&#xff0c;访问Nginx的官方网站&#xff08;https://nginx.org/&#xff09;或您选择的镜像站点&#xff0c;找…

C#学生选课及成绩查询系统

一、项目背景 学生选课及成绩查询系统是一个学校不可缺少的部分&#xff0c;传统的人工管理档案的方式存在着很多的缺点&#xff0c;如&#xff1a;效率低、保密性差等&#xff0c;所以开发一套综合教务系统管理软件很有必要&#xff0c;它应该具有传统的手工管理所无法比拟的…

关于算法复杂度的几张表

算法在改进今天的计算机与古代的计算机的区别 去除冗余 数据点 算法复杂度 傅里叶变换

解决java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.的错误

文章目录 1. 复现错误2. 分析错误3. 解决问题3.1 下载Hadoop3.2 配置Hadoop3.3 下载winutils3.4 配置winutils 1. 复现错误 今天在运行同事给我的项目&#xff0c;但在项目启动时&#xff0c;报出如下错误&#xff1a; java.io.FileNotFoundException: java.io.FileNotFoundEx…

嵌入式系统中C++内存管理基本方法

引言 说到 C 的内存管理&#xff0c;我们可能会想到栈空间的本地变量、堆上通过 new 动态分配的变量以及全局命名空间的变量等&#xff0c;这些变量的分配位置都是由系统来控制管理的&#xff0c;而调用者只需要考虑变量的生命周期相关内容即可&#xff0c;而无需关心变量的具…

基于SSM的电动车上牌管理系统(有报告)。Javaee项目。

演示视频&#xff1a; 基于SSM的电动车上牌管理系统&#xff08;有报告&#xff09;。Javaee项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringM…

IIS解决上传文件大小限制

IIS解决上传文件大小限制 目的&#xff1a;通过配置文件和IIS来解决服务器对上传文件大小的限制 1&#xff1a;修改配置文件&#xff08;默认为4M 值的大小根据自己情况进行修改&#xff09; <httpRuntime maxRequestLength"2048000" /> 2&#xff1a;修改IIS配…

专业图标制作软件 Image2icon 最新中文 for mac

Image2Icon是一款用于Mac操作系统的图标转换工具。它允许用户将常见的图像文件&#xff08;如PNG、JPEG、GIF等&#xff09;转换为图标文件&#xff08;.ico格式&#xff09;&#xff0c;以便在Mac上用作应用程序、文件夹或驱动器的自定义图标。 以下是Image2Icon的一些主要功…

java Spring Boot按日期 限制大小分文件记录日志

上文 java Spring Boot 将日志写入文件中记录 中 我们实现另一个将控制台日志写入到 项目本地文件的效果 但是 这里有个问题 比如 我项目是个大体量的企业项目 每天会有一百万用户访问 那我每天的日志都记载同一个文件上 那不跟没记没什么区别吗&#xff1f; 东西怎么找&#x…

智慧工地源代码 SaaS模式云平台

伴随着技术的不断发展&#xff0c;信息化手段、移动技术、智能穿戴及工具在工程施工阶段的应用不断提升&#xff0c;智慧工地概念应运而生&#xff0c;庞大的建设规模催生着智慧工地的探索和研发。 什么是智慧工地&#xff1f; 伴随着技术的不断发展&#xff0c;信息化手段、移…

算法基础学习|排序

快速排序 模板 void quick_sort(int q[], int l, int r) {if (l > r) return;int i l - 1, j r 1, x q[l r >> 1];while (i < j){do i ; while (q[i] < x);do j -- ; while (q[j] > x);if (i < j) swap(q[i], q[j]);}quick_sort(q, l, j)&#xf…

【AI视野·今日CV 计算机视觉论文速览 第259期】Tue, 3 Oct 2023

AI视野今日CS.CV 计算机视觉论文速览 Tue, 3 Oct 2023 (showing first 100 of 167 entries) Totally 100 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computer Vision Papers GPT-Driver: Learning to Drive with GPT Authors Jiageng Mao, Yuxi Qian, Hang Zha…

linux系统的启动流程

目录 简述linux的启动流程 git简介 Linux文件 Ubuntu文件汇总 linux文件属性 Linux命令行 更换软件源 简述linux的启动流程 韦东山课程学习路线&#xff1a;APP应用--DEV驱动--项目。 百问网官网 git资料&#xff1a;https://e.coding.net/weiongshan/01_all_series_qu…

【C++11】多线程

多线程创建线程thread提供的成员函数获取线程id的方式线程函数参数的问题线程join场景和detach 互斥量库&#xff08;mutex&#xff09;mutexrecursive_mutexlock_guard 和 unique_lock 原子性操作库&#xff08;atomic&#xff09;条件变量库&#xff08;condition_varuable&a…

【2】c++设计模式——>UML表示类之间的继承关系

继承也叫作泛化&#xff08;Generalization&#xff09;&#xff0c;用于描述父子类之间的关系&#xff0c;父类又称为基类或者超类&#xff0c;子类又称作派生类。在UML中&#xff0c;继承关系用带空心三角形的实线来表示。 关于继承关系一共有两种&#xff1a;普通继承关系和…

操作系统学习笔记1

文章目录 1、OS的一个宏观比喻2、OS的目的和功能3、OS的发展4、OS的运行机制5、OS的特征6、OS的体系结构 参考视频&#xff1a;操作系统 1、OS的一个宏观比喻 2、OS的目的和功能 3、OS的发展 4、OS的运行机制 中断、系统调用、异常。 5、OS的特征 6、OS的体系结构

解决ASP.NET Core的中间件无法读取Response.Body的问题

概要 本文主要介绍如何在ASP.NET Core的中间件中&#xff0c;读取Response.Body的方法&#xff0c;以便于我们实现更多的定制化开发。本文介绍的方法适用于.Net 3.1 和 .Net 6。 代码和实现 现象解释 首先我们尝试在自定义中间件中直接读取Response.Body&#xff0c;代码如…

【算法训练-二分查找 三】【特殊二分】寻找峰值

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【数组的二分查找】&#xff0c;使用【数组】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为…