springcloud rocketmq 新增的消费者组从哪里开始消费

如果新建一个新的消费者组,是否会消费历史消息,导致重复消费?

直接在 console 界面新增消费者组,但是没有办法绑定订阅关系,没有找到入口,在 控制台项目源码 rocketmq-externals 也没有找到可以确定订阅关系的接口,在阿里云的生产控制台也没有绑定的入口。
在这里插入图片描述
在这里插入图片描述
所以只能是消费者启动后再注册订阅关系。
消费者从哪里消费的计算:
RebalancePushImpl.java
在这里插入图片描述

默认走的是:CONSUME_FROM_LAST_OFFSET 规则,按照官方说法,是从最后的消费位点开始继续消费。
关键的获取消费位点的逻辑:readOffset方法:
RemoteBrokerOffsetStore.java
在这里插入图片描述
集群模式下,是从远程获取的偏移量,跟据 fetchConsumeOffsetFromBroker 方法:
在这里插入图片描述
在这里插入图片描述
报错,其实就是服务端没有该消费者组的offset,被catch住默认返回 -1.
又不是重试队列,所以拿最大的偏置,broker-a queue-8 的 brokerOffset 是 25
在这里插入图片描述
出来到了 RebalanceImpl.java 的 updateProcessQueueTableInRebalance 方法。
在这里插入图片描述
然后会被添加到 pullRequestList 通过 this.dispatchPullRequest(pullRequestList)

控制台topic消费进度中已经保存了新的消费者组的消费进度,但 consumeOffset都是 0, 还有 759 个消息没有消费。
在这里插入图片描述

消费者消费了一些比较早前的消息:
在这里插入图片描述
在这里插入图片描述

消费进度也随之更新。
在这里插入图片描述
为什么和官方的说法不一致呢?CONSUME_FROM_LAST_OFFSET 为什么没有起到作用?
参考官方的修复:Fix CONSUME_FROM_LAST_OFFSET mode may pull data from 0L #4909

~~
至于这个console怎么看?参考以下:
在rocketmq的控制台中,选择 topic -> consumer manage,就可以查看一个主题下的消费者组、集群、队列的消费情况。
在这里插入图片描述
其中,10.122.24.41 是本人的内网ip,如果我本地线程卡住了(或者debug中),这个在线状态也会下线的。目前我是分配到了其中的8个集群队列,broker-a(8~15)。
在这里插入图片描述
offsetTable 的内容和我所描述的一致。
此外:
我还有对比组,是之前创建的废弃的消费者组,集群位点 brokerOffset 不变,消费位点 consumerOffset 落后了许多,落后的总量 diffTotal 代表此消费者组还有这么多未消费的消息。而且也没有在线的消费者客户端 consumerClient。
在这里插入图片描述
如果这时,我配置启动消费者去消费此消费者组。预计会消费 delay = 294 个消息。
结果也确实如此,将消息消费完,而且分配到了所有集群的所有队列。
在这里插入图片描述

测试结果:默认策略会从offset = 0 开始消费。
在这里插入图片描述
所以该参数没有用,还是从0开始消费了,这时候只能靠消费者组重置位点操作了。

~~
tags过滤,是服务端过滤,为什么会直接将不需要的消息也丢失掉呢?
这就要涉及到订阅关系一致性。
在这里插入图片描述
在这里插入图片描述

参考:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering

tags过滤会将不匹配的直接跳过(丢失)
我的理解是,现在没有为某个tags有单独记录消费进度的地方,所谓的服务端过滤,也只是说用hashcode快速匹配拉取而已,之后也是直接将offset拉到队列尾的。

参考:https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg?spm=a2c6h.12873639.article-detail.18.3ba035175CHVos

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/49708.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 常用调试工具/方法解析

一、内存相关 参考Android内存分析命令_dumpsys meminfo 算出rss-CSDN博客 1、基本概念 1)PSS & RSS & USS & VSS a、PSS 概念:全称Proportional Set Size,根据进程实际使用的内存量按照共享比例分配给进程的一种内存度量方…

SpringBoot 项目配置文件注释乱码的问题解决方案

一、问题描述 在项目的配置文件中,我们写了一些注释,如下所示: 但是再次打开注释会变成乱码,如下所示: 那么如何解决呢? 二、解决方案 1. 点击” File→Setting" 2. 搜索“File Encodings”, 将框…

Wonder3D 论文学习

论文链接:https://arxiv.org/abs/2310.15008 代码链接:https://github.com/xxlong0/Wonder3D 解决了什么问题? 随着扩散模型的提出,3D 生成领域取得了长足进步。从单张图片重建出 3D 几何是计算机图形学和 3D 视觉的基础任务&am…

World of Warcraft [CLASSIC] Timebadge

