RabbitMQ基础(简单易懂)

RabbitMQ高级篇请看:

RabbitMQ高级篇-CSDN博客

目录

什么是RabbitMQ?

MQ 的核心概念

1. RabbitMQ 的核心组件

2. Exchange 的类型

3. 数据流向说明

 如何安装RabbitQueue?

WorkQueue(工作队列):

Fanout交换机 :广播

Direct交换机

Direct 交换机

生产者代码

消费者代码

Topic交换机 

 SpringAMQP

​编辑

1. maven依赖:

2. 配置服务端信息 

简单示例 

注解式声明队列和交换机

​编辑

消息转换器

总结如何在IDEA当中使用RabbitMQ


什么是RabbitMQ?

它基于AMQP协议(Advanced Message Queuing Protocol),一种为应用构建消息队列的标准协议。过程中,它提供了一些重要模块:为消息发送的Producer(生产者),分发消息的Exchange(交换器),写入消息的Queue(队列),和读取消息的Consumer(消费者)。


MQ 的核心概念

1. 异步处理

问题:系统需要执行一些耗时操作(如发送邮件、生成报告),如果这些操作在主线程执行,会影响用户体验或导致系统响应变慢。

MQ 的解决方式

  • 生产者将任务消息放入队列,不需要等待任务完成。
  • 消费者在后台异步处理任务。

示例

  • 用户下单后,系统需要发送订单确认邮件。如果没有 MQ,用户可能需要等待邮件发送完成后才能收到订单确认。
  • 使用 MQ 后,生产者(订单服务)将“发送邮件”任务放入队列,消费者(邮件服务)异步处理。

2. 系统解耦

问题:系统服务之间高度耦合,一个服务的变化会导致多个服务需要修改,降低开发效率和系统灵活性。

MQ 的解决方式

  • 服务之间通过消息队列通信,而不是直接调用。
  • 生产者只需要发送消息到 MQ,消费者负责处理消息,二者互不影响。

示例

  • 用户下单后,订单服务需要通知库存服务扣减库存、物流服务生成物流单。如果没有 MQ,订单服务需同步调用这些服务的接口,导致用户必须等待所有操作完成(线性操作),响应时间较长,且系统耦合度高。
  • 使用 MQ 后,订单服务将消息发送到消息队列,并立即返回“下单成功”的响应。库存服务和物流服务异步订阅消息进行处理,各服务独立运行,彼此解耦。这样既提升了用户体验,也增强了系统的扩展性和稳定性。

3. 削峰填谷

问题:在高并发场景下,大量请求瞬间涌入,可能导致服务过载或崩溃。

MQ 的解决方式

  • 将高并发的请求存入队列,消费者按自己的能力逐步处理。
  • 队列可以作为缓冲区,平衡生产者和消费者之间的处理速度。

示例

  • 秒杀活动中,用户请求大量涌入库存系统。没有 MQ,库存服务可能因并发过高而宕机。
  • 使用 MQ 后,所有秒杀请求进入队列,库存服务按顺序逐一处理。

好处

  • 防止系统崩溃,保障服务稳定性。

4. 数据可靠性

问题:数据传输过程中,可能因为网络故障、系统宕机等原因导致消息丢失。

MQ 的解决方式

  • MQ 提供消息持久化功能,确保即使系统故障,消息也不会丢失。
  • 支持消息重试机制,确保消息至少被处理一次。

示例

  • 支付系统发送“支付成功”消息给订单系统。如果没有 MQ,网络抖动可能导致消息丢失,订单状态无法更新。
  • 使用 MQ 后,消息持久化到磁盘,消费者故障恢复后可继续消费消息。

1. RabbitMQ 的核心组件

 Virtual Host(虚拟主机)

定义:RabbitMQ 中的逻辑隔离单位,类似于一个独立的命名空间。

作用

  • 用于实现不同用户或系统之间的隔离。
  • 每个 Virtual Host 下可以有独立的 Exchange(交换机)、Queue(队列)和绑定关系。
  • 一个 RabbitMQ 服务器可以有多个 Virtual Host。

应用场景

  • 多租户系统(例如,不同的业务模块可以使用不同的 Virtual Host)。

 Publisher(消息发送者)

  • 定义:负责向 RabbitMQ 发送消息的生产者应用程序。
  • 功能
    • 将消息发送到 Exchange(交换机),而不是直接发送到 Queue。
  • 注意
    • Publisher 和 Exchange 通过绑定关系决定消息的路由。

 Consumer(消息消费者)

定义:负责从 Queue(队列)中接收消息并处理的应用程序。

