KafkaConsumer 消费逻辑

版本:kafka-clients-2.0.1.jar

之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。
下面是相关的源码,并通过注释的方式进行说明。

先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。

拉取消息

KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {// note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的)acquireAndEnsureOpen();try {if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");// note: subscriptions:SubscriptionState  维护了当前消费者订阅的主题列表的状态信息(组、offset等)//   方法判断是否未订阅或未分配分区if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expireslong elapsedTime = 0L;do {// note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时))client.maybeTriggerWakeup();final long metadataEnd;if (includeMetadataInTimeout) {final long metadataStart = time.milliseconds();// note: 更新分区分配元数据以及offset, remain是用来算剩余时间的// 内部逻辑://  1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交)//  2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos)if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {return ConsumerRecords.empty();}metadataEnd = time.milliseconds();elapsedTime += metadataEnd - metadataStart;} else {while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {log.warn("Still waiting for metadata");}metadataEnd = time.milliseconds();}//note: 这里终于开始拉取消息了,下面单独讲一下final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (!records.isEmpty()) {//note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}//note:  这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题return this.interceptors.onConsume(new ConsumerRecords<>(records));}final long fetchEnd = time.milliseconds();elapsedTime += fetchEnd - metadataEnd;} while (elapsedTime < timeoutMs);return ConsumerRecords.empty();} finally {release();}
}

关于 pollForFetches 的逻辑

pollForFetches

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {final long startMs = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);// note: 先获取已经拉取了的消息,存在就直接返回//  fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息//    从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position),//      然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// note: 没有预拉取的消息,发送拉取请求(实际没发) //  先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存//  以及设置listener (成功则处理结果入队列completedFetches)// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// note: 轮询等待,详见下文client.poll(pollTimeout, startMs, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}

ConsumerNetworkClient#poll

/*** Poll for any network IO.* @param timeout timeout in milliseconds* @param now current time in milliseconds* @param disableWakeup If TRUE disable triggering wake-ups*/
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {// note: 触发已完成的请求的回调处理器  (有一个pendingCompletion的队列)// there may be handlers which need to be invoked if we woke up the previous call to pollfirePendingCompletedRequests();lock.lock();try {// note: 处理断开的连接 (pendingDisconnects队列)// Handle async disconnects prior to attempting any sendshandlePendingDisconnects();// note: 实际上这里才真正发出请求。。 前面那个feature只是构建request//  前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中//  这里面取出来进行发送, kafkaClient.ready -> send// send all the requests we can send nowlong pollDelayMs = trySend(now);timeout = Math.min(timeout, pollDelayMs);// note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞//  poll 里面是从 sockets 里面读写数据// check whether the poll is still needed by the caller. Note that if the expected completion// condition becomes satisfied after the call to shouldBlock() (because of a fired completion// handler), the client will be woken up.if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {// if there are no requests in flight, do not block longer than the retry backoffif (client.inFlightRequestCount() == 0)timeout = Math.min(timeout, retryBackoffMs);client.poll(Math.min(maxPollTimeoutMs, timeout), now);now = time.milliseconds();} else {client.poll(0, now);}// note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中// handle any disconnects by failing the active requests. note that disconnects must// be checked immediately following poll since any subsequent call to client.ready()// will reset the disconnect statuscheckDisconnects(now);if (!disableWakeup) {// trigger wakeups after checking for disconnects so that the callbacks will be ready// to be fired on the next call to poll()maybeTriggerWakeup();}// throw InterruptException if this thread is interruptedmaybeThrowInterruptException();// note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false)// try again to send requests since buffer space may have been// cleared or a connect finished in the polltrySend(now);// fail requests that couldn't be sent if they have expiredfailExpiredRequests(now);// clean unsent requests collection to keep the map from growing indefinitelyunsent.clean();} finally {lock.unlock();}// called without the lock to avoid deadlock potential if handlers need to acquire locksfirePendingCompletedRequests();
}

自动提交

提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。

入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync

