Kafka-broker处理producer请求-leader篇

一、上下文

《Kafka-生产者源码分析》博客中我们了解了Kafka是如何生产数据的,《Kafka-broker粗粒度启动流程》博客中我们了解了KafkaApis中有各种api和对应处理逻辑,其中PRODUCE请求对应了处理produce请求的逻辑,下面我们跟着源码来看下处理细节

class KafkaApis(//......request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)......}
}

二、handleProduceRequest

  def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {//获取请求体val produceRequest = request.body[ProduceRequest]//为每个TopicPartition 的不同情况声明不同的responseMap//未经授权的val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()//不存在的val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()//无效的val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()//已授权的val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()//缓存结果以避免冗余的授权调用val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,produceRequest.data().topicData().asScala)(_.name())//依次循环 topic > partition 来处理//这说明这一个请求中需要处理多个 topic 的多个 partition 的数据produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>val topicPartition = new TopicPartition(topic.name, partition.index)val memoryRecords = partition.records.asInstanceOf[MemoryRecords]if (!authorizedTopics.contains(topicPartition.topic))unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicPartition))nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)elsetry {//只有授权且metadataCache中有这个topicPartition 才能走到这//校验数据,并把该数据放入authorizedRequestInfo ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)authorizedRequestInfo += (topicPartition -> memoryRecords)} catch {//......}}//回调函数,这里先不展开,后续处理完数据需要返回给producer时再展开@nowarn("cat=deprecation")def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {...}def processingStatsCallback(processingStats: FetchResponseStats): Unit = {processingStats.forKeyValue { (tp, info) =>updateRecordConversionStats(request, tp, info)}}//如果没有有效数据,就立即返回空的情况if (authorizedRequestInfo.isEmpty)sendResponseCallback(Map.empty)else {val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_IDval transactionSupportedOperation = if (request.header.apiVersion > 10) genericError else defaultError//调用副本管理器将消息追加到副本//副本管理器 是一个屏蔽了集群的层 里面既包括本地leader写也包括远程Follower写replicaManager.handleProduceAppend(timeout = produceRequest.timeout.toLong,requiredAcks = produceRequest.acks,internalTopicsAllowed = internalTopicsAllowed,transactionalId = produceRequest.transactionalId,entriesPerPartition = authorizedRequestInfo,responseCallback = sendResponseCallback,recordValidationStatsCallback = processingStatsCallback,requestLocal = requestLocal,transactionSupportedOperation = transactionSupportedOperation)//如果请求被放入炼狱,它将有一个被保留的引用,因此不能被垃圾回收;因此,我们在这里清除它的数据,以便让GC回收内存,因为它已经附加到日志中//当follower的数据追平leader的数据,且leader没有新数据增长时,follower的fetch请求会放入炼狱,来减少带宽的消耗produceRequest.clearPartitionRecords()}}

三、ReplicaManager

最终需要将数据给到ReplicaManager来进行实际的追加

class ReplicaManager(...){def handleProduceAppend(...){val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()entriesPerPartition.forKeyValue { (topicPartition, records) =>// 生成请求(仅需要验证的请求)应该在“批处理”中每个分区只有一个批处理,但为了安全起见,请检查所有批处理。val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)}if (transactionalProducerInfo.size > 1) {//从这里看出,事务记录应该只包含一个生产者id//抛出异常并提示:事务记录包含多个生产者IDthrow new InvalidPidMappingException("Transactional records contained more than one producer ID")}//又封装了一层回调def postVerificationCallback(...){}//如果事务记录包含0个生产者id,不用处理if (transactionalProducerInfo.size < 1) {postVerificationCallback(requestLocal,(Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard]))return}//事务记录中只能有一个生产者idmaybeStartTransactionVerificationForPartitions(topicPartitionBatchInfo,transactionalId,transactionalProducerInfo.head._1,transactionalProducerInfo.head._2,//当事务验证完成时,将要处理的回调封装在任意请求处理程序线程上。传入的本地请求仅在立即执行回调时使用。KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback,requestLocal),transactionSupportedOperation)}}

1、postVerificationCallback

事务校验完成,会处理回调,执行真正的数据追加

    def postVerificationCallback(...): Unit = {val (preAppendErrors, verificationGuards) = results//将事务协调器错误转换为已知的生产者响应错误val errorResults = preAppendErrors.map {//......}val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus }def newResponseCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {responseCallback(preAppendPartitionResponses ++ responses)}//执行数据追加操作appendRecords(timeout = timeout,requiredAcks = requiredAcks,internalTopicsAllowed = internalTopicsAllowed,origin = AppendOrigin.CLIENT,entriesPerPartition = entriesWithoutErrorsPerPartition,responseCallback = newResponseCallback,recordValidationStatsCallback = recordValidationStatsCallback,requestLocal = newRequestLocal,actionQueue = actionQueue,verificationGuards = verificationGuards)}

2、appendRecords

将消息附加到分区的leader副本,并等待它们复制到其他副本;当超时或满足所需的ack时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁

注意,所有挂起的延迟检查操作都存储在队列中。ReplicaManager.appendRecords() 的所有调用者都应该对所有受影响的分区调用ActionQueue.tryCompleteActions,而不会持有任何冲突的锁(将多线程锁的问题转成线性队列操作来提升性能)

  def appendRecords(t...): Unit = {//验证 acks 必须是 0 1 -1 三种类型if (!isValidRequiredAcks(requiredAcks)) {sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)return}val sTime = time.milliseconds//向本地log写入数据val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)//生产到本地日志中用了多长时间debug("Produce to local log in %d ms".format(time.milliseconds - sTime))//下面的我们后续博客再接着分析,本地写完log,//按理论知识:如果ack=0或者1,就可以直接返回了,如果是-1就需要等待备份数据写入成功val produceStatus = buildProducePartitionStatus(localProduceResults)addCompletePurgatoryAction(actionQueue, localProduceResults)recordValidationStatsCallback(localProduceResults.map { case (k, v) =>k -> v.info.recordValidationStats})maybeAddDelayedProduce(requiredAcks,delayedProduceLock,timeout,entriesPerPartition,localProduceResults,produceStatus,responseCallback)}

3、appendToLocalLog

接下来我们看看kafka是如何将消息附加到本地副本日志的

  private def appendToLocalLog(...){//按照每个topic > 分区 来写entriesPerPartition.map { case (topicPartition, records) =>brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()//如果不允许,则拒绝附加到内部主题if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {(topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),hasCustomErrorMessage = false))} else {try {val partition = getPartitionOrException(topicPartition)//接下来交给Partition向leader追加数据val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,//......省略.....} catch {//......}}}}

四、Partition

它是表示topic partition的数据结构。leader负责维护AR、ISR、CUR、RAR

并发注意事项:

1、分区是线程安全的。分区上的操作可以从不同的请求处理程序线程并发调用

2、ISR更新使用读写锁同步。读锁用于检查是否需要更新,以避免在不执行更新的情况下获取副本的常见情况下获取写锁。在执行更新之前,在写锁下第二次检查ISR更新条件

3、在保持ISR写锁的同时,处理各种其他操作,如leader更改。这可能会在生成和副本获取请求中引入延迟,但这些操作通常不常见

4、使用ISR读锁同步HW更新

5、锁用于防止在ReplicaAlterDirThread执行时更新follower副本。可以使用ReplaceCurrentWithFutureReplica()用未来的副本替换follower副本。

1、appendRecordsToLeader

ReplicaManager调用了Partition的该方法来继续执行数据的追加操作

  def appendRecordsToLeader(...): LogAppendInfo = {val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {leaderLogIfLocal match {case Some(leaderLog) =>//最小的ISRval minIsr = effectiveMinIsr(leaderLog)val inSyncSize = partitionState.isr.size//如果没有足够的insync副本来保证安全,请避免写信给leader//如果目前的ids < 最小的isr 要求,且还要求了 acks = -1 就直接返回异常if (inSyncSize < minIsr && requiredAcks == -1) {//这个 topic 的 partition 当前的isr 集合 为 partitionState.isr  不足以满足  min.isr 要求throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")}//将追加数据的任务委托给leaderLog既:UnifiedLogval info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,interBrokerProtocolVersion, requestLocal, verificationGuard)//我们可能需要增加高水位,因为ISR可能会降至1(info, maybeIncrementLeaderHW(leaderLog))case None =>//抛出异常:不能在该broker为xx分区的ledaer写入数据}}info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)}

五、UnifiedLog

本地和分层日志段的统一视图。

1、appendAsLeader

  //将此消息集附加到本地日志的活动段 segment ,分配偏移量和分区前导纪元def appendAsLeader(...): LogAppendInfo = {//这里需要验证log的来源,从而决定是否有对offset验证的必要//当下log是来自客户端,要写入的对象是RAFT_LEADER,筏头的意思,kafka中有一个HW的概念//我理解的意思是:此时要写入的数据是在HW之上的,因此称之为木筏leader//总共有四种来源:既REPLICATION(副本)、COORDINATOR(组协调员和事务)、CLIENT(客户端)、RAFT_LEADER(leader)val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER//将此消息集附加到本地日志的活动段,必要时滚动到新段。//此方法通常负责为消息分配偏移量,但是如果传递了assignOffsets=false标志,我们将只检查现有的偏移量是否有效。append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)}

2、append 

  private def append(...): LogAppendInfo = {//我们希望确保在将任何日志数据写入磁盘之前,将分区元数据文件写入日志目录。//这将确保在发生故障时,可以使用正确的topic ID恢复任何日志数据。//可能要写元数据了,这里用了flush,因为元数据比较重要,数据到pagecache后需要直接写入磁盘maybeFlushMetadataFile()//对要追加的数据进行解析和校验,校验的有以下几点//1、每条消息都与其CRC匹配,循环冗余校验//2、每个消息大小是否有效//3、传入记录批的序列号与现有状态一致//4、offset是否单调递增//也需要计算下面纬度的数量://1、第一条消息的 offset//2、最后一条消息的offset//3、消息条数//4、有效字节数//5、偏移量是否单调递增//6、是否使用了任何压缩编解码器(如果使用了多个,则给出最后一个)val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)// 如果我们没有有效的消息,或者这是最后一个附加条目的重复,则返回if (appendInfo.validBytes <= 0) appendInfoelse {// 在将任何无效字节或部分消息附加到磁盘日志之前,请先对其进行修剪var validRecords = trimInvalidBytes(records, appendInfo)// 它们是有效的,请将其插入日志中lock synchronized {maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {//主要是检查 log 的日志文件 的 mmap 是否关闭localLog.checkIfMemoryMappedBufferClosed()if (validateAndAssignOffsets) {//为消息集分配偏移量val offset = PrimitiveRef.ofLong(localLog.logEndOffset)appendInfo.setFirstOffset(offset.value)val validateAndOffsetAssignResult = try {val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())val validator = new LogValidator(validRecords,topicPartition,time,appendInfo.sourceCompression,targetCompression,config.compact,config.recordVersion.value,config.messageTimestampType,config.messageTimestampBeforeMaxMs,config.messageTimestampAfterMaxMs,leaderEpoch,origin,interBrokerProtocolVersion)validator.validateMessagesAndAssignOffsets(offset,validatorMetricsRecorder,requestLocal.getOrElse(throw new IllegalArgumentException("requestLocal should be defined if assignOffsets is true")).bufferSupplier)} catch {//......}validRecords = validateAndOffsetAssignResult.validatedRecordsappendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)appendInfo.setLastOffset(offset.value - 1)appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)// 如果消息大小有可能发生变化(由于重新压缩或消息格式转换),则对其进行电子验证if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {validRecords.batches.forEach { batch =>if (batch.sizeInBytes > config.maxMessageSize) {// 我们记录原始消息集大小,而不是修剪后的大小,以与预压缩字节RejectedRate记录保持一致brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")}}}} else {//我们 使用 自己给的offsetsif (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {// 如果日志为空,我们仍然可以恢复,例如:从未批处理对齐的leader上的日志开始偏移量中获取,这可能是由于AdminClient#deleteRecords()造成的val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffsetval firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"throw new UnexpectedAppendOffsetException(...)}}// 用领导者标记在消息上的纪元更新纪元缓存validRecords.batches.forEach { batch =>if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)} else {// 在部分升级场景中,我们可能会对消息格式进行临时回归。为了确保领导人选举的安全性,我们清除了纪元缓存,以便在下一次领导人选举后恢复到 HW 截断。leaderEpochCache.filter(_.nonEmpty).foreach { cache =>warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")cache.clearAndFlush()}}}// 检查消息集大小可能超过config.segmentSize  这一批次消息的大小不能超过 整个段的大小if (validRecords.sizeInBytes > config.segmentSize) {throw new RecordBatchTooLargeException(......)}//如果该段已满,可能会滚动日志val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)val logOffsetMetadata = new LogOffsetMetadata(appendInfo.firstOrLastOffsetOfFirstBatch,segment.baseOffset,segment.size)// 现在我们有了有效的记录、分配的偏移量和更新的时间戳,我们需要验证生产者的幂等/事务状态,并收集一些元数据val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(logOffsetMetadata, validRecords, origin, verificationGuard)maybeDuplicate match {case Some(duplicate) =>appendInfo.setFirstOffset(duplicate.firstOffset)appendInfo.setLastOffset(duplicate.lastOffset)appendInfo.setLogAppendTime(duplicate.timestamp)appendInfo.setLogStartOffset(logStartOffset)case None =>//追加记录,并在追加后立即递增本地日志结束偏移量,因为对下面事务索引的写入可能会失败,// 我们希望确保未来追加的偏移量仍然单调增长。恢复日志目录后,将清理由此产生的事务索引不一致。// 请注意,如果事务索引的追加失败,ProducerStateManager的结束偏移量将不会更新,最后一个稳定偏移量也不会前进。//这里就开始将记录插入 log 和索引文件了 并更新最后的 offsetlocalLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)updateHighWatermarkWithLogEndOffset()//更新生产者状态updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))// 用真实的最后一个稳定偏移量更新事务索引。// 使用READ_COMMITTED的消费者可见的最后一个偏移量将受到此值和高水印的限制。completedTxns.foreach { completedTxn =>val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)segment.updateTxnIndex(completedTxn, lastStableOffset)producerStateManager.completeTxn(completedTxn)}// 始终更新最后一个生产者id映射偏移量,以便快照反映当前偏移量,即使没有写入任何幂等数据producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)// 更新第一个不稳定偏移量(用于计算LSO)maybeIncrementFirstUnstableOffset()trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +s"first offset: ${appendInfo.firstOffset}, " +s"next offset: ${localLog.logEndOffset}, " +s"and messages: $validRecords")//localLog.unflushedMessages   :  还没有 flush 的消息数//config.flushInterval 为  flush.messages配置  默认 Long.MAX_VALUE//说明://此设置允许指定一个间隔,在该间隔内,我们将强制对写入日志的数据进行fsync。// 例如,如果将其设置为1,我们将在每条消息后进行fsync;如果是5,我们将在每5条消息后进行fsync。// 一般来说,我们建议您不要设置此选项,而是使用副本来提高持久性,并允许操作系统的后台刷新功能,因为它更高效。// 此设置可以按topic覆盖(请参阅<a href=\“#topicconfigs\”>按主题配置部分</a  很灵活,可以为topic设置不同的 flush策略if (localLog.unflushedMessages >= config.flushInterval) flush(false)}appendInfo}}}}

六、LocalLog

用于在本地存储消息的仅追加日志。日志是一系列LogSegments,每个LogSegments都有一个基本偏移。根据可配置的策略创建新的日志段,该策略控制给定段的字节大小或时间间隔。

因此数据寻址的逻辑是:基地址+偏移地址(基地址找到具体的LogSegment,再根据偏移地址找到文件中的具体数据)

  private[log] def append(...): Unit = {//向本地活动段中追加数据segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)//更新LEO:既log末尾的offsetupdateLogEndOffset(lastOffset + 1)}

七、 LogSegment

日志的一部分。每个段有两个组成部分:日志和索引。日志是一个包含实际消息的FileRecords。该索引是一个从逻辑偏移映射到物理文件位置的OffsetIndex。每个段都有一个基本偏移量,该偏移量<=该段中任何消息的最小偏移量,>任何先前段中的任何偏移量。

基偏移量为[base_offset]的段将存储在两个文件中,一个[base_ooffset].index和一个[base_offset].log文件。

    public void append(long largestOffset,long largestTimestampMs,long shallowOffsetOfMaxTimestamp,MemoryRecords records) throws IOException {if (records.sizeInBytes() > 0) {//在位置{}的末尾偏移量{}处插入{}个字节,在偏移量{}处插入最大的时间戳{}LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);int physicalPosition = log.sizeInBytes();if (physicalPosition == 0)rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);ensureOffsetInRange(largestOffset);//数据真正写的地方long appendedBytes = log.append(records);LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);// 更新内存中的最大时间戳和相应的偏移量。if (largestTimestampMs > maxTimestampSoFar()) {maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);}// 在索引中附加一个条目(如果需要)//这证明并不是每次写数据都会向索引中写入标记,因此索引指向了一段数据//需要累计写入的数据 > 索引中条目的大致字节数(index.interval.bytes 默认4096字节)if (bytesSinceLastIndexEntry > indexIntervalBytes) {offsetIndex().append(largestOffset, physicalPosition);timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());bytesSinceLastIndexEntry = 0;}bytesSinceLastIndexEntry += records.sizeInBytes();}}

1、log追加

FileRecords

由文件支持的 Records 实现。可以将可选的开始和结束位置应用于此实例,以允许对一系列日志记录进行切片。

    public int append(MemoryRecords records) throws IOException {int written = records.writeFullyTo(channel);size.getAndAdd(written);return written;}

MemoryRecords

 由ByteBuffer支持的 Records实现。这仅用于读取或就地修改记录批的现有缓冲区。

    //将所有记录写入给定通道(包括部分记录)。public int writeFullyTo(GatheringByteChannel channel) throws IOException {buffer.mark();int written = 0;//并没有调 flush ,这里只是写入了 pagecache 中 //需要考内核的机制将其flush到磁盘 (linux系统中可以配置参数 当到达总内存的多少后或者脏页机制触发去写)while (written < sizeInBytes())written += channel.write(buffer);buffer.reset();return written;}

2、offsetIndex追加

OffsetIndex

   public void append(long offset, int position) {lock.lock();try {if (isFull())throw new IllegalArgumentException(...);if (entries() == 0 || offset > lastOffset) {log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath());//利用mmap的特性将数据写入内核的pagecachemmap().putInt(relativeOffset(offset));mmap().putInt(position);incrementEntries();lastOffset = offset;if (entries() * ENTRY_SIZE != mmap().position())throw new IllegalStateException(...);} elsethrow new InvalidOffsetException(...);} finally {lock.unlock();}}

3、timeIndex追加

TimeIndex

    public void maybeAppend(long timestamp, long offset) {maybeAppend(timestamp, offset, false);}public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {lock.lock();try {if (!skipFullCheck && isFull())throw new IllegalArgumentException(...);// 当偏移量等于最后一个条目的偏移量时,我们不会抛出异常。这意味着我们正试图插入与最后一个条目相同的时间索引条目。// 如果要插入的时间戳索引条目与最后一个条目相同,我们只需忽略插入,因为这可能会在以下两种情况下发生:// 1.日志段关闭了// 2.当滚动活动日志段时,会调用LogSegment.onBecomeActiveSegment()。if (entries() != 0 && offset < lastEntry.offset)throw new InvalidOffsetException(...);if (entries() != 0 && timestamp < lastEntry.timestamp)throw new IllegalStateException(...);// 只有当时间戳大于最后插入的时间戳时,我们才会附加到时间索引。// 如果所有消息都是消息格式v0,则时间戳将始终为NoTimestamp。在这种情况下,时间索引将为空。if (timestamp > lastEntry.timestamp) {log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath());//同样是调用mmap将数据写入pagecacheMappedByteBuffer mmap = mmap();mmap.putLong(timestamp);mmap.putInt(relativeOffset(offset));incrementEntries();this.lastEntry = new TimestampOffset(timestamp, offset);if (entries() * ENTRY_SIZE != mmap.position())throw new IllegalStateException(...);}} finally {lock.unlock();}}

八、总结

1、producer调用send发送数据

2、kafka调用对应的api进行处理

3、获取请求体中的数据

4、校验是否有有效数据,如果没有立即返回

5、调用副本管理器(ReplicaManager)将数据进行追加

6、校验事务完成后处理回调,执行真正的数据追加

7、对acks进行校验(必须是0、1、-1)

8、循环处理这次请求中的每个topic、partition,调用Partition进行数据追加

9、获取最小的ISR以及可以正常写的副本数量,如果存活的副本节点数量<最小ISR数量,且请求中的acks=-1,里面抛出异常

10、将数据委托给UnifiedLog进行追加

11、根据数据的来源对offset设置校验等级

12、写入元数据(因为元数据重要,因此要立马flush到磁盘)

13、再次对数据进行解析和校验(CRC、消息大小是否有效、序列号是否一致、offset是否单调递增)

14、校验日志的mmap是否关闭

15、为数据分配offset

16、校验数据大小是否超过了段大小

17、委托给LocalLog向本地活动段追加数据

18、委托给LogSegment进行数据追加

19、调用FileRecords、MemoryRecords将数据写入pagecache

20、判断累计数据量是否>index.interval.bytes 默认4096字节,如果大于开始写入索引

21、调用OffsetIndex将offset索引利用mmap写入pagecache

22、调用TimeIndex将time offset索引利用mmap写入pagecache(time的索引执行了offset索引,offset索引指向了真正位置)

23、更新该broker中的HW和LEO

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Axure网络短剧APP端原型图,竖屏微剧视频模版40页

作品概况 页面数量&#xff1a;共 40 页 使用软件&#xff1a;Axure RP 9 及以上&#xff0c;非软件无源码 适用领域&#xff1a;短剧、微短剧、竖屏视频 作品特色 本作品为网络短剧APP的Axure原型设计图&#xff0c;定位属于免费短剧软件&#xff0c;类似红果短剧、河马剧场…

Windows,虚拟机Ubuntu和开发板三者之间的NFS服务器搭建

Windows,虚拟机Ubuntu和开发板三者之间的NFS服务器搭建 &#xff08;1&#xff09;虚拟机 ubuntu 要使用桥接模式&#xff0c;不能使用其他模式 &#xff08;2&#xff09;通过网线将PC和开发板网口直连:这样的连接&#xff0c;开发板是无法连接外网的 &#xff08;3&#xff…

C# 有趣的小程序—桌面精灵详细讲解

C# 桌面精灵详细讲解 最近写了一个简化版桌面精灵&#xff0c;效果如图所示&#xff0c;可以实现切换动画&#xff0c;说话、鼠标拖动&#xff0c;等功能。具体如何做&#xff0c;我发布了一个资源里面包含ppt详解、源代码以及动画素材。放心吧&#xff0c;免费的&#xff0c;…

视觉SLAM数学基础

本文系统梳理从相机成像模型&#xff0c;通过不同图像帧之间的构造几何约束求解位姿变换&#xff0c;再根据位姿变换和匹配点还原三维坐标的过程&#xff0c;可以作为基于特征点法的视觉SLAM的数学基础。 1、相机成像模型 1.1、针孔相机模型 实际相机的成像方式通常很复杂&a…

计算机新手练级攻略——如何搜索问题

目录 计算机学生新手练级攻略——如何搜索问题1.明确搜索意图2.使用精确关键词3.使用专业引擎搜索4.利用好技术社区1. Stack Overflow2. GitHub3. IEEE Xplore4. DBLP 5.使用代码搜索工具1. GitHub 代码搜索2. Stack Overflow 代码搜索3. Papers with Code4. IEEE Xplore 6.查阅…

51c自动驾驶~合集10

我自己的原文哦~ https://blog.51cto.com/whaosoft/11638131 #端到端任务 说起端到端&#xff0c;每个从业者可能都觉得会是下一代自动驾驶量产方案绕不开的点&#xff01;特斯拉率先吹响了方案更新的号角&#xff0c;无论是完全端到端&#xff0c;还是专注于planner的模型&a…

大模型日报|6 篇必读的大模型论文

1.华为推出科学智能体 Agent K v1.0&#xff0c;已达 Kaggle 大师水平 在这项工作中&#xff0c;来自华为诺亚方舟实验室和伦敦大学学院的研究团队提出了 Agent K v1.0&#xff0c;它是一个端到端自主数据科学智能体&#xff08;agent&#xff09;&#xff0c;旨在对各种数据科…

MySQL核心业务大表归档过程

记录一下2年前的MySQL大表的归档&#xff0c;当时刚到公司&#xff0c;发现MySQL的业务核心库&#xff0c;超过亿条的有7张表&#xff0c;最大的表有9亿多条&#xff0c;有37张表超过5百万条&#xff0c;部分表行数如下&#xff1a; 在测试的MySQL环境 &#xff1a; pt-archiv…

cache(二)直接缓存映射

在知乎发现一份不错得学习资料 请教CPU的cache中关于line,block,index等的理解&#xff1f; PPT 地址 https%3A//cs.slu.edu/%7Efritts/CSCI224_S15/schedule/chap6-cache-memory.pptx 课程主页 https://cs.slu.edu/~fritts/CSCI224_S15/schedule/ 0. 缓存定义 这张图展示了缓…

光流法(Optical Flow)

一、简介 光流法&#xff08;Optical Flow&#xff09;是一种用于检测图像序列中像素运动的计算机视觉技术。其基于以下假设&#xff1a; 1.亮度恒定性假设&#xff1a;物体在运动过程中&#xff0c;其像素值在不同帧中保持不变。 2.空间和时间上的连续性&#xff1a;相邻像素之…

打造自己的RAG解析大模型:(可商用)智能文档服务上线部署

通用版面分析介绍 版面解析是一种将文档图像转化为机器可读数据格式的技术&#xff0c;广泛应用于文档管理和信息提取等领域。通过结合OCR、图像处理和机器学习&#xff0c;版面解析能够识别文档中的文本块、图片、表格等版面元素&#xff0c;最终生成结构化数据&#xff0c;大…

【MySQL】MySQL基础知识复习(下)

前言 上一篇博客介绍了MySQL的库操作&#xff0c;表操作以及CRUD。 【MySQL】MySQL基础知识复习&#xff08;上&#xff09;-CSDN博客 本篇将进一步介绍CRUD操作&#xff0c;尤其是查找操作 目录 一.数据库约束 1.约束类型 1.1NULL约束 1.2UNIQUE&#xff1a;唯一约束 …

新的服务器Centos7.6 安卓基础的环境配置(新服务器可直接粘贴使用配置)

常见的基础服务器配置之Centos命令 正常来说都是安装一个docker基本上很多问题都可以解决了&#xff0c;我基本上都是通过docker去管理一些容器如&#xff1a;mysql、redis、mongoDB等之类的镜像&#xff0c;还有一些中间件如kafka。下面就安装一个 docker 和 nginx 的相关配置…

性能测试|JMeter接口与性能测试项目

前言 在软件开发和运维过程中&#xff0c;接口性能测试是一项至关重要的工作。JMeter作为一款开源的Java应用&#xff0c;被广泛用于进行各种性能测试&#xff0c;包括接口性能测试。本文将详细介绍如何使用JMeter进行接口性能测试的过程和步骤。 JMeter是Apache组织开发的基…

linux物理内存管理:node,zone,page

一、总览 对于物理内存内存&#xff0c;linux对内存的组织逻辑从上到下依次是&#xff1a;node&#xff0c;zone&#xff0c;page&#xff0c;这些page是根据buddy分配算法组织的&#xff0c;看下面两张图&#xff1a; 上面的概念做下简单的介绍&#xff1a; Node&#xff1a…

Pr:视频过渡快速参考(合集 · 2025版)

Adobe Premiere Pro 自带七组约四十多个视频过渡 Video Transitions效果&#xff0c;包含不同风格和用途&#xff0c;可在两个剪辑之间创造平滑、自然的转场&#xff0c;用来丰富时间、地点或情绪的变化。恰当地应用过渡可让观众更好地理解故事或人物。 提示&#xff1a; 点击下…

使用vscode 连接linux进行开发

1. 在Vscode中安装扩展功能remote ssh 2. 打开命令窗口 3. 在弹出的命令窗口输入ssh&#xff0c;并从弹出的提示中选择 Add New SSH Host 4. 在弹出的输入窗口中输入类似下面形式的 连接地址&#xff1a; 5. 输入回车后出现下面的对话框&#xff0c;这个对话框是说你要用哪个…

面试击穿mysql

Mysql三大范式: 第一范式&#xff08;1NF&#xff09;&#xff1a; 不符合第一范式的典型情况是在一个字段中存放多种不同类型的详细信息。例如&#xff0c;在商品表中&#xff0c;若将商品名称、价格和类型都存储在同一个字段中&#xff0c;会带来诸多弊端。首先&#xff0c;在…

excel功能

统计excel中每个名字出现的次数 在Excel中统计每个名字出现的次数&#xff0c;您可以使用COUNTIF函数或数据透视表。以下是两种方法的详细步骤&#xff1a; 方法一&#xff1a;使用COUNTIF函数 准备数据&#xff1a;确保您的姓名列表位于一个连续的单元格区域&#xff0c;例如…

单体架构 IM 系统之长轮询方案设计

在上一篇技术短文&#xff08;单体架构 IM 系统之核心业务功能实现&#xff09;中&#xff0c;我们讨论了 “信箱模型” 在单体架构 IM 系统中的应用&#xff0c;“信箱模型” 见下图。 客户端 A 将 “信件” 投入到客户端 B 的 “信箱” 中&#xff0c;然后客户端 B 去自己的 …