简介
RabbitMQ是一款开源的消息队列中间件,它实现了高级消息队列协议(AMQP)标准。作为一个消息代理,RabbitMQ可以在应用程序之间可靠地传递和存储消息,并支持多种消息传递模式。
基本概念和特性
-
消息:在RabbitMQ中,消息是传输的基本单位。它由消息体和可选的属性组成,消息体是要传递的实际数据,而属性则包含有关消息的元数据信息。
-
队列:队列是消息的容器,它类似于一个缓冲区,用于存储待处理的消息。生产者将消息发送到队列,消费者从队列中接收和处理消息。
-
交换机:交换机是消息路由的核心组件,它接收来自生产者的消息,并根据特定的路由规则将消息分发给一个或多个绑定到它上面的队列。
-
绑定:绑定定义了交换机和队列之间的关系,它指定了消息在被发送到交换机时如何被路由到与之绑定的队列。
-
路由模式:RabbitMQ支持多种路由模式,包括直连、主题、扇出和头部路由等。不同的路由模式提供了不同的消息分发机制,以满足不同的应用需求。
-
可靠性:RabbitMQ提供了持久化消息、手动确认和事务等机制来确保消息的可靠性传递和处理。
-
高可用性:通过设置集群和镜像队列,RabbitMQ可以实现高可用性,确保即使某个节点或队列发生故障,系统仍然可用。
-
插件生态系统:RabbitMQ具有丰富的插件生态系统,可以扩展其功能和集成其他系统。
Spring Boot 整合 RabbitMQ 简单示例
- 添加依赖:在Spring Boot项目的pom.xml文件中添加以下依赖,以引入RabbitMQ客户端库:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置连接信息:在application.properties(或application.yml)文件中配置RabbitMQ的连接信息,如下所示
spring:# rabbitmq相关配置rabbitmq:host: 127.0.0.1port: 5672username: demopassword: demo#虚拟host 可以不设置,使用server默认hostvirtual-host: /demo# 链接超时时间connection-timeout: 1000# 缓存配置cache:channel:# 要保留在高速缓存中的通道数,注意此值不能超过 requested-channel-max size: 2980# 如果已达到高速缓存大小,则等待获取通道的持续时间。如果为0,则始终创建一个新通道。checkout-timeout: 500connection:# 链接模式 channel 和 connection 两种,建议channel,channel使用ThreadLocal绑定,记得使用线程池mode: channelpublisher-returns: truepublisher-confirm-type: simple# 监听器相关配置(可以看作消费者链接工厂配置)listener:# 设置默认连接器(simple)type: simple# 简单的链接工厂(默认)simple:# 可选最大消费者数量 cpu * 2max-concurrency: 20# 手动确认消费acknowledge-mode: manual# 初始化消费者数量concurrency: 10# 消费者一次从MQ服务器拉取的数据量,prefetch: 250# 最大channel 数量,该数量和rabbit mq server中配置相关,二者取最小值requested-channel-max: 3000# 生产者配置template:retry:# 是否开启重试enabled: true# 最大重试次数 5 次max-attempts: 5
- 创建消息发送者:创建一个用于发送消息的消息发送者(生产者)类。
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageSender {private final AmqpTemplate rabbitTemplate;@Autowiredpublic MessageSender(AmqpTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendMessage(String message) {rabbitTemplate.convertAndSend("<交换机名称>", "<路由键>", message);}
}
- 创建消息接收者:创建一个用于接收消息的消息接收者(消费者)类。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageReceiver {@RabbitListener(queues = "<队列名称>")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
}
注意事项
channel 部分
- channel的最大限制,取决于 requested-channel-max 和Rabbit-MQ server中配置相关,二者取最小值
- channel 是线程绑定对象,使用ThreadLocal存储,多线程环境下一定要注意
- channel 的数量,是针对每个应用,也就是说,如果server配置5000,requested-channel-max=5000,那么每个应用都可以开启5000个 channel
- channel 是消费者和生产者共享
- 如果channel 不足,则会抛出异常
org.springframework.amqp.AmqpTimeoutException: No available channels
,需要做好容错处理
消费者部分
- 在RabbitMQ的程序中,消费者称之为 Listener
- 消费者有两种模式,分别是 SIMPLE 和 DIRECT,推荐使用simple,通过配置参数
spring.rabbitmq.listener.type
进行设置- SIMPLE:RabbitMQ 使用者将消息分派到调用者线程的容器
- DIRECT :在 RabbitMQ 消费者线程上直接调用侦听器的容器。
- simple和direct 都有自己的独立配置,不要混用,混用也不会生效
- simple中 prefetch 参数是针对每个消费者的,具体工作流程参考下图
- concurrency 和 max-concurrency 数量差距不要太大,要不然数据会有比较严重的积压,因为扩充到 max也需要一定时间
生产者部分
-
连接管理:确保在处理完消息后正确关闭连接,以避免资源泄漏。建议使用连接池来管理和复用连接。连接池推荐使用
CachingConnectionFactory
,默认的也是这种 -
消息确认:当发送消息到 RabbitMQ 时,可以选择等待确认。这样可以确保消息被成功处理,或者在发生错误时进行重试。确保在代码中实现消息确认机制,以保证消息的可靠性。
-
消息持久化:为了防止消息丢失,在发送消息时应将消息标记为持久化。这样即使 RabbitMQ 服务意外关闭,消息也会被保存在磁盘上,并在重新启动后恢复。
-
序列化与反序列化:在将对象转换为消息发送到 RabbitMQ 之前,需要进行序列化操作。同样,在接收到消息后,需要进行相应的反序列化操作。确保选择一种适合你的数据类型和语言的序列化方式。并且你的生产者和消费者的序列化方式必须相同
-
错误处理:当发送消息时,要考虑可能出现的异常情况。例如,RabbitMQ 服务器不可用或消息队列已满。在代码中实现适当的错误处理机制,可以记录日志、重试发送或采取其他措施
附注,简易工作流程图
自己整理的 Spring boot 中RabbitMQ工作的简易流程图,可能有不对的地方,仅供参考