Rabbitmq Fanout如何保证不重复消费及应用场景

rabbitmq fanout业务场景,一个交换机对应多个队列,不会重复消费吗

在 RabbitMQ 中,使用 Fanout 类型的交换机时,确实可以将一个交换机绑定到多个队列。每当有消息发布到这个交换机时,交换机会把消息广播到所有绑定的队列。

不会重复消费的原因

  1. 队列与消费者关系

    • 每个队列都有一个或多个消费者来处理消息。RabbitMQ 确保每条消息只能被同一个队列的一个消费者消费。
    • 消息消费是按队列级别的,即每个队列中的消费者在消费时只能消费该队列中的消息,不同队列之间是独立的。
  2. 广播到多个队列

    • 当一个 Fanout 类型的交换机收到一条消息时,交换机会将该消息广播到所有与之绑定的队列。每个队列都独立维护一份消息副本,消息会按队列分发,而 每个队列 内的消息 只会被一个消费者消费
    • 也就是说,如果有多个队列与该交换机绑定,并且这些队列各自有消费者,那么每个消费者只会从自己所在的队列中取消息进行消费,而不会重复消费其他队列中的消息。
  3. 不会重复消费的核心原理

    • 假设有一个 Fanout 交换机与多个队列绑定,且每个队列有一个或多个消费者。
    • 当交换机广播消息时,每个队列都会得到该消息,但每个队列中的消费者只会从该队列中消费消息,不会跨队列消费。
    • 例如,如果有两个队列 queue1 和 queue2,并且每个队列都有一个消费者,那么每个消费者只会消费自己队列的消息,而不会消费其他队列中的消息。

举个例子

假设我们有一个 fanout 类型的交换机,名为 logs,并且我们绑定了两个队列 queue1queue2 到这个交换机。

  1. 消息发送

    • 如果向 logs 交换机发送了一条消息,RabbitMQ 会把这条消息广播到 queue1 和 queue2
  2. 队列消费

    • 假设 queue1 上有一个消费者 consumer1queue2 上有一个消费者 consumer2
    • consumer1 会从 queue1 中消费消息,而 consumer2 会从 queue2 中消费消息。
    • 每个消费者只会消费自己队列中的消息,因此不会重复消费。

总结

  • Fanout 交换机是广播消息到所有绑定的队列,每个队列都会接收到一份消息副本。
  • 消息 不会在不同的队列间重复消费,而是由各自队列内的消费者独立消费。
  • 如果同一个队列有多个消费者,它们会按照队列中的负载均衡机制分摊消费任务,但同一个队列中的消息 只能被一个消费者消费

所以,Fanout 交换机下的多个队列不会导致消息的重复消费,每个队列独立处理自己的消息

实际业务场景

但是一个消息被同时发送到多个队列中,就算每个队列中只能被一个消费者消费,不还是消费多次吗,请从实际业务场景给出例子吧,比如订单库存

是的,你说得对,如果一个消息被同时发送到多个队列中,每个队列内的消息会被独立消费,确实会出现“同一个消息被多个消费者消费”的情况。这种情况需要根据实际业务场景来设计系统,以确保消息的消费不会造成问题。以下是一个 订单库存管理 的实际业务场景例子来帮助说明:

场景背景:订单库存管理

假设你有一个电商系统,当用户下单后,系统需要做两件事:

  1. 扣减库存:减少商品库存。
  2. 创建订单:保存订单信息并做相关处理。

为了实现高可用和高并发,系统将这些操作拆分成两个独立的服务,并且通过消息队列来进行异步处理。你选择使用 RabbitMQ 来传递消息。

业务流程
  1. 用户下单

    • 用户提交订单时,系统会生成一条订单消息,并将该消息发送到多个队列:
      • 一个队列负责处理 扣减库存
      • 另一个队列负责 创建订单
  2. 消息发送

    • 订单消息通过 Fanout 类型的交换机广播到两个队列:order_queue 和 stock_queue
    • 这些队列都绑定到同一个交换机,因此每个队列都会接收到相同的订单消息。
  3. 消息消费

    • order_queue 会有一个消费者来处理订单创建的操作。
    • stock_queue 会有另一个消费者来处理库存扣减的操作。
