Kafka中Consumer源码解读

Consumer源码解读

本课程的核心技术点如下:

1、consumer初始化
2、如何选举Consumer Leader
3、Consumer Leader是如何制定分区方案

4、Consumer如何拉取数据
5、Consumer的自动偏移量提交

Consumer初始化

image.png

从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置,我们分析源码要抓核心,我把核心代码给摘出来

NetworkClient

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator

协调器,在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher

提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

而在这个核心类中,我们发现有很多很多的参数设置,这些就跟我们平时进行消费的时候配置有关系了,这里我们挑一些核心重点参数来讲一讲

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于10M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

默认是50M

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

这里说一下参数的默认值如何去找:

image.png

image.png

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

备注:1、Kafka入门笔记

image.png

max.poll.records

控制每次poll方法返回的最大记录数量。

默认是500

image.png

如何选举Consumer Leader

回顾之前的内容

image.png

那么如何完成以上的逻辑的,我们跟踪代码:

image.png

1、消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

对Broker的响应进行处理

image.png

image.png

1、消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

Consumer Leader如何制定分区方案

回顾之前的内容

image.png

消费者分区策略

消费者参数

partition.assignment.strategy

分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor

初始分区和RoundRobin是一样

粘性分区:每一次分配变更相对上一次分配做最少的变动.

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

C0: T0P0、T1P1、T3P0

C1: T0P1、T2P0、T3P1

C2: T1P0、T2P1

如果C1下线 、如果按照RoundRobin

image.png

C0: T0P0、T1P0、T2P0、T3P0

C2: T0P1、T1P1、T2P1、T3P1

对比之前

image.png

如果C1下线 、如果按照StickyAssignor

image.png

C0: T0P0、T1P1、T2P0、T3P0

C2: T0P1、T1P0、T2P1、T3P1

对比之前

image.png

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

即可。

消费者分区策略源码分析

接着上个章节的代码。

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

这里就是拉取数据,核心Fetch类

image.png

image.png

image.png

自动提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

从源码上也可以看出

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

这个autoCommitIntervalMs就是auto.commit.interval.ms设置的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/79534.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端JavaScript中异步的终极解决方案:async/await

🎬 岸边的风:个人主页 🔥 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想,就是为了理想的生活 ! 1. 背景 在深入讨论 async/await 之前,我们需要了解一下 JavaScript 的单线程和非阻塞的特性。JavaScript 是…

AD22使用笔记+积累库

一、前言 使用AD9习惯了,但是需求逐渐上来了就不够用了,好多快捷的新功能要新版本软件才能用,所以升级使用AD22 目录 1.添加层之后中间层无法布线 2.新增快捷方式CtrlW布线,不用点图标了 二、环境 AD22 三、正文 1.添加层之…

SpringBoot-RabbitMQ

RabbitMQ 是一个开源的消息中间件,它实现了 AMQP(Advanced Message Queuing Protocol)协议,并提供了可靠的消息传递机制。 Spring Boot 中使用 RabbitMQ 实现异步消息的发送和接收。 使用 Spring Boot 提供的 AmqpTemplate 和 Rab…

Redis Part1

单体架构:一台Web服务器、一台数据库服务器。 1.了解NoSql 什么是Nosql? NoSQL,即Not-Only-SQL,意思就是我们干事情不能只用SQL,泛指非关系型的数据库!NoSQL定位:作为关系型数据库的补充&am…

小米OPPO三星一加红魔全机型解锁BL详细教程合集-ROOT刷机必要操作

解锁BL一个熟悉又陌生的词汇,只要你刷机root过,你肯定都解锁BL成功过。我们简单的描述下BL是什么?BL全名bootloader,目前市面上全部机型,基本出厂全部BL处于锁定的状态锁定的BL机型,不支持刷入非官方固件或…

性能测试 —— Jmeter事务控制器

事务: 性能测试中,事务指的是从端到端,一个完整的操作过程,比如一次登录、一次 筛选条件查询,一次支付等;技术上讲:事务就是由1个或多个请求组成的 事务控制器 事务控制器类似简单控制器&…

广州口腔医院种植牙-广东省爱牙工程公益种牙,获湾区群众点赞