游戏币【每个服务器实时金价不一样,本例子是5000-6000金】 1枚【魔兽世界时光徽章】 30天游戏时间。 5760金币游戏币,策划如何消耗游戏里面的金币总量,以及如何留住那些非人民币玩家呢 30天加上去了 World of Warcraft [CLASSIC] [魔兽世界…

搜索与下载Stable Diffusion 模型

我只是一个刚开始学习SD没多久小白,拿到别人的工作流想要运行时,很多时候还要下载对应的模型才能正常运行,虽然也可以通过comfyui-manager下载,不过有时候好像会下载失败,而单独下载所需模型,我自己试过&am…

英语科技写作 希拉里·格拉斯曼-蒂(英文版)pdf下载

下载链接: 链接1:https://pan.baidu.com 链接2:/s/1fxRUGnlJrKEzQVF6k1GmBA 提取码:b69t 由于是英文版,可能有些看着不太方便,可以在网页版使用以下软件中英文对照着看,看着更舒服,…

图书管理系统设计

设计一个图书管理系统时,我们需要考虑系统的基本功能、用户需求、技术选型以及数据的安全性和完整性。下面是一个基本的图书管理系统的设计概览: 1. 系统目标 管理图书信息:添加、删除、修改图书信息。借阅管理:处理借书、还书流…

Python 教程(二):语法与数据结构

目录 前言专栏列表语法特点实例代码基本数据类型变量命名规则赋值动态类型作用域示例代码 运算符list、set和dict 数据结构 区别1. list(列表)2. set(集合)3. dict(字典) 总结 前言 Python 是一种计算机编…

Linux调试器gdb

1.debug版本的文件才能调试 gcc默认编译生成的是release版本,在输入gcc编译指令时,要加-g,才会生成debug版本的可执行文件。 可以看到,code_dbug比code大,因为debug版本含有调试信息,往往比release版本的…

【BUG】已解决:SyntaxError:positional argument follows keyword argument

SyntaxError:positional argument follows keyword argument 目录 SyntaxError:positional argument follows keyword argument 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页,我是博主英杰&#xff0c…

【SOC 芯片设计 DFT 学习专栏 -- DFT OCC 与 ATPG的介绍】

请阅读【嵌入式及芯片开发学必备专栏】 请阅读【芯片设计 DFT 学习系列 】 如有侵权,请联系删除 转自: 简矽芯学堂 简矽芯学堂 2024年01月18日 09:00 陕西 文章目录 OCC 介绍Fast ScanFull chip ATPGPartition ATPGHierarchical ATPG OCC 介绍 OCC&am…

微信小程序实现聊天界面,发送功能

.wxml <scroll-view scroll-y"true" style"height: {{windowHeight}}px;"><view wx:for"{{chatList}}" wx:for-index"index" wx:for-item"item" style"padding-top:{{index0?30:0}}rpx"><!-- 左…

力扣高频SQL 50题(基础版)第十题

文章目录 力扣高频SQL 50题&#xff08;基础版&#xff09;第十题1661. 每台机器的进程平均运行时间题目说明思路分析实现过程准备数据实现方式结果截图总结 力扣高频SQL 50题&#xff08;基础版&#xff09;第十题 1661. 每台机器的进程平均运行时间 题目说明 表: Activity…

Git merge

Git merge 参考文档&#xff1a; https://marsishandsome.github.io/2019/07/Three_Way_Merge https://git-scm.com/docs/merge-strategies https://stackoverflow.com/questions/56889406/how-does-git-compare-two-files-while-merging Git merge的目标是合并changes&#x…

idea一键为实体类赋值

file -> settings -> plugins -> marketplace 把这个插件装上 找个实体&#xff0c;选中&#xff0c;altenter进入edit界面 我是选择只保留右边这种生成方法&#xff0c;然后选择ok 返回到那个实体&#xff0c;选择&#xff0c;altenter generate生成

创建线程的几种方式

严格来讲&#xff0c;创建线程只有一种方式&#xff0c;就是实现Runnable接口&#xff0c;其他创建线程的方式也是对其封装。 继承Thread方式 public class Thread extends java.lang.Thread {Overridepublic void run() {super.run();} } 进入Thread可以看到&#xff0c;也是…

Spark实时(四):Strctured Streaming简单应用

文章目录 Strctured Streaming简单应用 一、Output Modes输出模式 二、Streaming Table API 三、​​​​​​​​​​​​​​Triggers 1、​​​​​​​unspecified&#xff08;默认模式&#xff09; 2、​​​​​​​​​​​​​​Fixed interval micro-batches&am…

C语言刷题小记2

前言 本篇博客还是为大家分享一些C语言的OJ题目&#xff0c;如果你感兴趣&#xff0c;希望大佬一键三连。多多支持。下面进入正文部分。 题目1竞选社长 分析&#xff1a;本题要求我们输入一串字符&#xff0c;并且统计个数的多少&#xff0c;那么我们可以通过getchar函数来获…

软件开发者消除edge浏览器下载时“此应用不安全”的拦截方法

当Microsoft Edge浏览器显示“此应用不安全”或者“已阻止此不安全的下载”这类警告时&#xff0c;通常是因为Windows Defender SmartScreen或者其他安全功能认为下载的文件可能存在安全风险。对于软件开发者来说&#xff0c;大概率是由于软件没有进行数字签名&#xff0c;导致…

【React】useState:状态更新规则详解

文章目录 一、基本用法二、直接修改状态 vs 使用 setState 更新状态三、对象状态的更新四、深层次对象的更新五、函数式更新六、优化性能的建议 在 React 中&#xff0c;useState 是一个非常重要的 Hook&#xff0c;用于在函数组件中添加状态管理功能。正确理解和使用 useState…