KafkaConsumer
- 一、kakfa消费者暴露给业务系统获取数据的方法
- 1 首先从缓冲区队列取数,没有数据则请求服务端来获取数据
- 1.1循环从队列中取数,给到空或者已被提取的nextInLineRecords
- (1)当nextInLineRecords的数据被提取时,就把nextInLineRecords置为已提取,
- 1.2 针对不同的分区,客户端拉取数据的请求都会发送一次
- (1)组装此次执行要拉取哪些分区的请求集合
- 1)在组装请求集合之前,客户端要首先获取此次要拉取哪些分区
一、kakfa消费者暴露给业务系统获取数据的方法
/** @deprecated */@Deprecatedpublic ConsumerRecords<K, V> poll(long timeoutMs) {return this.poll(this.time.timer(timeoutMs), false);}public ConsumerRecords<K, V> poll(Duration timeout) {return this.poll(this.time.timer(timeout), private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) {//.......删除干扰理解的代码行ConsumerRecords var3;do {//.......删除干扰理解的代码行Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.pollForFetches(timer);//检查拉取的消息记录是否为空。if (!records.isEmpty()) {//检查是否需要发送更多的拉取请求或者是否有未完成的网络请求。if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) {//如果需要发送更多的拉取请求或者有未完成的网络请求,调用 pollNoWakeup 方法来处理这些请求。this.client.pollNoWakeup();}// 调用 onConsume 方法对消费的消息记录进行拦截处理。ConsumerRecords var4 = this.interceptors.onConsume(new ConsumerRecords(records));//返回经过拦截处理后的消费者记录。return var4;}//timer.notExpired()如果在入参的提供的时间内,继续循环,直到returen或者超时} while(timer.notExpired());var3 = ConsumerRecords.empty();return var3;//.......删除干扰理解的代码行}
1 首先从缓冲区队列取数,没有数据则请求服务端来获取数据
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {long pollTimeout = Math.min(this.coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.fetcher.fetchedRecords();//如果有数据,直接返回if (!records.isEmpty()) {return records;} else {//如果没有数据,则发出请求,从服务端获取数据,this.fetcher.sendFetches();if (!this.cachedSubscriptionHashAllFetchPositions && pollTimeout > this.retryBackoffMs) {pollTimeout = this.retryBackoffMs;}Timer pollTimer = this.time.timer(pollTimeout);this.client.poll(pollTimer, () -> {return !this.fetcher.hasCompletedFetches();});timer.update(pollTimer.currentTimeMs());return this.coordinator.rejoinNeededOrPending() ? Collections.emptyMap() : this.fetcher.fetchedRecords();}}
1.1循环从队列中取数,给到空或者已被提取的nextInLineRecords
fetchedRecords可能不好理解,你可以这样想比较好理解,
1、while循环里,先走的是else语句,从内部队列completedFetches拿出数据给nextInLineRecords赋值,
2、之后第二次走的是循环里的if语句,因为刚被赋值,所以nextInLineRecords不为null,并且还没有提取,所以this.nextInLineRecords.isFetched=false
3、在第二次走的if语句中,执行this.fetchRecords(this.nextInLineRecords, recordsRemaining);后,this.nextInLineRecords.isFetched会被置为true,下次循环又要走else了,nextInLineRecords又重新被队列中的新的值赋值,并且新的this.nextInLineRecords.isFetched=false,下一次循环又可以走if语句了
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap();//获取一批最大记录数int recordsRemaining = this.maxPollRecords;//.......删除干扰理解的代码行while(recordsRemaining > 0) {if (this.nextInLineRecords != null && !this.nextInLineRecords.isFetched) {List<ConsumerRecord<K, V>> records = this.fetchRecords(this.nextInLineRecords, recordsRemaining);TopicPartition partition = this.nextInLineRecords.partition;if (!records.isEmpty()) {List<ConsumerRecord<K, V>> currentRecords = (List)fetched.get(partition);if (currentRecords == null) {fetched.put(partition, records);} else {List<ConsumerRecord<K, V>> newRecords = new ArrayList(records.size() + currentRecords.size());newRecords.addAll(currentRecords);newRecords.addAll(records);fetched.put(partition, newRecords);}//循环第一个结束位置,recordsRemaining减小到0recordsRemaining -= records.size();}} else {Fetcher.CompletedFetch completedFetch = (Fetcher.CompletedFetch)this.completedFetches.peek();if (completedFetch == null) {//循环第二个结束的位置,内部队列没有数据了break;}try {this.nextInLineRecords = this.parseCompletedFetch(completedFetch);} catch (Exception var7) {//循环第三个结束的位置,抛异常PartitionData partition = completedFetch.partitionData;if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {this.completedFetches.poll();}throw var7;}this.completedFetches.poll();}}//.......删除干扰理解的代码行return fetched;}
(1)当nextInLineRecords的数据被提取时,就把nextInLineRecords置为已提取,
private List<ConsumerRecord<K, V>> fetchRecords(Fetcher<K, V>.PartitionRecords partitionRecords, int maxRecords) {//.......删除干扰理解的代码行partitionRecords.drain();return Collections.emptyList();}
partitionRecords.drain();
会把提取标志设为已提取
private void drain() {if (!this.isFetched) {this.maybeCloseRecordStream();this.cachedRecordException = null;this.isFetched = true;this.completedFetch.metricAggregator.record(this.partition, this.bytesRead, this.recordsRead);if (this.bytesRead > 0) {Fetcher.this.subscriptions.movePartitionToEnd(this.partition);}}}
1.2 针对不同的分区,客户端拉取数据的请求都会发送一次
public synchronized int sendFetches() {//返回的是一个map,key是集群的节点,value是要发往这个节点的入参,下面for循环次数=客户端发送请求的次数(一个分区一次请求)=fetchRequestMap.sizeMap<Node, FetchRequestData> fetchRequestMap = this.prepareFetchRequests();final Node fetchTarget;final FetchRequestData data;Builder request;//它遍历一个名为fetchRequestMap的映射(Map)的条目集合。每个条目是一个键值对,其中键是请求的目标(fetchTarget),值是请求对象(request)。//在每次循环迭代中,代码会执行以下操作://获取迭代器(Iterator)对象var2,用于遍历fetchRequestMap的条目集合。//检查是否还有下一个条目,即检查迭代器是否还有更多的元素。//如果还有下一个条目,代码将执行this.client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>()这一行代码。//这行代码的作用是将请求对象request发送到指定的目标fetchTarget,并添加一个RequestFutureListener监听器来处理响应。//请注意,代码中的this.client表示当前对象的客户端属性,send是客户端发送请求的方法,addListener用于添加请求监听器。for(Iterator var2 = fetchRequestMap.entrySet().iterator(); var2.hasNext(); this.client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>() {//监听器在请求成功后的处理逻辑public void onSuccess(ClientResponse resp) {synchronized(Fetcher.this) {FetchResponse<Records> response = (FetchResponse)resp.responseBody();FetchSessionHandler handler = Fetcher.this.sessionHandler(fetchTarget.id());if (handler == null) {Fetcher.this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id());} else if (handler.handleResponse(response)) {Set<TopicPartition> partitions = new HashSet(response.responseData().keySet());Fetcher.FetchResponseMetricAggregator metricAggregator = new Fetcher.FetchResponseMetricAggregator(Fetcher.this.sensors, partitions);Iterator var7 = response.responseData().entrySet().iterator();//遍历结果集,while(var7.hasNext()) {Entry<TopicPartition, PartitionData<Records>> entry = (Entry)var7.next();TopicPartition partition = (TopicPartition)entry.getKey();long fetchOffset = ((org.apache.kafka.common.requests.FetchRequest.PartitionData)data.sessionPartitions().get(partition)).fetchOffset;PartitionData fetchData = (PartitionData)entry.getValue();Fetcher.this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", new Object[]{Fetcher.this.isolationLevel, fetchOffset, partition, fetchData});//把数据放入到completedFetches 队列中,每一个Fetcher都有分区和数据Fetcher.this.completedFetches.add(new Fetcher.CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion()));}Fetcher.this.sensors.fetchLatency.record((double)resp.requestLatencyMs());}}}//在请求发生异常后的处理方法public void onFailure(RuntimeException e) {//.......删除干扰理解的代码行}})) {//这里才是for循环的主体逻辑,上面的onFailure和onSuccess是RequestFutureListener内部实现,Entry<Node, FetchRequestData> entry = (Entry)var2.next();fetchTarget = (Node)entry.getKey();data = (FetchRequestData)entry.getValue();request = Builder.forConsumer(this.maxWaitMs, this.minBytes, data.toSend()).isolationLevel(this.isolationLevel).setMaxBytes(this.maxBytes).metadata(data.metadata()).toForget(data.toForget());//.......删除干扰理解的代码行}return fetchRequestMap.size();}
(1)组装此次执行要拉取哪些分区的请求集合
private Map<Node, FetchRequestData> prepareFetchRequests() {Cluster cluster = this.metadata.fetch();Map<Node, org.apache.kafka.clients.FetchSessionHandler.Builder> fetchable = new LinkedHashMap();//var3是此次请求要获取的分区对象的迭代器,迭代器中每一个对象都是TopicPartition1Iterator var3 = this.fetchablePartitions().iterator();//遍历while(var3.hasNext()) {TopicPartition partition = (TopicPartition)var3.next();//此partition分区属于集群的哪个节点,后面会当成fetchable的keyNode node = cluster.leaderFor(partition);//.......删除干扰理解的代码行//builder=fetchable.get(node);org.apache.kafka.clients.FetchSessionHandler.Builder builder = (org.apache.kafka.clients.FetchSessionHandler.Builder)fetchable.get(node);//如果从map中根据node当key,得出的value是null,则创建一个新的build放入map中if (builder == null) {FetchSessionHandler handler = this.sessionHandler(node.id());if (handler == null) {handler = new FetchSessionHandler(this.logContext, node.id());this.sessionHandlers.put(node.id(), handler);}builder = handler.newBuilder();fetchable.put(node, builder);}long position = this.subscriptions.position(partition);//把分区和获取分区最大size添加到buildbuilder.add(partition, new org.apache.kafka.common.requests.FetchRequest.PartitionData(position, -1L, this.fetchSize, Optional.empty()));this.log.debug("Added {} fetch request for partition {} at offset {} to node {}", new Object[]{this.isolationLevel, partition, position, node});}//.......删除干扰理解的代码行Map<Node, FetchRequestData> reqs = new LinkedHashMap();Iterator var10 = fetchable.entrySet().iterator();//遍历fetchable,根据不同的节点key,value是node对应的build,重新得到一个新的mapwhile(var10.hasNext()) {Entry<Node, org.apache.kafka.clients.FetchSessionHandler.Builder> entry = (Entry)var10.next();reqs.put(entry.getKey(), ((org.apache.kafka.clients.FetchSessionHandler.Builder)entry.getValue()).build());}//这个map就是实际发送请求,发往服务端的入参的一部分return reqs;}
1)在组装请求集合之前,客户端要首先获取此次要拉取哪些分区
消费者能消费哪些分区拉取数据,
1、当前客户端订阅了哪些分区,就是有权限拉取这些分区的数据
2、缓冲区队列中有积压数据的分区不再此次拉取分区的集合内
//kafka内部队列,从服务端得到的数据会放这里,之后由业务调用poll方法,先从这个队列里取数据,没有则请求private final ConcurrentLinkedQueue<Fetcher.CompletedFetch> completedFetches;private List<TopicPartition> fetchablePartitions() {Set<TopicPartition> exclude = new HashSet();//获得当前消费者客户端可以消息哪些分区的集合List<TopicPartition> fetchable = this.subscriptions.fetchablePartitions();if (this.nextInLineRecords != null && !this.nextInLineRecords.isFetched) {exclude.add(this.nextInLineRecords.partition);}//查看缓冲区队列还有哪些分区挤压着,从这些分区获取数据排除在这次请求Iterator var3 = this.completedFetches.iterator();while(var3.hasNext()) {Fetcher.CompletedFetch completedFetch = (Fetcher.CompletedFetch)var3.next();exclude.add(completedFetch.partition);}fetchable.removeAll(exclude);return fetchable;}