RabbitMQ基本原理

一、基本结构

工作原理
所有中间件技术都是基于 TCP/IP 协议基础之上进行构建新的协议规范,RabbitMQ遵循的是AMQP协议(Advanced Message Queuing Protocol - 高级消息队列协议)。
生产者发送消息流程:

  • 1、生产者和Broker建立TCP连接;
  • 2、生产者和Broker建立通道;
  • 3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发;
  • 4、Exchange将消息转发到指定的Queue(队列)。

【详细】

1、消息生产者连接到`RabbitMQ Broker`,建立链接(Connection),在链接(Connection)上开启一个信道(Channel);
2、声明一个交换机(Exchange),并设置相关属性,比如交换机类型、是否持久化等;
3、声明一个队列(Queue),并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
4、使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来;
5、生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息,根据路由键(RoutingKey)发送消息到交换机(Exchange);
6、相应的交换器(Exchange)根据接收到的路由键(RoutingKey)查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中;
7、如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者;
8、关闭信道(Channel);
9、关闭链接(Connection);

消费者接收消息流程:

  • 1、消费者和Broker建立TCP连接;
  • 2、消费者和Broker建立通道;
  • 3、消费者监听指定的Queue(队列);
  • 4、当有消息到达QueueBroker默认将消息推送给消费者;
  • 5、消费者接收到消息;
  • 6、ack回复。

【详细】

- 1、建立链接(Connection);
- 2、在链接(Connection)上开启一个信道(Channel);
- 3、请求消费指定队列(Queue)的消息,并设置回调函数(onMessage);
- 4、[MQ]将消息推送给消费者,消费者接收消息;
- 5、消费者发送消息确定(Ack[acknowledge]);
- 6、[MQ]删除被确认的消息;
- 7、关闭信道(Channel);
- 8、关闭链接(Connection);

MQ消费消息分发原理

1)一种是Pull模式,对应的方法是basicGet。
消息存放在服务端,只有消费者主动获取才能拿到消息。如果每搁一段时间获取一次消息,消息的实时性会降低。
但是好处是可以根据自己的消费能力决定消息的频率。2)另一种是push,对应的方法是BasicConsume,只要生产者发消息到服务器,就马上推送给消费者,
消息保存客户端,实时性很高,如果消费不过来有可能会造成消息积压。Spring AMQP是push方式,
通过事件机制对队列进行监听,只要有消息到达队列,就会触发消费消息的方法。

二、RabbitMQ组成部分说明

  • Producer: 消息生产者,即生产方客户端,生产方客户端将消息发送;

  • Connection: TCP连接,生产者或消费者与消息队列RabbitMQ版间的物理TCP连接;

    1)Connection会执行认证、IP解析、路由等底层网络任务。
    2)应用与消息队列RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和消息队列RabbitMQ版资源。
    3)一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。
    4)Producer和Consumer分别使用不同的Connection进行消息发送和消费;

  • Channel: 在客户端的每个物理TCP连接里,可建立多个Channel,每个Channel代表一个会话任务。

    1)Channel是物理TCP连接中的虚拟连接。
    2)当应用通过Connection与消息队列RabbitMQ版建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。
    3) Channel可以复用Connection,即一个Connection下可以建立多个Channel。
    4) Channel不能脱离Connection独立存在,而必须存活在Connection中。
    5) 当某个Connection断开时,该Connection下的所有Channel都会断开。

  • Broker: 消息队列服务进程,此进程包括两个部分:Exchange和Queue;

  • Exchange(交换器): 生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。Exchange根据消息的属性或内容路由消息。

  • Queue: 消息队列,存储消息的队列,每个消息都会被投入到一个或多个Queue里;

  • Consumer: 消息消费者,即消费方客户端,接收MQ转发的消息;

  • Routing Key(路由键): 生产者在向Exchange发送消息时,需要指定一个Routing Key来设定该消息的路由规则。 Routing Key需要与Exchange类型及Binding Key联合使用才能生效。一般情况下,生产者在向Exchange发送消息时,可以通过指定Routing Key来决定消息被路由到哪个或哪些Queue;

  • Binding: 一套绑定规则,用于告诉Exchange消息应该被存储到哪个Queue。它的作用是把Exchange和Queue按照路由规则绑定起来。

  • Binding Key(绑定键): 用于告知Exchange应该将消息投递到哪些Queue中(生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定的);

  • Virtual Host: 虚拟主机,本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制,vhost是共享相同的身份认证和加密环境的独立服务器域。vhost是AMQP的基础,必须在连接时指定,RabbitMQ默认的vhost是/。