功能

  • 消费者直接从队列中读取消息。
  • 每个消息只会被一个消费者处理(点对点模式)。

 Queue(队列)

定义:存储消息的缓冲区,用于临时保存消息。

功能

  • 消息最终会路由到队列,并由消费者从队列中消费。
  • 队列可以绑定到多个 Exchange,并可根据路由规则接收不同的消息。

特点

  • 持久化队列可以配置为持久化(即使 RabbitMQ 服务重启,消息也不会丢失)。
  • 排队顺序:消息按照 FIFO(先进先出)的顺序进行消费。

Exchange(交换机)

定义:负责根据路由规则分发消息的组件。

功能

  • 接收 Publisher 发送的消息,并根据路由规则决定将消息发送到哪个 Queue。
  • 不直接存储消息,消息总是路由到队列中。

2. Exchange 的类型

根据不同的消息路由方式,Exchange 有以下几种类型:

Direct(直连交换机)

  • 根据完全匹配的路由键(Routing Key)将消息发送到指定的队列。
  • 适用场景:精准匹配,例如订单状态更新。

Fanout(广播交换机)

  • 将消息广播到所有绑定的队列,而不考虑路由键。
  • 适用场景:日志广播、通知推送。

Topic(交换机)

  • 根据通配符匹配路由键,将消息路由到符合条件的队列。
  • 适用场景:动态路由,例如按照“日志级别.模块名”匹配日志消息。


3. 数据流向说明

Publisher -> Exchange

  • 消息发送者(Publisher)将消息发送到 RabbitMQ 的 Exchange(交换机)。
  • 发送时需要指定 Routing Key,用于路由消息。

Exchange -> Queue

  • Exchange 根据绑定关系和路由规则,将消息分发到一个或多个队列(Queue)。
  • 如果没有匹配的队列,消息可能会被丢弃或进入死信队列(DLQ,Dead Letter Queue)。

Queue -> Consumer

  • 消费者(Consumer)从队列中拉取消息并进行处理。
  • 消费者可以是多个,每个消息只能被一个消费者消费(在同一队列中)。

 如何安装RabbitQueue?

‍​​​‌​​​​‍‍​​​⁠‬‍​​⁠‌​‬‬‌​​​​​‌​​‍‌‌​⁠​​‌​‌​​day06-MQ基础 - 飞书云文档 (feishu.cn)

查看以上文档的第二部分即可


WorkQueue(工作队列):

工作队列的核心理念

  1. 任务分发

    • 消息(任务)由一个生产者(Producer)发送到队列。
    • 多个消费者(Consumer)从同一个队列中取出消息并处理。
  2. 负载均衡

    • 消息会按照一定规则分发给消费者,通常每个消费者会处理相同数量的任务(轮询机制)
    • 不同消费者可以根据其处理能力自行调整消费速率。可以通过配置的prefetch来设置
  3. 解耦和异步

    • 生产者和消费者无需直接交互。生产者将任务发送到队列即可,消费者独立从队列中获取任务并处理。

Fanout交换机 :广播

通过 Fanout Exchange 可以实现消息的广播分发,将消息发送给所有绑定的队列。结合 Spring 提供的 RabbitTemplate 工具,可以方便地向交换机发送消息,极大简化了开发流程。这种模式非常适用于日志收集、通知广播等场景。

发送消息到交换机的 API 示例

@Test
public void testFanoutExchange() {// 定义交换机的名称String exchangeName = "itcast.fanout";// 定义消息内容String message = "Hello, everyone!";// 发送消息到指定交换机// 参数解释:// - exchangeName:交换机名称// - routingKey:路由键(在 Fanout 交换机中会被忽略)// - message:发送的消息内容rabbitTemplate.convertAndSend(exchangeName, "", message);
}


Direct交换机

Direct 交换机会根据消息携带的 RoutingKey,将消息发送到与交换机绑定且 RoutingKey 完全匹配的队列。

特性精确匹配分发消息广播到所有绑定的队列
RoutingKey 是否重要必须匹配不考虑,直接广播
绑定关系每个队列可以绑定不同的 RoutingKey所有绑定的队列都会接收消息
适用场景精确路由,如订单状态更新广播通知,如日志、系统更新
Direct 交换机

场景:订单服务需要将不同类型的订单路由到对应的队列(如普通订单和优先订单)。

绑定关系

  • 队列 normal_orders 绑定 RoutingKey = normal
  • 队列 priority_orders 绑定 RoutingKey = priority

消息发送

  • 消息 RoutingKey = normal,会被路由到 normal_orders 队列。
  • 消息 RoutingKey = priority,会被路由到 priority_orders 队列。