消费者行为
  • order_queue 的消费者:消费者会从 order_queue 队列中取出消息并执行 订单创建 操作,保存订单记录到数据库中。

  • stock_queue 的消费者:消费者会从 stock_queue 队列中取出消息并执行 库存扣减 操作,更新商品库存。

关键问题:消息重复消费

在这种场景下,同一条订单消息会同时被发送到多个队列,因此 每个队列内的消息都将被独立消费

这确实会让你产生一个疑问:同一条消息被发送到多个队列后,会不会在消费者中被重复消费,从而导致业务错误(例如扣减库存两次或创建两次订单)?

答案:不会重复消费

虽然每个队列都会收到相同的消息,但 每个队列只会被单独的消费者消费,且每个消费者只会消费自己队列中的消息。例如:

  • order_queue 的消费者 只负责处理订单创建,而不会去处理库存扣减。
  • stock_queue 的消费者 只负责处理库存扣减,而不会去处理订单创建。

因此,尽管消息在队列之间是广播的,但由于消费者是独立处理不同队列的消息,所以不会导致重复消费 同一条消息,从而避免了逻辑错误。

为什么这样设计?

这种设计的关键在于 解耦和职责分离,这样可以将不同的业务逻辑分开,使得系统具备更好的扩展性和容错性。每个消费者只负责一个独立的任务,避免了一个消费者过载或失败时影响到其他任务。

具体例子

假设有一个用户下单的请求,系统生成一条订单消息 order_msg

  1. order_msg 被发送到 Fanout 交换机,并广播到两个队列:

    • order_queue(负责创建订单)
    • stock_queue(负责扣减库存)
  2. 消息到达两个队列:

    • order_queue 中的消费者(假设消费者 A)消费该消息并创建订单。
    • stock_queue 中的消费者(假设消费者 B)消费该消息并扣减库存。
  3. 结果:

    • 订单创建操作和库存扣减操作是 独立的,并且各自只处理一个特定的任务。因此,每个队列中的消费者处理的都是各自队列的消息,而不会发生重复消费。

高并发情况下的注意事项

在高并发场景下,虽然一个消息会被发送到多个队列,但由于不同队列的消费者独立工作,每个消费者会处理自己队列中的消息,不会发生重复消费。为了避免在不同队列间出现竞争问题,可能需要在系统设计时考虑:

  • 事务性操作:例如在创建订单和扣减库存的过程中,可以使用数据库的事务机制来保证两者的原子性,避免出现只扣减库存但没有创建订单的情况,或反之。
  • 幂等性:为了确保即使消息被重复投递或重复处理,系统也能保证业务逻辑的正确性和一致性,通常需要在消费者端做幂等性设计。

总结

虽然在 Fanout 交换机的场景下,每个消息会被广播到多个队列,并且每个队列的消息会被独立消费,但 每个队列的消费者只会消费自己的队列中的消息。因此,消息不会在不同消费者之间重复消费,不会导致库存扣减和订单创建的重复操作。

订单与库存分别是不同的服务,如何做到事务性操作

当订单与库存是不同的服务时,保证两者的事务性操作变得复杂,因为它们通常运行在不同的进程、不同的数据库、甚至不同的物理主机上。在这种微服务架构下,传统的数据库事务(ACID)无法跨服务进行,因此需要采用分布式事务的方案来保证订单创建与库存扣减操作的一致性。

常见的分布式事务解决方案有 两阶段提交(2PC)TCC(Try-Confirm-Cancel)最终一致性(通过补偿机制)等。对于订单和库存这种场景,通常推荐的方案是通过 消息队列 + 补偿机制Saga模式 来确保最终一致性。

以下是几种常见的实现方法:

1. 基于消息队列 + 补偿机制(最常见的解决方案)

在这个方案中,订单服务和库存服务通过消息队列进行解耦,消息队列提供了异步、可靠的消息传递机制。具体流程如下:

1.1 订单服务和库存服务的操作流程
  1. 用户下单

    • 订单服务收到用户下单请求后,首先创建订单并生成订单ID。
    • 然后,订单服务会将一个“扣减库存”的消息发送到消息队列(例如,RabbitMQ、Kafka 等)。
  2. 库存服务处理消息

    • 库存服务从消息队列中消费“扣减库存”的消息,进行库存扣减操作。
    • 如果库存扣减成功,库存服务将发送一个“扣减库存成功”的消息回给订单服务,表示库存操作成功。
  3. 订单服务确认库存扣减

    • 订单服务收到“扣减库存成功”的确认消息后,确认订单创建的事务完成,最终提交订单。
  4. 失败处理

    • 如果库存服务在处理扣减库存时失败(例如,库存不足,或者系统崩溃),可以通过回退机制来补偿之前的订单操作。
    • 例如,订单服务可以设置一个 超时回调,如果在规定时间内没有收到库存服务的确认消息,则触发补偿操作(如取消订单或通知用户)。
1.2 补偿机制

补偿机制的核心思想是,当某个操作失败时,通过补偿操作来回滚之前成功的操作。例如,如果库存扣减失败,可以通过回滚库存并取消之前成功创建的订单。

在实际操作中,订单和库存的补偿可以通过以下方式实现:

  • 订单补偿:如果库存扣减失败,订单服务可以通过 API 调用回滚订单(例如删除订单或标记为失败)。
  • 库存补偿:如果订单未成功提交(例如用户取消订单),库存服务可以通过 API 调用恢复库存。
1.3 如何实现幂等性

为了避免消息重复消费导致的副作用(比如库存扣减两次、订单创建多次),通常需要在消息处理时实现 幂等性

  • 库存服务:可以根据订单ID来判定是否已经处理过该消息。如果已经扣减过库存,则不再重复操作。
  • 订单服务:可以通过唯一的订单ID来判断订单是否已经提交,避免重复创建订单。

2. 基于Saga模式

Saga模式是一种分布式事务的模式,适用于微服务架构中的跨服务事务。在Saga模式中,长事务被分解为一系列的局部事务,每个局部事务在执行完后都会发布一个事件或发送消息给下游服务。Saga模式的关键在于 补偿事务,即每个局部事务都有一个相应的补偿操作。

2.1 Saga模式的流程
  • Step 1:订单服务收到用户下单请求后,创建订单并保存到数据库,同时发送一条“扣减库存”的消息。
  • Step 2:库存服务接收到扣减库存的消息,进行库存扣减。如果扣减成功,则返回确认消息;如果失败,则通过补偿机制发出库存不足的消息。
  • Step 3:订单服务收到确认消息后提交订单。如果库存扣减失败,订单服务通过补偿操作回滚订单。
  • Step 4:如果库存扣减失败,库存服务会进行补偿操作,比如恢复库存。
2.2 事务补偿

Saga模式的关键在于如何定义补偿操作(即回滚操作)。在我们的场景中,补偿操作可以是:

  • 订单补偿:如果库存扣减失败,订单服务需要取消订单操作,可能是删除订单或标记为“失败”。
  • 库存补偿:如果订单取消或失败,库存服务需要恢复库存。

3. TCC(Try-Confirm-Cancel)模式

TCC模式也是一种分布式事务的方案,它通过三个操作来保证分布式事务的一致性:

  • Try:尝试执行操作,确保资源可用并且锁定资源。
  • Confirm:在所有的Try操作成功后,执行确认操作。
  • Cancel:如果Try操作失败或最终无法确认,则执行取消操作。
3.1 TCC模式流程
  1. 订单服务(Try):订单服务调用库存服务的 try 接口,检查库存是否足够并预扣库存。
  2. 库存服务(Try):库存服务尝试扣减库存并锁定资源,返回成功或失败。
  3. 订单服务(Confirm):如果库存服务成功,订单服务调用 confirm 接口确认订单创建并提交。
  4. 库存服务(Confirm):库存服务执行库存扣减操作,确保库存被扣除。
  5. 失败时的Cancel
    • 如果订单服务没有接到库存的确认消息,或者库存不足,订单服务会调用 cancel 接口回滚订单。
    • 库存服务会在确认失败时调用 cancel 操作,恢复库存。

4. 最终一致性 + 事件驱动