三、交换模式

Direct Exchange(直连模式)

【路由规则】 Direct Exchange根据Binding Key和Routing Key完全匹配的规则路由消息。
【使用场景】 Direct Exchange适用于通过简单字符标识符区分消息的场景。
Direct Exchange常用于单播路由。
Direct

Fanout Exchange(广播模式)

【路由规则】 Fanout Exchange忽略Routing Key和Binding Key的匹配规则,将消息路由到所有绑定的Queue。
【使用场景】 Fanout Exchange适用于广播消息的场景。例如,分发系统使用Fanout Exchange来广播各种状态和配置更新。
广播模式

Topic Exchange(主题模式)

【路由规则】 Topic Exchange根据Binding Key和Routing Key通配符匹配的规则路由消息。
Topic Exchange支持的通配符包括星号(*)和井号(#)。 星号(*)代表一个英文单词(例如cn)。 井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如cn.zj.hz。
【使用场景】 Topic Exchange适用于通过通配符区分消息的场景。
Topic Exchange常用于多播路由。例如,使用Topic Exchange分发有关于特定地理位置的数据。
主题模式

Headers Exchange (头部交换机)

【路由规则】 Headers Exchange可以被视为Direct Exchange的另一种表现形式。
Headers Exchange可以像Direct Exchange一样工作,不同之处在于Headers Exchange使用Headers属性代替Routing Key进行路由匹配。
在绑定Headers Exchange和Queue时,可以设置绑定属性的键值对。然后,在向Headers Exchange发送消息时,设置消息的Headers属性键值对。
Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。
匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:

  • 1)all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
  • 2)any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
    以下两种情况下,认为消息Headers属性键值对和绑定属性键值对匹配:
    • 1、 消息Headers属性的键和值与绑定属性的键和值完全相同;
    • 2、 消息Headers属性的键和绑定属性的键完全相同,但绑定属性的值为空。

【使用场景】 Headers Exchange适用于通过多组Headers属性区分消息的场景。Headers Exchange常用于多播路由。例如,涉及到分类或者标签的新闻更新。

生产者确认机制

1、确认原理
生产者将消息发送到exchange,exchange根据路由规则将消息投递到了queue。

  • 1)Confirm确认:生产者发送消息到交换机时会存在消息丢失的情景,开启事务会导致吞吐量下降,Confirm机制就是消息发送到交换机(Exchange)时会触发Confirm回调。通过 publisher confirm (发送方确认机制)可以确定消息是否被成功路由到MQ broker从而选择是否重发等步骤。当生产者开启 publisher confirm 消息发送到MQ端之后,MQ会回一个ack给生产者,ack是个boolean值,为true消息成功发送到MQ。反之发送失败。
  • 2)Return确认:从交换机到队列也有可能出现路由失败导致消息丢失情景(可能是MQ出问题导致queue和exchange绑定丢失,或者失误删除了绑定关系等),Return机制可解决这个问题,路由失败时可以通过Return回调来将路由失败的消息记录下来。

消费者确认机制

1、消费者确认原理
消费者确认是指当一条消息投递到消费者处理后,消费者发送给MQ broker的确认
(通俗的说就是 告知服务器这条消息已经被我消费了,可以在队列删掉 ,这样以后就不会再发了, 否则消息服务器以为这条消息没处理掉 重启应用后还会在发)。

有auto和manual两种

  • 1)auto则由broker自行选择时机,一般可认为消息发送到消费者后就直接被ack,也即消息会被从队列中移除掉而不顾消息的处理逻辑是否成功;

  • 2)manual则是需要消费者显式的去手动ack后消息才会被从队列中移除掉,通过这个机制可以限制在消息处理完之后再Ack或者nack; 开启手动确认模式,即由消费方自行决定何时应该ack,通过设置autoAck=false开启手动确认模式;

消息持久化

消息发送并保存到队列之后如果不做特殊处理是保存在内存中,当节点宕机重启或者内存故障等,会导致消息丢失,通过对消息进行持久化到磁盘可以降低这种风险, 除了对消息进行持久化还是不够,还需要对queue、exchange进行持久化。

RabbitMQ解决消息丢失问题

消息确认机制