生产者代码

生产者将消息发送到 Direct 交换机,并指定不同的 RoutingKey

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;@Service
public class SimpleProducer {private final RabbitTemplate rabbitTemplate;public SimpleProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendMessage(String message) {// 直接发送消息到交换机rabbitTemplate.convertAndSend("simple_exchange", "simple_routing_key", message);System.out.println("Sent message: " + message);}
}

消费者代码

定义两个消费者,分别监听普通队列和优先队列。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class SimpleConsumer {// 监听队列,直接声明队列名称@RabbitListener(queues = "simple_queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

Topic交换机 

opic Exchange 是 RabbitMQ 的一种交换机类型,它根据消息的路由键(RoutingKey)和绑定键(BindingKey)的模式匹配规则,将消息路由到一个或多个队列。它是 Direct Exchange 的增强版,支持模糊匹配和通配符。

匹配规则精确匹配(完全相等)模糊匹配(支持 * 和 # 通配符)
RoutingKey 示例order.createdorder.*order.#
适用场景简单、明确的路由需求复杂、动态的路由需求
扩展性固定路由,灵活性低动态路由,灵活性高

具体匹配示例:

RoutingKey 模式消息 RoutingKey是否匹配
order.*order.created✔️
order.*order.created.new

 SpringAMQP

Spring AMQP 是 Spring 提供的一个用于与 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 通信的模块化框架。它为基于 Spring 的应用程序集成 AMQP 消息中间件(例如 RabbitMQ)提供了便捷的方法,简化了消息的发送、接收和处理。 

1. maven依赖:
      <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. 配置服务端信息 
spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 控制消费者预取的消息数量,处理完一条再处理,如果希望RabbitMQ轮询访问可以不设置这个

什么是 prefetch

prefetch(预取数量) 是 RabbitMQ 的一个设置,用于控制消息消费者(Consumer)每次从队列中预取消息的数量。

它定义了在消费者确认(ACK)之前,RabbitMQ 可以向消费者发送的未确认消息的最大数量。

场景:

假如 prefetch 设置为 1,RabbitMQ 会向消费者一次发送 1 条消息,只有这条消息被确认后,才会发送下一条消息。

如果设置为一个较大的数字(例如 10),RabbitMQ 会一次性发送多条消息,消费者可以并行处理这些消息。

为什么需要设置 prefetch

在多个消费者监听同一个队列的场景下prefetch 设置为 1,可以确保消息在消费者之间更均匀地分布。

防止某些消费者处理速度慢但仍然接收大量消息,导致处理延迟。

简单示例 

发送消息

@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("exchange_name", "routing_key", message);
}

接收消息

@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("Received: " + message);
}


注解式声明队列和交换机

@RabbitListener

用于监听队列,当队列接收到消息时,触发对应方法处理消息。

@QueueBinding

声明队列与交换机的绑定关系。

  • 包含:

@Queue:声明队列名称和属性。

  • @Exchange:声明交换机名称、类型和属性。
  • key:指定绑定时使用的路由键。

