【消息队列】RabbitMQ实现消费者组机制

目录

1. RabbitMQ 的 发布订阅模式

2. GRPC 服务间的实体同步

2.1 生产者服务

2.2 消费者服务

3. 可靠性

3.1 生产者丢失消息

3.2 消费者丢失消息

3.3 RabbitMQ 中间件丢失消息


1. RabbitMQ 的 发布订阅模式

https://www.rabbitmq.com/tutorials/tutorial-three-go

  • P 生产者:发送消息到一个特定的交换机(交换机类型是fanout),不需指定具体的目标队列
  • X 交换机:将消息分发给所有绑定到它的队列
  • C 消费者:订阅主题,通过绑定到交换机的队列接收消息

2. GRPC 服务间的实体同步

考虑以下业务需求——

  • 模拟消费者组机制:
    • 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系
    • 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费相同的数据
  • 消费历史数据:当生产者先启动,生产了一部分数据,消费者后启动时,也能消费到历史数据

服务之间的实体数据同步方案:

2.1 生产者服务

(1) 初始化

生产者初始化时需要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明自己的实体有哪些消费者在消费。比如:

  • 声明交换机 exchange_user、exchange_group
  • 声明消费者 consumer_user_rpc、consumer_org_rpc
  • 创建队列 exchange_user_consumer_user_rpc、exchange_user_consumer_org_rpc、exchange_group_consumer_user_rpc、exchange_group_consumer_org_rpc,也就是对每个 topic-consumer 组合,创建一个相应的队列
  • 将交换机和队列绑定

(2) 实体变更时发送消息

发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_user 交换机,那么消息会被投递给队列 exchange_user_consumer_user_rpc 和 队列 exchange_user_consumer_org_rpc。

2.2 消费者服务

消费者订阅一个 topic,处理 rabbitMQ 队列发来的消息。

  • 若消息处理成功(业务流程成功),发送 Ack 给 rabbitMQ 确认消费
  • 若消息处理失败(业务流程失败),发送 Nack 通知 rabbitMQ 处理失败,消息将放回队列等待下次消费

Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~

3. 可靠性

3.1 生产者丢失消息

(1) 生产者绑定交换机和队列

在生产者初始化时,需要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。

func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {// ...// 初始化交换机和队列for topic, consumerGroups := range option.TopicConsumerGroupsBinding {err = initExchange(topic, consumerGroups, mq)if err != nil {return nil, err}}return mq, nil
}func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {// 1. 创建发送通道pch, err := mq.conn.Channel()if err != nil {return err}mq.produceChannels[exchange] = pch// 2. 开启消息确认机制if err := pch.Confirm(false); err != nil {return err}// 3. 创建交换机// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)if err != nil {return err}slog.Info("rabbitmq declared exchange", "exchange_name", exchange)// 4. 创建队列并绑定到交换机for _, consumerGroup := range strings.Split(consumerGroups, ",") {consumerGroup = strings.TrimSpace(consumerGroup)if consumerGroup == "" {continue}queue := queueName(exchange, consumerGroup)// 创建队列// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数_, err = pch.QueueDeclare(queue, true, false, false, false, nil)if err != nil {return err}// 将队列绑定到交换机// 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数err = pch.QueueBind(queue, "", exchange, false, nil)if err != nil {return err}slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)// 创建接收通道cch, err := mq.conn.Channel()if err != nil {return err}mq.consumeChannels[queue] = cch}// 5. 开启消息确认事件监听、消息投递事件监听mq.publishWatcher[exchange] = &watcher{returnCh:  pch.NotifyReturn(make(chan amqp.Return)),confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),}// 监听未被交换机投递的消息go func() {for ret := range mq.publishWatcher[exchange].returnCh {// 尝试重新投递ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)} else {slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)}time.Sleep(time.Second * 3)}}()return nil
}

(2) 发送重试

发送消息时增加重试机制。若超过重试上限,需记录日志或报警。

func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {body, _ := json.Marshal(data)msgID := uuid.New()var retried intfor {err := r.publish(ctx, topic, msgID, body, time.Now())if err == nil {return nil}retried++if retried > r.option.RetryNum {return err}time.Sleep(r.option.RetryInterval)}
}

(3) confirm 消息确认机制

生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认事件,让生产端知晓消息已发送成功。监听 confirm 事件以确认消息的发送状态:

func initExchange(exchange string, mq *RabbitMQ) error {// ...// 开启消息确认机制if err := pch.Confirm(false); err != nil {return err}// 创建监听器mq.publishWatcher[exchange] = &watcher{confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),}// ...
}func (r *RabbitMQ) publish(ctx context.Context, ...) error {// publish发送消息// ...// 等待rabbitmq返回消息确认select {case confirm := <-r.publishWatcher[exchange].confirmCh:if !confirm.Ack {return errors.New("publish failed, got nack from rabbitmq")}case <-ctx.Done():return errors.New("context deadline, publish to rabbitmq timeout")case <-time.After(r.config.Timeout):return errors.New("publish to rabbitmq timeout")}return nil
}