RabbitMQ提供了消息确认机制,即生产者在发送消息后,可以等待RabbitMQ服务器返回确认信息,以确保消息已经被正确地接收和处理。如果RabbitMQ服务器没有返回确认信息,生产者可以选择重新发送消息或者采取其他的补救措施。

生产者确认消息和重试
  1. 使用缓存:在confirmCallback中,将ackfalse的消息存到缓存中。然后,可以使用另外的线程或者定时任务来处理这些失败的消息,进行重试。

  2. 设置重试次数:为了避免无限重试,我们可以设置一个重试次数的上限。当达到这个上限后,我们可以选择将消息发送到死信队列,或者进行其他的错误处理。

  3. 使用死信队列:在RabbitMQ中,我们可以设置一个死信队列来存储那些无法被正常处理的消息。当消息在主队列中被拒绝或者过期后,它们会被发送到死信队列。然后,我们可以对死信队列中的消息进行人工处理,或者在一段时间后再次进行处理。

事务机制

1.首先需要配置一个事务管理器

 @BeanPlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}

2.然后在生产者上添加事务注解以及设置通信通道为事务模式。

@Transactional

  1. 开启事务机制就三步:
  • 配置事务管理器
  • 使用 @Transactional 注解开启事务
  • 调用 setChannelTransacted 方法设置消息通道为事务模式,即设置为 true

4.当我们开启事务模式之后,RabbitMQ 生产者发送消息会有这样几个步骤:

  • (1) 客服端发出请求,将通信管道设置为事务模式
  • (2) 服务端给出回复,同意将通信管道设置为事务模式
  • (3) 客户端发送消息
  • (4) 客户端提交事务
  • (5) 服务端给出响应,确认事务提交

6.两步 RabbitMQ 都有提供解决的方案。那么,如果确保消息成功到达 RabbitMQ 呢?

  • (1) 开启事务机制

  • (2) 发送方确认机制
    注意:这是两种不同的方案,不可以同时开启,只能二选其一。如果同时开启,则会报错

消息持久化

RabbitMQ支持将消息持久化到磁盘,即使RabbitMQ服务器宕机或重启,消息也不会丢失。在发布消息时,可以设置消息的持久化标志,这样消息就会被写入磁盘中,而不是仅仅保存在内存中。
其中我们交换机和队列都要设置对应的持久化,在创建时,我们会设置持久化参数。同时为了避免单点故障,RabbitMQ应该做成集群模式,以免一台机器损坏,出现数据丢失的问题。

消费端确认和重试

对于消费者来说,该配置不仅起到了连接作用,同时也启动了重试机制,默认重试 2 次。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制
spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔

确认的话需要我们做签收操作

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

死信队列配置

被拒收的消息,或者是过期的消息,或者是队列已经满了的消息,都会进入死信队列,死信队列有一个默认的生效时间,如果没有做任务配置,到了时间会自动删除消息。
java中配置死信队列

    /*** 延迟队列,又叫死信队列 “** @return*/@Beanpublic Queue delayQueue() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "test_ex");arguments.put("x-dead-letter-routing-key", "test_ex.dead");// 消息过期时间 2分钟arguments.put("x-message-ttl", 60000);return new Queue("delayQueue", true, false, false, arguments);}

以上参数说明:

  • x-dead-letter-exchange:死信队列过期以后往指定交换机发
  • x-dead-letter-routing-key:死信队列过期指定路由键
  • x-message-ttl: 死信队列过期时间,单位是毫秒

RabbitMQ解决消息积压问题

RabbitMQ消息积压问题通常是由于消费者无法及时消费消息或消费速度过慢或发送者流量太大导致的。以下是一些解决方法:

1.增加消费者数量: 可以通过增加消费者的数量来提高消费速度,减少消息积压。可以通过添加更多的消费者进程或者增加消费者的线程数来实现。

2.调整消费者的QoS参数: 消费者的QoS参数可以控制消费者每次从RabbitMQ服务器获取的消息数量,以及未确认消息的最大数量。可以适当调整这些参数,以减少消息积压。

3.设置消费者的超时时间: 可以设置消费者的超时时间,如果消费者在指定的时间内没有消费消息,就将消息重新投递到队列中,以便其他消费者消费。

4.增加队列的容量: 可以增加队列的容量,以便存储更多的消息。但是,如果队列容量过大,可能会导致内存占用过高,影响系统的性能。

