kafka consumer客户端消费逻辑解析

kafka consumer客户端消费逻辑解析

    • 一、主要消费步骤
    • 二、提交策略
      • 【步骤2代码解析】
      • 【提交策略总结】
    • 三、拉取策略(待补充)
    • 四、消费策略
      • 【代码解析】
      • 【消费策略总结】

一、主要消费步骤

在这里插入图片描述
这是kafka客户端拉取消息的入口,有4个主要部分

1、启动后的准备
consumer线程启动后,如果非自动提交模式,构建worker线程放入worker线程池,供后续消费消息使用

2、运行期逻辑循环——提交策略

3、运行期逻辑循环——消息拉取

4、运行期逻辑循环——消息消费

二、提交策略

【步骤2代码解析】

2.1 在拉取消息之前,如果非自动提交,进行提交判定:
需要提交的消息(ConsumerRecord)会维护在acks——本地已处理待提交消息队列(一个linkedBlockingQueue)中,这里会把acks里所有消息拿出来进行循环处理。
在这里插入图片描述
(2-1-1) ack策略-立即提交:说明在配置了手动提交-立即提交的ack策略时,提交动作是每次消息拉取前,worker线程已处理完的消息的offset,挨个put进本地的partition和offset的映射(metadata中一个map)。由于是循环处理worker队列,而消息是乱序存放的,所以put之前判断offset大于现有offset才会执行,确保低offset不覆盖高offset。
put后会直接进行网络请求提交到broker中。(由于在循环中,这里的请求会发生多次?没细看)在这里插入图片描述在这里插入图片描述
在这里插入图片描述

(2-1-2) ack策略-非立即提交:
和2-1-1一样,都会维护分区最高位移映射。在这里插入图片描述
接下来会根据具体的提交规则配置来判定是否提交,
1、未提交数:未提交数 >= 配置
2、提交时间间隔:上次提交 - 当前时间 >= 配置
3、未提交数或提交时间间隔:1或2任意满足

在这里插入图片描述

【提交策略总结】

一、提交模式
1、自动提交:拉取消息后立即提交
2、手动(非自动)提交:
2-1、拉取消费前执行一次提交判定

二、提交判定
1、立即提交:无需判定
2、非立即提交:根据配置的规则判定
2-1、满足提交时间间隔可提交
2-2、满足未提交数计算可提交
2-3、满足2-1或2-2可提交

三、拉取策略(待补充)

四、消费策略

【代码解析】

在这里插入图片描述
自动提交,直接进行消费
在这里插入图片描述
手动提交:只加入消息处理队列,等待消费线程消费
在这里插入图片描述

【消费策略总结】

1、自动提交:consume线程自我消费(?)
2、手动提交:worker线程异步消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/35709.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Rust】function和methed的区别

文章目录 functionmethedAssociated Functions 参考资料 一句话总结: function和methed很多都是相同的。 不同点在于: methed定义在结构体里面,并且它的第一个参数肯定是self,代表结构体实例。方法需要用实例名.方法名调用当然结…

苏东坡传-读书笔记三

苏东坡去世之后,一黄某获得苏东坡一珍贵的手稿,其中有苏东坡下列的名句: “处贫贱易,处富贵难。安劳苦易,安闲散难。忍痛易,忍痒难。人能安闲散,耐富贵,忍痒,真有道之士也…

三生随记——暗夜诊所

在偏远的小镇边缘,矗立着一座看似普通的诊所。这座诊所历史悠久,据传已经存在了几十年,但关于它的具体来历和背后的故事,却鲜有人知。它的外表看似破旧不堪,但内部却异常整洁,散发着一种神秘而诡异的气息。…

vCenter-vAPI-Endpoint service health shows as Yellow

- 问题摘要:vAPI-Endpoint service health shows as Yellow - 解决方案/工作方法: 使用命令重启vAPI Endpoint service后该服务运行正常。 service-control --stop vmware-vapi-endpoint service-control --start vmware-vapi-endpoint VMware KB&…

详细分析Oracle修改默认的时间格式(四种方式)

目录 前言1. 会话级别2. 系统级别3. 环境配置4. 函数格式化5. 总结 前言 默认的日期和时间格式由参数NLS_DATE_FORMAT控制 如果需要修改默认的时间格式,可以通过修改会话级别或系统级别的参数来实现 1. 会话级别 在当前会话中设置日期格式,这只会影响…

uni-app (通过HBuilderX 和 VS Code 开发)详细连接过程教学。

使用 HBuilderX 创建 uni-app 项目 并编译到微信开发者工具。 uni-app 支持两种方式创建项目: 通过 HBuilderX 创建 通过命令行创建 首先我们需要先下载HBuilderX 下载链接地址:DCloud - HBuilder、HBuilderX、uni-app、uniapp、5、5plus、mui、wap2…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] LYA的登山之旅01(100分)- 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 &#x1f…

