SpringBoot整合RabbitMQ消息中间件,实现延迟队列和死信队列

在现代的分布式系统中,消息队列作为一种重要的中间件,广泛应用于系统解耦、流量削峰、异步处理等场景。而RabbitMQ作为其中一款流行的消息队列中间件,因其高性能和丰富的功能受到众多开发者的青睐。本文将详细介绍如何在SpringBoot项目中整合RabbitMQ,实现延迟队列和死信队列,以满足复杂业务需求。

一、RabbitMQ简介

RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的开源消息代理系统,主要由以下几个部分组成:

  1. Producer(生产者):消息的发送者。
  2. Consumer(消费者):消息的接收者。
  3. Queue(队列):存储消息的容器。
  4. Exchange(交换机):接收生产者发送的消息,并根据绑定规则(Binding)将消息路由到队列。
  5. Binding(绑定):将交换机与队列绑定的规则。

RabbitMQ支持多种交换机类型,如Direct、Fanout、Topic、Headers等,灵活性极高。

二、SpringBoot整合RabbitMQ

2.1 引入依赖

在SpringBoot项目中,我们可以通过引入Spring AMQP(Spring与RabbitMQ的集成框架)来快速整合RabbitMQ。在pom.xml中添加如下依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 配置RabbitMQ

application.ymlapplication.properties中配置RabbitMQ连接信息:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

2.3 定义配置类

创建RabbitMQ的配置类,定义交换机、队列和绑定关系。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange");}// 定义队列@Beanpublic Queue queue() {return new Queue("queue");}// 定义绑定关系@Beanpublic Binding binding(Queue queue, DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("routingKey");}
}

2.4 生产者

定义消息生产者,将消息发送到指定的交换机和路由键。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("directExchange", "routingKey", message);}
}

2.5 消费者

定义消息消费者,从队列中接收并处理消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = "queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

三、实现延迟队列

延迟队列的需求在很多场景下非常常见,例如订单超时处理、消息重试等。RabbitMQ本身并不直接支持延迟队列功能,但我们可以通过TTL(Time-To-Live)和DLX(Dead Letter Exchange)机制来实现。

3.1 配置延迟队列

首先,我们需要定义一个用于存储延迟消息的队列,并配置其TTL和死信交换机:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayedQueueConfig {// 定义死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("deadLetterExchange");}// 定义死信队列@Beanpublic Queue deadLetterQueue() {return new Queue("deadLetterQueue");}// 定义死信队列与死信交换机的绑定关系@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("deadLetterRoutingKey");}// 定义延迟队列,并设置其TTL和死信交换机@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayedQueue").withArgument("x-dead-letter-exchange", "deadLetterExchange").withArgument("x-dead-letter-routing-key", "deadLetterRoutingKey").withArgument("x-message-ttl", 60000) // 60秒TTL.build();}// 定义延迟队列的交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange("delayedExchange");}// 定义延迟队列与延迟交换机的绑定关系@Beanpublic Binding delayedBinding() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayedRoutingKey");}
}

3.2 发送延迟消息

在生产者中,发送消息到延迟队列:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class DelayedMessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayedMessage(String message) {rabbitTemplate.convertAndSend("delayedExchange", "delayedRoutingKey", message);}
}

3.3 消费延迟消息

定义消费者,从死信队列中接收并处理延迟后的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class DelayedMessageConsumer {@RabbitListener(queues = "deadLetterQueue")public void receiveDelayedMessage(String message) {System.out.println("Received delayed message: " + message);}
}

四、实现死信队列

死信队列用于处理无法正常消费的消息。通常情况下,消息在以下情况会进入死信队列:

  1. 消息被拒绝(basic.reject或basic.nack)并且requeue参数设置为false。
  2. 消息在队列中的TTL过期。
  3. 队列的最大长度限制被超出。

4.1 配置死信队列

我们在前面已经定义了死信队列和死信交换机,这里我们进一步探讨如何将普通队列配置为支持死信消息:

@Configuration
public class DeadLetterQueueConfig {// 定义普通队列,并配置其死信交换机和死信路由键@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normalQueue").withArgument("x-dead-letter-exchange", "deadLetterExchange").withArgument("x-dead-letter-routing-key", "deadLetterRoutingKey").build();}// 定义普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normalExchange");}// 定义普通队列与普通交换机的绑定关系@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRoutingKey");}
}

4.2 生产消息

在生产者中,将消息发送到普通队列:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class NormalMessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendNormalMessage(String message) {rabbitTemplate.convertAndSend("normalExchange", "normalRoutingKey", message);}
}