5.使用死信队列: 可以将未能及时消费的消息转移到死信队列中,以便后续处理。可以设置死信队列的超时时间,以便在一定时间内处理这些消息。

6.监控和调整: 可以使用RabbitMQ的监控工具来监控队列的状态和消费者的消费速度,及时发现并解决消息积压问题。

RabbitMQ解决消息重复消费问题

RabbitMQ提供了消息去重的机制来解决消息重复消费的问题。具体来说,可以使用以下两种方式来实现:

1.消息去重插件
RabbitMQ提供了一个消息去重插件,可以通过在RabbitMQ节点上安装该插件来实现消息去重。该插件会在消息传输之前对消息进行唯一性校验,如果消息已经被消费过,那么该消息将被丢弃。该插件的实现原理是将已经消费过的消息ID保存在内存中,当新消息到达时,会检查该消息ID是否已经存在,如果存在则丢弃该消息。

2.消息幂等性设计
消息幂等性是指对于同一条消息,无论消费多少次,最终的结果都是一致的。因此,可以通过在消息的生产者或消费者端实现消息幂等性来解决消息重复消费的问题。具体实现方式包括:

  • 在消息生产者端,为每条消息生成唯一的ID,将该ID与消息一起发送到RabbitMQ,消费者在消费消息时根据该ID进行幂等性校验;
  • 在消息消费者端,记录已经消费过的消息ID,当重复消费同一条消息时,直接忽略该消息。

需要注意的是,实现消息幂等性需要考虑业务逻辑的复杂性和消息处理的性能。如果业务逻辑比较简单,可以通过对消息进行去重来解决问题;如果业务逻辑比较复杂,可以通过实现消息幂等性来保证消息的正确性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/55320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何实现Mybatis自定义插件

背景 MyBatis的插件机制&#xff0c;也可称为拦截器&#xff0c;是一种强大的扩展工具。它允许开发者在不修改MyBatis框架源代码的情况下&#xff0c;通过拦截和修改MyBatis执行过程中的行为来定制和增强功能。 MyBatis插件可以拦截四大核心组件的方法调用&#xff1a;Executor…

Certbot自动申请并续期https证书

Certbot自动申请并续期https证书 一、 安装 Certbot&#xff1a;使用命令安装 Certbot&#xff1a; dnf install certbot python3-certbot-nginx获取 SSL 证书&#xff1a;运行 Certbot 命令来获取并安装 SSL 证书。 示例命令&#xff0c;替换其中的域名和路径信息&#xff1a…

共和国勋章获得者:李振声

李振声&#xff0c;1931年2月出生于山东淄博&#xff0c;是中共党员、著名遗传学家和小麦遗传育种专家&#xff0c;兼任中国科学院院士和第三世界科学院院士。 他被誉为“中国小麦远缘杂交之父”和“当代后稷”&#xff0c;是中国小麦远缘杂交育种的奠基人。 教育背景与早年经…

react+antdMobie实现消息通知页面样式

一、实现效果 二、代码 import React, { useEffect, useState } from react; import style from ./style/index.less; import { CapsuleTabs, Ellipsis, Empty, SearchBar, Tag } from antd-mobile; //消息通知页面 export default function Notification(props) {const [opti…

Python办公自动化案例:批量修改Word文件中的段落格式

案例:Python实现批量修改Word文件中的段落格式。 在处理大量Word文档时,经常需要批量修改这些文档的格式,比如统一段落格式,以提升文档的一致性和专业性。使用Python来实现这一任务可以极大地提高工作效率,特别是当涉及到数百或数千个文档时。Python通过第三方库如python…

vue3 antdv3/4 Modal显示一个提示,内容换行显示。

1、官网地址&#xff1a; Ant Design Vue — An enterprise-class UI components based on Ant Design and Vue.js 2、显示个信息&#xff1a; Modal.info({title: This is a notification message,content: h(div, {}, [h(p, some messages...some messages...),h(p, some …

828华为云征文|部署音乐流媒体服务器 mStream

828华为云征文&#xff5c;部署音乐流媒体服务器 mStream 一、Flexus云服务器X实例介绍二、Flexus云服务器X实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置2.4 Docker 环境搭建 三、Flexus云服务器X实例部署 mStream3.1 mStream 介绍3.2 mStream 部署3.3 mStream 使用 四、…

centos一些常用命令