如何在 HTML 中实现响应式设计以适应不同设备的屏幕尺寸?

要在HTML中实现响应式设计以适应不同设备的屏幕尺寸,可以使用CSS媒体查询和流动布局。 以下是实现响应式设计的一些关键步骤: 使用CSS媒体查询:CSS媒体查询允许根据屏幕尺寸和设备特性应用不同的CSS样式。通过在CSS中使用media规则&#xf…

第四十一章 使用 二进制 SOAP 格式

文章目录 第四十一章 使用 二进制 SOAP 格式介绍扩展 Web 服务的 WSDL 第四十一章 使用 二进制 SOAP 格式 数据平台 SOAP 支持提供了可选的专有二进制 SOAP 格式,当发送和接收大型 SOAP 消息并希望最小化消息大小时,该格式非常有用。 Web 服务可以接收…

js文件的执行和变量初始化缓存

js文件和变量初始化 全局变量举例js文件加载 全局变量举例 import * as turf from "turf/turf"; import earcut from "earcut"; import * as THREE from "three"; import { TextGeometry } from "three/addons/geometries/TextGeometry.js…

《梦醒蝶飞:释放Excel函数与公式的力量》6.4 TODAY函数

第四节:6.4 TODAY函数 1)TODAY函数概述 TODAY函数是Excel中一个非常有用的内置函数,它返回当前的日期。与NOW函数不同,TODAY函数仅返回日期部分,时间部分默认为午夜(0:00)。 2)函…

[数据质量]手动实现 阿里云DataWorks 的数据质量监控告警功能

目录 手动实现 DataWorks 的数据质量监控告警功能1. 简介:2. 数据表准备2.1 tmp_monitor_tbl_info (数据监控信息表)2.2 tmp_monitor_rule_info (数据质量监控规则表)2.3 tmp_monitor_tbl_info_log_di (数据监控信息记录表) 3. 程序开发3.1 数据检查程序3.2 告警信息推送程序3.…

Jenkins教程-10-发送飞书测试报告通知

上一小节我们学习了发送企业微信测试报告通知的方法,本小节我们讲解一下发送飞书测试报告通知的方法。 1、自动化用例执行完后,使用pytest_terminal_summary钩子函数收集测试结果,存入本地status.txt文件中,供Jenkins调用 conft…

MyBatis(9)MyBatis 的一级缓存和二级缓存的区别

MyBatis 提供了两级缓存机制:一级缓存(Session级别)和二级缓存(全局级别),以提高应用的性能通过减少数据库的查询次数。 一级缓存(Session级别) 一级缓存是基于 SQL 会话&#xff…

优化 C# 和 .NET Core Web API 中的 LINQ 查询

LINQ(语言集成查询)是 C# 中的一项强大功能,允许开发人员以可读且简洁的方式查询和操作数据。但是,LINQ 的使用效率低下可能会导致性能瓶颈,尤其是在处理 .NET Core Web API 中的大型数据集时。优化 LINQ 查询对于维护…

嵌入式 Linux 设备刷系统具体组成

嵌入式 Linux 设备刷系统具体组成 1 介绍1.1 概述1.2 嵌入式 Linux 的组成1.3 U-Boot1.4 Linux 内核1.5 设备树1.6 根文件系统 参考 1 介绍 1.1 概述 一个完整的 linux 系统,通常包含了 U-Boot、kernel、设备树以及根文件系统。 1.2 嵌入式 Linux 的组成 1.3 U-…

Java热门技术点总结:Lambda表达式与Stream API

第一部分:Lambda表达式 1. 简介 Lambda表达式是Java 8引入的一个非常重要的特性,它提供了一种简洁、灵活的函数式编程方式。Lambda表达式允许我们将函数作为参数传递,极大的简化了代码的编写。 2. 基本语法 Lambda表达式的基本语法如下&a…

教程:Spring Boot中集成Elasticsearch的步骤

教程:Spring Boot中集成Elasticsearch的步骤 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在当今大数据时代,搜索功能对于许多应用程…

注解详解系列 - @RestClientTest:Rest客户端测试

注解简介 在今天的注解详解系列中,我们将探讨RestClientTest注解。RestClientTest是Spring Boot提供的一个注解,用于简化Rest客户端的测试。通过RestClientTest注解,可以轻松地对使用RestTemplate或WebClient的代码进行单元测试,…

Java基于jjwt操作jwt

之前讲解了jwt的相关知识&#xff0c;有不了解的&#xff0c;可以查看相关的文章JWT简介-CSDN博客&#xff0c;本节不再介绍&#xff0c;主要讲解有关java中如何通过jjwt库产生jwt以及解析jwt的相关操作。 添加maven依赖 <dependency><groupId>io.jsonwebtoken&l…