kafka consumer客户端消费逻辑解析

kafka consumer客户端消费逻辑解析

    • 一、主要消费步骤
    • 二、提交策略
      • 【步骤2代码解析】
      • 【提交策略总结】
    • 三、拉取策略(待补充)
    • 四、消费策略
      • 【代码解析】
      • 【消费策略总结】

一、主要消费步骤

在这里插入图片描述
这是kafka客户端拉取消息的入口,有4个主要部分

1、启动后的准备
consumer线程启动后,如果非自动提交模式,构建worker线程放入worker线程池,供后续消费消息使用

2、运行期逻辑循环——提交策略

3、运行期逻辑循环——消息拉取

4、运行期逻辑循环——消息消费

二、提交策略

【步骤2代码解析】

2.1 在拉取消息之前,如果非自动提交,进行提交判定:
需要提交的消息(ConsumerRecord)会维护在acks——本地已处理待提交消息队列(一个linkedBlockingQueue)中,这里会把acks里所有消息拿出来进行循环处理。
在这里插入图片描述
(2-1-1) ack策略-立即提交:说明在配置了手动提交-立即提交的ack策略时,提交动作是每次消息拉取前,worker线程已处理完的消息的offset,挨个put进本地的partition和offset的映射(metadata中一个map)。由于是循环处理worker队列,而消息是乱序存放的,所以put之前判断offset大于现有offset才会执行,确保低offset不覆盖高offset。
put后会直接进行网络请求提交到broker中。(由于在循环中,这里的请求会发生多次?没细看)在这里插入图片描述在这里插入图片描述
在这里插入图片描述

(2-1-2) ack策略-非立即提交:
和2-1-1一样,都会维护分区最高位移映射。在这里插入图片描述
接下来会根据具体的提交规则配置来判定是否提交,
1、未提交数:未提交数 >= 配置
2、提交时间间隔:上次提交 - 当前时间 >= 配置
3、未提交数或提交时间间隔:1或2任意满足

在这里插入图片描述

【提交策略总结】

一、提交模式
1、自动提交:拉取消息后立即提交
2、手动(非自动)提交:
2-1、拉取消费前执行一次提交判定

二、提交判定
1、立即提交:无需判定
2、非立即提交:根据配置的规则判定
2-1、满足提交时间间隔可提交
2-2、满足未提交数计算可提交
2-3、满足2-1或2-2可提交

三、拉取策略(待补充)

四、消费策略

【代码解析】

在这里插入图片描述
自动提交,直接进行消费
在这里插入图片描述
手动提交:只加入消息处理队列,等待消费线程消费
在这里插入图片描述

【消费策略总结】

1、自动提交:consume线程自我消费(?)
2、手动提交:worker线程异步消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/35709.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

苏东坡传-读书笔记三

苏东坡去世之后,一黄某获得苏东坡一珍贵的手稿,其中有苏东坡下列的名句: “处贫贱易,处富贵难。安劳苦易,安闲散难。忍痛易,忍痒难。人能安闲散,耐富贵,忍痒,真有道之士也…

详细分析Oracle修改默认的时间格式(四种方式)

目录 前言1. 会话级别2. 系统级别3. 环境配置4. 函数格式化5. 总结 前言 默认的日期和时间格式由参数NLS_DATE_FORMAT控制 如果需要修改默认的时间格式,可以通过修改会话级别或系统级别的参数来实现 1. 会话级别 在当前会话中设置日期格式,这只会影响…

uni-app (通过HBuilderX 和 VS Code 开发)详细连接过程教学。

使用 HBuilderX 创建 uni-app 项目 并编译到微信开发者工具。 uni-app 支持两种方式创建项目: 通过 HBuilderX 创建 通过命令行创建 首先我们需要先下载HBuilderX 下载链接地址:DCloud - HBuilder、HBuilderX、uni-app、uniapp、5、5plus、mui、wap2…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] LYA的登山之旅01(100分)- 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 &#x1f…

《梦醒蝶飞:释放Excel函数与公式的力量》6.4 TODAY函数

第四节:6.4 TODAY函数 1)TODAY函数概述 TODAY函数是Excel中一个非常有用的内置函数,它返回当前的日期。与NOW函数不同,TODAY函数仅返回日期部分,时间部分默认为午夜(0:00)。 2)函…

[数据质量]手动实现 阿里云DataWorks 的数据质量监控告警功能

目录 手动实现 DataWorks 的数据质量监控告警功能1. 简介:2. 数据表准备2.1 tmp_monitor_tbl_info (数据监控信息表)2.2 tmp_monitor_rule_info (数据质量监控规则表)2.3 tmp_monitor_tbl_info_log_di (数据监控信息记录表) 3. 程序开发3.1 数据检查程序3.2 告警信息推送程序3.…

Jenkins教程-10-发送飞书测试报告通知

上一小节我们学习了发送企业微信测试报告通知的方法,本小节我们讲解一下发送飞书测试报告通知的方法。 1、自动化用例执行完后,使用pytest_terminal_summary钩子函数收集测试结果,存入本地status.txt文件中,供Jenkins调用 conft…