在实际的电商系统中,可能更多的是采用 最终一致性 的方式来保证跨服务的数据一致性。系统通过 事件驱动架构 来保证不同服务之间最终的一致性。

  • 订单服务:创建订单后发布事件。
  • 库存服务:接收到库存扣减的事件并进行库存扣减操作。
  • 失败恢复:如果库存服务失败,则通过重试机制或者补偿机制恢复数据一致性。

总结

对于分布式系统中的订单与库存这类跨服务事务,无法依赖传统的数据库事务,需要通过一些分布式事务解决方案来保证数据一致性。常用的方式有:

  1. 消息队列 + 补偿机制:通过消息传递解耦订单与库存,使用补偿操作来保证一致性。
  2. Saga模式:将大事务拆分为多个局部事务,并通过补偿操作保证最终一致性。
  3. TCC模式:通过 Try-Confirm-Cancel 三步操作来保证事务一致性。

最常见的实践是 消息队列 + 补偿机制,通过保证消息的可靠性和幂等性,以及设置补偿机制来应对系统故障或异常情况,最终实现订单和库存之间的数据一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/66608.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pg数据库运维经验2024

这篇文章主要是讲pg运维常见问题,两三年见一次的疑难杂症就不说了。 主要是技术性运维总结,主打通俗易懂和快速上手,尽量避免源码层面等深入分析。 SQL性能与执行计划 执行计划突变 pg官方不支持hint功能,并且计划永远不支持&…

缓存-Redis-常见问题-缓存击穿-永不过期+逻辑过期(全面 易理解)

缓存击穿(Cache Breakdown) 是在高并发场景下,当某个热点数据在缓存中失效或不存在时,瞬间大量请求同时击中数据库,导致数据库压力骤增甚至崩溃的现象。为了解决这一问题,“永不过期” “逻辑过期” 的策略…

Hadoop 实战笔记(一) -- Windows 安装 Hadoop 3.x

环境准备 安装 JAVA 1.8 Java环境搭建之JDK下载及安装下载 Hadoop 3.3.5 安装包 Hadoop 下载:https://archive.apache.org/dist/hadoop/common/ 一、JAVA JDK 环境检查 二、Hadoop(HDFS)环境搭建 1. 解压安装文件 hadoop-3.3.5.tar 2. 配置环境变量 HADOOP_HO…

点赞模块的实现思路

在现代应用程序中,点赞功能已经成为用户互动的重要方式。无论是社交媒体平台、新闻网站还是在线论坛,点赞功能都可以帮助用户表达对内容的认可和喜爱。 点赞功能的基本原理是将用户对某个内容的赞记录到数据库中。当用户点击点赞时,系统会检…

个人博客搭建(二)—Typora+PicGo+OSS

个人博客站—运维鹿: http://www.kervin24.top CSDN博客—做个超努力的小奚: 做个超努力的小奚-CSDN博客 一、前言 博客搭建完一直没有更新,因为WordPress自带的文档编辑器不方便,以前用CSDN写作的时候,习惯了Typora。最近对比了…

【向量数据库】搜索算法

最近几年,一种叫做向量数据库的产品,正趁着AI的热潮开始崭露头角。伴随着AI时代的到来,向量将成为一种重要的数据形式,而传统数据库并不适合用来存储和检索向量数据,因此我们大约需要一种专门设计的数据库来处理这些问…

前端开发中常用的插件库

ahooks 一个高质量且可靠的React Hooks库。class-variance-authority(CVA) 是一个专注于解决CSS类管理问题的库。classnames 是一个在React开发中非常流行的JavaScript工具库,它可以帮助开发者有条件地连接类名字符串。copy-to-clipboard 一个…

什么是cline?

Cline 是一个功能强大的 AI 编程助手,专为 Visual Studio Code(VSCode)设计。它通过结合大语言模型(如 Claude、GPT 等)和工具链,帮助开发者自动化任务、生成代码、执行命令等。以下是对 Cline 的深入理解和…

启动tomcat出现的The JRE_HOME environment variable is not defined correctl

