kafka源码阅读-Broker如何处理生产者的消息写入请求

概述

Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:

  1. 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。

  2. Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。

  3. Connect源码 :用来构建异构数据双向流式同步服务。

  4. Stream源码 :用来实现实时流处理相关功能。

  5. Raft源码 :实现了Raft一致性协议。

  6. Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。

  7. Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。

  8. Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。

  9. Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。

  10. Common模块 :包含各种异常类以及错误验证。

  11. Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。

  12. Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。

  13. Coordinator模块 :负责管理部分consumer group和他们的offset。

  14. Javaapi模块 :提供Java语言的Producer和Consumer的API接口。

  15. Log模块 :负责Kafka文件存储,读写所有Topic消息数据。

  16. Message模块 :封装多条数据组成数据集或压缩数据集。

  17. Metrics模块 :负责内部状态监控。

  18. Network模块 :处理客户端连接,网络事件模块。

  19. Producer模块 :生产者细节实现,包括同步和异步消息发送。

  20. Security模块 :负责Kafka的安全验证和管理。

  21. Serializer模块 :序列化和反序列化消息内容。

  22. Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。

  23. Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。

  24. Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。

这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。

kafka源码分支为1.0.2

各种api请求处理类kafka.server.KafkaApis:

  /*** Handle a produce request*///处理来自生产者的请求def handleProduceRequest(request: RequestChannel.Request) {val produceRequest = request.body[ProduceRequest]val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes//事务消息if (produceRequest.isTransactional) {//判断有没有 Write 权限if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId))) {sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)return}// Note that authorization to a transactionalId implies ProducerId authorization} else if (produceRequest.isIdempotent && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)return}val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicPartition.topic))nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)elseauthorizedRequestInfo += (topicPartition -> memoryRecords)}// the callback for sending a produce response//回调函数def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponsesvar errorInResponse = falsemergedResponseStatus.foreach { case (topicPartition, status) =>if (status.error != Errors.NONE) {errorInResponse = truedebug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(request.header.correlationId,request.header.clientId,topicPartition,status.error.exceptionName))}}def produceResponseCallback(bandwidthThrottleTimeMs: Int) {if (produceRequest.acks == 0) {// no operation needed if producer request.required.acks = 0; however, if there is any error in handling// the request, since no response is expected by the producer, the server will close socket server so that// the producer client will know that some error has happened and will refresh its metadata//若client设置的 request.required.acks=0, 如果 server 在处理的过程出现了错误,那么就会关闭 socket 连接来间接地通知 client// client 会重新刷新 meta,重新建立相应的连接if (errorInResponse) {val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>topicPartition -> status.error.exceptionName}.mkString(", ")info(s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +s"from client id ${request.header.clientId} with ack=0\n" +s"Topic and partition to exceptions: $exceptionsSummary")//关闭连接closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)} else {sendNoOpResponseExemptThrottle(request)}} else {sendResponseMaybeThrottle(request, requestThrottleMs =>new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs))}}// When this callback is triggered, the remote API call has completedrequest.apiRemoteCompleteTimeNanos = time.nanosecondsquotas.produce.maybeRecordAndThrottle(request.session.sanitizedUser,request.header.clientId,numBytesAppended,produceResponseCallback)}def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {processingStats.foreach { case (tp, info) =>updateRecordsProcessingStats(request, tp, info)}}if (authorizedRequestInfo.isEmpty)sendResponseCallback(Map.empty)else {val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId// call the replica manager to append messages to the replicas//调用replicaManager.appendRecords()向副本添加日志replicaManager.appendRecords(timeout = produceRequest.timeout.toLong,requiredAcks = produceRequest.acks,internalTopicsAllowed = internalTopicsAllowed,isFromClient = true,entriesPerPartition = authorizedRequestInfo,responseCallback = sendResponseCallback,processingStatsCallback = processingStatsCallback)// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;// hence we clear its data here in order to let GC reclaim its memory since it is already appended to logproduceRequest.clearPartitionRecords()}}

