13、RockerMQ消息类型之广播与集群消息

RocketMq中提供两种消费模式:集群模式和广播模式。

集群模式
集群模式表示同一个消息会被同一个消费组中的消费者消费一次,消息被负载均衡分配到同一个消费者上的多个实例上。

在这里插入图片描述
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
在这里插入图片描述

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

广播模式
广播消息表示同一个消息会推送到集群里面所有的消费者,保证消息至少被每个消费者消费一次。
在这里插入图片描述

默认情况下是使用集群模式,Broker端会给每一个消费组维护一个统一的offset,这个offset能够保证一个消息在消费组里面只会被消费一次,而广播模式的实现方式,是将本来由Broker保管的offset交个消费者自行保管,而Broker只管往消费者推送消息即可。
注意事项:

01
由于offset由消费者自行保存和记录,Broker只管推送消息,如果消息失败了就不会存在重试的能力。

02
消费者维护的offset是可以在服务重启时,按照上一次消费的进度处理后面没有处理的消息不会影响服务器的性能,但是如果消费者的offset丢失了,消费者的服务可以正常运行但是此时未消费的消息就不能申请了,只能申请后面推送的。

03
offset文件会存放在本地,当然这里面存在很多的坑。

各场景源码分析

广播消息与集群消息作为RocketMQ的两大类型,存在以下几点差异,首先我们通过DefaultMQPushConsumerImpl类的start方法启动消费者。

01
广播模式不支持消息的重试。

