Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。

ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。

在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。

在这里插入图片描述

  • client:NetworkClient对象。
  • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
    简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。
  • metadata:用于管理Kafka集群元数据。
  • unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
  • unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
  • wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
  • wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
    wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:

  • timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
  • now表示当前时间戳;
  • executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

下面介绍其流程,其中简单回顾一下NetworkClient的功能:

  1. 调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。

    具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。

    若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
    在这里插入图片描述

  2. 计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。

  3. 调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。

  4. 调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。

在这里插入图片描述

  1. 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

在这里插入图片描述

  1. 根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。

  2. 再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。

  3. 调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。

在这里插入图片描述
分析完poll方法的详细步骤之后,我们下面来看其实现代码:

在这里插入图片描述
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。

具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。

poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。

在这里插入图片描述
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。

在这里插入图片描述

在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。

在这里插入图片描述
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。

  • isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。

在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:

在这里插入图片描述
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。

当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。

在这里插入图片描述
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:

在这里插入图片描述

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。

介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:

  • awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
  • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
  • put()方法:向unsent中添加请求。
  • schedule()方法:向delayedTasks队列中添加定时任务。
  • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640569.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0122-2-JavaScript正则表达式

《JavaScript正则表达式》 第一章 正则表达式 字符匹配 正则表达式是匹配模式&#xff0c;要么匹配字符&#xff0c;要么匹配位置&#xff01; 横向匹配 /ab[2,5]/c/g 匹配 abc, abbc,abbbc,abbbbc,abbbbbc&#xff0c;数字连续出现 2 到 5 次&#xff0c;会匹配 2 位、3 位、…

C++ STL之string的使用及模拟实现

文章目录 1. 前言2. 介绍3. string类的使用3.1 string类的构造函数3.2 string类对象的容量操作3.3 string类对象的访问及遍历操作3.4 string类对象的修改操作3.5 string类对象的字符串操作3.6 string类的非成员函数 4. string类的模拟实现 1. 前言 C语言中&#xff0c;字符串是…

地图 - 实现有多条定位,显示多条定位,并且使用一个圆形遮罩层将多条定位进行覆盖

首先&#xff0c;需要在你的index.html模板页面头部加载百度地图JavaScript API代码&#xff0c;密钥可去百度地图开放平台官网申请 <script type"text/javascript" src"//api.map.baidu.com/api?typewebgl&v1.0&ak您的密钥"></script&…

代码随想录 Leetcode1047. 删除字符串中的所有相邻重复项

题目&#xff1a; 代码(首刷自解 2024年1月21日&#xff09;&#xff1a; class Solution { public:string removeDuplicates(string s) {if (s.size() < 2) return s;stack<char> t;for (int i 0; i < s.size(); i) {if (t.empty()) t.push(s[i]);else {if (s[i…

InnoDB的Buffer Pool

前置概念&#xff1a;一个数据页16KB&#xff0c;一个数据页可能有多个记录&#xff0c;即使我们只需要访问一条记录&#xff0c;需要把整个数据页加载到内存中&#xff0c;加载到内存后不是直接释放&#xff0c;而是缓存到内存当中&#xff08;当然对于buffer pool的缓存是在存…

若依管理系统搭建教程,ruoyi-vue环境搭建