kafkaServer.startup()方法中会创建ReplicaManager对象并调用其startup()方法:

 //副本管理器/* start replica manager */replicaManager = createReplicaManager(isShuttingDown)replicaManager.startup()protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower,brokerTopicStats, metadataCache, logDirFailureChannel)

ReplicaManager类:

//副本管理器,主要负责管理这台broker的所有分区副本的读写操作以及副本相关的管理任务。
//每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。
//ReplicaManager 的并不负责具体的日志创建,它只是管理 Broker 上的所有分区(也就是图中下一步的那个 Partition 对象)。
//在创建 Partition 对象时,它需要 ReplicaManager 的 logManager 对象,Partition 会通过这个 logManager 对象为每个 replica 创建对应的日志。
class ReplicaManager(val config: KafkaConfig,metrics: Metrics,time: Time,val zkUtils: ZkUtils,scheduler: Scheduler,val logManager: LogManager,val isShuttingDown: AtomicBoolean,quotaManager: ReplicationQuotaManager,val brokerTopicStats: BrokerTopicStats,val metadataCache: MetadataCache,logDirFailureChannel: LogDirFailureChannel,val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {/*** Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;* the callback function will be triggered either when timeout or the required acks are satisfied;* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.*///向分区的leader副本写入日志,并等待同步到其他副本。如果满足了acks参数或超时了,会触发回调函数调用。def appendRecords(timeout: Long,requiredAcks: Short,internalTopicsAllowed: Boolean,isFromClient: Boolean,entriesPerPartition: Map[TopicPartition, MemoryRecords],responseCallback: Map[TopicPartition, PartitionResponse] => Unit,delayedProduceLock: Option[Lock] = None,processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {//校验acks参数,只能为其中一种:-1,1,0if (isValidRequiredAcks(requiredAcks)) {val sTime = time.milliseconds//向本地的副本 log 追加数据val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,isFromClient = isFromClient, entriesPerPartition, requiredAcks)debug("Produce to local log in %d ms".format(time.milliseconds - sTime))val produceStatus = localProduceResults.map { case (topicPartition, result) =>topicPartition ->ProducePartitionStatus(result.info.lastOffset + 1, // required offsetnew PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status}processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))处理 acks=-1 的情况,需要等到 isr 的所有follower副本都写入成功的话,才能返回最后结果if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {// create delayed produce operationval produceMetadata = ProduceMetadata(requiredAcks, produceStatus)//延迟 produce 请求val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)// create a list of (topic, partition) pairs to use as keys for this delayed produce operationval producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq// try to complete the request immediately, otherwise put it into the purgatory// this is because while the delayed produce operation is being created, new// requests may arrive and hence make this operation completable.delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)} else {//若不是acks=-1的情况,可以通过回调函数直接返回结果// we can respond immediatelyval produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)responseCallback(produceResponseStatus)}} else {// If required.acks is outside accepted range, something is wrong with the client// Just return an error and don't handle the request at allval responseStatus = entriesPerPartition.map { case (topicPartition, _) =>topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)}responseCallback(responseStatus)}}/*** Append the messages to the local replica logs*///向本地的 日志副本 写入数据private def appendToLocalLog(internalTopicsAllowed: Boolean,isFromClient: Boolean,entriesPerPartition: Map[TopicPartition, MemoryRecords],requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {trace(s"Append [$entriesPerPartition] to local log")//遍历要写的所有 topic-partitionentriesPerPartition.map { case (topicPartition, records) =>brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()// reject appending to internal topics if it is not allowed//只有当clientId==__admin_client时(管理员命令,对应internalTopicsAllowed=true),才能向 kafka 的内部 topic 追加数据if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo,Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))} else {try {//查找对应的 Partitionval partitionOpt = getPartition(topicPartition)val info = partitionOpt match {case Some(partition) =>//partition为OfflinePartition,返回异常if (partition eq ReplicaManager.OfflinePartition) {throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")}//向分区对应的leader副本写入数据partition.appendRecordsToLeader(records, isFromClient, requiredAcks)//在当前broker没找到这个分区,返回异常case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))}val numAppendedMessages =if (info.firstOffset == -1L || info.lastOffset == -1L)0elseinfo.lastOffset - info.firstOffset + 1// update stats for successfully appended bytes and messages as bytesInRate and messageInRate//更新 metricsbrokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d".format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))(topicPartition, LogAppendResult(info))} catch {// NOTE: Failed produce requests metric is not incremented for known exceptions// it is supposed to indicate un-expected failures of a broker in handling a produce requestcase e@ (_: UnknownTopicOrPartitionException |_: NotLeaderForPartitionException |_: RecordTooLargeException |_: RecordBatchTooLargeException |_: CorruptRecordException |_: KafkaStorageException |_: InvalidTimestampException) =>(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))case t: Throwable =>val logStartOffset = getPartition(topicPartition) match {case Some(partition) =>partition.logStartOffsetcase _ =>-1}brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()error("Error processing append operation on partition %s".format(topicPartition), t)(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))}}}}}

Partition.appendRecordsToLeader()方法:

  //Partition 类的 appendRecordsToLeader() 方法是处理生产者发送的消息并将其追加到 Leader 副本日志中的关键方法。// 这个方法由 Kafka 的 ReplicaManager 调用,用于确保消息被正确地写入到 Leader 副本的日志中,并且 Follower 副本能够从 Leader 中同步数据def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {leaderReplicaIfLocal match {case Some(leaderReplica) =>//获取对应的 Log 对象, 一个Log对象对应一个主题分区副本的目录路径。val log = leaderReplica.log.get//min.insync.replicas配置值,即isr副本数val minIsr = log.config.minInSyncReplicasval inSyncSize = inSyncReplicas.size// Avoid writing to leader if there are not enough insync replicas to make it safe//如果当前分区的isr副本数小于预期值,且producer client设置的acks=-1,则抛出异常if (inSyncSize < minIsr && requiredAcks == -1) {throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]".format(topicPartition, inSyncSize, minIsr))}//向分区leader副本对应的 log对象追加数据,//(一个Log对象对应机器上的一个topic-partition目录,里面有多个logSegment文件,以及包括对应的 offset 索引和时间戳索引文件)val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)// probably unblock some follower fetch requests since log end offset has been updatedreplicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))// we may need to increment high watermark since ISR could be down to 1//可能需要增加高水位(HW)的值。//高水位标记了消费者可以看到的消息的最大偏移量。Leader 副本使用 HW 来确定哪些消息可以被消费者读取。Follower 副本在发送 Fetch 请求时,// 也会提供自己的 HW 信息,以便 Leader 知道 Follower 的同步状态。(info, maybeIncrementLeaderHW(leaderReplica))//此分区对应的leader副本不在本broker,返回异常case None =>throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))}}// some delayed operations may be unblocked after HW changedif (leaderHWIncremented)tryCompleteDelayedRequests()info}

appendRecordsToLeader()接着会调用Log类的append()方法:

  /*** Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs* @param records The records to append* @param isFromClient Whether or not this append is from a producer* @throws KafkaStorageException If the append fails due to an I/O error.* @return Information about the appended messages including the first and last offset.*/def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {append(records, isFromClient, assignOffsets = true, leaderEpoch)}/*** Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.** This method will generally be responsible for assigning offsets to the messages,* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.** @param records The log records to append* @param isFromClient Whether or not this append is from a producer* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader* @throws KafkaStorageException If the append fails due to an I/O error.* @throws OffsetsOutOfOrderException If out of order offsets found in 'records'* @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset* @return Information about the appended messages including the first and last offset.*/// 负责将消息追加到日志文件中的核心方法。这个方法被 ReplicaManager 调用,以处理来自生产者的消息写入请求// 向active segment 追加 log,必要的情况下,滚动创建新的 segmentprivate def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {//分析和验证将要被写入到 Kafka 分区日志中的消息记录是否满足要求,如CRC校验和、消息大小是否超过最大限制值等val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)// return if we have no valid messages or if this is a duplicate of the last appended entry//若没有有效消息,直接返回if (appendInfo.shallowCount == 0)return appendInfo//删除这批消息中无效的消息// trim any invalid bytes or partial messages before appending it to the on-disk logvar validRecords = trimInvalidBytes(records, appendInfo)// they are valid, insert them in the loglock synchronized {checkIfMemoryMappedBufferClosed()if (assignOffsets) {// assign offsets to the message set//计算这个消息集起始 offset,对 offset 的操作是一个原子操作val offset = new LongRef(nextOffsetMetadata.messageOffset)appendInfo.firstOffset = offset.value//设置的时间以 server 收到的时间戳为准val now = time.millisecondsval validateAndOffsetAssignResult = try {LogValidator.validateMessagesAndAssignOffsets(validRecords,offset,time,now,appendInfo.sourceCodec,appendInfo.targetCodec,config.compact,config.messageFormatVersion.messageFormatVersion.value,config.messageTimestampType,config.messageTimestampDifferenceMaxMs,leaderEpoch,isFromClient)} catch {case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)}validRecords = validateAndOffsetAssignResult.validatedRecordsappendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestampappendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampappendInfo.lastOffset = offset.value - 1appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStatsif (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)appendInfo.logAppendTime = now// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message// format conversion)//更新 metrics 的记录if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {for (batch <- validRecords.batches.asScala) {if (batch.sizeInBytes > config.maxMessageSize) {// we record the original message set size instead of the trimmed size// to be consistent with pre-compression bytesRejectedRate recordingbrokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d.".format(batch.sizeInBytes, config.maxMessageSize))}}}} else {// we are taking the offsets we are givenif (!appendInfo.offsetsMonotonic)throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +records.records.asScala.map(_.offset))if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) {// we may still be able to recover if the log is empty// one example: fetching from log start offset on the leader which is not batch aligned,// which may happen as a result of AdminClient#deleteRecords()// appendInfo.firstOffset maybe either first offset or last offset of the first batch.// get the actual first offset, which may require decompressing the dataval firstOffset = records.batches.asScala.head.baseOffset()throw new UnexpectedAppendOffsetException(s"Unexpected offset in append to $topicPartition. First offset or last offset of the first batch " +s"${appendInfo.firstOffset} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",firstOffset, appendInfo.lastOffset)}}// update the epoch cache with the epoch stamped onto the message by the leadervalidRecords.batches.asScala.foreach { batch =>if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset)}// check messages set size may be exceed config.segmentSize//检查消息集大小是否超过segment.bytes(单个logSegment文件最大大小),若超过则返回异常if (validRecords.sizeInBytes > config.segmentSize) {throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d.".format(validRecords.sizeInBytes, config.segmentSize))}// now that we have valid records, offsets assigned, and timestamps updated, we need to// validate the idempotent/transactional state of the producers and collect some metadataval (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)maybeDuplicate.foreach { duplicate =>appendInfo.firstOffset = duplicate.firstOffsetappendInfo.lastOffset = duplicate.lastOffsetappendInfo.logAppendTime = duplicate.timestampappendInfo.logStartOffset = logStartOffsetreturn appendInfo}// maybe roll the log if this segment is full//如果当前的 LogSegment 满了,就需要重新新建一个 segmentval segment = maybeRoll(messagesSize = validRecords.sizeInBytes,maxTimestampInMessages = appendInfo.maxTimestamp,maxOffsetInMessages = appendInfo.lastOffset)val logOffsetMetadata = LogOffsetMetadata(messageOffset = appendInfo.firstOffset,segmentBaseOffset = segment.baseOffset,relativePositionInSegment = segment.size)//向activeSegment写入数据segment.append(firstOffset = appendInfo.firstOffset,largestOffset = appendInfo.lastOffset,largestTimestamp = appendInfo.maxTimestamp,shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,records = validRecords)// update the producer statefor ((producerId, producerAppendInfo) <- updatedProducers) {producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)producerStateManager.update(producerAppendInfo)}// update the transaction index with the true last stable offset. The last offset visible// to consumers using READ_COMMITTED will be limited by this value and the high watermark.for (completedTxn <- completedTxns) {val lastStableOffset = producerStateManager.completeTxn(completedTxn)segment.updateTxnIndex(completedTxn, lastStableOffset)}// always update the last producer id map offset so that the snapshot reflects the current offset// even if there isn't any idempotent data being writtenproducerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)// increment the log end offset//更新LEO,即 nextOffsetMetadata.messageOffsetupdateLogEndOffset(appendInfo.lastOffset + 1)// update the first unstable offset (which is used to compute LSO)updateFirstUnstableOffset()trace(s"Appended message set to log with last offset ${appendInfo.lastOffset} " +s"first offset: ${appendInfo.firstOffset}, " +s"next offset: ${nextOffsetMetadata.messageOffset}, " +s"and messages: $validRecords")//若自从上一次刷新到现在的未刷盘消息已经满足了flush.messages配置的值,则需要刷新磁盘if (unflushedMessages >= config.flushInterval)flush()appendInfo}}}

Log.append()中会调用LogSegment.append()方法向最新的activeSegment写入消息:

  /*** Append the given messages starting with the given offset. Add* an entry to the index if needed.** It is assumed this method is being called from within a lock.** @param firstOffset The first offset in the message set.* @param largestOffset The last offset in the message set* @param largestTimestamp The largest timestamp in the message set.* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.* @param records The log entries to append.* @return the physical position in the file of the appended records*///在指定的 offset 处追加指定的 messages, 需要的情况下追加相应的索引@nonthreadsafedef append(firstOffset: Long,largestOffset: Long,largestTimestamp: Long,shallowOffsetOfMaxTimestamp: Long,records: MemoryRecords): Unit = {if (records.sizeInBytes > 0) {trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d".format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))val physicalPosition = log.sizeInBytes()if (physicalPosition == 0)rollingBasedTimestamp = Some(largestTimestamp)// append the messagesrequire(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")//追加到数据文件中val appendedBytes = log.append(records)trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")// Update the in memory max timestamp and corresponding offset.if (largestTimestamp > maxTimestampSoFar) {maxTimestampSoFar = largestTimestampoffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp}// append an entry to the index (if needed)//判断是否需要追加索引(数据每次都会添加到数据文件中,但不是每次都会添加索引的,间隔 indexIntervalBytes 大小才会写入一个索引文件)if(bytesSinceLastIndexEntry > indexIntervalBytes) {index.append(firstOffset, physicalPosition)timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)bytesSinceLastIndexEntry = 0}bytesSinceLastIndexEntry += records.sizeInBytes}}

最后会调用FileRecords.append()方法,可以看到底层用的就是java NIO将消息写入磁盘:

 /*** Append log batches to the buffer* @param records The records to append* @return the number of bytes written to the underlying file*/public int append(MemoryRecords records) throws IOException {//通过java NIO的channel.write写入消息对应的ByteBufferint written = records.writeFullyTo(channel);size.getAndAdd(written);return written;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

端到端自动驾驶科普向

从设计思想看&#xff0c;自动驾驶系统分为模块化和黑箱化两个大类。前者将系统分解为一系列功能独立的模块&#xff0c;每一个模块单独设计&#xff0c;组合到一起实现自动驾驶功能&#xff08;我们熟知的定位 感知 规划 控制 决策等模块&#xff09;&#xff1b;后者又称为端…

Python爬虫技术 第13节 HTML和CSS选择器

在爬虫技术中&#xff0c;解析和提取网页数据是核心部分。HTML 和 CSS 选择器被广泛用于定位网页中的特定元素。下面将详细介绍这些选择器如何在 Python 中使用&#xff0c;特别是在使用像 Beautiful Soup 或 Scrapy 这样的库时。 HTML 选择器 HTML 选择器基于 HTML 元素的属性…

基于微信小程序+SpringBoot+Vue的美食推荐平台(带1w+文档)

基于微信小程序SpringBootVue的美食推荐平台(带1w文档) 基于微信小程序SpringBootVue的流浪动物救助(带1w文档) 当微信小程序占领了多半江山&#xff0c;目前不分年龄和种族&#xff0c;使用频率最高&#xff0c;覆盖面积最广。使用人群使用的大多数都是微信小程序。目前国内最…

Kithara和Halcon (二)

Kithara使用Halcon QT 进行二维码实时识别 目录 Kithara使用Halcon QT 进行二维码实时识别Halcon 简介以及二维码检测的简要说明Halcon 简介Halcon的二维码检测功能 Qt应用框架简介项目说明关键代码抖动测试测试平台&#xff1a;测试结果&#xff1a; 开源源码 Halcon 简介以…

STM32CubeMX的介绍与简单使用

STM32CubeMX提供了一个直观的图形用户界面&#xff0c;允许用户通过简单的操作完成对STM32微控制器的配置&#xff0c;包括引脚分配、时钟配置、外设初始化等。专为STM32微控制器设计&#xff0c;旨在帮助开发者轻松配置和初始化STM32微控制器。用户可以通过拖拽和连接来配置芯…

关键词查找【Aho-Corasick 算法】

【全程干货】程序员必备算法&#xff01;AC自动机算法敏感词匹配算法&#xff01;动画演示讲解&#xff0c;看完轻松掌握&#xff0c;面试官都被你唬住&#xff01;&#xff01;_哔哩哔哩_bilibili 著名的多模匹配算法 引入依赖&#xff1a; <dependency><groupId>…

Vue3 Pinia/组件通信

2. pinaia 符合直觉的Vue.js状态管理库 集中式状态&#xff08;数据&#xff09;管理 官网 2.1 搭建pinaia环境 第一步&#xff1a;npm install pinia 第二步&#xff1a;操作src/main.ts import { createApp } from vue import App from ./App.vue/* 引入createPinia&…

37 Debian如何配置GlusterFS 10

作者:网络傅老师 特别提示:未经作者允许,不得转载任何内容。违者必究! Debian如何配置GlusterFS 10 《傅老师Debian知识库系列之37》——原创 ==前言== 傅老师Debian知识库特点: 1、拆解Debian实用技能; 2、所有操作在VMware虚拟机实测完成; 3、致力于最终形成Debian…

Java面试八股之什么是声明式事务管理,spring怎么实现声明式事务管理?

什么是声明式事务管理&#xff0c;spring怎么实现声明式事务管理&#xff1f; 声明式事务管理是一种编程范式&#xff0c;它允许开发人员通过声明性的配置或注解&#xff0c;而不是硬编码事务处理逻辑&#xff0c;来指定哪些方法或类应该在其上下文中执行事务。这种方法将事务…

13.CSS 打印样式表 悬停下划线动画

CSS 打印样式表 虽然我们不经常从网上实际打印内容,但打印样式表不应被忽视。它们可以用来确保你的网站内容以一种易读和适合打印的方式呈现。这里有一个简单的、独特的打印样式表,你可以用它作为自己的基础: media print {page {size: A4;}body {margin: 0;padding: 0;}body, …

【PHP】系统的登录和注册

一、为什么要学习系统的登录和注册 系统的登录和注册可能存在多种漏洞&#xff0c;这些漏洞可能被恶意攻击者利用&#xff0c;从而对用户的安全和隐私构成威胁。通过学习系统的登录和注册理解整个登录和注册的逻辑方便后续更好站在开发的角度思考问题发现漏洞。以下是一些常见…

Linux取消U盘自动挂载

Ubuntu 或其他GNOME桌面环境 打开“设置”&#xff1a; 点击桌面右上角的系统菜单&#xff0c;然后点击“设置”。 找到“可移动媒体”&#xff1a; 在设置窗口中&#xff0c;点击左侧的“可移动媒体”选项&#xff08;有些版本中&#xff0c;这个选项可能在“设备”或“文件”…

Husky 入门

Husky 是一个流行的 Node.js 工具&#xff0c;用于管理 Git 钩子。Git 钩子是在特定 Git 操作&#xff08;如提交、推送等&#xff09;发生时自动触发的脚本。Husky 允许你轻松地为你的项目添加这些钩子&#xff0c;以便在代码提交或推送之前自动执行检查、测试或其他任务。 安…

Eslint从安装到Vue项目配置

ESLint是一个静态代码分析工具&#xff0c;用于识别JavaScript代码中的模式&#xff0c;帮助开发者发现并修复代码中的问题。以下是从安装到在Vue 2项目中整合使用ESLint的详细步骤&#xff1a; 一、ESLint的安装 1. 全局安装&#xff08;可选&#xff09; 虽然全局安装ESLi…

JDK 21 中的虚拟线程与 Future

在 JDK 21 中&#xff0c;虚拟线程与 Future 的结合为异步编程提供了更强大和高效的解决方案。 Future 代表了异步计算的结果&#xff0c;通过它可以获取计算的状态和最终的结果。当与虚拟线程一起使用时&#xff0c;可以更灵活地管理和协调异步任务。 例如&#xff0c;在一个数…

C++第二十七弹---优先级队列的高级应用:结合仿函数优化性能

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1 priority_queue的介绍和使用 1.1 priority_queue的介绍 1.2 priority_queue的使用 2 仿函数的介绍和使用 2.1 仿函数的介绍 2.2 仿函数的…

Python升级打怪—Django入门

目录 一、Django简介 二、安装Django 三、创建Dajngo项目 (一) 创建项目 (二) 项目结构介绍 (三) 运行项目 (四) 结果 一、Django简介 Django是一个高级Python web框架&#xff0c;鼓励快速开发和干净、实用的设计。由经验丰富的开发人员构建&#xff0c;它解决了web开…

【文件fd】文件描述符fd | 文件描述表

目录 1.文件描述符fd 2.系统调用的0/1/2 3.C语言的stdin/stdout/stderr 4.系统调用的0/1/2和C语言的stdin/stout/stderr二者的关系❓ 5.文件描述表 5.1 文件描述符概念 5.3 文件对象strcut file 5.4 进程和文件对应关系 5.5 文件描述符理解 5.6 源码查看 1.文件描述…

谷粒商城实战笔记-55-商品服务-API-三级分类-修改-拖拽数据收集

文章目录 一&#xff0c;拖拽后结点的parentCid的更新二&#xff0c;拖拽后结点的父节点下所有结点的sort排序属性的变化更新排序的逻辑代码分析 三&#xff0c;拖拽后结点及其子节点catLevel的变化判断是否需要更新 catLevel获取拖动后的新节点 更新 catLevel完整代码 这一节的…

mysql特殊字符、生僻字存储设置

mysql utf-8模式下&#xff0c;分为ut8mb3,utf8mb4&#xff0c;mb4是支持特殊字符、emoji表情的&#xff0c;mb3是不支持的。 报错信息&#xff1a; 1### Error updating database. Cause: java.sql.SQLException: Incorrect string value: \xF0\xA8\x92\x82\xE6\x95... fo…