很久没使用Tomcat了,最近配置JDK 17 Tomcat 10,竟然报错 the JRE_HOME environment variable is not defined correctly很明显就是环境变量没有配置 JRE_HOME这个变量,但我明明是环境变量是配置了JAVA_HOME以及JRE_HOME这两个变量。 奇怪了…

ARM CCA机密计算安全模型之安全生命周期管理

安全之安全(security)博客目录导读 目录 一、固件启用的调试 二、CCA系统安全生命周期 三、重新供应 四、可信子系统与CCA HES 启用 CCA(机密计算架构)的安全系统是指 CCA 平台的实现处于可信状态。 由于多种原因,CCA 启用系统可能处于不…

[读书日志]从零开始学习Chisel 第八篇:Scala的集合(敏捷硬件开发语言Chisel与数字系统设计)

5.Scala的集合 Scala的集合包括数组,列表,集,映射,序列,元组,数组缓冲和列表缓冲等。 5.1 数组 5.1.1数组的定义 数组是计算机内一片地址连续的内存空间。数组元素类型可以是任意的,不同元素…

k8s排错集:zk集群的pod报错 Init:CrashLoopBackOff无法启动

zk三节点集群,zk-0无法启动 statefulset 进到该node节点上查看容器的报错日志,发现在初始化container的时候一个命令有问题 查看正常zk集群的pod的资源配置文件 解决办法: 修改资源配置文件 应该修改为 chown -R 1000:1000 /zkenv kubec…

Golang的并发编程框架比较

# Golang的并发编程框架比较 中的并发编程 在现代软件开发中,处理高并发的能力愈发重要。Golang作为一门支持并发编程的编程语言,提供了丰富的并发编程框架和工具,使得开发者能够更轻松地处理并发任务。本文将介绍Golang中几种常用的并发编程…

【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历

明天开始考试周,百无聊赖开了一把CTF,还顺带体验了下二开工具,让无聊的Z3很开心🙂 CachedVisitor这题 大概描述一下:从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…

单纯形法的学习笔记

文章目录 A. 单纯形法概述1. 优化模型示例 B. 理论基础C. 算法思想D. 实现算法1. 线性规划的标准型2. 顶点解的理解及表示2.1 在标准型中变量取值为零的意义2.2 顶点解的表示 3. 最优性判断4. 解的更新5. 完成迭代过程 E. 单纯形法的基本概念与本文对照F. 文档源码 前言&#x…

【VBA】【EXCEL】将某列内容横向粘贴到指定行

Sub CopyRowToColumn()On Error GoTo ErrorHandler 添加错误处理Application.ScreenUpdating FalseApplication.Calculation xlCalculationManualApplication.EnableEvents False 禁用事件处理Dim lastCol As LongDim lastRow As LongDim i As Long, colCount As LongDim …

【vue3封装element-plus的反馈组件el-drawer、el-dialog】

vue2中封装el-drawer、el-dialog这类反馈类子组件,需要将父组件的visible值传递子组件,并且再通过$emit将关闭弹窗的组件值传回父组件,同时子组件还需要监听父组件传递过来的visible的值,来驱动弹窗显示隐藏,很麻烦&am…

JS进阶--JS听到了不灭的回响

作用域 作用域(scope)规定了变量能够被访问的“范围”,离开了这个“范围”变量便不能被访问 作用域分为局部和全局 局部作用域 局部作用域分为函数和块 那 什么是块作用域呢? 在 JavaScript 中使用 { } 包裹的代码称为代码块…

杭州市有哪些大学能够出具论文检索报告?

杭州市具有查收查引服务的学校有浙江大学、杭州电子科技大学、浙江工业大学、杭州师范大学等高校。 1、浙江大学图书馆 浙江大学图书馆提供文献查收查引服务,包括查询学术论文被SCIE、SSCI、A&HCI、EI、CPCI-S、CPCI-SSH、CSSCI、CSCD等国内外权威数据库收录和…

计算机网络 (26)互联网的路由选择协议

一、路由选择协议的基本概念 路由选择协议是计算机网络中用于确定数据包在网络中传输路径的一种协议。它帮助路由器构建和维护路由表,以便根据目的地址将数据包转发到正确的下一跳路由器。路由选择协议分为静态路由选择协议和动态路由选择协议两大类。 二、静态路由…