private void copySubscription() throws MQClientException {try {//...switch (this.defaultMQPushConsumer.getMessageModel()) {// 判断消息模式,如果是广播直接跳出不进行重试case BROADCASTING:break;case CLUSTERING:final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}

02
广播模式使用本地offset缓存(start方法)。

switch (this.defaultMQPushConsumer.getMessageModel()) {// 广播模式(本地的Offset文件)case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;// 集群模式 (Broker里面的offset)case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;
}

03
没有所谓的负载均衡(执行rebalanceByTopic方法)。

case BROADCASTING: {// 根据topic获取MessageQueue集合Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;
}

04
不支持顺序消息:由于关闭消息是所有的消费者都需要去消费。

// 顺序消息更新偏移量
if (processQueue.isLocked()) {if (!pullRequest.isPreviouslyLocked()) {long offset = -1L;try {offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}
} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;
}

05
并发消息消费时也没有重试(执行processConsumeResult方法时)。

// 单纯打印信息
case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/216360.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS 的背景属性(开发中常用)

目录 1 内容预览 背景颜色 背景图片 背景平铺 背景图片位置(常用) 背景图像固定 背景复合写法 背景色半透明 实现案例 1 内容预览 背景属性可以设置背景颜色、背景图片、背景平铺、背景图片位置、背景图像固定等。 注意&#xff1a; 把表格中的五个属背下来&#xff0c…

约数性质以及辗转相除法

文章目录 AcWing 869. 试除法求约数题目链接思路CODE AcWing 870. 约数个数题目链接思路CODE AcWing 871. 约数之和题目链接思路CODE AcWing 872. 最大公约数题目链接思路CODE AcWing 869. 试除法求约数 题目链接 https://www.acwing.com/activity/content/problem/content/9…

【LuatOS】简单案例网页点灯

材料 硬件&#xff1a;合宙ESP32C3简约版&#xff0c;BH1750光照度模块&#xff0c;0.96寸OLED(4P_IIC)&#xff0c;杜邦线若干 接线&#xff1a; ESP32C3.GND — OLED.GND — BH1750.GND ESP32C3.3.3V — OLED.VCC — BH1750.VCC ESP32C3.GPIO5 — OLED.SCL — BH1750.SCL E…

AOP跨模块捕获异常遭CGLIB拦截而继续向上抛出异常

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、BUG详情 1.1 报错信息 1.2 接口响应信息 1.3 全局异常处理器的定义 二、排查过程 三、解决方案 四、总结 前言 最近&…

数据科学工作的20个Pandas函数(备忘)

Pandas 是数据科学社区中使用最广泛的库之一&#xff0c;它是一个强大的工具&#xff0c;可以进行数据操作、清理和分析。 本文将提供最常用的 Pandas 函数以及如何实际使用它们的样例。我们将涵盖从基本数据操作到高级数据分析技术的所有内容&#xff0c;到本文结束时&#xf…

攻防世界题目练习——Web引导模式(五)(持续更新)

题目目录 1. FlatScience2. bug3. Confusion1 1. FlatScience 参考博客&#xff1a; 攻防世界web进阶区FlatScience详解 题目点进去如图&#xff0c;点击链接只能看到一些论文pdf 用dirsearch和御剑扫描出一些隐藏文件&#xff1a; robots.txt: admin.php: login.php: f…

配电站房智慧化改造

随着科技的发展和工业化自动化的不断提高&#xff0c;传统的配电室已经不能满足现代工业的需求。配电站房的智慧化改造可以提高电力系统的运行效率和安全性&#xff0c;依托电易云-智慧电力物联网实现配电房无人值守。以下是配电站房智慧化改造的一些主要方面&#xff1a; 设备…

数据分享 I 全国市级商品房屋销售数据,shp/excel格式,2005-2020年数据

基本信息. 数据名称: 全国市级商品房屋销售数据 数据格式: Shp、excel 数据时间: 2005-2020年 数据几何类型: 面 数据坐标系: WGS84坐标系 数据来源&#xff1a;网络公开数据 数据字段&#xff1a; 序号字段名称字段说明1spxse商品房销售额&#xff08;亿元&#xf…

css 表示具有特定类或者其他属性的某种标签类型的元素

需求 通过 css 选择器获取某种标签&#xff08;如&#xff1a;div、input 等&#xff09;具有某个属性&#xff08;如&#xff1a;class、id 等&#xff09;的元素&#xff0c;从而修改其样式。 代码 通过 [标签].[属性] 的方式来获取 <div class"test">&l…

合封芯片开发就找宇凡微,提供合封芯片技术支持与资讯

一、引言 随着科技的迅速发展&#xff0c;芯片在各种电子设备中的地位日益凸显&#xff0c;其中越来越受关注的合封芯片给芯片和pcb厂商带来惊喜 合封芯片是指将多个芯片&#xff08;或其他电子元件&#xff09;封装在一个芯片封装体中的芯片。这种封装方式可以实现多个芯片的…

APP自动化测试工具大全

一、UI自动化测试工具 1. uiautomator2 openatx开源的ui自动化工具&#xff0c;支持Android和iOS。主要面向的编程语言是Python&#xff0c;API设计简洁易用&#xff0c;在开源社区也是很受欢迎。 安装&#xff1a; pip install --upgrade --pre uiautomator2# Or you can …

ArcGIS无法绘制一个或多个图层

背景&#xff1a;在导入一份数据时候&#xff0c;arcmap出现无法绘制一个或多个图层的错误&#xff0c;...点数少于要素所要求的的数量&#xff0c;查阅了半天资料发现是制作数据时候拓扑关系错误造成&#xff0c;现将处理方法详细记录如下&#xff1a; 1.原数据&#xff1a; …

北斗卫星助力消防救援实现精确升级

北斗卫星助力消防救援实现精确升级 在浙江省湖州市&#xff0c;当地消防支队建设了基于北斗系统的“智慧用水”等湖州市“智慧消防”综合信息指挥平台&#xff0c;初步实现了火灾精确预警、精确防范、精确指挥和精确处置。 湖州市“智慧用水系统”通过将北斗系统与地理信息管网…

plf::list原理分析

plf::list是一个比std::list性能要好的另外一种实现&#xff0c;根据作者的性能测试&#xff1a; 293% faster insertion 57% faster erasure 17% faster iteration 77% faster sorting 70% faster reversal 91% faster remove/remove_if 63% faster unique 811% faster clear …

【分享】7-Zip软件如何压缩文件?

7-Zip是一款完全免费的解压缩软件&#xff0c;不仅拥有自己独特的格式&#xff0c;还支持众多主流压缩格式&#xff0c;相比其他解压缩软件&#xff0c;它的压缩率更好&#xff0c;压缩速度更快。今天来分享一下如何使用7-Zip软件来压缩文件。 首先&#xff0c;我们可以到7-Zi…

SpringBoot热部署

SpringBoot热部署 借鉴链接&#x1f517;&#xff1a;SpringBoot中的热部署 添加devtools依赖和pom插件 <!-- devtools 依赖 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId&…

C语言——结构体

一、结构的基础知识 结构是一些值的集合&#xff0c;这些值称为成员变量结构的&#xff0c;每个成员可以是不同类型的变量。 二、结构的声明 struct tag {member-list; }variable-list; 描述一个学生&#xff1a; typedef struct Student {char name[20]; //姓名int age; …

SpringBoot入门及整合

前言 Spring Boot是一个基于Spring框架的快速开发脚手架&#xff0c;它简化了Spring应用的初始化和搭建过程&#xff0c;提供了众多便利的功能和特性并且使用"习惯优于配置"的理念&#xff0c;通过提供默认设置来快速搭建应用&#xff0c;同时也保留了灵活性以进行定…

IntelliJ IDEA创建一个spark的项目

在开始之前&#xff0c;需要说明的是 要跑通基本的wordcount程序&#xff0c;是不需要在windows上安装 hadoop 和spark的&#xff0c;因为idea在跑程序的时候&#xff0c;会按照 pom.xml配置文件&#xff0c;从指定的 repository源&#xff0c;按照properties指定的版本&#x…

系统架构设计师教程(一)绪论

系统架构设计师 1.1 系统架构概述1.1.1 系统架构的定义及发展历程1.1.2 软件架构的常用分类及建模方法1、软件架构常用分类2、系统架构的常用建模方法 1.2 系统架构设计师概述1.3 如何成为一名好的系统架构设计师 1.1 系统架构概述 自1946年第一台计算机诞生以来&#xff0c;计…