kafka 3.5 主题分区ISR伸缩源码

ISR(In-sync Replicas):保持同步的副本
OSR(Outof-sync Replicas):不同步的副本。最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中
AR(Assigned Replicas):包括所有的分区的副本,AR=ISR+OSR
不懂的可以看一下Kafka——副本(Replica)机制

  • 一、在主题分区初始化时,当前主题分区所有副本都是会Leader副本的maximalIsr中
    • 1、先获得leaderIsrUpdateLock写锁,在锁内
    • 2、初始化ISR(只是把所有副本信息保存在maximalIsr,这时候maximalIsr也是最大的时候)
  • 二、定时任务针对ISR缩容
    • 1、2种启动方式
      • (1)zk模式
      • (2)kraft模式
    • 2、定时任务具体实现
      • (1) 获得leaderIsrUpdateLock的读锁判断是否需要ISR的缩容
      • (2)得到leaderIsrUpdateLock的写锁开始修改ISR
      • (3) 缩容后的ISR先赋值给maximalIsr,isr还是保持没有缩容前的
  • 三、Follower请求Leader的Fetch数据时,会判断是否加入ISR
    • 1、获得leaderIsrUpdateLock的读锁后再判断是否符合加入ISR条件
    • 2、获得leaderIsrUpdateLock的写锁再执行写入操作
    • 3、写入操作把新的Follower副本先加入maximalIsr,isr保持扩容前的
  • 四、修改完maximalIsr后都要把信息发给其他副本
    • 1、zk模式
      • 定时任务修改zk节点进行传播
    • 2、kraft模式
      • 通过给controllerChannelManager发送请求通知
  • 五、 maximalIsr和isr
    • 1、PartitionState中,isr和maximalIsr两个字段的定义和为什么上面只是修改了maximalIsr,而不是修改isr
    • 2、什么时候maximalIsr会给isr赋值

一、在主题分区初始化时,当前主题分区所有副本都是会Leader副本的maximalIsr中

如果不知到becomeLeaderOrFollower方法,可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