环境部署 准备工作 JDK > 1.8 (推荐1.8版本) Mysql > 5.7.0 (推荐5.7版本) Maven > 3.0 运行系统 1、前往Gitee下载页面([https://gitee.com/y_project/RuoYi (opens new window)](https://gitee.com/y_project/RuoYi))下载解压到工作目录 2、导入到Eclipse&#…

ESP32-TCP服务端(Arduino)

将ESP32设置为TCP服务器 介绍 TCP&#xff08;Transmission Control Protocol&#xff09;传输控制协议&#xff0c;是一种面向连接的&#xff08;一个客户端对应一个服务端&#xff09;、可靠的传输层协议。在TCP的工作原理中&#xff0c;它会将消息或文件分解为更小的片段&a…

Day16 linuxC高级(存储类型 linux命令 shell命令)

文章目录 C补充标识常量存储类型1.auto // 自动型2.static&#xff1a;修饰变量和函数 // 静态型3.extern&#xff1a;外部引用4.register&#xff1a;寄存器类型 LinuxC高级简介&#xff1a;嵌入式系统(将软件嵌入到硬件里面)Linux起源查看操作系统版本内核系统架构系统关机或…

CSDN COC西安城市开发者社区2023年度线下聚会

1. 活动背景 CSDN始终致力于促进城市区域内尖端新型技术开发者交流&#xff0c;提供开放自由的切磋平台。在这个充满挑战和机遇的一年即将结束之际&#xff0c;通过本次聚会&#xff0c;汇聚西安本地各行各业的开发者朋友&#xff0c;回顾过去一年城市社区的成就和收获&#x…

Spring5系列学习文章分享---第一篇(概述+特点+IOC原理+IOC并操作之bean的XML管理操作)

目录 Spring&#xff08;概述特点IOC原理IOC并操作之bean的XML管理操作&#xff09;概述Spring是轻量级的开源的JavaEE框架Spring可以解决企业应用开发的复杂性Spring有两个核心部分ioc,aopSpring特点 loc(概念和原理)什么是 IOCIOC 底层原理IOC 过程图 IOC&#xff08;接口&am…

MySQL的一些综合运用

一些基本的语句&#xff1a; USE dept_emp; CREATE TABLE dept ( deptno INT(2) NOT NULL COMMENT 部门编号, dname VARCHAR (15) COMMENT 部门名称, loc VARCHAR (20) COMMENT 地理位置 ); -- 添加主键 ALTER TABLE dept ADD PRIMARY KEY (deptno); -- 添加数据 INSE…

前端转鸿蒙的就业前景如何?有必要学鸿蒙么?

学习鸿蒙开发是否有必要&#xff0c;取决于多个因素&#xff1a; 一、个人兴趣与职业规划&#xff1a; 如果你对华为鸿蒙操作系统&#xff08;HarmonyOS&#xff09;感兴趣&#xff0c;并且希望将这个平台作为你的职业发展的方向&#xff0c;那么学习鸿蒙开发是非常有意义的。…

MSG3D

论文在stgcn与sta-lstm基础上做的。下面讲一下里面的方法&#xff1a; 1.准备工作 符号。这里是对符号进行解释。 一个人体骨骼图被记为G(v,E) 图卷积&#xff1a; 图卷积定义 考虑一种常用于处理图像的标准卷积神经网络 (CNN)。输入是像素网格。每个像素都有一个数据值向…

x-cmd pkg | speedtest-cli - 网络速度测试工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 speedtest-cli 是一个网络速度测试工具&#xff0c;用于测试计算机或服务器与速度测试服务器之间的网络连接速度。 它使用 speedtest.net 测试互联网带宽&#xff0c;可以帮助用户获取网络的上传和下载速度、延迟等参…

【复现】SpringBlade SQL 注入漏洞_22

目录 一.概述 二 .漏洞影响 三.漏洞复现 1. 漏洞一&#xff1a; 四.修复建议&#xff1a; 五. 搜索语法&#xff1a; 六.免责声明 一.概述 SpringBlade 是由一个商业级项目升级优化而来的SpringCloud微服务架构&#xff0c;采用Java8 API重构了业务代码&#xff0c;完全…

【C++初阶】第二站:类与对象(上) -- 下部分

前言&#xff1a; 本章知识点&#xff1a; 类对象模型、 this 指针 专栏&#xff1a; C初阶 目录 类对象模型 如何计算类对象的大小 类对象的存储方式猜测 结构体内存对齐规则 this指针 this指针的引出 this指针的特性 C语言和C实现Stack的对比 C语言实现 C实现 类对象模型 …

动态规划——炮兵回城【集训笔记】

题目描述 游戏盘面是一个m行n列的方格矩阵&#xff0c;将每个方格用坐标表示&#xff0c;行坐标从下到上依次递增&#xff0c;列坐标从左至右依次递增&#xff0c;左下角方格的坐标为(1,1)&#xff0c;则右上角方格的坐标为(m,n)。 游戏结束盘上只剩下一枚炮兵没有回到城池中&a…

ERP系统哪个好用?用友,金蝶,ORACLE,SAP综合测评

ERP系统哪个好用&#xff1f;用友&#xff0c;金蝶&#xff0c;ORACLE&#xff0c;SAP综合测评 ERP领域SAP、ORACLE相对于国内厂商如用友、金蝶优势在哪&#xff1f; SAP&#xff0c;ORACLE操作习惯一般国人用不惯&#xff1b;相对于国产软件&#xff0c;界面也很难看&#x…

AlmaLinux 9.3 安装图解

风险告知 本人及本篇博文不为任何人及任何行为的任何风险承担责任&#xff0c;图解仅供参考&#xff0c;请悉知&#xff01;本次安装图解是在一个全新的演示环境下进行的&#xff0c;演示环境中没有任何有价值的数据&#xff0c;但这并不代表摆在你面前的环境也是如此。生产环境…

Android学习之路(22) 从模块化到组件化

从模块化到组件化 一、从模块化到组件化 Android 应用项目 , 都存在一个应用模块 ( Application Module ) , 在 build.gradle 构建脚本中 , 第一个插件配置 com.android.application , 表明 该 Module 编译打包后的输出是 APK 安装包 ; 该项目可以直接运行 ; plugins {id co…