4.3 消费消息

定义消费者,从普通队列中接收消息,如果出现问题则将消息转移到死信队列:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class NormalMessageConsumer {@RabbitListener(queues = "normalQueue")public void receiveNormalMessage(String message) {try {// 模拟处理逻辑System.out.println("Processing message: " + message);// 模拟异常情况if ("error".equals(message)) {throw new RuntimeException("Processing error");}} catch (Exception e) {// 消息处理失败,拒绝并不重新入队System.out.println("Message processing failed: " + message);throw new AmqpRejectAndDontRequeueException("Message rejected");}}
}

4.4 消费死

信消息

定义死信消息消费者,从死信队列中接收并处理无法正常消费的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class DeadLetterMessageConsumer {@RabbitListener(queues = "deadLetterQueue")public void receiveDeadLetterMessage(String message) {System.out.println("Received dead letter message: " + message);// 处理死信消息的逻辑}
}

五、总结

通过本文,我们详细介绍了如何在SpringBoot项目中整合RabbitMQ,并实现延迟队列和死信队列的功能。我们先介绍了RabbitMQ的基本概念,然后逐步讲解了如何配置RabbitMQ、定义生产者和消费者,最后重点介绍了延迟队列和死信队列的实现方式。希望本文能够帮助开发者更好地理解和应用RabbitMQ,实现更加健壮和灵活的消息处理系统。

在实际开发中,消息队列的配置和使用可能会因具体业务需求而有所不同,开发者应根据自身需求进行调整和优化。同时,RabbitMQ提供了丰富的功能,如消息优先级、消息确认、集群部署等,开发者可以深入学习和应用这些功能,以构建高性能和高可用的分布式系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

笔记95:车辆横向动力学方程转化为误差形式 -- 详细推导过程

1. 非误差型车辆横向动力学方程 注&#xff1a;关于轮胎侧偏刚度的正负 深蓝课程推导得到的车辆横向动力学返程使用的轮胎侧偏刚度是默认为正数&#xff1b;老王课程推导得到的车辆横向动力学方程使用的轮胎侧偏刚度是默认为负数&#xff1b; 1.1 深蓝课程推导得到的方程&…

如何计算 GPT 的 Tokens 数量?

基本介绍 随着人工智能大模型技术的迅速发展&#xff0c;一种创新的计费模式正在逐渐普及&#xff0c;即以“令牌”&#xff08;Token&#xff09;作为衡量使用成本的单位。那么&#xff0c;究竟什么是Token呢&#xff1f; Token 是一种将自然语言文本转化为计算机可以理解的…

kafka集成flink api编写教程

1.引入依赖&#xff08;pox.xml&#xff09; <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.6</version></dependency><dependency><gro…

【C++ | 拷贝赋值运算符函数】一文了解C++的 拷贝赋值运算符函数

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; ⏰发布时间⏰&#xff1a;2024-06-09 1…

对WEB标准以及W3C的理解与认识

Web标准简单来说可以分为结构&#xff0c;表现&#xff0c;行为&#xff1a; 结构&#xff08;HTML&#xff09;: HTML&#xff08;HyperText Markup Language&#xff09;定义了网页的结构和内容。它通过各种标签来组织信息&#xff0c;如标题、段落、图像、链接等。HTML 提供…

antd DatePicker 日期 与 时间 分开选择

自定义组件 import { DatePicker } from "antd"; import dayjs from "dayjs"; import { FC, useRef } from "react";/*** 日期 与 时间 分开选择** 版本号: * "antd": "^5.17.4",* "dayjs": "^1.11.11"…

树莓派debain 12更换apt-get源到阿里源

1、备份 总共需要备份两个文件 a、/etc/apt/sources.list.d/raspi.list b、/etc/apt/sources.list 2、删除上述两个文件内到所有内容&#xff0c;然后添加如下内容 /etc/apt/sources.list.d/raspi.list deb https://mirrors.aliyun.com/debian/ bookworm main non-free non…

给gRPC增加负载均衡功能

在现代的分布式系统中&#xff0c;负载均衡是确保服务高可用性和性能的关键技术之一。而gRPC作为一种高性能的RPC框架&#xff0c;自然也支持负载均衡功能。本文将探讨如何为gRPC服务增加负载均衡功能&#xff0c;从而提高系统的性能和可扩展性。 什么是负载均衡&#xff1f; …

域名的端口号范围

域名的端口号范围是从0到65535。这些端口可以大致分为两类&#xff1a; 知名端口&#xff08;Well-Known Ports&#xff09;&#xff1a;范围从0到1023。这些端口号一般固定分配给一些服务&#xff0c;如21端口分配给FTP服务&#xff0c;25端口分配给SMTP&#xff08;简单邮件…

新手如何学习编程!

选择编程语言&#xff1a;根据你的兴趣和目标选择一门编程语言。例如&#xff0c;Python 适合初学者和数据科学&#xff0c;JavaScript 适合网页开发&#xff0c;Java 和 C# 适合企业级应用。 理解基本概念&#xff1a;学习编程的基本概念&#xff0c;如变量、数据类型、控制结…

Ansible——stat模块

目录 参数总结 返回值 基础语法 常见的命令行示例 示例1&#xff1a;检查文件是否存在 示例2&#xff1a;获取文件详细信息 示例3&#xff1a;检查目录是否存在 示例4&#xff1a;获取文件的 MD5 校验和 示例5&#xff1a;获取文件的 MIME 类型 高级使用 示例6&…

[leetcode]longest-common-prefix 最长公共前缀

. - 力扣&#xff08;LeetCode&#xff09; 编写一个函数来查找字符串数组中的最长公共前缀。 如果不存在公共前缀&#xff0c;返回空字符串 ""。 示例 1&#xff1a; 输入&#xff1a;strs ["flower","flow","flight"] 输出&…

第52集《摄大乘论》

请大家打开《讲义》第一七二页&#xff0c;戊七、辨修圆满。 前一科我们讲到观照力。这观照力&#xff0c;六波罗蜜多里面的观照力&#xff0c;是观照我空、法空的真如理&#xff0c;使令内心能够得到安住&#xff1b;另外在六波罗蜜多以外&#xff0c;又开出四种波罗蜜多&…

03 Linux 内核数据结构

Linux kernel 有四种重要的数据结构:链表、队列、映射、二叉树。普通驱动开发者只需要掌握链表和队列即可。 链表和队列 Linux 内核都有完整的实现,我们不需要深究其实现原理,只需要会使用 API 接口即可。 1、链表 链表是 Linux 内核中最简单、最普通的数据结构。链表是一…

19082 中位特征值

【2022】贝壳找房秋招测试开发工程师笔试卷2 给你一棵以T为根&#xff0c;有n个节点的树。&#xff08;n为奇数&#xff09;每个点有一个价值V&#xff0c;并且每个点有一个特征值P。 每个点的特征值P为&#xff1a;以这个点为根的子树的所有点&#xff08;包括根&#xff09;…

C#面:应⽤程序池集成模式和经典模式的区别

C# 应用程序池是用于托管和执行应用程序的进程。在 IIS&#xff08;Internet Information Services&#xff09;中&#xff0c;C# 应用程序池有两种集成模式&#xff1a;集成模式和经典模式。 集成模式&#xff08;Integrated Mode&#xff09;&#xff1a; 集成模式是 IIS 7…

深度网络及经典网络简介

深度网络及经典网络简介 导语加深网络一个更深的CNN提高识别精度Data Augmentation 层的加深 经典网络VGGGoogLeNetResNet 高速学习迁移学习GPU分布式学习计算位缩减 强化学习总结参考文献 导语 深度学习简单来说&#xff0c;就是加深了层数的神经网络&#xff0c;前面已经提到…

Java:110-SpringMVC的底层原理(上篇)

SpringMVC的底层原理 在前面我们学习了SpringMVC的使用&#xff08;67章博客开始&#xff09;&#xff0c;现在开始说明他的原理&#xff08;实际上更多的细节只存在67章博客中&#xff0c;这篇博客只是讲一点深度&#xff0c;重复的东西尽量少说明点&#xff09; MVC 体系结…

深入理解指针(三)

一、指针运算 1.1指针-整数 下面我们来看一个指针加整数的例子&#xff1a; #include<stdio.h> int main() { int arr[10] { 1,2,3,4,5,6,7,8,9,10 }; int* p &arr[0]; int i 0; int sz sizeof(arr) / sizeof(arr[0]); for (i 0; i < …

Netty原理与实战

1.为什么选择Netty&#xff1f; 高性能低延迟 事件分发器&#xff1a; reactor采用同步IO&#xff0c;Proactor采用异步IO 网络框架选型&#xff1a; 2.Netty整体架构设计&#xff08;4.X&#xff09; 三个模块&#xff1a;Core核心层、Protocal Support协议支持层、…