3.2 消费者丢失消息

消费者消费完成后,必须手动 Ack 通知 MQ,表示已经消费成功:

func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {// ...return consumeChannel.Ack(deliveryTag, false)
}

如果消费失败,需要手动 Nack,那此条消息会重新入队,等待下次消费:

func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {// ...return consumeChannel.Nack(deliveryTag, false, true)
}

3.3 RabbitMQ 中间件丢失消息

(1) 数据持久化到磁盘

交换机持久化(durable=true):

// 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)

队列持久化(durable=true):

// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)

消息持久化(DeliveryMode=Persistent):

err := ch.Publish(exchange, // 交换机名称"",       // 路由键true,     // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息false,    // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃amqp.Publishing{MessageId:    msgID,              // 消息IDContentType:  "application/json", // 消息内容类型Body:         body,               // 消息内容DeliveryMode: amqp.Persistent,    // 消息需要持久化Timestamp:    t,                  // 消息时间},
)

(2) RabbitMQ 本身的数据一致性保证

RabbitMQ 使用 raft 共识算法保证数据一致性:

https://www.rabbitmq.com/docs/clustering#replica-placement

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882389.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot+Vue+uniapp微信小程序的乡村政务服务系统的详细设计和实现(源码+lw+部署文档+讲解等)

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…

【软件测试】JUnit

Junit 是一个用于 Java 编程语言的单元测试框架&#xff0c;Selenium是自动化测试框架&#xff0c;专门用于Web测试 本篇博客介绍 Junit5 文章目录 Junit 使用语法注解参数执行顺序断言测试套件 Junit 使用 本篇博客使用 Idea集成开发环境 首先&#xff0c;创建新项目&#…

VUE 仿神州租车-开放平台

项目背景&#xff1a; 神州租车是一家提供汽车租赁服务的公司&#xff0c;其API开放平台为开发者提供了访问神州租车相关服务和数据的接口。用VUE技术来仿照其开发平台。 成果展示&#xff1a; 首页&#xff1a; API文档&#xff1a; 关于我们&#xff1a;

牛只行为及种类识别数据集18g牛只数据,适用于多种图像识别,目标检测,区域入侵检测等算法作为数据集。数据集中包括牛只行走,站立,进食,饮水等不同类型的数据

18g牛只数据&#xff0c;适用于多种图像识别&#xff0c;目标检测&#xff0c;区域入侵检测等算法作为数据集。 数据集中包括牛只行走&#xff0c;站立&#xff0c;进食&#xff0c;饮水等不同类型的数据&#xff0c;可以用于行为检测 数据集中包含多种不同种类的牛只&#xff…

黑盒测试 | 挖掘.NET程序中的反序列化漏洞

通过不安全反序列化漏洞远程执行代码 今天&#xff0c;我将回顾 OWASP 的十大漏洞之一&#xff1a;不安全反序列化&#xff0c;重点是 .NET 应用程序上反序列化漏洞的利用。 &#x1f4dd;$ _序列化_与_反序列化 序列化是将数据对象转换为字节流的过程&#xff0c;字节流可以…

基于SpringBoot+Vue+uniapp的诗词学习系统的详细设计和实现

详细视频演示 请联系我获取更详细的演示视频 项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不…

Maxwell 底层原理 详解

Maxwell 是一个 MySQL 数据库的增量数据捕获&#xff08;CDC, Change Data Capture&#xff09;工具&#xff0c;它通过读取 MySQL 的 binlog&#xff08;Binary Log&#xff09;来捕获数据变化&#xff0c;并将这些变化实时地发送到如 Kafka、Kinesis、RabbitMQ 或其他输出端。…

0x3D service

0x3D service 1. 概念2. Request message 数据格式3. Respone message 数据格式3.1 正响应格式3.2 negative respone codes(NRC)4. 示例4.1 正响应示例:4.2 NRC 示例1. 概念 UDS(统一诊断服务)中的0x3D服务,即Write Memory By Address(按地址写内存)服务,允许客户端向服…

2024年中国工业大模型行业发展研究报告|附43页PDF文件下载

工业大模型伴随着大模型技术的发展&#xff0c;逐渐渗透至工业&#xff0c;处于萌芽阶段。 就大模型的本质而言&#xff0c;是由一系列参数化的数学函数组成的计算系统&#xff0c;且是一个概率模型&#xff0c;其工作机制是基于概率和统计推动进行的&#xff0c;而非真正的理解…