def becomeLeaderOrFollower(correlationId: Int,leaderAndIsrRequest: LeaderAndIsrRequest,onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {//省略代码val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)//省略代码          
}
private def makeLeaders(controllerId: Int,controllerEpoch: Int,partitionStates: Map[Partition, LeaderAndIsrPartitionState],correlationId: Int,responseMap: mutable.Map[TopicPartition, Errors],highWatermarkCheckpoints: OffsetCheckpoints,topicIds: String => Option[Uuid]): Set[Partition] = {//省略代码//更新分区信息以成为leader,成功则返回trueif (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {//将成功成为leader的分区添加到partitionsToMakeLeaders集合中partitionsToMakeLeaders += partition}//省略代码}         

1、先获得leaderIsrUpdateLock写锁,在锁内

def makeLeader(partitionState: LeaderAndIsrPartitionState,highWatermarkCheckpoints: OffsetCheckpoints,topicId: Option[Uuid]): Boolean = {//获取了一个写锁leaderIsrUpdateLock,以确保并发修改的同步。val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {//省略代码	controllerEpoch = partitionState.controllerEpoch//省略代码	val currentTimeMs = time.milliseconds//代码检查了isLeader是否为false,如果是,则将isNewLeader设置为true。val isNewLeader = !isLeader//代码将partitionState中的各种属性转换为Scala集合,并尝试更新分配和ISR状态。val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpochval replicas = partitionState.replicas.asScala.map(_.toInt)//遍历partitionState生成ISR,isv有此分区所有的副本的信息,包括Leader和Followerval isr = partitionState.isr.asScala.map(_.toInt).toSetval addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)//省略代码//如果分区纪元大于或等于当前分区纪元,则更新分配和 ISR updateAssignmentAndIsr(replicas = replicas,isLeader = true,isr = isr,addingReplicas = addingReplicas,removingReplicas = removingReplicas,LeaderRecoveryState.RECOVERED)//省略代码。。。。。isNewLeader}

updateAssignmentAndIsr这个会进行初始化ISR

  def updateAssignmentAndIsr(replicas: Seq[Int],isLeader: Boolean,isr: Set[Int],addingReplicas: Seq[Int],removingReplicas: Seq[Int],leaderRecoveryState: LeaderRecoveryState): Unit = {if (isLeader) {//根据replicas过滤出所有非本地节点的副本标识符,存储在followers中val followers = replicas.filter(_ != localBrokerId)//通过remoteReplicasMap.keys过滤出需要移除的副本标识符,存储在removedReplicas中val removedReplicas = remoteReplicasMap.keys.filterNot(followers.contains(_))//。通过迭代followers,将新副本添加到remoteReplicasMap,如果副本已存在,则不进行任何操作。followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))remoteReplicasMap.removeAll(removedReplicas)} else {//清空remoteReplicasMapremoteReplicasMap.clear()}assignmentState = if (addingReplicas.nonEmpty || removingReplicas.nonEmpty)OngoingReassignmentState(addingReplicas, removingReplicas, replicas)elseSimpleAssignmentState(replicas)partitionState = CommittedPartitionState(isr, leaderRecoveryState)}

通过调用CommittedPartitionStateISR(代码中字段是maximalIsr)赋值

2、初始化ISR(只是把所有副本信息保存在maximalIsr,这时候maximalIsr也是最大的时候)

case class CommittedPartitionState(isr: Set[Int],leaderRecoveryState: LeaderRecoveryState
) extends PartitionState {val maximalIsr = isrval isInflight = falseoverride def toString: String = {s"CommittedPartitionState(isr=$isr" +s", leaderRecoveryState=$leaderRecoveryState" +")"}
}

至于为什么赋值给maximalIsr,看一下下面第五章1节的PartitionState的定义,其实就知道,ISR还没有正式生效

二、定时任务针对ISR缩容

1、2种启动方式

(1)zk模式

kakfaServer.scala中的startup方法里会调用replicaManager.startup()

(2)kraft模式

BrokerServer.scalastartup方法------>
sharedServer.loader.installPublishers(metadataPublishers)-------->
scheduleInitializeNewPublishers(0);------------->
initializeNewPublishers------------->
publisher.onMetadataUpdate(delta, image, manifest);实现方法是BrokerMetadataPublisher.scala中的onMetadataUpdate-------------->
initializeManagers()----------------->
replicaManager.startup()

2、定时任务具体实现

首先直接看定时任务,在ReplicaManager.scala类中

 def startup(): Unit = {//启动 ISR 过期线程// 从属者在从 ISR 中删除之前最多可以落后于领导者。replicaLagTimeMaxMs x 1.5scheduler.schedule("isr-expiration", () => maybeShrinkIsr(), 0L, config.replicaLagTimeMaxMs / 2)}

实现定时执行方法为maybeShrinkIsr

 private def maybeShrinkIsr(): Unit = {trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")// Shrink ISRs for non offline partitions//收缩非脱机分区的 ISR,即遍历所有在线分区的ISR,allPartitions.keys.foreach { topicPartition =>onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())}}

(1) 获得leaderIsrUpdateLock的读锁判断是否需要ISR的缩容

  //检查是否需要更新ISR(In-Sync Replica)列表,并在需要更新时执行更新。
def maybeShrinkIsr(): Unit = {def needsIsrUpdate: Boolean = {//检查partitionState.isInflight是否为false,并在获取leaderIsrUpdateLock的读锁内部调用needsShrinkIsr()来判断。!partitionState.isInflight && inReadLock(leaderIsrUpdateLock) {needsShrinkIsr()}}if (needsIsrUpdate) {val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {leaderLogIfLocal.flatMap { leaderLog =>//获取超过指定延迟时间的不同步副本的ID列表。val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)partitionState match {case currentState: CommittedPartitionState if outOfSyncReplicaIds.nonEmpty =>//省略代码//准备更新ISR的操作。Some(prepareIsrShrink(currentState, outOfSyncReplicaIds))case _ =>None}}}//submitAlterPartition在LeaderAndIsr锁之外发送AlterPartition请求,因为完成逻辑可能会增加高水位线(high watermark)并完成延迟操作。alterIsrUpdateOpt.foreach(submitAlterPartition)}}

其中needsShrinkIsr的结果决定下面是否执行修改ISR操作

 private def needsShrinkIsr(): Boolean = {leaderLogIfLocal.exists { _ => getOutOfSyncReplicas(replicaLagTimeMaxMs).nonEmpty }}/*** 如果追随者已经拥有与领导者相同leo,则不会被视为不同步* 1、卡住的追随者:如果副本的 leo 尚未针对 maxLagMs ms 进行更新,则跟随者卡住,应从 ISR 中删除* 2、慢速跟随器:如果复制副本在最近 maxLagM 毫秒内未读取 leo,则跟随器滞后,应从 ISR 中删除* 这两种情况都是通过检查 lastCaughtUpTimeMs 来处理的,该 lastCaughtUpTimeM 表示副本完全赶上的最后时间。如果违反上述任一条件,则该副本将被视为不同步*如果 ISR 更新正在进行中,我们将在此处返回一个空集**/def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = {val current = partitionStateif (!current.isInflight) {val candidateReplicaIds = current.isr - localBrokerIdval currentTimeMs = time.milliseconds()val leaderEndOffset = localLogOrException.logEndOffsetcandidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))} else {Set.empty}}
private def isFollowerOutOfSync(replicaId: Int,leaderEndOffset: Long,currentTimeMs: Long,maxLagMs: Long): Boolean = {getReplica(replicaId).fold(true) { followerReplica =>//这里需要注意是感叹号,结果取反!followerReplica.stateSnapshot.isCaughtUp(leaderEndOffset, currentTimeMs, maxLagMs)}
}  
def isCaughtUp(leaderEndOffset: Long,currentTimeMs: Long,replicaMaxLagMs: Long): Boolean = {//如果leo==副本日志的logEndOffset或者当前时间减去最后的拉取时间间隔小于等于replicaMaxLagMs,则返回true,leaderEndOffset == logEndOffset || currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs}
}  

(2)得到leaderIsrUpdateLock的写锁开始修改ISR

执行的操作是prepareIsrShrink方法

  //在缩小 ISR 时,我们不能假设更新会成功,因为如果“AlterPartition”失败,这可能会错误地推进HW。// 因此,“PendingShrinkIsr”的“最大 ISR”是当前的 ISR。private[cluster] def prepareIsrShrink(currentState: CommittedPartitionState,outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {//把要去掉的副本从ISR中去掉val isrToSend = partitionState.isr -- outOfSyncReplicaIds//组建一个新的ISRval isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList)val newLeaderAndIsr = LeaderAndIsr(localBrokerId,leaderEpoch,partitionState.leaderRecoveryState,isrWithBrokerEpoch,partitionEpoch)val updatedState = PendingShrinkIsr(outOfSyncReplicaIds,newLeaderAndIsr,currentState)partitionState = updatedStateupdatedState}

(3) 缩容后的ISR先赋值给maximalIsr,isr还是保持没有缩容前的

PendingShrinkIsr方法会给ISR赋值

case class PendingShrinkIsr(outOfSyncReplicaIds: Set[Int],sentLeaderAndIsr: LeaderAndIsr,lastCommittedState: CommittedPartitionState
) extends PendingPartitionChange  {val isr = lastCommittedState.isrval maximalIsr = isrval isInflight = truedef notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {alterPartitionListener.markIsrShrink()}override def toString: String = {s"PendingShrinkIsr(outOfSyncReplicaIds=$outOfSyncReplicaIds" +s", sentLeaderAndIsr=$sentLeaderAndIsr" +s", leaderRecoveryState=$leaderRecoveryState" +s", lastCommittedState=$lastCommittedState" +")"}
}

三、Follower请求Leader的Fetch数据时,会判断是否加入ISR

kafkaApis.scala中的fetch请求处理逻辑中,有判断此次请求是Follower请求还是消费者的请求,或者你可以看一下kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码

 def fetchRecords(fetchParams: FetchParams,fetchPartitionData: FetchRequest.PartitionData,fetchTimeMs: Long,maxBytes: Int,minOneMessage: Boolean,updateFetchState: Boolean): LogReadInfo = {def readFromLocalLog(log: UnifiedLog): LogReadInfo = {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}//判断获取数据的请求是否来自Followerif (fetchParams.isFromFollower) {// Check that the request is from a valid replica before doing the readval (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {val localLog = localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)val replica = followerReplicaOrThrow(fetchParams.replicaId,fetchPartitionData)val logReadInfo = readFromLocalLog(localLog)(replica, logReadInfo)}//todo Follower副本在fetch数据后,修改一些信息if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {//如果 fetch 来自 broker 的副本同步,那么就更新相关的 log end offsetupdateFollowerFetchState(replica,followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,followerStartOffset = fetchPartitionData.logStartOffset,followerFetchTimeMs = fetchTimeMs,leaderEndOffset = logReadInfo.logEndOffset,fetchParams.replicaEpoch)}logReadInfo} //省略代码}

其中updateFollowerFetchState就是获取数据后进行一些处理

def updateFollowerFetchState(replica: Replica,followerFetchOffsetMetadata: LogOffsetMetadata,followerStartOffset: Long,followerFetchTimeMs: Long,leaderEndOffset: Long,brokerEpoch: Long): Unit = {//通过判断是否存在延迟的DeleteRecordsRequest来确定是否需要计算低水位(lowWatermarkIfLeader)。如果没有延迟的DeleteRecordsRequest,则将oldLeaderLW设为-1。val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L//获取副本的先前的跟随者日志结束偏移量val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset//调用replica.updateFetchState方法来更新副本的抓取状态,包括跟随者的抓取偏移量元数据、起始偏移量、抓取时间、领导者的结束偏移量和代理节点的时期。replica.updateFetchState(followerFetchOffsetMetadata,followerStartOffset,followerFetchTimeMs,leaderEndOffset,brokerEpoch)//再次判断是否存在延迟的DeleteRecordsRequest,如果没有则将newLeaderLW设为-1。val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L//检查分区的低水位是否增加,即新的低水位(newLeaderLW)是否大于旧的低水位(oldLeaderLW)。val leaderLWIncremented = newLeaderLW > oldLeaderLW//调用maybeExpandIsr方法来检查是否需要将该同步副本添加到ISR(In-Sync Replicas)中。maybeExpandIsr(replica)//检查分区的高水位是否可以增加。如果副本的日志结束偏移量(replica.stateSnapshot.logEndOffset)发生变化,val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) {//尝试增加高水位(maybeIncrementLeaderHW方法),并在leaderIsrUpdateLock锁的保护下执行该操作。inReadLock(leaderIsrUpdateLock) {leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))}} else {false}//如果低水位或高水位发生变化,则尝试完成延迟请求(tryCompleteDelayedRequests方法)。if (leaderLWIncremented || leaderHWIncremented)tryCompleteDelayedRequests()}

1、获得leaderIsrUpdateLock的读锁后再判断是否符合加入ISR条件

其中maybeExpandIsr方法会尝试把当前副本添加到ISR,和上面定时任务触发的maybeShrinkIsr差不多

 /***  //检查并可能扩展分区的 ISR。*如果副本的 LEO >= current hw,并且它在当前前导纪元内被赶到偏移量,则会将其添加到 ISR 中。* 副本必须先赶到当前领导者纪元,然后才能加入 ISR,* 否则,如果当前领导者的HW和 LEO 之间存在已提交的数据,则副本可能会在获取已提交数据之前成为领导者,并且数据将丢失。*/private def maybeExpandIsr(followerReplica: Replica): Unit = {//partitionState不在inflight状态 并且ISR不包含此Follower副本并且分区状态不是isInflight=true,再获取leaderIsrUpdateLock读锁val needsIsrUpdate = !partitionState.isInflight && canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock) {//再一次判断是否符合条件到ISR的条件needsExpandIsr(followerReplica)}if (needsIsrUpdate) {//经过needsIsrUpdate的验证,Follower符合添加到ISR的条件,则获得leaderIsrUpdateLock的写锁进行操作val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {// check if this replica needs to be added to the ISRpartitionState match {case currentState: CommittedPartitionState if needsExpandIsr(followerReplica) =>//prepareIsrExpand执行加入操作Some(prepareIsrExpand(currentState, followerReplica.brokerId))case _ =>None}}// Send the AlterPartition request outside of the LeaderAndIsr lock since the completion logic// may increment the high watermark (and consequently complete delayed operations).alterIsrUpdateOpt.foreach(submitAlterPartition)}}
 private def needsExpandIsr(followerReplica: Replica): Boolean = {//isFollowerInSync 会判断Follower副本的leo是否大于当前Leader的HW,大于则为truecanAddReplicaToIsr(followerReplica.brokerId) && isFollowerInSync(followerReplica)}//条件1private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {val current = partitionState!current.isInflight &&!current.isr.contains(followerReplicaId) &&isReplicaIsrEligible(followerReplicaId)}//判断副本是否符合成为ISR(In-Sync Replica)的条件private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {metadataCache match {//对于KRaft元数据缓存//1、副本没有被标记为已隔离(fenced)//2、副本不处于受控关机状态(controlled shutdown)。//3、副本的元数据缓存的Broker epoch与其Fetch请求的Broker epoch匹配,或者Fetch请求的Broker epoch为-1(绕过epoch验证)。case kRaftMetadataCache: KRaftMetadataCache =>val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpochval cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId)!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)//对于ZK元数据缓存,只需确保副本是存活的Broker即可。尽管这里没有检查正在关闭的Broker,但控制器会阻止它们加入ISR。case zkMetadataCache: ZkMetadataCache =>zkMetadataCache.hasAliveBroker(followerReplicaId)case _ => true}} //条件2private def isFollowerInSync(followerReplica: Replica): Boolean = {leaderLogIfLocal.exists { leaderLog =>val followerEndOffset = followerReplica.stateSnapshot.logEndOffsetfollowerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)}}  

2、获得leaderIsrUpdateLock的写锁再执行写入操作

方法是prepareIsrExpand

  //在扩展 ISR 时,我们假设新副本将在我们收到确认之前将其放入 ISR。// 这可确保HW已经反映更新的 ISR,即使在我们收到确认之前有延迟。// 或者,如果更新失败,则不会造成任何损害,因为扩展的 ISR 对HW的推进提出了更严格的要求。private def prepareIsrExpand(currentState: CommittedPartitionState,newInSyncReplicaId: Int): PendingExpandIsr = {//将当前的ISR与新的In-Sync Replica ID相结合,得到要发送的ISR列表isrToSendval isrToSend = partitionState.isr + newInSyncReplicaId//调用addBrokerEpochToIsr方法为ISR列表中的每个副本添加Broker Epoch,并将结果存储在isrWithBrokerEpoch中。val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList)//使用localBrokerId作为新的leader,将其他参数从当前的分区状态中获取,并创建一个新的LeaderAndIsr对象newLeaderAndIsr。val newLeaderAndIsr = LeaderAndIsr(localBrokerId,leaderEpoch,partitionState.leaderRecoveryState,isrWithBrokerEpoch,partitionEpoch)//创建一个PendingExpandIsr对象updatedState,其中包含新的In-Sync Replica ID、新的LeaderAndIsr对象和当前状态val updatedState = PendingExpandIsr(newInSyncReplicaId,newLeaderAndIsr,currentState)//将partitionState更新为updatedState。//返回updatedState作为结果。partitionState = updatedStateupdatedState}

3、写入操作把新的Follower副本先加入maximalIsr,isr保持扩容前的

case class PendingExpandIsr(newInSyncReplicaId: Int,sentLeaderAndIsr: LeaderAndIsr,lastCommittedState: CommittedPartitionState
) extends PendingPartitionChange {//这个是现在正在生效的ISR集合val isr = lastCommittedState.isr//而maximalIsr包含还没有正式生效的,防止因为修改失败影响流程val maximalIsr = isr + newInSyncReplicaIdval isInflight = truedef notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {alterPartitionListener.markIsrExpand()}override def toString: String = {s"PendingExpandIsr(newInSyncReplicaId=$newInSyncReplicaId" +s", sentLeaderAndIsr=$sentLeaderAndIsr" +s", leaderRecoveryState=$leaderRecoveryState" +s", lastCommittedState=$lastCommittedState" +")"}
}

四、修改完maximalIsr后都要把信息发给其他副本

上面不管是定时任务中的maybeShrinkIsr还是fetch请求中的maybeExpandIsr方法,都会执行到下面这个函数

alterIsrUpdateOpt.foreach(submitAlterPartition)
private def submitAlterPartition(proposedIsrState: PendingPartitionChange): CompletableFuture[LeaderAndIsr] = {debug(s"Submitting ISR state change $proposedIsrState")//alterIsrManager.submit是提交 ISR 状态更改,zk模式和kraft模式执行不同的函数//zk是ZkAlterPartitionManager中的submit//kraft是DefaultAlterPartitionManager中的submitval future = alterIsrManager.submit(new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),proposedIsrState.sentLeaderAndIsr,controllerEpoch)future.whenComplete { (leaderAndIsr, e) =>var hwIncremented = falsevar shouldRetry = falseinWriteLock(leaderIsrUpdateLock) {if (partitionState != proposedIsrState) {//这意味着partitionState在我们得到AlterPartition响应之前,是通过领导者选举或其他机制更新的。我们不知道控制器上到底发生了什么,但我们知道此响应已过时,因此我们忽略它。//省略代码} else if (leaderAndIsr != null) {//修改ISR,并且返回高位水是否递增hwIncremented = handleAlterPartitionUpdate(proposedIsrState, leaderAndIsr)} else {shouldRetry = handleAlterPartitionError(proposedIsrState, Errors.forException(e))}}//高水位标记是否增加。if (hwIncremented) {tryCompleteDelayedRequests()}if (shouldRetry) {//需要重试则自己调用自己submitAlterPartition(proposedIsrState)}}}

1、zk模式

//将给定的leaderAndIsr信息写入ZooKeeper,并返回一个LeaderAndIsr对象。override def submit(topicIdPartition: TopicIdPartition,leaderAndIsr: LeaderAndIsr,controllerEpoch: Int): CompletableFuture[LeaderAndIsr]= {debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " +s"${leaderAndIsr.partitionEpoch} for partition $topicIdPartition")//调用ReplicationUtils.updateLeaderAndIsr方法更新ZooKeeper中的leaderAndIsr信息,并返回更新是否成功(updateSucceeded)以及新的版本号(newVersion)。val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicIdPartition.topicPartition,leaderAndIsr, controllerEpoch)val future = new CompletableFuture[LeaderAndIsr]()if (updateSucceeded) {//使用synchronized关键字同步访问isrChangeSet,// Track which partitions need to be propagated to the controller//isrChangeSet是通过定时任务触发isrChangeSet synchronized {//将topicIdPartition.topicPartition添加到isrChangeSet中。isrChangeSet += topicIdPartition.topicPartition//使用lastIsrChangeMs记录最后一次ISR更改的时间。lastIsrChangeMs.set(time.milliseconds())}//使用leaderAndIsr.withPartitionEpoch(newVersion)更新leaderAndIsr的分区时代,并将其设置为future的结果。future.complete(leaderAndIsr.withPartitionEpoch(newVersion))} else {//省略代码}future}

定时任务修改zk节点进行传播

kakfaServer.scala中启动函数会执行如下命令

 alterPartitionManager.start()

其中alterPartitionManager的实现是ZkAlterPartitionManager
实际执行的是如下代码创建定时任务

override def start(): Unit = {scheduler.schedule("isr-change-propagation", () => maybePropagateIsrChanges(), 0L,isrChangeNotificationConfig.checkIntervalMs)}
/*** 此函数定期运行以查看是否需要传播 ISR。它在以下情况下传播 ISR:* 1. 尚未传播 ISR 更改。* 2. 最近 5 秒内没有 ISR 更改,或者自上次 ISR 传播以来已超过 60 秒。* 这允许在几秒钟内传播偶尔的 ISR 更改,并避免在发生大量 ISR 更改时使控制器和其他代理不堪重负。*/private[server] def maybePropagateIsrChanges(): Unit = {val now = time.milliseconds()isrChangeSet synchronized {if (isrChangeSet.nonEmpty &&(lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {zkClient.propagateIsrChanges(isrChangeSet)isrChangeSet.clear()lastIsrPropagationMs.set(now)}}}

2、kraft模式

override def submit(topicIdPartition: TopicIdPartition,leaderAndIsr: LeaderAndIsr,controllerEpoch: Int): CompletableFuture[LeaderAndIsr] = {val future = new CompletableFuture[LeaderAndIsr]()val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch)//把要修改的LeaderAndIsr信息放入到map中val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == nullif (enqueued) {maybePropagateIsrChanges()} else {future.completeExceptionally(new OperationNotAttemptedException(s"Failed to enqueue ISR change state $leaderAndIsr for partition $topicIdPartition"))}future}private[server] def maybePropagateIsrChanges(): Unit = {//如果尚未收到请求,请发送所有待处理项目。if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {//复制当前未发送的 ISR,但不从映射中删除,它们会在响应处理程序中清除val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]()unsentIsrUpdates.values.forEach(item => inflightAlterPartitionItems.append(item))sendRequest(inflightAlterPartitionItems.toSeq)}}

通过给controllerChannelManager发送请求通知

其中controllerChannelManager是在BrokerServer.scala初始化时执行 alterPartitionManager.start() ,实现类是DefaultAlterPartitionManager,执行的是start方法,方法内部是controllerChannelManager.start()

  private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit = {val brokerEpoch = brokerEpochSupplier()//构建一个AlterPartition请求,并返回请求对象request以及一个映射topicNamesByIdsval (request, topicNamesByIds) = buildRequest(inflightAlterPartitionItems, brokerEpoch)debug(s"Sending AlterPartition to controller $request")//我们不会使 AlterPartition 请求超时,而是让它无限期地重试,直到收到响应,或者新的 LeaderAndIsr 覆盖现有的 isrState,从而导致忽略这些分区的响应//controllerChannelManager.sendRequest方法用于将请求发送给控制器,并提供一个ControllerRequestCompletionHandler作为回调处理程序。controllerChannelManager.sendRequest(request,new ControllerRequestCompletionHandler {override def onComplete(response: ClientResponse): Unit = {debug(s"Received AlterPartition response $response")val error = try {if (response.authenticationException != null) {// For now we treat authentication errors as retriable. We use the// `NETWORK_EXCEPTION` error code for lack of a good alternative.// Note that `BrokerToControllerChannelManager` will still log the// authentication errors so that users have a chance to fix the problem.Errors.NETWORK_EXCEPTION} else if (response.versionMismatch != null) {Errors.UNSUPPORTED_VERSION} else {//处理响应handleAlterPartitionResponse(response.requestHeader,response.responseBody.asInstanceOf[AlterPartitionResponse],brokerEpoch,inflightAlterPartitionItems,topicNamesByIds)}} finally {// clear the flag so future requests can proceedclearInFlightRequest()}//省略代码}//省略代码})}

其中handleAlterPartitionResponse是处理请求后响应结果的函数

def handleAlterPartitionResponse(requestHeader: RequestHeader,alterPartitionResp: AlterPartitionResponse,sentBrokerEpoch: Long,inflightAlterPartitionItems: Seq[AlterPartitionItem],topicNamesByIds: mutable.Map[Uuid, String]): Errors = {val data = alterPartitionResp.dataErrors.forCode(data.errorCode) match {//省略代码。。。。case Errors.NONE =>//创建一个partitionResponses的可变哈希映射,用于存储分区级别的响应。val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()data.topics.forEach { topic =>//省略代码topic.partitions.forEach { partition =>//创建一个TopicPartition对象,表示主题和分区索引。val tp = new TopicPartition(topicName, partition.partitionIndex)val apiError = Errors.forCode(partition.errorCode)debug(s"Controller successfully handled AlterPartition request for $tp: $partition")if (apiError == Errors.NONE) {//解析分区的leaderRecoveryState,如果有效,则将分区的响应存储到partitionResponses中。LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {case Some(leaderRecoveryState) =>partitionResponses(tp) = Right(LeaderAndIsr(partition.leaderId,partition.leaderEpoch,partition.isr.asScala.toList.map(_.toInt),leaderRecoveryState,partition.partitionEpoch))//省略代码  }} else {partitionResponses(tp) = Left(apiError)}}}//遍历入参的inflightAlterPartitionItems,可以和响应结果对应,inflightAlterPartitionItems.foreach { inflightAlterPartition =>partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {case Some(leaderAndIsrOrError) =>//如果找到响应,将其从unsentIsrUpdates中移除,并根据响应的类型完成inflightAlterPartition.future。unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)leaderAndIsrOrError match {case Left(error) => inflightAlterPartition.future.completeExceptionally(error.exception)case Right(leaderAndIsr) => inflightAlterPartition.future.complete(leaderAndIsr)}//省略代码}}//省略代码}//省略代码}

五、 maximalIsr和isr

1、PartitionState中,isr和maximalIsr两个字段的定义和为什么上面只是修改了maximalIsr,而不是修改isr

sealed trait PartitionState {/*** 仅包括已提交到 ZK 的同步副本。*/def isr: Set[Int]/***此集可能包括扩展后未提交的 ISR 成员。此“有效”ISR 用于推进高水位线以及确定 acks=all produce 请求需要哪些副本*/def maximalIsr: Set[Int]/*** The leader recovery state. See the description for LeaderRecoveryState for details on the different values.*/def leaderRecoveryState: LeaderRecoveryState/*** 指示我们是否有正在进行的 更改分区 请求。*/def isInflight: Boolean
}

原因以maybeShrinkIsr举例:

maybeShrinkIsr方法更新的是maximalIsr变量,而不是ISR列表本身。maximalIsr是一个优化变量,用于表示在上一次调用maybeShrinkIsr方法时,ISR列表的最大长度。这样,Kafka可以通过检查当前ISR列表的长度与maximalIsr的大小来判断是否需要进行收缩操作。更新maximalIsr变量而不是直接更新ISR列表本身可以减少内存拷贝的开销,因为ISR列表可能在方法调用期间频繁地被更新。另外,只更新maximalIsr变量而不更新ISR列表本身可以保持ISR列表的稳定性,以便其他并发操作可以安全地访问ISR列表。

2、什么时候maximalIsr会给isr赋值

目前知道有几种
1、Leader选举时会修改isr
2、broker之间的心跳会适当修改isr
3、生产者发送数据到服务端,会适当修改isr

这里折磨了我2天,还是没找到什么时候isr中的数据会根据maximalIsr修改,网关资料都没有查到,只是说适当的时机,这个时机在哪里?或者都讲解到修改maximalIsr就结束了,就认为isr修改成功了,我连单元测试都看了,下面分析一个单元测试,大家如果有结果可以在评论里给一下答案,

@ParameterizedTest                                                                                         
@ValueSource(strings = Array("zk", "kraft"))                                                               
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {                                val kraft = quorum == "kraft"                                                                            val log = logManager.getOrCreateLog(topicPartition, topicId = None)                                      seedLogData(log, numRecords = 10, leaderEpoch = 4)                                                       val controllerEpoch = 0                                                                                  val leaderEpoch = 5                                                                                      val remoteBrokerId = brokerId + 1                                                                        val replicas = List(brokerId, remoteBrokerId)                                                            val isr = Set(brokerId)                                                                                  val metadataCache: MetadataCache = if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadif (kraft) {                                                                                             addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas)            }                                                                                                        // Mark the remote broker as eligible or ineligible in the metadata cache of the leader.                 // When using kraft, we can make the broker ineligible by fencing it.                                    // In ZK mode, we must mark the broker as alive for it to be eligible.                                   def markRemoteReplicaEligible(eligible: Boolean): Unit = {                                               if (kraft) {                                                                                           when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligi} else {                                                                                               when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)                              }                                                                                                      }                                                                                                        //初始化分区                                                                                                         val partition = new Partition(                                                                           topicPartition,                                                                                        replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,                                                    interBrokerProtocolVersion = MetadataVersion.latest,                                                   localBrokerId = brokerId,                                                                              () => defaultBrokerEpoch(brokerId),                                                                    time,                                                                                                  alterPartitionListener,                                                                                delayedOperations,                                                                                     metadataCache,                                                                                         logManager,                                                                                            alterPartitionManager                                                                                  )                                                                                                        partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)          assertTrue(partition.makeLeader(                                                                         new LeaderAndIsrPartitionState()                                                                       .setControllerEpoch(controllerEpoch)                                                                 .setLeader(brokerId)                                                                                 .setLeaderEpoch(leaderEpoch)                                                                         .setIsr(isr.toList.map(Int.box).asJava)                                                              .setPartitionEpoch(1)                                                                                .setReplicas(replicas.map(Int.box).asJava)                                                           .setIsNew(true),                                                                                     offsetCheckpoints, None), "Expected become leader transition to succeed")                              assertEquals(isr, partition.partitionState.isr)                                                          assertEquals(isr, partition.partitionState.maximalIsr)                                                   markRemoteReplicaEligible(true)                                                                          // Fetch to let the follower catch up to the log end offset and                                   
// to check if an expansion is possible.                                                          
//获取以让追随者赶上日志结束偏移量和检查是否可以扩展                                                                       
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)              // Follower fetches and catches up to the log end offset.                                         
//追随者获取并赶上日志结束偏移量。                                                                                
assertReplicaState(partition, remoteBrokerId,                                                     lastCaughtUpTimeMs = time.milliseconds(),                                                       logStartOffset = 0L,                                                                            logEndOffset = log.logEndOffset                                                                 
)                                                                                                 // Expansion is triggered.                                                                        
//扩展被触发。                                                                                          
assertEquals(isr, partition.partitionState.isr)                                                   
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)                                 
assertEquals(1, alterPartitionManager.isrUpdates.size)                                            // Controller rejects the expansion because the broker is fenced or offline.                      
//控制器拒绝扩展,因为代理处于受防护或脱机状态。                                                                         
alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)                                    // The leader reverts back to the previous ISR.                                                          //领导者将恢复到以前的 ISR。                                                                                        assertEquals(isr, partition.partitionState.isr)                                                          assertEquals(isr, partition.partitionState.maximalIsr)                                                   assertFalse(partition.partitionState.isInflight)                                                         assertEquals(0, alterPartitionManager.isrUpdates.size)                                                   // The leader eventually learns about the fenced or offline broker.                                      markRemoteReplicaEligible(false)                                                                         // The follower fetches again.                                                                           //追随者再次获取                                                                                                fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)                     // Expansion is not triggered because the follower is fenced.                                            //不会触发扩展,因为追随者被围栏                                                                                        assertEquals(isr, partition.partitionState.isr)                                                          assertEquals(isr, partition.partitionState.maximalIsr)                                                   assertFalse(partition.partitionState.isInflight)                                                         assertEquals(0, alterPartitionManager.isrUpdates.size)                                                   // The broker is eventually unfenced or brought back online.                                             //经纪人最终被解除围栏或重新上线。                                                                                       markRemoteReplicaEligible(true)                                                                          // The follower fetches again.                                                                           //追随者再次获取。                                                                                               fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)                     // Expansion is triggered.                                                                               //扩展被触发。                                                                                                 assertEquals(isr, partition.partitionState.isr)                                                          assertEquals(replicas.toSet, partition.partitionState.maximalIsr)                                        assertTrue(partition.partitionState.isInflight)                                                          assertEquals(1, alterPartitionManager.isrUpdates.size)                                                   // Expansion succeeds.                                                                                   //扩容成功。                                                                                                  alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1)                                           // ISR is committed.                                                                                     //todo ISR 已提交。                                                                                          assertEquals(replicas.toSet, partition.partitionState.isr)                                               assertEquals(replicas.toSet, partition.partitionState.maximalIsr)                                        assertFalse(partition.partitionState.isInflight)                                                         assertEquals(0, alterPartitionManager.isrUpdates.size)                                                   
} 

注意上面alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1) ,在这条命令之前,maximalIsr已经是最新的了,而isr还是旧的,当执行完这个命令后,isrmaximalIsr已经相同了,都是最新的了
其中alterPartitionManager.completeIsrUpdate执行的是TestUtils类中如下方法,

class MockAlterPartitionManager extends AlterPartitionManager {val isrUpdates: mutable.Queue[AlterPartitionItem] = new mutable.Queue[AlterPartitionItem]()val inFlight: AtomicBoolean = new AtomicBoolean(false)//这个命令会在fetchFollower命令里面执行,执行链条//fetchFollower->fetchRecords->updateFollowerFetchState->maybeExpandIsr->submitAlterPartition->submit//主要是把数据存入isrUpdatesoverride def submit(topicPartition: TopicIdPartition,leaderAndIsr: LeaderAndIsr,controllerEpoch: Int): CompletableFuture[LeaderAndIsr]= {val future = new CompletableFuture[LeaderAndIsr]()if (inFlight.compareAndSet(false, true)) {isrUpdates += AlterPartitionItem(topicPartition,leaderAndIsr,future,controllerEpoch)} else {future.completeExceptionally(new OperationNotAttemptedException(s"Failed to enqueue AlterIsr request for $topicPartition since there is already an inflight request"))}future}def completeIsrUpdate(newPartitionEpoch: Int): Unit = {if (inFlight.compareAndSet(true, false)) {val item = isrUpdates.dequeue()//第四章节,kraft模式,inflightAlterPartition.future.complete//第四章节,zk模式,future.complete(leaderAndIsr.withPartitionEpoch(newVersion))item.future.complete(item.leaderAndIsr.withPartitionEpoch(newPartitionEpoch))} else {fail("Expected an in-flight ISR update, but there was none")}}}   

其中isrUpdates.dequeue()出来的就是AlterPartitionItem,之后执行item.future.complete,之后isr修改完了,很莫名其妙,
我分析了第四章节和这个命令一样功能代码,他这里也没有future.whenComplete的后续处理,但是也修改了isr,不明白

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/80156.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(1)输入输出函数:cin和cout(2)数学函数:sqrt、pow、sin、cos、tan等

输入输出函数&#xff1a;cin 和 cout 在C编程语言中&#xff0c;为了与用户进行交互和显示程序的结果&#xff0c;我们使用了两个非常重要的函数&#xff1a;cin 和 cout。这两个函数分别用于输入和输出。 cin是C中的标准输入流对象&#xff0c;它用于从键盘接收用户的输入。…

SQL12 高级操作符练习(2)

描述 题目&#xff1a;现在运营想要找到学校为北大或GPA在3.7以上(不包括3.7)的用户进行调研&#xff0c;请你取出相关数据&#xff08;使用OR实现&#xff09; 示例&#xff1a;user_profile iddevice_idgenderageuniversitygpa12138male21北京大学3.423214male复旦大学4.03…

六、不root不magisk不xposed lsposed frida原生修改定位

前言常用风控APP检测1.Aida64检测2.momo检测3.微霸检测4.cellular-z检测 厂商测试总结 前言 不root不戴面具 不xposed lsposed frida&#xff0c;不分身&#xff0c;不多开&#xff0c;最完美的原生修改定位。 常用风控APP检测 先看效果再说原理&#xff0c;先过一遍环境 1.Ai…

聚类分析 | MATLAB实现基于SOM自组织特征映射聚类可视化

聚类分析 | MATLAB实现基于SOM自组织特征映射聚类可视化 目录 聚类分析 | MATLAB实现基于SOM自组织特征映射聚类可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 基于自组织特征映射聚类算法(SOM)的数据聚类可视化 可直接运行 注释清晰 Matlab语言 1.多特征输入&…

Python 可迭代对象、迭代器、生成器

可迭代对象 定义 在Python的任意对象中&#xff0c;只要它定义了可以返回一个迭代器的 __iter__ 魔法方法&#xff0c;或者定义了可以支持下标索引的 __getitem__ 方法&#xff0c;那么它就是一个可迭代对象&#xff0c;通俗的说就是可以通过 for 循环遍历了。Python 原生的列…

爬虫 — 正则案例

目录 一、需求二、页面分析三、代码实现 一、需求 目标网站&#xff1a;http://www.weather.com.cn/weather/101010700.shtml 需求&#xff1a;获取日期&#xff0c;天气&#xff0c;温度&#xff0c;风力数据 二、页面分析 1、确定 url&#xff0c;静态加载 url&#xff1a;ht…

Mybatis的mapper.xml批量插入、修改sql

今天要有个功能&#xff0c;要进行一批数据的插入和修改&#xff0c;为了不频繁调用数据库&#xff0c;所以想到了批量插入和修改&#xff0c;因为从毕业后&#xff0c;就没写过批量插入和批量修改&#xff0c;所以在这里记录一下&#xff0c;避免后续再遇到忘记怎么写了 批量…

【小记录】jupyter notebook新版本

手欠升级 &#x1f605;今天手贱&#xff0c;在anaconda navigator里面更新了最新版本的spyder&#xff0c;然后莫名奇妙地jupyter notebook就打不开了&#x1f605;&#xff0c;报错说缺少模块”ModuleNotFoundError: No module named jupyter_server.contents“&#xff0c;…

Python分享之对象的属性

Python一切皆对象(object)&#xff0c;每个对象都可能有多个属性(attribute)。Python的属性有一套统一的管理方案。 属性的__dict__系统 对象的属性可能来自于其类定义&#xff0c;叫做类属性(class attribute)。类属性可能来自类定义自身&#xff0c;也可能根据类定义继承来的…

docker挂载目录权限问题

虽然是root身份进入docker但是依然有些权限是没有的&#xff01; 一、docker权限参数 可以解决挂载目录操作权限低 使用–privilegedtrue和-u参数来给Docker容器授权 docker run -it --privilegedtrue -uroot --namemysqlTest -v /root/data:/root/data_container mysql:5.7…

从0到1学会Git(第三部分):Git的远程仓库链接与操作

写在前面:前面两篇文章我们已经学会了git如何在本地进行使用&#xff0c;这篇文章将讲解如何将本地的git仓库和云端的远程仓库链接起来并使用 为什么要使用远程仓库:因为我们需要拷贝我们的代码给别人以及进行协同开发&#xff0c;就需要有一个云端仓库进行代码的存储和同步&a…

常见的HTTP请求方式

目录 GET 请求 POST 请求 PUT 请求 DELETE 请求 PATCH 请求 HEAD 请求 OPTIONS 请求 HTTP&#xff08;Hypertext Transfer Protocol&#xff09;是一种用于传输数据的协议&#xff0c;它在互联网中扮演了至关重要的角色。HTTP请求方式定义了客户端与服务器之间的通信方式…

【数据结构】C++实现AVL平衡树

文章目录 1.AVL树的概念2.AVL树的实现AVL树结点的定义AVL树的插入AVL树的旋转左单旋右单旋左右双旋右左双旋插入代码 AVL树的验证AVL树的查找AVL树的修改AVL树的删除AVL树的性能 AVL树的代码测试 1.AVL树的概念 二叉搜索树虽然可以提高我们查找数据的效率&#xff0c;但如果插…

错误码:spark_error_00000004

错误码&#xff1a;spark_error_00000004 错误码&#xff1a;spark_error_00000004 问题原因&#xff1a;这个报错与Spark执行器&#xff08;executor&#xff09;的内存不足有关&#xff0c;程序运行时所需内存 > memory。一般是因为处理数据量或者缓存的数据量较大&#x…

模拟实现链式二叉树及其结构学习——【数据结构】

W...Y的主页 &#x1f60a; 代码仓库分享 &#x1f495; 之前我们实现了用顺序表完成二叉树(也就是堆)&#xff0c;顺序二叉树的实际作用就是解决堆排序以及Topk问题。 今天我们要学习的内容是链式二叉树&#xff0c;并且实现链式二叉树&#xff0c;这篇博客与递归息息相关&a…

Leetcode.712 两个字符串的最小ASCII删除和

题目链接 Leetcode.712 两个字符串的最小ASCII删除和 mid 题目描述 给定两个字符串 s1 和 s2&#xff0c;返回 使两个字符串相等所需删除字符的 ASCII 值的最小和 。 示例 1: 输入: s1 “sea”, s2 “eat” 输出: 231 解释: 在 “sea” 中删除 “s” 并将 “s” 的值(115)加…

Keepalived+LVS高可用集群

目录 一、keepalived介绍&#xff1a; 二、keepalived工具介绍&#xff1a; &#xff08;1&#xff09;管理 LVS 负载均衡软件&#xff1a; &#xff08;2&#xff09;支持故障自动切换&#xff1a; &#xff08;3&#xff09;实现 LVS 负载调度器、节点服务器的高可用性&…

合宙Air724UG LuatOS-Air LVGL API控件-二维码(Qrcode)

二维码&#xff08;Qrcode&#xff09; 示例代码 qrcodelvgl.qrcode_create(lvgl.scr_act(),nil)lvgl.qrcode_set_txt(qrcode,"https://doc.openluat.com/home")lvgl.obj_set_size(qrcode,400,400)lvgl.obj_align(qrcode, nil, lvgl.ALIGN_CENTER, 0, 0)创建 可以通…

【Nginx25】Nginx学习:连接限制和请求限制

Nginx学习&#xff1a;连接限制和请求限制 之前我们就已经学习过了一些和流量限制相关的配置指令&#xff0c;它们是 HTTP 核心配置中的内容 &#xff0c;不记得的小伙伴可以回去看一下 Nginx学习&#xff1a;HTTP核心模块&#xff08;七&#xff09;请求体与请求限流https://m…

第3章_瑞萨MCU零基础入门系列教程之开发环境搭建与体验

本教程基于韦东山百问网出的 DShanMCU-RA6M5开发板 进行编写&#xff0c;需要的同学可以在这里获取&#xff1a; https://item.taobao.com/item.htm?id728461040949 配套资料获取&#xff1a;https://renesas-docs.100ask.net 瑞萨MCU零基础入门系列教程汇总&#xff1a; ht…