优化 C# 和 .NET Core Web API 中的 LINQ 查询

LINQ(语言集成查询)是 C# 中的一项强大功能,允许开发人员以可读且简洁的方式查询和操作数据。但是,LINQ 的使用效率低下可能会导致性能瓶颈,尤其是在处理 .NET Core Web API 中的大型数据集时。优化 LINQ 查询对于维护…

嵌入式 Linux 设备刷系统具体组成

嵌入式 Linux 设备刷系统具体组成 1 介绍1.1 概述1.2 嵌入式 Linux 的组成1.3 U-Boot1.4 Linux 内核1.5 设备树1.6 根文件系统 参考 1 介绍 1.1 概述 一个完整的 linux 系统,通常包含了 U-Boot、kernel、设备树以及根文件系统。 1.2 嵌入式 Linux 的组成 1.3 U-…

Java热门技术点总结:Lambda表达式与Stream API

第一部分:Lambda表达式 1. 简介 Lambda表达式是Java 8引入的一个非常重要的特性,它提供了一种简洁、灵活的函数式编程方式。Lambda表达式允许我们将函数作为参数传递,极大的简化了代码的编写。 2. 基本语法 Lambda表达式的基本语法如下&a…

Java基于jjwt操作jwt

之前讲解了jwt的相关知识&#xff0c;有不了解的&#xff0c;可以查看相关的文章JWT简介-CSDN博客&#xff0c;本节不再介绍&#xff0c;主要讲解有关java中如何通过jjwt库产生jwt以及解析jwt的相关操作。 添加maven依赖 <dependency><groupId>io.jsonwebtoken&l…

目标检测之YoloV1

一、预测阶段&#xff08;前向推断&#xff09; 在预测阶段Yolo就相当于一个黑箱子&#xff0c;输入的是448*448*3的图像&#xff0c;输出是7*7*30的张量&#xff0c;包含了所有预测框的坐标、置信度和类别 为什么是7*7*30呢&#xff1f; --将输入图像划分成s*s个grid cell&a…

【多线程】如何解决线程安全问题?

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. synchronized 关键字1.1 锁是什么1.2 如何加锁1.3 synchronized 修饰方法1) 修饰普通成员方法2) 修饰静态…

【系统架构设计师】七、信息安全技术基础知识(访问控制技术|抗攻击技术|计算机系统安全保护能力等级)

目录 一、访问控制技术 二、信息安全的抗攻击技术 2.1 分布式拒绝服务DDoS与防御 2.3 ARP欺骗攻击与防御 2.4 DNS欺骗与防御 2.5 IP欺骗与防御 2.6 端口扫描&#xff08;Port Scanning&#xff09; 2.7 强化TCP/IP堆栈以抵御拒绝服务攻击 2.8 系统漏洞扫描 三、信息安…

基于weixin小程序乡村旅游系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;商家管理&#xff0c;旅游景点管理&#xff0c;景点类型管理&#xff0c;景点路线管理&#xff0c;系统管理 商家帐号账号功能包括&#xff1a;系统首页&#xff0c;旅游景点管理&…

解决RuntimeError: Unsupported image type, must be 8bit gray or RGB image.

今天在使用Opencv进行人脸识别项目时发现了一个问题&#xff0c;一直报这个错误RuntimeError: Unsupported image type, must be 8bit gray or RGB image.查了一下资料也是解决了&#xff0c;这样给大家分享一下 解决方案 Numpy 有一个主要版本更新&#xff0c;与 dlib 不兼容。…

【Docker】创建 swarm 集群

目录 1. 更改防火墙设置 2. 安装 Docker 组件 3. 启动 Docker 服务&#xff0c;并检查服务状态。 4. 修改配置文件&#xff0c;监听同一端口号。 5. 下载 Swarm 组件 6. 创建集群&#xff0c;加入节点 7. 启动集群 8. 查询集群节点信息 9. 查询集群具体信息 10. 查询…

电脑文件concrt140.dll丢失要怎么恢复?靠谱修复方法分析

电脑文件concrt140.dll丢失这种情况&#xff0c;相对来说还是比较少见的&#xff01;但是不代表没有&#xff0c;既然有人出现这种情况了&#xff0c;那么小编势必要给大家详细的讲解一下concrt140.dll这个文件&#xff0c;以及我们要怎么去解决concrt140.dll文件丢失的问题。下…

hnust 1817 算法10-10,10-11:堆排序

hnust 1817 算法10-10,10-11&#xff1a;堆排序 题目描述 堆排序是一种利用堆结构进行排序的方法&#xff0c;它只需要一个记录大小的辅助空间&#xff0c;每个待排序的记录仅需要占用一个存储空间。 首先建立小根堆或大根堆&#xff0c;然后通过利用堆的性质即堆顶的元素是最…

pppd 返回错误码 含义

错误码 00&#xff1a; pppd已经断开&#xff0c;或者已经成功建立连接后请求方又中 断了。 01&#xff1a; 发成了一个严重错误&#xff0c;例如系统调用失败或者访问非法内存。 02&#xff1a; 处理给定操作是检测到错误&#xff0c;例如使用两个互斥的操作。 03&#xff1a;…