广州种植牙价格表-自2017年成立以来,广东省爱牙工程一直坚持以公益惠民为宗旨、公益种牙为服务方向,针对群众普遍存在的口腔健康问题,开展形式多样的公益性口腔医疗惠民活动。 广州种植牙费用表-日前,广东省爱牙工程“种植牙惠民行动”第二十季已正式启动。据广东省爱牙工程官方…

韩信点兵:求韩信一共有多少兵

任务描述 本关任务:求韩信一共有多少兵。 韩信有一队兵,他想知道有多少人,便让士兵排队报数。 按从 1 至5报数,最末一个士兵报的数为 1; 按从 1 至 6 报数,最末一个士兵报的数为 5; 按从 1 …

栈与队列--删除字符串中的所有相邻重复项

给出由小写字母组成的字符串 S,重复项删除操作会选择两个相邻且相同的字母,并删除它们。 在 S 上反复执行重复项删除操作,直到无法继续删除。 在完成所有重复项删除操作后返回最终的字符串。答案保证唯一。 示例: 输入&#x…

C/C++指针函数与函数指针

一、指针函数 指针函数:本质为一个函数,返回值为指针指针函数:如果一个函数的返回值是指针类型,则称为指针函数用指针作为函数的返回值的好处:可以从被调函数向主函数返回大量的数据,常用于返回结构体指针。…

基于ntchat的微信群聊同步机器人

微信群有500人上限的限制,建立多个群的话又有信息无法互通的不便,此机器人通过自动将消息转发到同一个同步组内的所有群,消除这一不便性,间接达成扩大群成员数的目的。 效果演示: 项目地址: https://gith…

为何红黑树在B/B+树之上仍然占据重要地位?

为何红黑树在B/B树之上仍然占据重要地位? 引言二、红黑树和B/B树的基本原理2.1、红黑树的特点和性质2.2、B/B树的特点和性质2.3、红黑树和B/B树的比较 三、B/B树相对于红黑树的优势四、红黑树仍然占据重要地位的原因总结 博主简介 💡一个热爱分享高性能服…

Vue echarts 饼图 引导线加小圆点,文字分行展示

需求 重点代码 完整代码 initChart() {// 创建 echarts 实例。var myChartOne this.$echarts.init(this.$refs.Echart);myChartOne.setOption({tooltip: {trigger: "item",},title: {top: center,text: [{name| this.chartTitle.name },{value| this.chartTitle.…

ROS学习笔记(四)---使用 VScode 启动launch文件运行多个节点

ROS学习笔记文章目录 01. ROS学习笔记(一)—Linux安装VScode 02. ROS学习笔记(二)—使用 VScode 开发 ROS 的Python程序(简例) 03. ROS学习笔记(三)—好用的终端Terminator 一、什么是launch文件 虽然说Terminator终端是能够比较方便直观的看运行的节点…

l8-d15 IO多路复用select函数

一、IO多路复用select函数 1.select函数 int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); nfds: 是三个集合中编号最高的文件描述符,加上 1 readfds/writefds/exceptfds: 可读集合/可写集…

LLM 05-大模型法律

LLM 05-大模型法律 5.1 简介 在这个教程中,我们将探讨法律对大型语言模型的开发和部署有何规定。我们将会按照以下的步骤进行讨论: 新技术与现有法律的关系 与我们之前的讲座一样,比如关于社会偏见的讲座,我们将要讨论的很多内容…

2.10 PE结构:重建重定位表结构

Relocation(重定位)是一种将程序中的一些地址修正为运行时可用的实际地址的机制。在程序编译过程中,由于程序中使用了各种全局变量和函数,这些变量和函数的地址还没有确定,因此它们的地址只能暂时使用一个相对地址。当…

虚幻动画系统概述

本文主要整理一下高层次的概述,方便后续查阅 1.动画流程 DCC产出动画文件 -> UE动画导入 -> 动画蓝图驱动(类似unity的动画状态机) ->动画后处理蓝图驱动(例如修型骨,骨骼矫正等后期处理) 2.动…

Ubuntu 出现 Failed to Fetch的解决办法

在使用apt install XXX时出现问题: E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/universe/f/fyba/libfyba0_4.1.1-6build1_amd64.deb Connection failed [IP: 185.125.190.39 80]E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/univers…