kafka 2.1.1 java的消费者客户端如何获取数据源码

KafkaConsumer

  • 一、kakfa消费者暴露给业务系统获取数据的方法
    • 1 首先从缓冲区队列取数,没有数据则请求服务端来获取数据
      • 1.1循环从队列中取数,给到空或者已被提取的nextInLineRecords
        • (1)当nextInLineRecords的数据被提取时,就把nextInLineRecords置为已提取,
      • 1.2 针对不同的分区,客户端拉取数据的请求都会发送一次
        • (1)组装此次执行要拉取哪些分区的请求集合
          • 1)在组装请求集合之前,客户端要首先获取此次要拉取哪些分区

一、kakfa消费者暴露给业务系统获取数据的方法

/** @deprecated */@Deprecatedpublic ConsumerRecords<K, V> poll(long timeoutMs) {return this.poll(this.time.timer(timeoutMs), false);}public ConsumerRecords<K, V> poll(Duration timeout) {return this.poll(this.time.timer(timeout), private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) {//.......删除干扰理解的代码行ConsumerRecords var3;do {//.......删除干扰理解的代码行Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.pollForFetches(timer);//检查拉取的消息记录是否为空。if (!records.isEmpty()) {//检查是否需要发送更多的拉取请求或者是否有未完成的网络请求。if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) {//如果需要发送更多的拉取请求或者有未完成的网络请求,调用 pollNoWakeup 方法来处理这些请求。this.client.pollNoWakeup();}// 调用 onConsume 方法对消费的消息记录进行拦截处理。ConsumerRecords var4 = this.interceptors.onConsume(new ConsumerRecords(records));//返回经过拦截处理后的消费者记录。return var4;}//timer.notExpired()如果在入参的提供的时间内,继续循环,直到returen或者超时} while(timer.notExpired());var3 = ConsumerRecords.empty();return var3;//.......删除干扰理解的代码行}

1 首先从缓冲区队列取数,没有数据则请求服务端来获取数据

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {long pollTimeout = Math.min(this.coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.fetcher.fetchedRecords();//如果有数据,直接返回if (!records.isEmpty()) {return records;} else {//如果没有数据,则发出请求,从服务端获取数据,this.fetcher.sendFetches();if (!this.cachedSubscriptionHashAllFetchPositions && pollTimeout > this.retryBackoffMs) {pollTimeout = this.retryBackoffMs;}Timer pollTimer = this.time.timer(pollTimeout);this.client.poll(pollTimer, () -> {return !this.fetcher.hasCompletedFetches();});timer.update(pollTimer.currentTimeMs());return this.coordinator.rejoinNeededOrPending() ? Collections.emptyMap() : this.fetcher.fetchedRecords();}}

1.1循环从队列中取数,给到空或者已被提取的nextInLineRecords

fetchedRecords可能不好理解,你可以这样想比较好理解,
1、while循环里,先走的是else语句,从内部队列completedFetches拿出数据给nextInLineRecords赋值,
2、之后第二次走的是循环里的if语句,因为刚被赋值,所以nextInLineRecords不为null,并且还没有提取,所以this.nextInLineRecords.isFetched=false
3、在第二次走的if语句中,执行this.fetchRecords(this.nextInLineRecords, recordsRemaining);后,this.nextInLineRecords.isFetched会被置为true,下次循环又要走else了,nextInLineRecords又重新被队列中的新的值赋值,并且新的this.nextInLineRecords.isFetched=false,下一次循环又可以走if语句了

 public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap();//获取一批最大记录数int recordsRemaining = this.maxPollRecords;//.......删除干扰理解的代码行while(recordsRemaining > 0) {if (this.nextInLineRecords != null && !this.nextInLineRecords.isFetched) {List<ConsumerRecord<K, V>> records = this.fetchRecords(this.nextInLineRecords, recordsRemaining);TopicPartition partition = this.nextInLineRecords.partition;if (!records.isEmpty()) {List<ConsumerRecord<K, V>> currentRecords = (List)fetched.get(partition);if (currentRecords == null) {fetched.put(partition, records);} else {List<ConsumerRecord<K, V>> newRecords = new ArrayList(records.size() + currentRecords.size());newRecords.addAll(currentRecords);newRecords.addAll(records);fetched.put(partition, newRecords);}//循环第一个结束位置,recordsRemaining减小到0recordsRemaining -= records.size();}} else {Fetcher.CompletedFetch completedFetch = (Fetcher.CompletedFetch)this.completedFetches.peek();if (completedFetch == null) {//循环第二个结束的位置,内部队列没有数据了break;}try {this.nextInLineRecords = this.parseCompletedFetch(completedFetch);} catch (Exception var7) {//循环第三个结束的位置,抛异常PartitionData partition = completedFetch.partitionData;if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {this.completedFetches.poll();}throw var7;}this.completedFetches.poll();}}//.......删除干扰理解的代码行return fetched;}

(1)当nextInLineRecords的数据被提取时,就把nextInLineRecords置为已提取,

  private List<ConsumerRecord<K, V>> fetchRecords(Fetcher<K, V>.PartitionRecords partitionRecords, int maxRecords) {//.......删除干扰理解的代码行partitionRecords.drain();return Collections.emptyList();}

partitionRecords.drain();会把提取标志设为已提取

  private void drain() {if (!this.isFetched) {this.maybeCloseRecordStream();this.cachedRecordException = null;this.isFetched = true;this.completedFetch.metricAggregator.record(this.partition, this.bytesRead, this.recordsRead);if (this.bytesRead > 0) {Fetcher.this.subscriptions.movePartitionToEnd(this.partition);}}}

1.2 针对不同的分区,客户端拉取数据的请求都会发送一次

 public synchronized int sendFetches() {//返回的是一个map,key是集群的节点,value是要发往这个节点的入参,下面for循环次数=客户端发送请求的次数(一个分区一次请求)=fetchRequestMap.sizeMap<Node, FetchRequestData> fetchRequestMap = this.prepareFetchRequests();final Node fetchTarget;final FetchRequestData data;Builder request;//它遍历一个名为fetchRequestMap的映射(Map)的条目集合。每个条目是一个键值对,其中键是请求的目标(fetchTarget),值是请求对象(request)。//在每次循环迭代中,代码会执行以下操作://获取迭代器(Iterator)对象var2,用于遍历fetchRequestMap的条目集合。//检查是否还有下一个条目,即检查迭代器是否还有更多的元素。//如果还有下一个条目,代码将执行this.client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>()这一行代码。//这行代码的作用是将请求对象request发送到指定的目标fetchTarget,并添加一个RequestFutureListener监听器来处理响应。//请注意,代码中的this.client表示当前对象的客户端属性,send是客户端发送请求的方法,addListener用于添加请求监听器。for(Iterator var2 = fetchRequestMap.entrySet().iterator(); var2.hasNext(); this.client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>() {//监听器在请求成功后的处理逻辑public void onSuccess(ClientResponse resp) {synchronized(Fetcher.this) {FetchResponse<Records> response = (FetchResponse)resp.responseBody();FetchSessionHandler handler = Fetcher.this.sessionHandler(fetchTarget.id());if (handler == null) {Fetcher.this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id());} else if (handler.handleResponse(response)) {Set<TopicPartition> partitions = new HashSet(response.responseData().keySet());Fetcher.FetchResponseMetricAggregator metricAggregator = new Fetcher.FetchResponseMetricAggregator(Fetcher.this.sensors, partitions);Iterator var7 = response.responseData().entrySet().iterator();//遍历结果集,while(var7.hasNext()) {Entry<TopicPartition, PartitionData<Records>> entry = (Entry)var7.next();TopicPartition partition = (TopicPartition)entry.getKey();long fetchOffset = ((org.apache.kafka.common.requests.FetchRequest.PartitionData)data.sessionPartitions().get(partition)).fetchOffset;PartitionData fetchData = (PartitionData)entry.getValue();Fetcher.this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", new Object[]{Fetcher.this.isolationLevel, fetchOffset, partition, fetchData});//把数据放入到completedFetches 队列中,每一个Fetcher都有分区和数据Fetcher.this.completedFetches.add(new Fetcher.CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion()));}Fetcher.this.sensors.fetchLatency.record((double)resp.requestLatencyMs());}}}//在请求发生异常后的处理方法public void onFailure(RuntimeException e) {//.......删除干扰理解的代码行}})) {//这里才是for循环的主体逻辑,上面的onFailure和onSuccess是RequestFutureListener内部实现,Entry<Node, FetchRequestData> entry = (Entry)var2.next();fetchTarget = (Node)entry.getKey();data = (FetchRequestData)entry.getValue();request = Builder.forConsumer(this.maxWaitMs, this.minBytes, data.toSend()).isolationLevel(this.isolationLevel).setMaxBytes(this.maxBytes).metadata(data.metadata()).toForget(data.toForget());//.......删除干扰理解的代码行}return fetchRequestMap.size();}

(1)组装此次执行要拉取哪些分区的请求集合

  private Map<Node, FetchRequestData> prepareFetchRequests() {Cluster cluster = this.metadata.fetch();Map<Node, org.apache.kafka.clients.FetchSessionHandler.Builder> fetchable = new LinkedHashMap();//var3是此次请求要获取的分区对象的迭代器,迭代器中每一个对象都是TopicPartition1Iterator var3 = this.fetchablePartitions().iterator();//遍历while(var3.hasNext()) {TopicPartition partition = (TopicPartition)var3.next();//此partition分区属于集群的哪个节点,后面会当成fetchable的keyNode node = cluster.leaderFor(partition);//.......删除干扰理解的代码行//builder=fetchable.get(node);org.apache.kafka.clients.FetchSessionHandler.Builder builder = (org.apache.kafka.clients.FetchSessionHandler.Builder)fetchable.get(node);//如果从map中根据node当key,得出的value是null,则创建一个新的build放入map中if (builder == null) {FetchSessionHandler handler = this.sessionHandler(node.id());if (handler == null) {handler = new FetchSessionHandler(this.logContext, node.id());this.sessionHandlers.put(node.id(), handler);}builder = handler.newBuilder();fetchable.put(node, builder);}long position = this.subscriptions.position(partition);//把分区和获取分区最大size添加到buildbuilder.add(partition, new org.apache.kafka.common.requests.FetchRequest.PartitionData(position, -1L, this.fetchSize, Optional.empty()));this.log.debug("Added {} fetch request for partition {} at offset {} to node {}", new Object[]{this.isolationLevel, partition, position, node});}//.......删除干扰理解的代码行Map<Node, FetchRequestData> reqs = new LinkedHashMap();Iterator var10 = fetchable.entrySet().iterator();//遍历fetchable,根据不同的节点key,value是node对应的build,重新得到一个新的mapwhile(var10.hasNext()) {Entry<Node, org.apache.kafka.clients.FetchSessionHandler.Builder> entry = (Entry)var10.next();reqs.put(entry.getKey(), ((org.apache.kafka.clients.FetchSessionHandler.Builder)entry.getValue()).build());}//这个map就是实际发送请求,发往服务端的入参的一部分return reqs;}
1)在组装请求集合之前,客户端要首先获取此次要拉取哪些分区

消费者能消费哪些分区拉取数据,
1、当前客户端订阅了哪些分区,就是有权限拉取这些分区的数据
2、缓冲区队列中有积压数据的分区不再此次拉取分区的集合内

	//kafka内部队列,从服务端得到的数据会放这里,之后由业务调用poll方法,先从这个队列里取数据,没有则请求private final ConcurrentLinkedQueue<Fetcher.CompletedFetch> completedFetches;private List<TopicPartition> fetchablePartitions() {Set<TopicPartition> exclude = new HashSet();//获得当前消费者客户端可以消息哪些分区的集合List<TopicPartition> fetchable = this.subscriptions.fetchablePartitions();if (this.nextInLineRecords != null && !this.nextInLineRecords.isFetched) {exclude.add(this.nextInLineRecords.partition);}//查看缓冲区队列还有哪些分区挤压着,从这些分区获取数据排除在这次请求Iterator var3 = this.completedFetches.iterator();while(var3.hasNext()) {Fetcher.CompletedFetch completedFetch = (Fetcher.CompletedFetch)var3.next();exclude.add(completedFetch.partition);}fetchable.removeAll(exclude);return fetchable;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c函数学习

函数的概念 函数是c语言的功能单位&#xff0c;实现一个功能可以封装一个函数来实现。定义函数的时候一切以功能为目的&#xff0c;根据功能去定义函数的参数和返回值 函数的分类 从定义角度分类&#xff1a;库函数&#xff08;c库实现的&#xff09;&#xff0c;自定义函数&…

一、大数据技术之Flume(简介)

第1章 Flume概述 1.1 Flume定义 Flume是Cloudera提供的一个高可用的&#xff0c;高可靠的&#xff0c;分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构&#xff0c;灵活简单。 1.2 Flume基础架构 Flume组成架构如下图所示。 1.2.1 Agent Agent是一个JVM进程&…

微信小程序23__flex布局 相关的3种居中: 水平居中_垂直居中_水平垂直居中

3 种居中方式&#xff0c; 在页面布局中经常用到&#xff0c; 现作一记录。 第一种 水平居中 类似这样写法 display: flex; flex-direction: column; //垂直布局 align-items: center; // 水平居中 justify-content: space-a…

NLP实战8:图解 Transformer笔记

目录 1.Transformer宏观结构 2.Transformer结构细节 2.1输入 2.2编码部分 2.3解码部分 2.4多头注意力机制 2.5线性层和softmax 2.6 损失函数 3.参考代码 &#x1f368; 本文为[&#x1f517;365天深度学习训练营]内部限免文章&#xff08;版权归 *K同学啊* 所有&#…

vue 路由守卫

全局路由守卫 beforeEach 路由跳转前触发to 代表 到那个页面去from 代表从哪个页面来next 表示放行 beforeResolve 表示 组件解析后触发的钩子 afterEach 表示路由跳转完成之后i触发的钩子 全局路由钩子执行顺序 beforeEach > beforeResolve>afterEach 局部路由…

在外远程NAS群晖Drive - 群晖Drive挂载电脑磁盘同步备份【无需公网IP】

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…

【Hadoop 01】简介

目录 1 Hadoop 简介 2 下载并配置Hadoop 2.1 修改/etc/profile 2.2 修改hadoop-env.sh 2.3 修改core-site.xml 2.4 修改hdfs-site.xml 2.5 修改mapred-site.xml 2.6 修改yarn-site.xml 2.7 修改workers 2.8 修改start-dfs.sh、stop-dfs.sh 2.9 修改start-yarn.sh、s…

Elemui表单合并

原代码形式 <template><el-table:data"tableData"borderstyle"width: 100%"><el-table-columnprop"date"label"日期"width"180"></el-table-column><el-table-columnprop"name"label…

TC3XX - MCAL知识点(十):ICU EB-tresos配置说明与代码浅析

目录 1、概述 2、EB-tresos配置 2.1、实验目标 2.2、MCU配置 2.2.1、McuGtmTimAllocationConf 2.2.2、GtmClusterConf 2.2.3、McuClockSettingConfig 2.3、ICU配置 2.3.1、IcuOptionalApis

leetcode 399. 除法求值

给你一个变量对数组 equations 和一个实数值数组 values 作为已知条件&#xff0c;其中 equations[i] [Ai, Bi] 和 values[i] 共同表示等式 Ai / Bi values[i] 。每个 Ai 或 Bi 是一个表示单个变量的字符串。 另有一些以数组 queries 表示的问题&#xff0c;其中 queries[j]…

Java: 转换流

转换流 1.定义 是字符流和字节流之间的桥梁。 分为&#xff1a; 字符转换输入流&#xff08;InputStreamReader&#xff09;字符转换输出流&#xff08;OutputStreamWrite&#xff09; 2.作用 1.作用1:指定字符集读写 在jdk11时&#xff0c;这种方式被淘汰了。 案例1&…

.NET各版本支持的操作系统

借助虚拟机和测试机&#xff0c;检测各版本操作系统对.NET的支持情况。 安装操作系统后&#xff0c;实测安装相应运行时并能够运行星尘代理为通过。 测试平台&#xff1a;VMware Workstation 镜像来源&#xff1a;MSDN I Tell You 参考&#xff1a; .NET Framework 版本和…

WebGL 概念和基础入门

WebGL 概念和基础入门 WebGL 是什么 对于 WebGL 百度百科给出的解释是 WebGL 是一种 3D 绘图协议&#xff0c;而对此维基百科给出的解释却是一种 JavaScript API。由于 WebGL 技术旨在帮助我们在不使用插件的情况下在任何兼容的网页浏览器中开发交互式 2D 和 3D 网页效果&…

使用redis RedisAtomicLong 生成订单号

背景 产品需求要生成有序的订单 key 年月日时分秒 6位序号 由00001-99999组成 且每天都是从00001开始 公司系统有部署多台服务&#xff0c;这需要一个有序的序列不能重复而且得保证获取时的原子性这里 我们考虑使用了redis Incr 这个命令 Redis Incr 命令能将 key 中储存的数…

react 实现小球加入购物车动画

代码 import React, { useRef } from react;const ProductLayout () > {const box useRef(null);const createBall (left, top) > {const ball document.createElement(div);ball.style.position absolute;ball.style.left left - 10 px;ball.style.top top - 1…

【机器学习】了解 AUC - ROC 曲线

一、说明 在机器学习中&#xff0c;性能测量是一项基本任务。因此&#xff0c;当涉及到分类问题时&#xff0c;我们可以依靠AUC - ROC曲线。当我们需要检查或可视化多类分类问题的性能时&#xff0c;我们使用AUC&#xff08;曲线下面积&#xff09;ROC&#xff08;接收器工作特…

使用 Vue 创建一个简单的 Loading 动画

使用 Vue 创建一个简单的 Loading 动画 1. 开始之前 确保 正确安装了 Vue 3知道如何启动一个新的 Vue 项目&#xff08;或在项目中使用Vue&#xff09;了解 Vue 3 的 Composition API&#xff08;本文将使用&#xff09; 2. 设计组件 该组件应该包含三个部分 控制逻辑旋转…

win10 安装 langchain-chatglm 遇到的问题

win10 安装 langchain-chatglm 避坑指南&#xff08;2023年6月21日最新版本&#xff09;_憶的博客-CSDN博客官网看起来安装很简单&#xff0c;网上教程也是&#xff0c;但实际上我耗费了两天时间&#xff0c;查阅了当前网络上所有可查阅的资料&#xff0c;重复「安装-配置-卸载…

Spring Security 构建基于 JWT 的登录认证

一言以蔽之&#xff0c;JWT 可以携带非敏感信息&#xff0c;并具有不可篡改性。可以通过验证是否被篡改&#xff0c;以及读取信息内容&#xff0c;完成网络认证的三个问题&#xff1a;“你是谁”、“你有哪些权限”、“是不是冒充的”。 为了安全&#xff0c;使用它需要采用 …

HideSeeker论文阅读

文章目录 3.1 Overview of Our System HideSeeker3.2 Visual Information Extraction3.3 Relation Graph Learning3.4 Hidden Object Inference 4 EVALUATIONS4.7 Summary 6 DISCUSSIONS AND CONCLUSION 3.1 Overview of Our System HideSeeker 我们设计了一种名为“HideSeeke…