文章目录 查看磁盘信息使用 df 命令使用 du 命令 查看磁盘信息 使用 df 命令 df&#xff08;disk free&#xff09;命令用于显示文件系统的磁盘空间占用情况。 查看所有挂载点的磁盘使用情况&#xff1a; df -h选项说明&#xff1a; -h 参数表示以人类可读的格式&#xff0…

【学习笔记】手写 Tomcat 七

目录 一、优化 Dao 1. 设置 UserDaoImpl 为单例模式 2. 创建 Dao 工厂 3. 在 Service 层获取 UserDao 的实例 二、优化 Service 1. 设置 UserServiceImpl 为单例模式 2. 创建 Service 工厂 3. 在 Servlet 层获取 Service 实现类的对象 三、优化 Servlet 1. 使用配置…

Leetcode面试经典150题-322.零钱兑换

给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;返回 -1 。 你可以认为每种硬币的数量是无限的。 示…

【无人机设计与控制】Multi-UAV|多无人机多场景路径规划算法MATLAB

摘要 本研究探讨了多无人机路径规划问题&#xff0c;提出了三种不同算法的对比分析&#xff0c;包括粒子群优化&#xff08;PSO&#xff09;、灰狼优化&#xff08;GWO&#xff09;和鲸鱼优化算法&#xff08;WOA&#xff09;。利用MATLAB实现了多场景仿真实验&#xff0c;验证…

C++那些事之内存优化

C那些事之内存优化 通常程序运行时内存是一个比较大的问题&#xff0c;如何减少内存占用和提升访问速度是至关重要。为了解决这些问题&#xff0c;C20 引入了 no_unique_address 特性&#xff0c;并结合空基类优化&#xff08;EBO, Empty Base Optimization&#xff09;&#x…

组合优化与凸优化 学习笔记5 对偶拉格朗日函数

有的时候约束条件有点难搞&#xff0c;我们可以把它放到目标函数里面。 记得之前凸函数的时候的结论吗&#xff1f;一大堆函数&#xff0c;每一段都取最大的&#xff0c;最后会得到一个凸函数。同理&#xff0c;每一段都取最小的&#xff0c;得到的是一个凹函数。就这样&#x…

【Golang】Go语言字符串处理库--strings

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

数通 1

通信&#xff1a;需要介质才能通信电话离信号塔&#xff08;基站&#xff09;越远&#xff0c;信号越弱。信号在基站之间传递。你离路由器越远&#xff0c;信号越差。一个意思 比如想传一张图片&#xff0c;这张图片就是数据载荷 网关&#xff0c;分割两个网络。路由器可以是网…

宠物医院微信小程序源码

文章目录 前言研究背景研究内容一、主要技术&#xff1f;二、项目内容1.整体介绍&#xff08;示范&#xff09;2.系统分析3.数据表信息4.运行截图5.部分代码介绍 总结 前言 随着当代社会科技的迅速发展&#xff0c;计算机网络时代正式拉来帷幕&#xff0c;它颠覆性的影响着社会…

新版pycharm如何导入自定义环境

我们新的版本的pycharm的ui更改了&#xff0c;但是我不会导入新的环境了 我们先点击右上角的add interpreter 然后点击添加本地编译器 先导入这个bat文件 再点击load 我们就可以选择我们需要的环境了

SpringBoot3脚手架

MySpringBootAPI SpringBoot3脚手架&#xff0c;基于SpringBoot3DruidPgSQLMyBatisPlus13FastJSON2Lombok&#xff0c;启动web容器为Undertow(非默认tomcat)&#xff0c;其他的请自行添加和配置。 <java.version>17</java.version> <springboot.version>3.3…

android SELinux权限适配

抓log方法&#xff0c; setenforce 0, 如果不先将selinux设置为permission mode&#xff0c;会导致一个问题。 程序运行的时候遇到权限策略限制&#xff08;假设 sepolicy 1&#xff09;&#xff0c;程序运行失败。添加权限&#xff08;sepolicy 1&#xff09;&#xff0c;然后…

Mysql 删除表的所有数据

在 MySQL 中&#xff0c;如果你想要删除一个表中的所有数据&#xff0c;可以使用 TRUNCATE TABLE 命令或者 DELETE 语句。下面是两种方法的对比以及如何使用它们&#xff1a; 使用 TRUNCATE TABLE TRUNCATE TABLE 是一个非常快速的方法来删除表中的所有记录&#xff0c;并且它…