aardio 中最重要的控件:自定义控件使用指南

aardio虽然是个小众编程语言&#xff0c;但其在windows下做个小软件生成exe文件&#xff0c;确实方便。只是这个编程语言的生态圈小&#xff0c;文档的详细程度也完全无法和大的编程语言相提并论。今天介绍一下&#xff0c;aardio中的自定义控件如何使用。 这里我们只介绍如何做…

python 作业1

任务1: python为主的工作是很少的 学习的python的优势在于制作工具&#xff0c;制作合适的工具可以提高我们在工作中的工作效率的工具 提高我们的竞争优势。 任务2: 不换行 换行 任务3: 安装pycharm 进入相应网站Download PyCharm: The Python IDE for data science and we…

AnaTraf | TCP重传的工作原理与优化方法

目录 什么是TCP重传&#xff1f; TCP重传的常见触发原因 TCP重传对网络性能的影响 1. 高延迟与重传 2. 吞吐量的下降 如何优化和减少TCP重传 1. 优化网络设备配置 2. 优化网络链路 3. 网络带宽的合理规划 4. 部署CDN和缓存策略 结语 AnaTraf 网络性能监控系统NPM | …

餐饮店怎么标注地图位置信息?

随着市场竞争的日益激烈&#xff0c;商家若想在竞争中脱颖而出&#xff0c;就必须想方设法去提高自身的曝光度和知名度&#xff0c;为店铺带来更多的客流量。其中&#xff0c;地图标注便是一种简单却极为有效的方法。通过在地图平台上添加店铺位置信息&#xff0c;不仅可以方便…

Qt-系统文件相关介绍使用(61)

目录 描述 输⼊输出设备类 打开/读/写/关闭 使用 先初始化&#xff0c;创建出大致的样貌 输入框设置 绑定槽函数 保存文件 打开文件 提取文件属性 描述 在C/C Linux 中我们都接触过关于文件的操作&#xff0c;当然 Qt 也会有对应的文件操作的 ⽂件操作是应⽤程序必不…

【C语言】文件操作(1)(文件打开关闭和顺序读写函数的万字笔记)

文章目录 一、什么是文件1.程序文件2.数据文件 二、数据文件1.文件名2.数据文件的分类文本文件二进制文件 三、文件的打开和关闭1.流和标准流流标准流 2.文件指针3.文件的打开和关闭文件的打开文件的关闭 四、文件的顺序读写1.fgetc函数2.fputc函数3.fgets函数4.fputs函数5.fsc…

微信小程序上传组件封装uploadHelper2.0使用整理

一、uploadHelper2.0使用步骤说明 uploadHelper.js ---上传代码封装库 cos-wx-sdk-v5.min.js---腾讯云&#xff0c;对象存储封装库 第一步&#xff0c;下载组件代码&#xff0c;放置到自己的小程序项目中 第二步、 创建上传对象&#xff0c;执行选择图片/视频 var _this th…

npm install进度卡在 idealTree:node_global: sill idealTree buildDeps

ping一下源&#xff1a;ping http://registry.npm.taobao.org/ ping不通&#xff0c;原因&#xff1a;原淘宝npm永久停止服务&#xff0c;已更新新域名~~震惊&#xff01;&#xff01;&#xff01; 重新安装&#xff1a;npm config set registry https://registry.npmmirror.c…

推荐?还是踩雷?3款中英互译软件大盘点,你真的选对了吗?

作为一个爱到处跑的人&#xff0c;我特别明白旅行的时候能说会道有多重要。不管是跟当地人聊天&#xff0c;还是看路标、菜单&#xff0c;有个好用的翻译软件是肯定少不了的。今天&#xff0c;我打算给你们介绍3款中英文互译的翻译工具&#xff0c;帮你挑出最适合自己的那一个。…

机器学习:opencv--人脸检测以及微笑检测

目录 前言 一、人脸检测的原理 1.特征提取 2.分类器 二、代码实现 1.图片预处理 2.加载分类器 3.进行人脸识别 4.标注人脸及显示 三、微笑检测 前言 人脸检测是计算机视觉中的一个重要任务&#xff0c;旨在自动识别图像或视频中的人脸。它可以用于多种应用&#xff0…

Python和MATLAB锂电铅蓄电化学微分模型和等效电路

&#x1f3af;要点 对比三种电化学颗粒模型&#xff1a;电化学的锂离子电池模型、单粒子模型和带电解质的单粒子模型。求解粒子域内边界通量与局部电流密度有关的扩散方程。扩展为两个相的负或正电极复合电极粒子模型。模拟四种耦合机制下活性物质损失情况。模拟锂离子电池三参…