  @RabbitListener(bindings = @QueueBinding(value =@Queue(name="direct.queue1",durable = "true"),exchange = @Exchange(name="hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue(String message) throws InterruptedException {System.err.println("消费者1.....................接收到消息"+ message+","+ LocalTime.now());}

消息转换器

在 RabbitMQ 中,消息默认是以字节数组的形式在队列中传输的。如果我们希望以更方便的方式传递和处理对象(如 JSON、XML 或 Java 对象),就需要使用 消息转换器(Message Converter) 来完成消息的序列化与反序列化。

消息转换器主要负责:

  1. 序列化:将 Java 对象转换为消息格式(如 JSON、XML 或字节数组)发送到 RabbitMQ。
  2. 反序列化:将从 RabbitMQ 接收到的消息转换为 Java 对象,供消费者处理。
转换器功能
SimpleMessageConverter默认的消息转换器,支持简单类型(如 Stringbyte[]Serializable 对象)。
Jackson2JsonMessageConverter使用 Jackson 将 Java 对象转换为 JSON 格式,或将 JSON 消息转换为 Java 对象。
Jaxb2MarshallerMessageConverter使用 JAXB 将 Java 对象转换为 XML 格式,或将 XML 消息转换为 Java 对象。
ContentTypeDelegatingMessageConverter根据消息的 content_type 动态选择合适的消息转换器。

导入依赖:

        <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

记得要给生产者和消费者都需要设置消息转换器。 

    @Beanpublic MessageConverter messageCoverter(){return new Jackson2JsonMessageConverter();}
  • JSON 格式:需要配置 Jackson2JsonMessageConverter
  • XML 格式:需要配置 Jaxb2MarshallerMessageConverter
  • 字符串/字节数组:不需要额外配置,默认的 SimpleMessageConverter 即可。

总结如何在IDEA当中使用RabbitMQ

开始之前请确定RabbitMQ正在运行哦!!

引入依赖

  <!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置消息转换器Config 

@Configuration
@ConditionalOnClass(RabbitTemplate.class) //该注解表示,只有在当前的类路径(classpath)中存在 RabbitTemplate 类时,注解标注的配置类或 Bean 才会生效。
public class MqConfig {/*** 序列化存储JSON字符串* @return*/@Beanpublic MessageConverter messageCoverter(){Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
//        jjmc.setCreateMessageIds(true);// 设置消息转换器在创建消息时是否附加唯一的消息 ID。//true 表示会生成消息 ID,便于跟踪和调试消息。return jjmc;}
}

配置yaml

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

之后就可以再需要MQ的地方注入MQ并且实现消费者和生产者的代码了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/65832.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据环境搭建进度

1.使用虚拟机的系统&#xff1a;centos7.xLinux 2.资源不足&#xff0c;使用云服务器&#xff1a; 1. 3.使用远程登录进行操作 用xshell 4.任务 1.虚拟机装好 2.设置IP地址 3.可以联网 4.设置远程登录访问 5.创建module和software目录&#xff0c;修改两…

线程安全问题介绍

文章目录 **什么是线程安全&#xff1f;****为什么会出现线程安全问题&#xff1f;****线程安全问题的常见场景****如何解决线程安全问题&#xff1f;**1. **使用锁**2. **使用线程安全的数据结构**3. **原子操作**4. **使用volatile关键字**5. **线程本地存储**6. **避免死锁*…

pytorch小记(七):pytorch中的保存/加载模型操作

pytorch小记&#xff08;七&#xff09;&#xff1a;pytorch中的保存/加载模型操作 1. 加载模型参数 (state_dict)1.1 保存模型参数1.2 加载模型参数1.3 常见变种1.3.1 指定加载设备1.3.2 非严格加载&#xff08;跳过部分层&#xff09;1.3.3 打印加载的参数 2. 加载整个模型2.…

Mysql--运维篇--主从复制和集群(主从复制I/O线程,SQL线程,二进制日志,中继日志,集群NDB)

一、主从复制 MySQL的主从复制&#xff08;Master-Slave Replication&#xff09;是一种数据冗余和高可用性的解决方案&#xff0c;它通过将一个或多个从服务器&#xff08;Slave&#xff09;与主服务器&#xff08;Master&#xff09;同步来实现。主从复制的基本原理是&#…

【EI会议征稿通知】第十一届机械工程、材料和自动化技术国际会议(MMEAT 2025)

本次大会旨在汇聚全球机械工程、材料科学及自动化技术的创新学者和行业专家&#xff0c;为他们提供一个卓越的交流与合作平台。随着全球对可持续技术和智能制造需求的不断增加&#xff0c;MMEAT 2025将重点关注这些领域的最新发展趋势和未来前景。此次大会的主要目标是推动机械…

OpenCV基础:视频的采集、读取与录制

从摄像头采集视频 相关接口 - VideoCapture VideoCapture 用于从视频文件、摄像头或其他视频流设备中读取视频帧。它可以捕捉来自多种源的视频。 主要参数&#xff1a; cv2.VideoCapture(source): source: 这是一个整数或字符串&#xff0c;表示视频的来源。 如果是整数&a…

解读Linux Bridge中的东西流向与南北流向

解读Linux Bridge中的东西流向与南北流向 在现代云计算和虚拟化环境中&#xff0c;网络流量的管理和优化变得越来越重要。Linux Bridge作为Linux内核提供的一个强大的二层交换机工具&#xff0c;在虚拟化和容器化应用中扮演着至关重要的角色。本文将深入探讨Linux Bridge中的两…

在线实用工具 json格式化,base64转码,正则表达式测试工具

1、在线json格式化工具&#xff1a; https://json.openai2025.com/ 2、在线base64转码工具 https://base64.openai2025.com/ 3、在线正则表达式测试工具 https://reg.openai2025.com/ 4、在线去水印工具 https://watermark.openai2025.com

java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志

pom 依赖&#xff1a; <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version> </dependency> 或者 <dependency><groupId>org.ap…

车联网安全--TLS握手过程详解

目录 1. TLS协议概述 2. 为什么要握手 2.1 Hello 2.2 协商 2.3 同意 3.总共握了几次手&#xff1f; 1. TLS协议概述 车内各ECU间基于CAN的安全通讯--SecOC&#xff0c;想必现目前多数通信工程师们都已经搞的差不多了&#xff08;不要再问FvM了&#xff09;&#xff1b;…

RuoYi Cloud项目解读【四、项目配置与启动】

四、项目配置与启动 当上面环境全部准备好之后&#xff0c;接下来就是项目配置。需要将项目相关配置修改成当前相关环境。 1 后端配置 1.1 数据库 创建数据库ry-cloud并导入数据脚本ry_2024xxxx.sql&#xff08;必须&#xff09;&#xff0c;quartz.sql&#xff08;可选&…

C#对象池

一、资源管理的困境与破局 在软件开发的征程中&#xff0c;我们时常陷入资源管理的泥沼。以一个繁忙餐厅为例&#xff0c;每个顾客都急需一个盘子盛美食&#xff0c;可盘子数量有限&#xff0c;如果每次顾客用完盘子后&#xff0c;都不假思索地去清洗一个全新的盘子来供下一位…

Vue.js组件开发-如何使用moment.js

在Vue.js组件开发中&#xff0c;需要处理日期和时间&#xff0c;moment.js 是一个非常有用的库。moment.js 提供了丰富的API来解析、验证、操作和显示日期和时间。 步骤&#xff1a; 1. 安装moment.js 首先&#xff0c;需要通过npm或yarn安装moment.js。在项目根目录下运行以…

微信小程序mp3音频播放组件,仅需传入url即可

// index.js // packageChat/components/audio-player/index.js Component({/*** 组件的属性列表*/properties: {/*** MP3 文件的 URL*/src: {type: String,value: ,observer(newVal, oldVal) {if (newVal ! oldVal && newVal) {// 如果 InnerAudioContext 已存在&…

要避免除数绝对值远远小于被除数绝对值的除法

要避免除数绝对值远远小于被除数绝对值的除法 用绝对值小的数作除数&#xff0c;舍人误差会增大&#xff0c;如计算 x y \frac xy yx​,若 0 < ∣ y ∣ < ∣ x ∣ 0<|y|<|x| 0<∣y∣<∣x∣&#xff0c;则可能对计算结果带来严重影响&#xff0c;应尽量避免…

深入了解OpenStack中的隧道网络

在OpenStack环境中&#xff0c;隧道网络是一项关键技术&#xff0c;它确保了虚拟机之间以及虚拟机与外部网络之间的安全通信。通过隧道机制&#xff0c;我们可以有效地隔离不同租户的流量&#xff0c;并支持多租户环境下的复杂网络需求。之前我们介绍了隧道网络&#xff0c;下面…

4. scala高阶之隐式转换与泛型

背景 上一节&#xff0c;我介绍了scala中的面向对象相关概念&#xff0c;还有一个特色功能&#xff1a;模式匹配。本文&#xff0c;我会介绍另外一个特别强大的功能隐式转换&#xff0c;并在最后介绍scala中泛型的使用 1. 隐式转换 Scala提供的隐式转换和隐式参数功能&#…

pandas与sql对应关系【帮助sql使用者快速上手pandas】

本页旨在提供一些如何使用pandas执行各种SQL操作的示例&#xff0c;来帮助SQL使用者快速上手使用pandas。 目录 SQL语法一、选择SELECT1、选择2、添加计算列 二、连接JOIN ON1、内连接2、左外连接3、右外连接4、全外连接 三、过滤WHERE1、AND2、OR3、IS NULL4、IS NOT NULL5、B…

第432场周赛:跳过交替单元格的之字形遍历、机器人可以获得的最大金币数、图的最大边权的最小值、统计 K 次操作以内得到非递减子数组的数目

Q1、跳过交替单元格的之字形遍历 1、题目描述 给你一个 m x n 的二维数组 grid&#xff0c;数组由 正整数 组成。 你的任务是以 之字形 遍历 grid&#xff0c;同时跳过每个 交替 的单元格。 之字形遍历的定义如下&#xff1a; 从左上角的单元格 (0, 0) 开始。在当前行中向…

《探索鸿蒙Next上开发人工智能游戏应用的技术难点》

在科技飞速发展的当下&#xff0c;鸿蒙Next系统为应用开发带来了新的机遇与挑战&#xff0c;开发一款运行在鸿蒙Next上的人工智能游戏应用更是备受关注。以下是在开发过程中可能会遇到的一些技术难点&#xff1a; 鸿蒙Next系统适配性 多设备协同&#xff1a;鸿蒙Next的一大特色…