取本地数据_深入理解Kafka服务端之Follower副本如何同步Leader副本的数据

一、场景分析Kafka采用的是主写主读的方式,即客户端的读写请求都由分区的Leader副本处理,那么Follower副本要想保证和Leader副本数据一致,就需要不断地从Leader副本拉取消息来进行同步。由于同一个分区的Leader副本和Follower副本分布在不同的节点上,所以同步的过程可以简单概括为:Follower副本所在节点封装拉取数据的请求并发送给Leader副本所在节点 → Leader副本所在节点接收拉取数据的请求并进行处理,然后返回响应 → Follower副本所在节点接收到返回的响应并进行处理。这个过程中封装拉取请求和处理返回的响应是Follower副本所在节点的一个单独的线程完成的。二、图示说明

    假设某主题只有1个分区,该分区有两个副本:Leader 副本在 Broker1 上,Follower 副本在 Broker2 上,其 Leader 副本写入数据和 Follower 副本同步数据的流程如下图:

e1ef27109d25dbad9cc949b56801ee39.png

三、源码分析Kafka分区的Leader副本接收客户端生产的数据,写入本地存储;然后Follower副本拉取数据写入本地存储,并更新一系列关键的偏移量。整个流程比较复杂,这里先通过一个简单的方法调用流程来看一下这个过程:
1.leader 副本将数据写入本地磁盘  KafkaApis.handleProduceRequest(){      replicaManager.appendRecords(){            appendToLocalLog(){                Partition.appendRecordsToLeader(){                    Log.appendAsLeader(){                        Log.append(){                              //通过LogSegment.append()方法写入磁盘                              LogSegment.append()                        }                    }                }            }        }  }2.leader 副本更新LEO  KafkaApis.handleProduceRequest(){      replicaManager.appendRecords(){            appendToLocalLog(){                Partition.appendRecordsToLeader(){                    Log.appendAsLeader(){                        Log.append(){                              //更新Leader副本的LEO值                            updateLogEndOffset(appendInfo.lastOffset + 1)                        }                    }                }            }        }  }3.follower 副本同步数据,携带自身的LEO  AbstractFetchThread.doWork(){      maybeFetch(){            buildFetch(fetchStates){                  //这里的fetchState.fetchOffset 就是Follower副本的LEO值                builder.add(topicPartition, new FetchRequest.PartitionData(                fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))            }        }  }4.leader 副本更新本地保存的Follower副本的LEO  ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){          if (isFromFollower) updateFollowerLogReadResults(replicaId, result){                  //TODO 更新leader保存的各个follower副本的LEO                  partition.updateReplicaLogReadResult(replica, readResult){                      //TODO 最终更新所有的replica的LEO的值                      replica.updateLogReadResult(logReadResult){                          //更新LEO对象                          logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata                    }                }            }      }  }5.leader 副本尝试更新ISR列表    ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){          if (isFromFollower) updateFollowerLogReadResults(replicaId, result){                  //TODO 尝试更新ISR列表                  val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult){                      //更新ISR列表                      updateIsr(newInSyncReplicas)                }            }      }  }6.leader 副本更新HW    ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){          if (isFromFollower) updateFollowerLogReadResults(replicaId, result){                  //TODO 尝试更新ISR列表及Leader副本的HW                  val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult){                      //TODO 尝试更新leader的HW                      maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs){                          //取ISR列表中副本的最小的LEO作为新的HW                          val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)                          //获取旧的HW                          val oldHighWatermark = leaderReplica.highWatermark                          //如果新的HW值大于旧的HW值,就更新                          if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||                            (oldHighWatermark.messageOffset == newHighWatermark.messageOffset &&                             oldHighWatermark.onOlderSegment(newHighWatermark))) {                            //更新 Leader 副本的 HW                            leaderReplica.highWatermark = newHighWatermark                        }                    }                }            }      }  }7.leader 副本给 follower副本 返回数据,携带leader 副本的 HW 值  ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){         readFromLocalLog(){           read(){             val readInfo = partition.readRecords(){                 //获取Leader Replica的高水位                 val initialHighWatermark = localReplica.highWatermark.messageOffset             }           }         }      }  }8.follower 副本写入数据,更新自身LEO、  ReplicaFetcherThread.processPartitionData(){    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false){            doAppendRecordsToFollowerOrFutureReplica(){                Log.appendAsFollower(){                    Log.append(){                          //更新Follower副本的LEO值                          updateLogEndOffset(appendInfo.lastOffset + 1)                    }                }            }        }  }9.follower 副本更新本地的 HW 值  ReplicaFetcherThread.processPartitionData(){        //根据leader返回的HW,更新Follower本地的HW:取Follower本地LEO 和 Leader HW 的较小值        val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)        //TODO 更新Follower副本的 HW 对象        replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)  }
注意:
  • 对于HW,Leader 副本和 Follower 副本只保存自身的

  • 对于LEO,Follower 副本只保存自身的,但是 Leader 副本除了保存自身的外,还会保存所有 Follower 副本的 LEO 值

  • 无论是Leader副本所在节点,还是Follower副本所在节点,分区对应的Partition 对象都会保存所有的副本对象,但是只有本地副本对象有对应的日志文件

整个数据写入及同步的过程分为九个步骤:

  1. leader 副本将数据写入本地磁盘
  2. leader 副本更新 LEO
  3. follower 副本发送同步数据请求,携带自身的 LEO
  4. leader 副本更新本地保存的其它副本的 LEO
  5. leader 副本尝试更新 ISR 列表
  6. leader 副本更新 HW
  7. leader 副本给 follower 副本返回数据,携带 leader 副本的 HW 值
  8. follower 副本接收响应并写入数据,更新自身 LEO
  9. follower 副本更新本地的 HW 值

   下面具体分析这几个步骤。第一、二步在分析日志对象的写数据流程时已经详细介绍过,这里不再赘述(《深入理解Kafka服务端之日志对象的读写数据流程》)。 对于后面的几个步骤,由于发生在不同的节点上,并没有按照这个顺序进行分析,而是分成了

  • Follower副本的相关操作:即 第三步、第八步、第九步
  • Leader副本的相关操作:即 第四步、第五步、第六步、第七步
上面提到,Follower副本拉取数据是通过一个单独的线程完成的,所以在分析这几个步骤之前,先看一下这个线程相关的类:
  • 抽象类:AbstractFetcherThread
  • 实现类:ReplicaFetcherThread
先看一下 AbstractFetcherThread 类的定义:
abstract class AbstractFetcherThread(name: String,//线程名称                                     clientId: String,//Cliend ID,用于日志输出                                     val sourceBroker: BrokerEndPoint,//数据源Broker地址                                     failedPartitions: FailedPartitions,//线程处理过程报错的分区集合                                     fetchBackOffMs: Int = 0,//拉取的重试间隔,默认是 Broker 端参数 replica.fetch.backoff.ms 值。                                     isInterruptible: Boolean = true)//是否允许线程中断  extends ShutdownableThread(name, isInterruptible) {  type FetchData = FetchResponse.PartitionData[Records]  type EpochData = OffsetsForLeaderEpochRequest.PartitionData  //泛型 PartitionFetchState:表征分区读取状态,包含已读取偏移量和对应的副本读取状态  //副本状态由 ReplicaState 接口定义,包含 读取中 和 截断中 两个  private val partitionStates = new PartitionStates[PartitionFetchState]  ...}

其中,type 的用法是:给指定的类起一个别名,如:

type FetchData = FetchResponse.PartitionData[Records]

后面就可以用 FetchData 来表示 FetchResponse.PartitionData[Records] 类;EpochData 同理。

    FetchResponse.PartitionData:FetchResponse是封装的FETCH请求的响应类,PartitionData是一个嵌套类,表示响应中单个分区的拉取信息,包括对应Leader副本的高水位,分区日志的起始偏移量,拉取到的消息集合等。

public static final class PartitionData<T extends BaseRecords> {    public final Errors error;//错误码    public final long highWatermark;//从Leader返回的分区的高水位值    public final long lastStableOffset;// 最新LSO值    public final long logStartOffset;//日志起始偏移量    public final Optional preferredReadReplica;// 期望的Read Replica;KAFKA 2.4之后支持部分Follower副本可以对外提供读服务    public final List abortedTransactions;// 该分区对应的已终止事务列表    public final T records;//消息集合}
OffsetsForLeaderEpochRequest.PartitionData:里面包含了Follower副本在本地保存的leader epoch 和从Leader副本获取到的leader epoch
public static class PartitionData {    public final Optional currentLeaderEpoch;    public final int leaderEpoch;}
分区读取的状态:

    PartitionFetchState:样例类,用来表征分区的读取状态。包含已拉取的偏移量,当前leader的epoch,副本读取状态等

case class PartitionFetchState(fetchOffset: Long,//已拉取的偏移量                               currentLeaderEpoch: Int,//当前epoch                               delay: DelayedItem,                               state: ReplicaState//副本读取状态                              ) {  //表征分区的读取状态  //1.可拉取,表明副本获取线程当前能够读取数据。判断条件是:副本处于Fetching且未被推迟执行  def isReadyForFetch: Boolean = state == Fetching && !isDelayed  //2.截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是:副本处于Truncating且未被推迟执行  def isTruncating: Boolean = state == Truncating && !isDelayed  //3.被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。判断条件是:存在未过期的延迟任务  def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0}
分区读取状态分为三种:
  • isReadyForFetch:可拉取,表明副本获取线程当前能够读取数据。判断条件是:副本处于Fetching且未被推迟执行
  • isTruncating:截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是:副本处于Truncating且未被推迟执行
  • isDelayed:被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。判断条件是:存在未过期的延迟任务

副本读取的状态

    ReplicaState:特质,用来表征副本读取状态。

sealed trait ReplicaState//截断中case object Truncating extends ReplicaState//拉取中case object Fetching extends ReplicaState
副本读取状态分为两种:
  • Truncating:截断中
  • Fetching:拉取中
对应上面的拉取数据流程,AbstractFetchThread定义了相关的方法:
  • buildFetch:封装拉取数据的请求
  • truncate:进行日志截断
  • processPartitionData:处理返回的响应
  • doWork:将上面定义的三个方法串联起来,形成一个闭环,并不断地重复执行。从而实现从Leader副本所在的节点同步消息
在 AbstractFetchThread 中,前三个定义的都是抽象方法,具体的方法实现在其实现类 ReplicaFetcherThread,其定义如下
class ReplicaFetcherThread(name: String,                           fetcherId: Int,//Follower 拉取的线程 Id,也就是线程的编号。                           // 单台 Broker 上,允许存在多个 ReplicaFetcherThread 线程。                           // Broker 端参数 num.replica.fetchers,决定了 Kafka 到底创建多少个 Follower 拉取线程。                           sourceBroker: BrokerEndPoint,                           brokerConfig: KafkaConfig,//服务端配置类,用来获取配置信息                           failedPartitions: FailedPartitions,                           replicaMgr: ReplicaManager,//副本管理器。该线程类通过副本管理器来获取分区对象、副本对象以及它们下面的日志对象。                           metrics: Metrics,                           time: Time,                           quota: ReplicaQuota,//用做限流。作用是控制 Follower 副本拉取速度                           leaderEndpointBlockingSend: Option[BlockingSend] = None//用于实现同步发送请求的类。                           // 所谓的同步发送,是指该线程使用它给指定 Broker 发送请求,然后线程处于阻塞状态,直到接收到 Broker 返回的 Response。                          )extends AbstractFetcherThread(                              name = name,                                clientId = name,                                sourceBroker = sourceBroker,                                failedPartitions,                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,                                isInterruptible = false) {  //Follower副本所在Broker的Id  private val replicaId = brokerConfig.brokerId  //用于执行请求发送的类  private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(        new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,        s"broker-$replicaId-fetcher-$fetcherId", logContext))    //Follower发送的FETCH请求被处理返回前的最长等待时间,由参数:replica.fetch.wait.max.ms 配置,默认 500 毫秒    private val maxWait = brokerConfig.replicaFetchWaitMaxMs    //每个FETCH Response返回前必须要累积的最少字节数,由参数:replica.fetch.min.bytes 配置,默认 1 字节    private val minBytes = brokerConfig.replicaFetchMinBytes    //每个合法FETCH Response的最大字节数,由参数:replica.fetch.response.max.bytes 配置,默认 10 M    private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes    //单个分区能够获取到的最大字节数,由参数:replica.fetch.max.bytes 配置,默认 1 M    private val fetchSize = brokerConfig.replicaFetchMaxBytes    ...}

buildFetch() 方法:为指定分区集合构建对应的FetchRequest.Builder 对象,而该对象是构建 FetchRequest 的核心组件。

这个方法中有一个重要的操作:

  • 封装拉取请求时,携带了Follower副本的 LogStartOffset 和 LEO 值(对应同步数据的第三步)

 override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]):             ResultWithPartitions[Option[FetchRequest.Builder]] = {    //定义一个保存出错分区的集合    val partitionsWithError = mutable.Set[TopicPartition]()    val builder = fetchSessionHandler.newBuilder()    // 遍历每个分区,将处于可获取状态的分区添加到builder后续统一处理    // 对于有错误的分区加入到出错分区集合    partitionMap.foreach { case (topicPartition, fetchState) =>      //如果分区的状态是可拉取的,且该分区未对follower限流      if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {        try {          //获取本地Follower副本保存的分区日志的logStartOffset          val logStartOffset = replicaMgr.localReplicaOrException(topicPartition).logStartOffset          /**将分区和对应的PartitionData添加到builder,注意这里的PartitionData对应的是拉取请求FetchRequest,里面封装了拉取请求的元数据信息,如:          * fetchOffset:拉取消息的起始偏移量,也就是Follower副本的LEO          * currentLeaderEpoch:Follower副本保存的leader epoch值          */          builder.add(topicPartition, new FetchRequest.PartitionData(            fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))        } catch {          case _: KafkaStorageException =>      //如果有异常,将该分区添加到出错分区的集合            partitionsWithError += topicPartition        }      }    }    val fetchData = builder.build()    val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {      None    } else {      //构造FETCH请求的Builder对象      val requestBuilder = FetchRequest.Builder        .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend)        .setMaxBytes(maxBytes)        .toForget(fetchData.toForget)        .metadata(fetchData.metadata)      Some(requestBuilder)    }    //构建返回结果,返回Builder对象以及出错分区列表    ResultWithPartitions(fetchRequestOpt, partitionsWithError)  }

truncate() 方法:用于将指定分区的日志截断到指定的偏移量

override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {    //根据分区获取本地副本    val replica = replicaMgr.localReplicaOrException(tp)    val partition = replicaMgr.getPartition(tp).get    //调用Partition.truecateTo方法进行日志截断    // offsetTruncationState.offset:要截断到的偏移量    partition.truncateTo(offsetTruncationState.offset, isFuture = false)    if (offsetTruncationState.offset < replica.highWatermark.messageOffset)      warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +        s"${replica.highWatermark.messageOffset}")    if (offsetTruncationState.truncationCompleted)      replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp,        offsetTruncationState.offset)}

    这个方法内部依次调用了:Partition.truncateTo -> LogManager.truncateTo -> Log.truncateTo -> LogSegment.truncateTo 进行日志截断操作

processPartitionData方法:用于处理指定分区从Leader副本所在节点返回的响应,将获取的消息写入本地存储,并返回写入消息的元数据

这里有两个个重要的操作:

  • 写入消息,更新 Follower 副本的 LEO(对应同步数据的第八步)

  • 更新 Follower 副本本地的 HW 值(对应同步数据的第九步)

override def processPartitionData(topicPartition: TopicPartition,   // 拉取数据的分区                                    fetchOffset: Long,              // 拉取的消息集合的起始位移                                    partitionData: FetchData        // 读取到的分区消息数据                                   ): Option[LogAppendInfo] = {     // 返回值:写入已读取消息数据前的元数据    //从副本管理器获取副本对象Replica    val replica = replicaMgr.localReplicaOrException(topicPartition)    //从副本管理器获取指定主题分区对象    val partition = replicaMgr.getPartition(topicPartition).get    //将获取的消息封装成MemoryRecords    val records = toMemoryRecords(partitionData.records)    //判断获取的消息集合是否超限    maybeWarnIfOversizedRecords(records, topicPartition)    //如果获取消息的起始位移值不是本地日志LEO值则视为异常情况    if (fetchOffset != replica.logEndOffset)      throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(        topicPartition, fetchOffset, replica.logEndOffset))    if (isTraceEnabled)      trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"        .format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))    //TODO 写入Follower副本本地日志,更新自身的LEO    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)    if (isTraceEnabled)      trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"        .format(replica.logEndOffset, records.sizeInBytes, topicPartition))    //根据leader返回的HW,更新Follower本地的HW:取Follower本地LEO 和 Leader HW 的较小值    val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)    //获取从leader返回的LogStartOffset    val leaderLogStartOffset = partitionData.logStartOffset    //TODO 更新Follower副本的HW对象    replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)    //尝试更新Follower副本的LogStartOffset    replica.maybeIncrementLogStartOffset(leaderLogStartOffset)    if (isTraceEnabled)      trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")    // 副本消息拉取限流    if (quota.isThrottled(topicPartition))      quota.record(records.sizeInBytes)    replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)    //返回写入消息的元数据    logAppendInfo  }
AbstractFetchThread.doWork() 方法:将上面的三个方法串联起来形成闭环,达到 Follower 副本从 Leader 副本同步数据的目的。
override def doWork() {    //尝试日志截断    maybeTruncate()    //尝试拉取数据    maybeFetch()}

这个方法很简单,只在内部调用了两个方法:

maybeTruncate():尝试进行日志截断

private def maybeTruncate(): Unit = {    // 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组    val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()    // 对于有Leader Epoch值的分区,将日志截断到Leader Epoch值对应的位移值处    if (partitionsWithEpochs.nonEmpty) {      truncateToEpochEndOffsets(partitionsWithEpochs)    }    // 对于没有Leader Epoch值的分区,将日志截断到高水位值处    if (partitionsWithoutEpochs.nonEmpty) {      truncateToHighWatermark(partitionsWithoutEpochs)    }}

这里先看对于没有Leader Epoch的分区,将日志截断到高水位处:

private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {    val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]    // 遍历每个要执行截断操作的分区对象    for (tp       // 获取分区的分区读取状态      val partitionState = partitionStates.stateValue(tp)      if (partitionState != null) {        // 取出高水位值。        val highWatermark = partitionState.fetchOffset        //封装截断状态        val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)        info(s"Truncating partition $tp to local high watermark $highWatermark")        // 执行截断到高水位值        if (doTruncate(tp, truncationState))          //保存分区和对应的截取状态          fetchOffsets.put(tp, truncationState)      }    }    // 更新这组分区的分区读取状态    updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)}
其中 doTruncate(tp, truncationState) 方法内部就调用了实现类:ReplicaFetcherThread.truncate() 方法maybeFetch():尝试从Leader副本拉取数据
private def maybeFetch(): Unit = {    //获取分区状态集合和对应的拉取请求的集合    val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {      //获取要拉取消息的分区和分区对应状态的集合      val fetchStates = partitionStates.partitionStateMap.asScala      // TODO 第一步:为集合中的分区构造FetchRequest.builder对象,这里的返回结果有两个对象:      //fetchRequestOpt:要读取的分区核心信息 + FetchRequest.Builder 对象。      // 而这里的核心信息,就是指要读取哪个分区,从哪个位置开始读,最多读多少字节,等等。      //partitionsWithError:一组出错的分区      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)      //TODO 第二步:处理出错的分区,处理方式主要是将这个分区加入到有序Map末尾,等待后续重试      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")      // 如果当前没有可读取的分区,则等待fetchBackOffMs时间等候后续重试      if (fetchRequestOpt.isEmpty) {        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)      }      (fetchStates, fetchRequestOpt)    }    //TODO 第三步:遍历FETCH请求,发送FETCH请求给Leader副本,并处理Response    fetchRequestOpt.foreach { fetchRequest =>      processFetchRequest(fetchStates, fetchRequest)    }}
这个方法可以划分为关键的三个步骤:a:为集合中的分区构造FetchRequest.builder对象
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
这里调用了实现类:ReplicaFetcherThread.buildFetch() 方法,返回结果有两个对象:fetchRequestOpt:要读取的分区核心信息 + FetchRequest.Builder 对象。而这里的核心信息,就是指要读取哪个分区,从哪个位置开始读,最多读多少字节,等等。partitionsWithError:一组出错的分区

b:处理出错的分区。处理方式主要是将这个分区加入到有序Map末尾,等待后续重试

handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
这个方法最后调用了PartitionStates.updateAndMoveToEnd() 方法,其作用就是把给定的分区从数据结构的头部移除,然后放到尾部,从而达到轮询的目的
//将给定的分区从map头部移除,然后再加到尾部,以达到轮询的目的//这里的LinkedHashMap对于插入元素是有顺序的,加入插入顺序是abcde,先读取了a,// 为了保证公平性,会将a从集合中先移除,然后放到尾部,那么下次就从b开始读public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {    map.remove(topicPartition);    map.put(topicPartition, state);    updateSize();}

c:遍历并发送FETCH请求给Leader副本,然后处理Response

fetchRequestOpt.foreach { fetchRequest =>  processFetchRequest(fetchStates, fetchRequest)}
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],                                  fetchRequest: FetchRequest.Builder): Unit = {    //定义出错分区的集合    val partitionsWithError = mutable.Set[TopicPartition]()    //定义接收响应数据的集合    var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty    try {      trace(s"Sending fetch request $fetchRequest")      //给Leader发送FETCH请求,获取响应数据      responseData = fetchFromLeader(fetchRequest)    } catch {      case t: Throwable =>        if (isRunning) {          warn(s"Error in response for fetch request $fetchRequest", t)          inLock(partitionMapLock) {            partitionsWithError ++= partitionStates.partitionSet.asScala            partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)          }        }    }    //更新请求发送速率指标    fetcherStats.requestRate.mark()    //如果接收到了响应    if (responseData.nonEmpty) {      inLock(partitionMapLock) {        //遍历响应结果中的分区和分区对应的数据        responseData.foreach { case (topicPartition, partitionData) =>          Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>            //获取分区对应的拉取状态            val fetchState = fetchStates(topicPartition)            // 处理Response的条件:            // 1. 获取的消息集合的起始偏移量和之前已保存的下一条待写入偏移量相等            // 2. 当前分区处于可获取状态            if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {              //获取请求中携带的Follower副本保存的 leader epoch 值              val requestEpoch = if (fetchState.currentLeaderEpoch >= 0)                Some(fetchState.currentLeaderEpoch)              else                None              partitionData.error match {                // 如果没有错误                case Errors.NONE =>                  try {                    // 交由子类完成Response的处理                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,                      partitionData)                    logAppendInfoOpt.foreach { logAppendInfo =>                      val validBytes = logAppendInfo.validBytes                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset                      fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset)                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {                        val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,                          state = Fetching)                        // 将该分区放置在有序Map读取顺序的末尾,保证公平性                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)                        fetcherStats.byteRate.mark(validBytes)                      }                    }                  } catch {                    case ime: CorruptRecordException =>                      error(s"Found invalid messages during fetch for partition $topicPartition " +                        s"offset ${currentFetchState.fetchOffset}", ime)                      partitionsWithError += topicPartition                    case e: KafkaStorageException =>                      error(s"Error while processing data for partition $topicPartition " +                        s"at offset ${currentFetchState.fetchOffset}", e)                      markPartitionFailed(topicPartition)                    case t: Throwable =>                      error(s"Unexpected error occurred while processing data for partition $topicPartition " +                        s"at offset ${currentFetchState.fetchOffset}", t)                      markPartitionFailed(topicPartition)                  }                // 如果读取位移值越界,通常是因为Leader发生变更                case Errors.OFFSET_OUT_OF_RANGE =>                  //调整越界,主要办法是做截断                  if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))                    //如果依然不能成功,将该分区添加到出错分区集合                    partitionsWithError += topicPartition                //如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要新                case Errors.UNKNOWN_LEADER_EPOCH =>                  debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +                    s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")                  // 加入到出错分区集合                  partitionsWithError += topicPartition                // 如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要旧                case Errors.FENCED_LEADER_EPOCH =>                  //将该分区标记为失效,从分区拉取状态集合中移除,并加入到失效分区集合                  if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition                // 如果Leader发生变更                case Errors.NOT_LEADER_FOR_PARTITION =>                  debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +                    "that the partition is being moved")                  // 加入到出错分区列表                  partitionsWithError += topicPartition                case _ =>                  error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",                    partitionData.error.exception)                  // 加入到出错分区集合                  partitionsWithError += topicPartition              }            }          }        }      }    }    // 处理出错分区集合,主要就是将该分区放到map数据结构的末尾    if (partitionsWithError.nonEmpty) {      handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")    }  }
Leader 副本如何处理拉取数据的请求:前面提到过,发送给服务端的各种请求都是由KafkaApis类处理的,处理FETCH请求的方法是:handleFetchRequest()

内部调用了ReplicaManager.fetchMessages() 方法:

def handleFetchRequest(request: RequestChannel.Request) {  ...    //TODO 这里是处理Follower Replica 拉取消息请求的具体方法    replicaManager.fetchMessages(        fetchRequest.maxWait.toLong,        fetchRequest.replicaId,        fetchRequest.minBytes,        fetchRequest.maxBytes,        versionId <= 2,        interesting,        replicationQuota(fetchRequest),        processResponseCallback,        fetchRequest.isolationLevel)    ...}

fetchMessages() 方法:

def fetchMessages(timeout: Long,                    replicaId: Int,                    fetchMinBytes: Int,                    fetchMaxBytes: Int,                    hardMaxBytesLimit: Boolean,                    fetchInfos: Seq[(TopicPartition, PartitionData)],                    quota: ReplicaQuota = UnboundedQuota,                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,                    isolationLevel: IsolationLevel) {    val isFromFollower = Request.isValidBrokerId(replicaId)    val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId    val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)      FetchLogEnd    else if (isolationLevel == IsolationLevel.READ_COMMITTED)      FetchTxnCommitted    else      FetchHighWatermark    //从本地磁盘读取数据    def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {      val result = readFromLocalLog(        replicaId = replicaId,        fetchOnlyFromLeader = fetchOnlyFromLeader,        fetchIsolation = fetchIsolation,        fetchMaxBytes = fetchMaxBytes,        hardMaxBytesLimit = hardMaxBytesLimit,        readPartitionInfo = fetchInfos,        quota = quota)      if (isFromFollower) updateFollowerLogReadResults(replicaId, result)      else result    }    //获取读取结果    val logReadResults = readFromLog()    var bytesReadable: Long = 0    var errorReadingData = false    val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]    logReadResults.foreach { case (topicPartition, logReadResult) =>      if (logReadResult.error != Errors.NONE)        errorReadingData = true      bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes      logReadResultMap.put(topicPartition, logReadResult)    }    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {      val fetchPartitionData = logReadResults.map { case (tp, result) =>        tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,          result.lastStableOffset, result.info.abortedTransactions)      }      responseCallback(fetchPartitionData)    } else {      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]      fetchInfos.foreach { case (topicPartition, partitionData) =>        logReadResultMap.get(topicPartition).foreach(logReadResult => {          val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))        })      }      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,        fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)    }  }
该方法内部定义了一个readFromLog()方法,其作用有两个:    a. 调用readFromLocalLog() 读取 Leader 副本的本地日志    b. 调用 updateFollowerLogReadResults() 更新Leader副本的HW、Leader副本保存的对应Follower副本的LEO,以及尝试调整ISR列表等readFromLocalLog() 方法和内部定义的 read() 方法如下,用于从Leader副本的日志文件读取数据:
def readFromLocalLog(replicaId: Int,                       fetchOnlyFromLeader: Boolean,                       fetchIsolation: FetchIsolation,                       fetchMaxBytes: Int,                       hardMaxBytesLimit: Boolean,                       readPartitionInfo: Seq[(TopicPartition, PartitionData)],                       quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {    def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {      //读取的起始偏移量      val offset = fetchInfo.fetchOffset      //读取的大小      val partitionFetchSize = fetchInfo.maxBytes      //follower Replica 的LogStartOffset      val followerLogStartOffset = fetchInfo.logStartOffset      brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()      val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)      try {        trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +          s"remaining response limit $limitBytes" +          (if (minOneMessage) s", ignoring response/partition size limits" else ""))        val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)        val fetchTimeMs = time.milliseconds        //读取数据,获取读取结果,里面包含了读取到的消息,LEO,HW,LogStartOffset等信息        val readInfo = partition.readRecords(          //读取的起始偏移量          fetchOffset = fetchInfo.fetchOffset,          //Follower副本保存的Leader epoch          currentLeaderEpoch = fetchInfo.currentLeaderEpoch,          maxBytes = adjustedMaxBytes,          fetchIsolation = fetchIsolation,          fetchOnlyFromLeader = fetchOnlyFromLeader,          minOneMessage = minOneMessage)        //获取读到的数据        val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {          //如果分区被限流了,那么返回一个空集合          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)        } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {          //如果返回的消息集合不完整,也返回一个空集合          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)        } else {          //正常返回          readInfo.fetchedData        }        //根据获取到的数据封装返回结果        LogReadResult(info = fetchDataInfo,                      highWatermark = readInfo.highWatermark,//Leader的HW                      leaderLogStartOffset = readInfo.logStartOffset,//Leader的LogStartOffset                      leaderLogEndOffset = readInfo.logEndOffset,//Leader的LEO                      followerLogStartOffset = followerLogStartOffset,//Follower的LogStartOffset                      fetchTimeMs = fetchTimeMs,                      readSize = adjustedMaxBytes,                      lastStableOffset = Some(readInfo.lastStableOffset),                      exception = None//异常信息                      )      } catch {        case e@ (_: UnknownTopicOrPartitionException |                 _: NotLeaderForPartitionException |                 _: UnknownLeaderEpochException |                 _: FencedLeaderEpochException |                 _: ReplicaNotAvailableException |                 _: KafkaStorageException |                 _: OffsetOutOfRangeException) =>          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),                        highWatermark = -1L,                        leaderLogStartOffset = -1L,                        leaderLogEndOffset = -1L,                        followerLogStartOffset = -1L,                        fetchTimeMs = -1L,                        readSize = 0,                        lastStableOffset = None,                        exception = Some(e))        case e: Throwable =>          brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()          brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()          val fetchSource = Request.describeReplicaId(replicaId)          error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +            s"on partition $tp: $fetchInfo", e)          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),                        highWatermark = -1L,                        leaderLogStartOffset = -1L,                        leaderLogEndOffset = -1L,                        followerLogStartOffset = -1L,                        fetchTimeMs = -1L,                        readSize = 0,                        lastStableOffset = None,                        exception = Some(e))      }    }    //读取的最大字节    var limitBytes = fetchMaxBytes    //封装结果对象    val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]    //是否至少返回一条消息    var minOneMessage = !hardMaxBytesLimit    //遍历分区进行读取    readPartitionInfo.foreach { case (tp, fetchInfo) =>      //获取读取的结果      val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)      //获取每个分区读取的字节数      val recordBatchSize = readResult.info.records.sizeInBytes      if (recordBatchSize > 0)        minOneMessage = false      //更新还可以读取的字节数      limitBytes = math.max(0, limitBytes - recordBatchSize)      //将分区的读取结果保存到结果集合中      result += (tp -> readResult)    }    //返回结果集    result  }
其中,read() 方法中通过调用Partition. readRecords() 方法,就获取了 Leader 副本的高水位值:
//获取Leader Replica的高水位val initialHighWatermark = localReplica.highWatermark.messageOffset
从这里可以看出,每个分区的读取结果中,都包含了 Leader 副本的 LEO、HW、LogStartOffset,以及 Follower 副本的LogStartOffset等信息。updateFollowerLogReadResults() 方法如下:
private def updateFollowerLogReadResults(replicaId: Int,                                           readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {    debug(s"Recording follower broker $replicaId log end offsets: $readResults")    readResults.map { case (topicPartition, readResult) =>      var updatedReadResult = readResult      nonOfflinePartition(topicPartition) match {          //如果找到了对应的分区        case Some(partition) =>          //根据副本id获取Partition对象中保存的副本对象          //Partition.allReplicasMap结构中保存了当前分区的所有副本对象。其中,key是brokerid,value是对应的Replica对象          partition.getReplica(replicaId) match {              //如果获取到了Replica对象            case Some(replica) =>              //TODO 更新leader保存的各个follower副本的LEO              partition.updateReplicaLogReadResult(replica, readResult)            case None =>              warn(s"Leader $localBrokerId failed to record follower $replicaId's position " +                s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +                s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " +                s"for partition $topicPartition. Empty records will be returned for this partition.")              updatedReadResult = readResult.withEmptyFetchInfo          }          //如果对应的分区没有被创建        case None =>          warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.")      }      topicPartition -> updatedReadResult    }  }

Partition.updateReplicaLogReadResult() 方法:

def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = {    val replicaId = replica.brokerId    val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L    //TODO 最终更新Leader副本保存的Follower副本的LEO的值    replica.updateLogReadResult(logReadResult)    val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L    val leaderLWIncremented = newLeaderLW > oldLeaderLW    //TODO 尝试更新ISR列表,在这个方法中会更新Leader副本对象的HW对象和分区对应的Log对象的HW值    val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)    val result = leaderLWIncremented || leaderHWIncremented    if (result)      tryCompleteDelayedRequests()    debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.")    result  }
Replica.updateLogReadResult() 方法:用于更新Partition保存的Follower副本的LEO(对应同步数据的第四步)
def updateLogReadResult(logReadResult: LogReadResult) {    if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)    else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)    //更新Follower副本的日志起始偏移量,即 _logStartOffset 变量    logStartOffset = logReadResult.followerLogStartOffset    //更新Follower副本的LEO元数据对象,即 _logEndOffsetMetadata 变量    logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata    //最后一次拉取时Leader副本的LEO    lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset    lastFetchTimeMs = logReadResult.fetchTimeMs}
maybeExpandIsr() 方法:尝试更新ISR列表(对应同步数据的第五步)
def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {    inWriteLock(leaderIsrUpdateLock) {      // 检查给定的副本对象是否需要添加到ISR列表      leaderReplicaIfLocal match {        case Some(leaderReplica) =>          //获取给定节点的Replica对象          val replica = getReplica(replicaId).get          //获取leader副本的HW值          val leaderHW = leaderReplica.highWatermark          //获取Follower副本的LEO          val fetchOffset = logReadResult.info.fetchOffsetMetadata.messageOffset          //判断是否需要更新ISR列表的条件:          //1.该节点不在ISR列表,且replica.logEndOffsetMetadata.offsetDiff(leaderHW)           //2.给定Follower副本的LEO大于等于leader副本的HW          //3.给定的Follower副本属于该分区          //4.leader epoch对应的起始偏移量存在且小于Follower副本的LEO          //满足这4个条件说明这个Follower副本已经和leader副本保持同步了,把这个Follower副本加入到ISR列表中          if (!inSyncReplicas.contains(replica) &&             assignedReplicas.map(_.brokerId).contains(replicaId) &&             replica.logEndOffsetMetadata.offsetDiff(leaderHW) >= 0 &&             leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) {            //将该副本加入集合            val newInSyncReplicas = inSyncReplicas + replica            info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +              s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")            // update ISR in ZK and cache            //更新ISR列表            updateIsr(newInSyncReplicas)            replicaManager.isrExpandRate.mark()          }          // check if the HW of the partition can now be incremented          // since the replica may already be in the ISR and its LEO has just incremented          //TODO 尝试更新leader副本的HW对象及分区对应的Log对象的HW值          maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)        case None => false // nothing to do if no longer leader      }    }  }

maybeIncrementLeaderHW() 方法:尝试更新 leader 副本的 HW 对象及分区对应的Log 对象的 HW 值(对应同步数据的第六步)

private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {    val allLogEndOffsets = assignedReplicas.filter { replica =>      curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica)    }.map(_.logEndOffsetMetadata)    //取ISR列表中副本的最小的LEO作为新的HW    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)    //获取旧的HW    val oldHighWatermark = leaderReplica.highWatermark    //如果新的HW值大于旧的HW值,就更新    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {      //更新Replica的hightWatermark对象,以及对应Log对象的高水位值      leaderReplica.highWatermark = newHighWatermark      debug(s"High watermark updated to $newHighWatermark")      true    } else {      def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffsetMetadata}"      debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +        s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")      false    }  }
在前面分析Log日志对象的主要操作时,其中有一项是进行高水位操作的管理。在Log类中,操作高水位值的方法只有一个:onHighWatermarkIncremented
def onHighWatermarkIncremented(highWatermark: Long): Unit = {    lock synchronized {      //更新高水位值      replicaHighWatermark = Some(highWatermark)      producerStateManager.onHighWatermarkUpdated(highWatermark)      updateFirstUnstableOffset()    }  }
这个方法就是将 Log 中的 replicaHightWatermark 变量修改为给定的值。那么什么时候会修改呢?查看调用该方法的地方:Replica.highWatermark_
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {    //如果是本地副本    if (isLocal) {      if (newHighWatermark.messageOffset < 0)        throw new IllegalArgumentException("High watermark offset should be non-negative")      //高水位的元数据对象      highWatermarkMetadata = newHighWatermark      //更新Log对象保存的高水位值      log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))      trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")    } else {      throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")    }  }
在尝试更新Leader副本的高水位时会进行highWatermark_的调用:
//更新Replica的hightWatermark对象,以及对应Log对象的高水位值leaderReplica.highWatermark = newHighWatermark
最后会将多个分区的读取结果(包含Leader副本 HW)放到集合中,然后在合适的时机返回给Follower副本所在的节点(对应同步数据第七步)
def fetchMessages(){    ...    logReadResults.foreach { case (topicPartition, logReadResult) =>      //如果读取发生错误      if (logReadResult.error != Errors.NONE)        errorReadingData = true      bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes      //将读取结果放入集合      logReadResultMap.put(topicPartition, logReadResult)    }    ...}

    上面所说的合适的时机,分为 立即返回延时返回,当满足下面四个条件之一时,便立即返回,否则会进行延时处理:

  • 拉取等待的时间到了

  • 拉取请求中没有拉取分区的信息

  • 已经拉取到了足够多的数据

  • 拉取过程中发生错误

总结:

Leader副本写入数据,Follower副本进行同步的过程分为9个步骤:

  1. leader 副本将数据写入本地磁盘
  2. leader 副本更新 LEO
  3. follower 副本发送同步数据请求,携带自身的 LEO
  4. leader 副本更新本地保存的其它副本的 LEO
  5. leader 副本尝试更新 ISR 列表
  6. leader 副本更新 HW
  7. leader 副本给 follower 副本返回数据,携带 leader 副本的 HW 值
  8. follower 副本接收响应并写入数据,更新自身 LEO
  9. follower 副本更新本地的 HW 值

关于 HW 和 LEO 的保存:

  • 对于HW,Leader 副本和 Follower 副本只保存自身的

  • 对于LEO,Follower 副本只保存自身的,但是 Leader 副本除了保存自身的外,还会保存所有 Follower 副本的 LEO 值

  • 无论是Leader副本所在节点,还是Follower副本所在节点,分区对应的Partition 对象都会保存所有的副本对象,但是只有本地副本对象有对应的日志文件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/270798.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQLServer子查询相关知识笔记

今天给大家介绍以下SQLServer子查询相关知识&#xff0c;希望对大家能有所帮助&#xff01;1、子查询概念介绍子查询可以嵌套在SELECT、INSERT、UPDATE、DELETE语句或其他子查询语句中的查询&#xff0c;子查询一般作为查询中间结果集角色&#xff0c;子查询也称为内部查询或内…

软件:推荐八款电脑实用软件,你都用过吗?

今天给大家推荐八款电脑非常实用的软件&#xff0c;希望对大家能有所帮助&#xff01;1、Audio Hijack Pro一款Mac 上的录音软件。它比较强大的功能是可以录制多个应用的声音&#xff0c;然后组合成一个结果&#xff0c;然后输出。2、快贴一个跨平台剪切板同步软件。你只需要简…

硬件:开机如何进BIOS,U盘启动快捷键一键查询

❤️作者主页&#xff1a;IT技术分享社区 ❤️作者简介&#xff1a;大家好,我是IT技术分享社区的博主&#xff0c;从事C#、Java开发九年&#xff0c;对数据库、C#、Java、前端、运维、电脑技巧等经验丰富。 ❤️个人荣誉&#xff1a; 数据库领域优质创作者&#x1f3c6;&#x…

强制关机对电脑的伤害你有必要了解一下

不管你的电脑新旧与否&#xff0c;我想大家肯定都遇到过死机、卡顿无反应的情况吧&#xff0c;这个时候无论是电脑高手还是萌新小白同场都会采用一个相同的解决方案&#xff0c;对&#xff0c;那就是直接关机。 当然遇到这种情况&#xff0c;长按电源键10秒强制关机&#xff0c…

操作系统的中断和异常

中断和异常 早期的计算机&#xff0c;各程序只能串行执行&#xff0c;系统资源利用率低 中断机制的诞生 中断的概念和作用 中断的分类

硬件:台式机老式键盘知识科普

❤️作者主页&#xff1a;IT技术分享社区 ❤️作者简介&#xff1a;大家好,我是IT技术分享社区的博主&#xff0c;从事C#、Java开发九年&#xff0c;对数据库、C#、Java、前端、运维、电脑技巧等经验丰富。 ❤️个人荣誉&#xff1a; 数据库领域优质创作者&#x1f3c6;&#x…

高考填报志愿计算机操作技巧,高考志愿填报技巧经验

高考志愿填报技巧经验2020-12-30 14:43:23文/叶丹填报技巧&#xff1a;在所有科目考试结束后合理估分&#xff0c;提前参考往年录取分数线&#xff0c;明确各项重要的时间节点&#xff0c;搜集目标院校资料&#xff0c;查询目标院校专业近几年的位次情况&#xff0c;有意向的学…

硬件知识:台式电脑主机各种接口介绍

❤️作者主页&#xff1a;IT技术分享社区 ❤️作者简介&#xff1a;大家好,我是IT技术分享社区的博主&#xff0c;从事C#、Java开发九年&#xff0c;对数据库、C#、Java、前端、运维、电脑技巧等经验丰富。 ❤️个人荣誉&#xff1a; 数据库领域优质创作者&#x1f3c6;&#x…

两条曲线所围成的面积_三个视频搞定:求曲边梯形面积的思想、微积分基本定理及其几何意义、微积分理论的可视化解读、...

● 本文适合高二下学期、高三一轮复习的同学阅读。先看视频再看文字&#xff0c;看视频时注意利用暂停&#xff0c;想清楚每一步变形的依据。01曲边梯形的面积、微积分基本定理的内容视频讲解1、曲边梯形的概念及面积求法(1)曲边梯形&#xff1a;由直线x&#xff1d;a&#xff…

SQLServer基础:Apply关键字用法介绍

1、概念介绍APPLy关键字是SQLServer版本中开始提供的一个系统关键字。APPLY的功能同联接很类似&#xff0c;APPLY运算分左右两个部分&#xff0c;APPLY的右表达式&#xff1a;左表达式的每一行都和右表达式进行一次计算,即右表达式需要根据左表达式提供的值进行相关计算来获取相…

asp命令执行语句】_2分钟教你使用ASP.NET CORE创建并发布网页应用

>> 点击上方 懒人MES 关注我们准备环境: 只需要下载并安装 .NET CORE SDK 3.1(推荐3.1)微软官方下载地址: https://dotnet.microsoft.com/download/dotnet-core不需要Visual Studio, 也不需要VS Code, 一切操作都只在CMD命令行中完成, 而且只需要执行5条命令&#xff0c;…

苹果计算机磁盘格式,苹果电脑如何完全写入NTFS格式磁盘

很多在使用Mac的用户可能都发现了&#xff0c;Mac有时候不能正常使用NTFS格式磁盘。无论是U盘、硬盘、软盘还是其他的NTFS格式分区&#xff0c;mac在使用它们的时候只能有访问读取的权限&#xff0c;但想要修改、删除、存在文件则无法实现。为解决这一问题&#xff0c;小编今天…

电脑知识:分享几款常用的截屏方法,欢迎收藏

目录 1、Windows系统自带截图工具 2、截屏软件 3、聊天软件 4、Windows系统自带有截屏的快捷键 5、浏览器截屏 6、手机拍照 今天小编给大家介绍几个常用截屏的方法&#xff0c;希望对大家的日常办公能有所帮助&#xff01; 1、Windows系统自带截图工具 点击左下角开始菜单在“…

eigen 编译_头条 | 使用eigen实现四元数、欧拉角、旋转矩阵、旋转向量间的转换...

点击上方蓝字&#xff0c;关注本公众号&#xff0c;获得更多资源上一篇文章介绍了四元数、欧拉角、旋转矩阵、轴角如何相互转换&#xff0c;本篇文章介绍如何用eigen来实现。旋转向量1&#xff0c;初始化旋转向量&#xff1a;旋转角为alpha&#xff0c;旋转轴为(x,y,z)Eigen::A…

硬件知识:独立显卡和集成显卡的区别

目录 1、独立显卡 2、集成显卡 3、独立显卡与集成显卡性能上的区别 今天给大家介绍一下独立显卡和集成显卡的区别&#xff0c;希望对大家能有所帮助&#xff01; 1、独立显卡 独立显卡是将显示芯片及相关器件制作成一个独立于电脑主板的板卡&#xff0c;成为专门的图像处理硬件…

『OPEN3D』1.1 点云处理

目录 1.open3d中的点云IO 2.点云的可视化 3 点云voxel下采样 4. 顶点法线估计 5.最小外界矩 6. 凸包计算 7. 点云距离计算 8. DBSCAN clustering聚类 9. RANSAC(Random Sample Consensus) 10. 点云平面分割 11. 隐藏点移除 12.outliers移除 13 最远点采样&#xf…

c 多线程map_Rust:一个不再有 C/C++ 的,实现安全实时软件的未来

作者丨lochsh译者丨马可薇策划丨王文婧Rust 作为新兴编程语言深受 Haskell 和 OCaml 等函数式编程语言的影响&#xff0c;使得它在语法上与 C 类似&#xff0c;但在语义上则完全不同。Rust 是静态类型语言&#xff0c;同时具有完整类型推断&#xff0c;而不是 C 的部分类型推断…