一、上下文
《Kafka-生产者源码分析》博客中我们了解了Kafka是如何生产数据的,《Kafka-broker粗粒度启动流程》博客中我们了解了KafkaApis中有各种api和对应处理逻辑,其中PRODUCE请求对应了处理produce请求的逻辑,下面我们跟着源码来看下处理细节
class KafkaApis(//......request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)......}
}
二、handleProduceRequest
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {//获取请求体val produceRequest = request.body[ProduceRequest]//为每个TopicPartition 的不同情况声明不同的responseMap//未经授权的val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()//不存在的val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()//无效的val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()//已授权的val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()//缓存结果以避免冗余的授权调用val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,produceRequest.data().topicData().asScala)(_.name())//依次循环 topic > partition 来处理//这说明这一个请求中需要处理多个 topic 的多个 partition 的数据produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>val topicPartition = new TopicPartition(topic.name, partition.index)val memoryRecords = partition.records.asInstanceOf[MemoryRecords]if (!authorizedTopics.contains(topicPartition.topic))unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicPartition))nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)elsetry {//只有授权且metadataCache中有这个topicPartition 才能走到这//校验数据,并把该数据放入authorizedRequestInfo ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)authorizedRequestInfo += (topicPartition -> memoryRecords)} catch {//......}}//回调函数,这里先不展开,后续处理完数据需要返回给producer时再展开@nowarn("cat=deprecation")def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {...}def processingStatsCallback(processingStats: FetchResponseStats): Unit = {processingStats.forKeyValue { (tp, info) =>updateRecordConversionStats(request, tp, info)}}//如果没有有效数据,就立即返回空的情况if (authorizedRequestInfo.isEmpty)sendResponseCallback(Map.empty)else {val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_IDval transactionSupportedOperation = if (request.header.apiVersion > 10) genericError else defaultError//调用副本管理器将消息追加到副本//副本管理器 是一个屏蔽了集群的层 里面既包括本地leader写也包括远程Follower写replicaManager.handleProduceAppend(timeout = produceRequest.timeout.toLong,requiredAcks = produceRequest.acks,internalTopicsAllowed = internalTopicsAllowed,transactionalId = produceRequest.transactionalId,entriesPerPartition = authorizedRequestInfo,responseCallback = sendResponseCallback,recordValidationStatsCallback = processingStatsCallback,requestLocal = requestLocal,transactionSupportedOperation = transactionSupportedOperation)//如果请求被放入炼狱,它将有一个被保留的引用,因此不能被垃圾回收;因此,我们在这里清除它的数据,以便让GC回收内存,因为它已经附加到日志中//当follower的数据追平leader的数据,且leader没有新数据增长时,follower的fetch请求会放入炼狱,来减少带宽的消耗produceRequest.clearPartitionRecords()}}
三、ReplicaManager
最终需要将数据给到ReplicaManager来进行实际的追加
class ReplicaManager(...){def handleProduceAppend(...){val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()entriesPerPartition.forKeyValue { (topicPartition, records) =>// 生成请求(仅需要验证的请求)应该在“批处理”中每个分区只有一个批处理,但为了安全起见,请检查所有批处理。val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)}if (transactionalProducerInfo.size > 1) {//从这里看出,事务记录应该只包含一个生产者id//抛出异常并提示:事务记录包含多个生产者IDthrow new InvalidPidMappingException("Transactional records contained more than one producer ID")}//又封装了一层回调def postVerificationCallback(...){}//如果事务记录包含0个生产者id,不用处理if (transactionalProducerInfo.size < 1) {postVerificationCallback(requestLocal,(Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard]))return}//事务记录中只能有一个生产者idmaybeStartTransactionVerificationForPartitions(topicPartitionBatchInfo,transactionalId,transactionalProducerInfo.head._1,transactionalProducerInfo.head._2,//当事务验证完成时,将要处理的回调封装在任意请求处理程序线程上。传入的本地请求仅在立即执行回调时使用。KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback,requestLocal),transactionSupportedOperation)}}
1、postVerificationCallback
事务校验完成,会处理回调,执行真正的数据追加
def postVerificationCallback(...): Unit = {val (preAppendErrors, verificationGuards) = results//将事务协调器错误转换为已知的生产者响应错误val errorResults = preAppendErrors.map {//......}val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus }def newResponseCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {responseCallback(preAppendPartitionResponses ++ responses)}//执行数据追加操作appendRecords(timeout = timeout,requiredAcks = requiredAcks,internalTopicsAllowed = internalTopicsAllowed,origin = AppendOrigin.CLIENT,entriesPerPartition = entriesWithoutErrorsPerPartition,responseCallback = newResponseCallback,recordValidationStatsCallback = recordValidationStatsCallback,requestLocal = newRequestLocal,actionQueue = actionQueue,verificationGuards = verificationGuards)}
2、appendRecords
将消息附加到分区的leader副本,并等待它们复制到其他副本;当超时或满足所需的ack时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁
注意,所有挂起的延迟检查操作都存储在队列中。ReplicaManager.appendRecords() 的所有调用者都应该对所有受影响的分区调用ActionQueue.tryCompleteActions,而不会持有任何冲突的锁(将多线程锁的问题转成线性队列操作来提升性能)
def appendRecords(t...): Unit = {//验证 acks 必须是 0 1 -1 三种类型if (!isValidRequiredAcks(requiredAcks)) {sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)return}val sTime = time.milliseconds//向本地log写入数据val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)//生产到本地日志中用了多长时间debug("Produce to local log in %d ms".format(time.milliseconds - sTime))//下面的我们后续博客再接着分析,本地写完log,//按理论知识:如果ack=0或者1,就可以直接返回了,如果是-1就需要等待备份数据写入成功val produceStatus = buildProducePartitionStatus(localProduceResults)addCompletePurgatoryAction(actionQueue, localProduceResults)recordValidationStatsCallback(localProduceResults.map { case (k, v) =>k -> v.info.recordValidationStats})maybeAddDelayedProduce(requiredAcks,delayedProduceLock,timeout,entriesPerPartition,localProduceResults,produceStatus,responseCallback)}
3、appendToLocalLog
接下来我们看看kafka是如何将消息附加到本地副本日志的
private def appendToLocalLog(...){//按照每个topic > 分区 来写entriesPerPartition.map { case (topicPartition, records) =>brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()//如果不允许,则拒绝附加到内部主题if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {(topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),hasCustomErrorMessage = false))} else {try {val partition = getPartitionOrException(topicPartition)//接下来交给Partition向leader追加数据val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,//......省略.....} catch {//......}}}}
四、Partition
它是表示topic partition的数据结构。leader负责维护AR、ISR、CUR、RAR
并发注意事项:
1、分区是线程安全的。分区上的操作可以从不同的请求处理程序线程并发调用
2、ISR更新使用读写锁同步。读锁用于检查是否需要更新,以避免在不执行更新的情况下获取副本的常见情况下获取写锁。在执行更新之前,在写锁下第二次检查ISR更新条件
3、在保持ISR写锁的同时,处理各种其他操作,如leader更改。这可能会在生成和副本获取请求中引入延迟,但这些操作通常不常见
4、使用ISR读锁同步HW更新
5、锁用于防止在ReplicaAlterDirThread执行时更新follower副本。可以使用ReplaceCurrentWithFutureReplica()用未来的副本替换follower副本。
1、appendRecordsToLeader
ReplicaManager调用了Partition的该方法来继续执行数据的追加操作
def appendRecordsToLeader(...): LogAppendInfo = {val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {leaderLogIfLocal match {case Some(leaderLog) =>//最小的ISRval minIsr = effectiveMinIsr(leaderLog)val inSyncSize = partitionState.isr.size//如果没有足够的insync副本来保证安全,请避免写信给leader//如果目前的ids < 最小的isr 要求,且还要求了 acks = -1 就直接返回异常if (inSyncSize < minIsr && requiredAcks == -1) {//这个 topic 的 partition 当前的isr 集合 为 partitionState.isr 不足以满足 min.isr 要求throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")}//将追加数据的任务委托给leaderLog既:UnifiedLogval info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,interBrokerProtocolVersion, requestLocal, verificationGuard)//我们可能需要增加高水位,因为ISR可能会降至1(info, maybeIncrementLeaderHW(leaderLog))case None =>//抛出异常:不能在该broker为xx分区的ledaer写入数据}}info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)}
五、UnifiedLog
本地和分层日志段的统一视图。
1、appendAsLeader
//将此消息集附加到本地日志的活动段 segment ,分配偏移量和分区前导纪元def appendAsLeader(...): LogAppendInfo = {//这里需要验证log的来源,从而决定是否有对offset验证的必要//当下log是来自客户端,要写入的对象是RAFT_LEADER,筏头的意思,kafka中有一个HW的概念//我理解的意思是:此时要写入的数据是在HW之上的,因此称之为木筏leader//总共有四种来源:既REPLICATION(副本)、COORDINATOR(组协调员和事务)、CLIENT(客户端)、RAFT_LEADER(leader)val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER//将此消息集附加到本地日志的活动段,必要时滚动到新段。//此方法通常负责为消息分配偏移量,但是如果传递了assignOffsets=false标志,我们将只检查现有的偏移量是否有效。append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)}
2、append
private def append(...): LogAppendInfo = {//我们希望确保在将任何日志数据写入磁盘之前,将分区元数据文件写入日志目录。//这将确保在发生故障时,可以使用正确的topic ID恢复任何日志数据。//可能要写元数据了,这里用了flush,因为元数据比较重要,数据到pagecache后需要直接写入磁盘maybeFlushMetadataFile()//对要追加的数据进行解析和校验,校验的有以下几点//1、每条消息都与其CRC匹配,循环冗余校验//2、每个消息大小是否有效//3、传入记录批的序列号与现有状态一致//4、offset是否单调递增//也需要计算下面纬度的数量://1、第一条消息的 offset//2、最后一条消息的offset//3、消息条数//4、有效字节数//5、偏移量是否单调递增//6、是否使用了任何压缩编解码器(如果使用了多个,则给出最后一个)val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)// 如果我们没有有效的消息,或者这是最后一个附加条目的重复,则返回if (appendInfo.validBytes <= 0) appendInfoelse {// 在将任何无效字节或部分消息附加到磁盘日志之前,请先对其进行修剪var validRecords = trimInvalidBytes(records, appendInfo)// 它们是有效的,请将其插入日志中lock synchronized {maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {//主要是检查 log 的日志文件 的 mmap 是否关闭localLog.checkIfMemoryMappedBufferClosed()if (validateAndAssignOffsets) {//为消息集分配偏移量val offset = PrimitiveRef.ofLong(localLog.logEndOffset)appendInfo.setFirstOffset(offset.value)val validateAndOffsetAssignResult = try {val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())val validator = new LogValidator(validRecords,topicPartition,time,appendInfo.sourceCompression,targetCompression,config.compact,config.recordVersion.value,config.messageTimestampType,config.messageTimestampBeforeMaxMs,config.messageTimestampAfterMaxMs,leaderEpoch,origin,interBrokerProtocolVersion)validator.validateMessagesAndAssignOffsets(offset,validatorMetricsRecorder,requestLocal.getOrElse(throw new IllegalArgumentException("requestLocal should be defined if assignOffsets is true")).bufferSupplier)} catch {//......}validRecords = validateAndOffsetAssignResult.validatedRecordsappendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)appendInfo.setLastOffset(offset.value - 1)appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)// 如果消息大小有可能发生变化(由于重新压缩或消息格式转换),则对其进行电子验证if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {validRecords.batches.forEach { batch =>if (batch.sizeInBytes > config.maxMessageSize) {// 我们记录原始消息集大小,而不是修剪后的大小,以与预压缩字节RejectedRate记录保持一致brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")}}}} else {//我们 使用 自己给的offsetsif (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {// 如果日志为空,我们仍然可以恢复,例如:从未批处理对齐的leader上的日志开始偏移量中获取,这可能是由于AdminClient#deleteRecords()造成的val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffsetval firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"throw new UnexpectedAppendOffsetException(...)}}// 用领导者标记在消息上的纪元更新纪元缓存validRecords.batches.forEach { batch =>if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)} else {// 在部分升级场景中,我们可能会对消息格式进行临时回归。为了确保领导人选举的安全性,我们清除了纪元缓存,以便在下一次领导人选举后恢复到 HW 截断。leaderEpochCache.filter(_.nonEmpty).foreach { cache =>warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")cache.clearAndFlush()}}}// 检查消息集大小可能超过config.segmentSize 这一批次消息的大小不能超过 整个段的大小if (validRecords.sizeInBytes > config.segmentSize) {throw new RecordBatchTooLargeException(......)}//如果该段已满,可能会滚动日志val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)val logOffsetMetadata = new LogOffsetMetadata(appendInfo.firstOrLastOffsetOfFirstBatch,segment.baseOffset,segment.size)// 现在我们有了有效的记录、分配的偏移量和更新的时间戳,我们需要验证生产者的幂等/事务状态,并收集一些元数据val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(logOffsetMetadata, validRecords, origin, verificationGuard)maybeDuplicate match {case Some(duplicate) =>appendInfo.setFirstOffset(duplicate.firstOffset)appendInfo.setLastOffset(duplicate.lastOffset)appendInfo.setLogAppendTime(duplicate.timestamp)appendInfo.setLogStartOffset(logStartOffset)case None =>//追加记录,并在追加后立即递增本地日志结束偏移量,因为对下面事务索引的写入可能会失败,// 我们希望确保未来追加的偏移量仍然单调增长。恢复日志目录后,将清理由此产生的事务索引不一致。// 请注意,如果事务索引的追加失败,ProducerStateManager的结束偏移量将不会更新,最后一个稳定偏移量也不会前进。//这里就开始将记录插入 log 和索引文件了 并更新最后的 offsetlocalLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)updateHighWatermarkWithLogEndOffset()//更新生产者状态updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))// 用真实的最后一个稳定偏移量更新事务索引。// 使用READ_COMMITTED的消费者可见的最后一个偏移量将受到此值和高水印的限制。completedTxns.foreach { completedTxn =>val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)segment.updateTxnIndex(completedTxn, lastStableOffset)producerStateManager.completeTxn(completedTxn)}// 始终更新最后一个生产者id映射偏移量,以便快照反映当前偏移量,即使没有写入任何幂等数据producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)// 更新第一个不稳定偏移量(用于计算LSO)maybeIncrementFirstUnstableOffset()trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +s"first offset: ${appendInfo.firstOffset}, " +s"next offset: ${localLog.logEndOffset}, " +s"and messages: $validRecords")//localLog.unflushedMessages : 还没有 flush 的消息数//config.flushInterval 为 flush.messages配置 默认 Long.MAX_VALUE//说明://此设置允许指定一个间隔,在该间隔内,我们将强制对写入日志的数据进行fsync。// 例如,如果将其设置为1,我们将在每条消息后进行fsync;如果是5,我们将在每5条消息后进行fsync。// 一般来说,我们建议您不要设置此选项,而是使用副本来提高持久性,并允许操作系统的后台刷新功能,因为它更高效。// 此设置可以按topic覆盖(请参阅<a href=\“#topicconfigs\”>按主题配置部分</a 很灵活,可以为topic设置不同的 flush策略if (localLog.unflushedMessages >= config.flushInterval) flush(false)}appendInfo}}}}
六、LocalLog
用于在本地存储消息的仅追加日志。日志是一系列LogSegments,每个LogSegments都有一个基本偏移。根据可配置的策略创建新的日志段,该策略控制给定段的字节大小或时间间隔。
因此数据寻址的逻辑是:基地址+偏移地址(基地址找到具体的LogSegment,再根据偏移地址找到文件中的具体数据)
private[log] def append(...): Unit = {//向本地活动段中追加数据segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)//更新LEO:既log末尾的offsetupdateLogEndOffset(lastOffset + 1)}
七、 LogSegment
日志的一部分。每个段有两个组成部分:日志和索引。日志是一个包含实际消息的FileRecords。该索引是一个从逻辑偏移映射到物理文件位置的OffsetIndex。每个段都有一个基本偏移量,该偏移量<=该段中任何消息的最小偏移量,>任何先前段中的任何偏移量。
基偏移量为[base_offset]的段将存储在两个文件中,一个[base_ooffset].index和一个[base_offset].log文件。
public void append(long largestOffset,long largestTimestampMs,long shallowOffsetOfMaxTimestamp,MemoryRecords records) throws IOException {if (records.sizeInBytes() > 0) {//在位置{}的末尾偏移量{}处插入{}个字节,在偏移量{}处插入最大的时间戳{}LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);int physicalPosition = log.sizeInBytes();if (physicalPosition == 0)rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);ensureOffsetInRange(largestOffset);//数据真正写的地方long appendedBytes = log.append(records);LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);// 更新内存中的最大时间戳和相应的偏移量。if (largestTimestampMs > maxTimestampSoFar()) {maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);}// 在索引中附加一个条目(如果需要)//这证明并不是每次写数据都会向索引中写入标记,因此索引指向了一段数据//需要累计写入的数据 > 索引中条目的大致字节数(index.interval.bytes 默认4096字节)if (bytesSinceLastIndexEntry > indexIntervalBytes) {offsetIndex().append(largestOffset, physicalPosition);timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());bytesSinceLastIndexEntry = 0;}bytesSinceLastIndexEntry += records.sizeInBytes();}}
1、log追加
FileRecords
由文件支持的 Records 实现。可以将可选的开始和结束位置应用于此实例,以允许对一系列日志记录进行切片。
public int append(MemoryRecords records) throws IOException {int written = records.writeFullyTo(channel);size.getAndAdd(written);return written;}
MemoryRecords
由ByteBuffer支持的 Records实现。这仅用于读取或就地修改记录批的现有缓冲区。
//将所有记录写入给定通道(包括部分记录)。public int writeFullyTo(GatheringByteChannel channel) throws IOException {buffer.mark();int written = 0;//并没有调 flush ,这里只是写入了 pagecache 中 //需要考内核的机制将其flush到磁盘 (linux系统中可以配置参数 当到达总内存的多少后或者脏页机制触发去写)while (written < sizeInBytes())written += channel.write(buffer);buffer.reset();return written;}
2、offsetIndex追加
OffsetIndex
public void append(long offset, int position) {lock.lock();try {if (isFull())throw new IllegalArgumentException(...);if (entries() == 0 || offset > lastOffset) {log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath());//利用mmap的特性将数据写入内核的pagecachemmap().putInt(relativeOffset(offset));mmap().putInt(position);incrementEntries();lastOffset = offset;if (entries() * ENTRY_SIZE != mmap().position())throw new IllegalStateException(...);} elsethrow new InvalidOffsetException(...);} finally {lock.unlock();}}
3、timeIndex追加
TimeIndex
public void maybeAppend(long timestamp, long offset) {maybeAppend(timestamp, offset, false);}public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {lock.lock();try {if (!skipFullCheck && isFull())throw new IllegalArgumentException(...);// 当偏移量等于最后一个条目的偏移量时,我们不会抛出异常。这意味着我们正试图插入与最后一个条目相同的时间索引条目。// 如果要插入的时间戳索引条目与最后一个条目相同,我们只需忽略插入,因为这可能会在以下两种情况下发生:// 1.日志段关闭了// 2.当滚动活动日志段时,会调用LogSegment.onBecomeActiveSegment()。if (entries() != 0 && offset < lastEntry.offset)throw new InvalidOffsetException(...);if (entries() != 0 && timestamp < lastEntry.timestamp)throw new IllegalStateException(...);// 只有当时间戳大于最后插入的时间戳时,我们才会附加到时间索引。// 如果所有消息都是消息格式v0,则时间戳将始终为NoTimestamp。在这种情况下,时间索引将为空。if (timestamp > lastEntry.timestamp) {log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath());//同样是调用mmap将数据写入pagecacheMappedByteBuffer mmap = mmap();mmap.putLong(timestamp);mmap.putInt(relativeOffset(offset));incrementEntries();this.lastEntry = new TimestampOffset(timestamp, offset);if (entries() * ENTRY_SIZE != mmap.position())throw new IllegalStateException(...);}} finally {lock.unlock();}}
八、总结
1、producer调用send发送数据
2、kafka调用对应的api进行处理
3、获取请求体中的数据
4、校验是否有有效数据,如果没有立即返回
5、调用副本管理器(ReplicaManager)将数据进行追加
6、校验事务完成后处理回调,执行真正的数据追加
7、对acks进行校验(必须是0、1、-1)
8、循环处理这次请求中的每个topic、partition,调用Partition进行数据追加
9、获取最小的ISR以及可以正常写的副本数量,如果存活的副本节点数量<最小ISR数量,且请求中的acks=-1,里面抛出异常
10、将数据委托给UnifiedLog进行追加
11、根据数据的来源对offset设置校验等级
12、写入元数据(因为元数据重要,因此要立马flush到磁盘)
13、再次对数据进行解析和校验(CRC、消息大小是否有效、序列号是否一致、offset是否单调递增)
14、校验日志的mmap是否关闭
15、为数据分配offset
16、校验数据大小是否超过了段大小
17、委托给LocalLog向本地活动段追加数据
18、委托给LogSegment进行数据追加
19、调用FileRecords、MemoryRecords将数据写入pagecache
20、判断累计数据量是否>index.interval.bytes 默认4096字节,如果大于开始写入索引
21、调用OffsetIndex将offset索引利用mmap写入pagecache
22、调用TimeIndex将time offset索引利用mmap写入pagecache(time的索引执行了offset索引,offset索引指向了真正位置)
23、更新该broker中的HW和LEO