触发逻辑主要是

  • KafkaConsumer#poll 拉消息
  • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
  • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先构建请求存 unset 里面,等拉消息的时候再发出去)
    public void maybeAutoCommitOffsetsAsync(long now) {// 这里用来判断是否满足自动提交的间隔if (autoCommitEnabled && now >= nextAutoCommitDeadline) {this.nextAutoCommitDeadline = now + autoCommitIntervalMs;doAutoCommitOffsetsAsync();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/138128.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GZ038 物联网应用开发赛题第1套

2023年全国职业院校技能大赛 高职组 物联网应用开发 任 务 书 (第1套卷) 工位号:______________ 第一部分 竞赛须知 一、竞赛要求 1、正确使用工具,操作安全规范; 2、竞赛过程中如有异议,可向现场考评人员反映,不得扰乱赛场秩序; 3、遵守赛场纪律,尊重考评人员…

【漏洞复现】BYTEVALUE智能流控路由器存在命令执行

【漏洞介绍】 百为智能流控路由器 /goform/webRead/open 路由的 ?path 参数存在有回显的命令注入漏洞。攻击者可通过该漏洞在服务器端执行命令&#xff0c;写入后门&#xff0c;获取服务器权限&#xff0c;从而获取路由器权限。 【指纹】 title”BYTEVALUE 智能流控路由器”…

151. 反转字符串中的单词

151. 反转字符串中的单词 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a;错误经验吸取 原题链接&#xff1a; 151. 反转字符串中的单词 https://leetcode.cn/problems/reverse-words-in-a-string/description/ 完成情况&#xff1a; 解…

ai批量剪辑矩阵无人直播一站式托管系统源头技术开发

1.全店IP形象打造----剪辑 全店IP打造模式为场景组合&#xff0c;需要在每个场景内按照顺序分别上传短视频素材&#xff0c;会与选中的音乐、标题文案组合生成有逻辑顺序的视频。可调配标题字号大小、音频音量大小。如想要携带团购地址可设置POI。可开启团购引导动画、镜头转场…

AI时代产品经理升级之道:ChatGPT让产品经理插上翅膀

文章目录 一、ChatGPT简介二、ChatGPT在产品经理工作中的应用1. 快速获取用户反馈2. 智能分析竞品3. 智能推荐产品4.分析市场趋势5.优化产品功能 三、总结与展望《AI时代产品经理升级之道&#xff1a;ChatGPT让产品经理插上翅膀》亮点内容简介目录作者简介获取方式 随着人工智能…

梯度@等值线@梯度运算法则

文章目录 梯度点处梯度函数梯度梯度和方向导数的关系 等值线等值线法线和梯度三元函数梯度点处梯度函数梯度梯度长度等值面 梯度运算法则 梯度 梯度是一个与方向导数相关的概念,梯度本质上是向量,是由各个自变量的偏导数定义的向量;梯度通常充当方向导数(函数变化率)的最值的角…

火山引擎公共云·城市分享会:共享云经验,一起向未来

数智化时代的来临&#xff0c;不仅激发了行业对云计算的资源需求&#xff0c;也重构了云计算的技术架构及产品布局&#xff0c;给业务场景带来更多可能性&#xff0c;让云计算成为企业走向高效治理的一剂“良方”。随着业务的多样化、复杂化&#xff0c;企业应该如何借助云计算…

各种业务场景调用API代理的API接口教程(附带电商平台api接口商品详情数据接入示例)

API代理的API接口在各种业务场景中具有广泛的应用&#xff0c;本文将介绍哪些业务场景可以使用API代理的API接口&#xff0c;并提供详细的调用教程和代码演示&#xff0c;同时&#xff0c;我们还将讨论在不同场景下使用API代理的API接口所带来的好处。 哪些业务场景可以使用API…

NAS 扩容简明指南:使用各种外设给 NAS 们扩容

说起来有趣&#xff0c;NAS 除了“不同设备共享存储”这个功能之外&#xff0c;最重要的功能就是为设备扩容&#xff0c;但是 NAS 自己的存储容量不够了&#xff0c;又该如何。 ​这篇文章分享下我目前使用外设给 NAS 扩容的思路&#xff0c;如何以相对低的成本来获取更大的容…

【python】乘机最大

题目&#xff1a; """ 设有一个长度为N的数字串&#xff0c;要求选手使用K个乘号将它分成K1个部分&#xff0c;找出一种分法&#xff0c;使得这K1个部分的乘积能够为最大。为了帮助选手能够正确理解题意&#xff0c;主持人还举了如下的一个例子有一个数字串: 31…

竞赛 目标检测-行人车辆检测流量计数

文章目录 前言1\. 目标检测概况1.1 什么是目标检测&#xff1f;1.2 发展阶段 2\. 行人检测2.1 行人检测简介2.2 行人检测技术难点2.3 行人检测实现效果2.4 关键代码-训练过程 最后 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 行人车辆目标检测计数系统 …

【系统救援】 Ubuntu重启失败,报错:UNEXPECTED INCONSISTENCY; RUN fsck MANUALLY

问题定位及处理 查看错误信息&#xff1a;/dev/sda3 contains a file system with errors, check forced. /dev/sda3: Inodes that were part of a corrupted orphan linked list found. /dev/sda3: UNEXPECTED INCONSISTENCY; RUN fsck MANUALLY. (i.e., without -a or -p o…

【数据结构】堆详解!(图解+源码)

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; 数据结构解析 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f324;️前言&#x1f324;️堆的理论☁️二叉树的顺序存储☁️堆的概念 &#x1f324;️堆的实现…

路径加密(替换空格),剑指offer,力扣

目录 我们直接看题解吧&#xff1a; 方法&#xff1a; 审题目事例提示&#xff1a; 解题思路&#xff1a; 法1&#xff1a; 代码&#xff08;法1&#xff09;&#xff1a; 法2&#xff1a; 代码&#xff08;法2&#xff09;&#xff1a; 原题解&#xff1a; 【剑指Offer】2、替…

GoLong的学习之路(二十三)进阶,语法之并发(go最重要的特点)(锁,sync包,原子操作)

这章是我并发系列中最后的一章。这章主要讲的是锁。但是也会讲上一章channl遗留下的一些没有讲到的内容。select关键字的用法&#xff0c;以及错误的一些channl用法。废话不多说。。。 文章目录 select多路复用通道错误示例并发安全和锁问题描述互斥锁读写互斥锁 syncsync.Wait…

合成数据如何改变制造业

人工智能正在工厂车间使用&#xff0c;以识别生产线中的低效率。它可以有效地预测设备何时需要维护&#xff0c;以避免停机。人工智能被用于发现产品中的缺陷。 为了完成所有这些工作&#xff0c;使用从人工智能应该学习的过程中收集的数据来创建或训练模型。对于缺陷识别&…

定义无向加权图,并使用Pytorch_geometric实现图卷积

首先定义无向边并定义边的权重 import torch import torch.nn as nn from torch_geometric.nn import GCNConv import torch.nn.functional as F from torch_geometric.data import Dataa torch.LongTensor([0, 0, 1, 1, 2, 2, 3, 4]) b torch.LongTensor([0, 1, 2, 3, 1, 5,…

EXCEL中安装多个vsto插件,插件之间互相影响功能,怎么解决

在 Excel 中安装多个 VSTO 插件&#xff0c;并且这些插件之间存在互相影响的情况下&#xff0c;可以采取以下措施来解决问题&#xff1a; 1. **隔离插件功能&#xff1a;** - 确保每个 VSTO 插件都有清晰的功能和责任范围&#xff0c;避免不同插件之间的功能重叠。这可以通…

【前端】TypeScript核心知识点讲解

1.TypeScript简介及入门案例 &#xff08;1&#xff09;什么是TypeScript&#xff1f; TypeScript 是 JavaScript 的一个超集&#xff0c;支持 ECMAScript 6 &#xff08;ES6&#xff09;标准。 TypeScript 由微软开发的自由和开源的编程语言。 TypeScript 设计目标是开发大…

【IP-guard WebServer 远程命令执行漏洞复现(0day)】

文章目录 一、漏洞说明二、影响版本三、资产测绘四、漏洞复现五、修复建议 一、漏洞说明 IP-guard是由溢信科技股份有限公司开发的一款终端安全管理软件&#xff0c;旨在帮助企业保护终端设备安全、数据安全、管理网络使用和简化IT系统管理。 IP-guard Webserver远程命令执行漏…