kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码

  • 一、服务端接收消费者拉取数据的方法
  • 二、遍历请求中需要拉取数据的主题分区集合,分别执行查询数据操作,
    • 1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离)
  • 三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求
    • 1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁
    • 2、readFromLocalLog读取本地日志数据
  • 四、读取日志数据就是读取的segment文件(忽视零拷贝的加持)
    • 1、获取当前本地日志的基础数据(高水位线,偏移量等),
    • 2、遍历segment,直到从segment读取到数据
  • 五、创建文件日志流对象FileRecords
    • 1、根据位点创建文件流FileLogInputStream
    • 2、把文件流构建成数据批量迭代器对象RecordBatchIterator
    • 3、DefaultRecordBatch实现iterator方法,在内存中创建数据

一、服务端接收消费者拉取数据的方法

kafka服务端接收生产者数据的API在KafkaApis.scala类中,handleFetchRequest方法

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {//省略代码request.header.apiKey match {//消费者拉取消息请求,这个接口进行处理case ApiKeys.FETCH => handleFetchRequest(request)//省略代码}    	}  
def handleFetchRequest(request: RequestChannel.Request): Unit = {//从请求中获取请求的API版本(versionId)和客户端ID(clientId)。val versionId = request.header.apiVersionval clientId = request.header.clientId//从请求中获取Fetch请求的数据val fetchRequest = request.body[FetchRequest]//根据请求的版本号,决定是否获取主题名称的映射关系(topicNames)。如果版本号大于等于13,则使用metadataCache.topicIdsToNames()获取主题名称映射关系,否则使用空的映射关系。val topicNames =if (fetchRequest.version() >= 13)metadataCache.topicIdsToNames()elseCollections.emptyMap[Uuid, String]()//根据主题名称映射关系,获取Fetch请求的数据(fetchData)和需要忽略的主题(forgottenTopics)。val fetchData = fetchRequest.fetchData(topicNames)val forgottenTopics = fetchRequest.forgottenTopics(topicNames)//创建一个Fetch上下文(fetchContext),用于管理Fetch请求的处理过程。该上下文包含了Fetch请求的版本号、元数据、是否来自Follower副本、Fetch数据、需要忽略的主题和主题名称映射关系。val fetchContext = fetchManager.newContext(fetchRequest.version,fetchRequest.metadata,fetchRequest.isFromFollower,fetchData,forgottenTopics,topicNames)//初始化两个可变数组erroneous和interesting,用于存储处理过程中的错误和请求需要哪些topic的数据。val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()//Fetch请求来自Follower副本if (fetchRequest.isFromFollower) {//则需要验证权限。如果权限验证通过// The follower must have ClusterAction on ClusterResource in order to fetch partition data.if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {//遍历每个分区的数据,根据不同情况将数据添加到erroneous或interesting中fetchContext.foreachPartition { (topicIdPartition, data) =>if (topicIdPartition.topic == null)erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)else if (!metadataCache.contains(topicIdPartition.topicPartition))erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)elseinteresting += topicIdPartition -> data}} else {//如果权限验证失败,则将所有分区的数据添加到erroneous中。fetchContext.foreachPartition { (topicIdPartition, _) =>erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)}}} else {//如果Fetch请求来自普通的Kafka消费者// Regular Kafka consumers need READ permission on each partition they are fetching.val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]fetchContext.foreachPartition { (topicIdPartition, partitionData) =>if (topicIdPartition.topic == null)erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)elsepartitionDatas += topicIdPartition -> partitionData}//需要验证对每个分区的读取权限,根据权限验证结果,将数据添加到erroneous或interesting中。val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)partitionDatas.foreach { case (topicIdPartition, data) =>if (!authorizedTopics.contains(topicIdPartition.topic))erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicIdPartition.topicPartition))erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)elseinteresting += topicIdPartition -> data}}//省略代码//如果需要的topic没有校验通过或者不存在,则直接调用processResponseCallback处理响应if (interesting.isEmpty) {processResponseCallback(Seq.empty)} else {// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given// no bytes were recorded in the recent quota window// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress//如果是Follower提取数据的请求,则maxQuotaWindowBytes设置为int类型的最大,否则从记录中得到此client以前获取数据大小,// 再和请求中、配置文件中的fetchMaxBytes比较得到下面fetchMaxBytes和fetchMinBytes两个值val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)Int.MaxValueelsequotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt//根据请求的类型和配额限制,获取Fetch请求的最大字节数(fetchMaxBytes)和最小字节数(fetchMinBytes)val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) {// Fetch API version 11 added preferred replica logic//提取 API 版本 11以上 添加了首选副本逻辑Optional.of(new DefaultClientMetadata(fetchRequest.rackId,clientId,request.context.clientAddress,request.context.principal,request.context.listenerName.value))} else {Optional.empty()}//创建一个FetchParams对象,包含了请求的各种参数val params = new FetchParams(versionId,fetchRequest.replicaId,fetchRequest.replicaEpoch,fetchRequest.maxWait,fetchMinBytes,fetchMaxBytes,FetchIsolation.of(fetchRequest),clientMetadata)// call the replica manager to fetch messages from the local replica//replicaManager.fetchMessages方法,从本地副本获取消息,并提供回调函数processResponseCallback处理响应replicaManager.fetchMessages(params = params,fetchInfos = interesting,quota = replicationQuota(fetchRequest),responseCallback = processResponseCallback,)}
}    

replicaManager.fetchMessages 最后通过这个方法获得日志

/*** Fetch messages from a replica, and wait until enough data can be fetched and return;* the callback function will be triggered either when timeout or required fetch info is satisfied.* Consumers may fetch from any replica, but followers can only fetch from the leader.* 从副本中获取消息,并等待可以获取足够的数据并返回;* 当满足超时或所需的获取信息时,将触发回调函数。* 消费者可以从任何副本中获取,但追随者只能从领导者那里获取。*/def fetchMessages(params: FetchParams,fetchInfos: Seq[(TopicIdPartition, PartitionData)],quota: ReplicaQuota,responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = {// check if this fetch request can be satisfied right away//调用readFromLocalLog函数从本地日志中读取消息,并将结果保存在logReadResults中。val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)var bytesReadable: Long = 0var errorReadingData = falsevar hasDivergingEpoch = falsevar hasPreferredReadReplica = falseval logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]//根据读取结果更新一些变量,如bytesReadable(可读取的字节数)、errorReadingData(是否读取数据时发生错误)、hasDivergingEpoch(是否存在不同的epoch)和hasPreferredReadReplica(是否存在首选读取副本)。logReadResults.foreach { case (topicIdPartition, logReadResult) =>brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()if (logReadResult.error != Errors.NONE)errorReadingData = trueif (logReadResult.divergingEpoch.nonEmpty)hasDivergingEpoch = trueif (logReadResult.preferredReadReplica.nonEmpty)hasPreferredReadReplica = truebytesReadable = bytesReadable + logReadResult.info.records.sizeInByteslogReadResultMap.put(topicIdPartition, logReadResult)}// respond immediately if 1) fetch request does not want to wait  不需要等待//                        2) fetch request does not require any data 不需要任何数据//                        3) has enough data to respond 有足够的数据//                        4) some error happens while reading data 读取数据时发生错误//                        5) we found a diverging epoch 存在不同的epoch//                        6) has a preferred read replica  存在首选读取副本if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||hasDivergingEpoch || hasPreferredReadReplica) {val fetchPartitionData = logReadResults.map { case (tp, result) =>val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)tp -> result.toFetchPartitionData(isReassignmentFetch)}responseCallback(fetchPartitionData)} else {//将构建一个延迟处理的DelayedFetch对象,并将其放入延迟处理队列(delayedFetchPurgatory)中,以便在满足特定条件时完成请求。// construct the fetch results from the read resultsval fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]fetchInfos.foreach { case (topicIdPartition, partitionData) =>logReadResultMap.get(topicIdPartition).foreach(logReadResult => {val logOffsetMetadata = logReadResult.info.fetchOffsetMetadatafetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))})}val delayedFetch = new DelayedFetch(params = params,fetchPartitionStatus = fetchPartitionStatus,replicaManager = this,quota = quota,responseCallback = responseCallback)// create a list of (topic, partition) pairs to use as keys for this delayed fetch operationval delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }// try to complete the request immediately, otherwise put it into the purgatory;// this is because while the delayed fetch operation is being created, new requests// may arrive and hence make this operation completable.delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)}}

通过readFromLocalLog查询数据日志

二、遍历请求中需要拉取数据的主题分区集合,分别执行查询数据操作,

 /*** Read from multiple topic partitions at the given offset up to maxSize bytes* 以给定的偏移量从多个主题分区读取最大最大大小字节*/def readFromLocalLog(params: FetchParams,readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],quota: ReplicaQuota,readFromPurgatory: Boolean): Seq[(TopicIdPartition, LogReadResult)] = {val traceEnabled = isTraceEnableddef read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {//从fetchInfo中获取一些数据,包括fetchOffset(拉取偏移量)、maxBytes(拉取的最大字节数)和logStartOffset(日志起始偏移量)。val offset = fetchInfo.fetchOffsetval partitionFetchSize = fetchInfo.maxBytesval followerLogStartOffset = fetchInfo.logStartOffset//计算调整后的最大字节数adjustedMaxBytes,取fetchInfo.maxBytes和limitBytes的较小值。val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)try {if (traceEnabled)trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +s"remaining response limit $limitBytes" +(if (minOneMessage) s", ignoring response/partition size limits" else ""))//获取指定分区的Partition对象val partition = getPartitionOrException(tp.topicPartition)//获取当前时间戳fetchTimeMsval fetchTimeMs = time.milliseconds//检查拉取请求或会话中的主题ID是否与日志中的主题ID一致,如果不一致则抛出InconsistentTopicIdException异常。val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else Some(tp.topicId)if (!hasConsistentTopicId(topicId, partition.topicId))throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")// If we are the leader, determine the preferred read-replica//根据一些条件选择合适的副本(replica)进行后续的数据抓取(fetch)。val preferredReadReplica = params.clientMetadata.asScala.flatMap(metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))if (preferredReadReplica.isDefined) {//如果不存在,则跳过读取操作,直接构建一个LogReadResult对象,表示从非Leader副本获取数据的结果。replicaSelectorOpt.foreach { selector =>debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +s"${preferredReadReplica.get} for ${params.clientMetadata}")}// If a preferred read-replica is set, skip the readval offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),divergingEpoch = None,highWatermark = offsetSnapshot.highWatermark.messageOffset,leaderLogStartOffset = offsetSnapshot.logStartOffset,leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,followerLogStartOffset = followerLogStartOffset,fetchTimeMs = -1L,lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),preferredReadReplica = preferredReadReplica,exception = None)} else {// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition//尝试进行读取操作。根据读取结果构建一个LogReadResult对象,表示从分区获取数据的结果。val readInfo: LogReadInfo = partition.fetchRecords(fetchParams = params,fetchPartitionData = fetchInfo,fetchTimeMs = fetchTimeMs,maxBytes = adjustedMaxBytes,minOneMessage = minOneMessage,updateFetchState = !readFromPurgatory)val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {// If the partition is being throttled, simply return an empty set.new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make// progress in such cases and don't need to report a `RecordTooLargeException`new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else {readInfo.fetchedData}//返回构建的LogReadResult对象LogReadResult(info = fetchDataInfo,divergingEpoch = readInfo.divergingEpoch.asScala,highWatermark = readInfo.highWatermark,leaderLogStartOffset = readInfo.logStartOffset,leaderLogEndOffset = readInfo.logEndOffset,followerLogStartOffset = followerLogStartOffset,fetchTimeMs = fetchTimeMs,lastStableOffset = Some(readInfo.lastStableOffset),preferredReadReplica = preferredReadReplica,exception = None)}} catch {//省略代码}}var limitBytes = params.maxBytesval result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]var minOneMessage = !params.hardMaxBytesLimitreadPartitionInfo.foreach { case (tp, fetchInfo) =>val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)//记录批量的大小(以字节为单位)。val recordBatchSize = readResult.info.records.sizeInBytes// Once we read from a non-empty partition, we stop ignoring request and partition level size limits//如果 recordBatchSize 大于 0,则将 minOneMessage 设置为 false,表示从非空分区读取了消息,不再忽略请求和分区级别的大小限制。if (recordBatchSize > 0)minOneMessage = falselimitBytes = math.max(0, limitBytes - recordBatchSize)//将 (tp -> readResult) 添加到 result 中result += (tp -> readResult)}result}

val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)遍历主题分区分别执行read内部函数执行查询操作
方法内部通过partition.fetchRecords查询数据

1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离)

在上面readFromLocalLog方法中,read内部方法

val preferredReadReplica = params.clientMetadata.asScala.flatMap(metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
def findPreferredReadReplica(partition: Partition,clientMetadata: ClientMetadata,replicaId: Int,fetchOffset: Long,currentTimeMs: Long): Option[Int] = {//partition.leaderIdIfLocal返回一个Option[Int]类型的值,表示分区的领导者副本的ID。// 如果本地是领导者副本,则返回该副本的ID,否则返回None。partition.leaderIdIfLocal.flatMap { leaderReplicaId =>// Don't look up preferred for follower fetches via normal replication//如果存在领导者副本ID(leaderReplicaId),则执行flatMap中的代码块;否则直接返回None。if (FetchRequest.isValidBrokerId(replicaId))Noneelse {replicaSelectorOpt.flatMap { replicaSelector =>//通过metadataCache.getPartitionReplicaEndpoints方法获取分区副本的端点信息val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,new ListenerName(clientMetadata.listenerName))//创建一个可变的mutable.Set[ReplicaView]类型的集合replicaInfoSet,用于存储符合条件的副本信息。val replicaInfoSet = mutable.Set[ReplicaView]()//遍历分区的远程副本集合(partition.remoteReplicas),对每个副本进行以下操作://获取副本的状态快照(replica.stateSnapshot)。//如果副本的brokerId存在于ISR中,并且副本的日志范围包含了指定的fetchOffset,则将副本信息添加到replicaInfoSet中。partition.remoteReplicas.foreach { replica =>val replicaState = replica.stateSnapshotif (partition.inSyncReplicaIds.contains(replica.brokerId) &&replicaState.logEndOffset >= fetchOffset &&replicaState.logStartOffset <= fetchOffset) {replicaInfoSet.add(new DefaultReplicaView(replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),replicaState.logEndOffset,currentTimeMs - replicaState.lastCaughtUpTimeMs))}}//创建一个DefaultReplicaView对象,表示领导者副本的信息,并将其添加到replicaInfoSet中。val leaderReplica = new DefaultReplicaView(replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),partition.localLogOrException.logEndOffset,0L)replicaInfoSet.add(leaderReplica)//创建一个DefaultPartitionView对象,表示分区的信息,其中包含了副本信息集合和领导者副本信息。val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)//调用replicaSelector.select方法,根据特定的策略选择合适的副本。然后通过collect方法将选择的副本转换为副本的ID集合。replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {// Even though the replica selector can return the leader, we don't want to send it out with the// FetchResponse, so we exclude it here//从副本的ID集合中排除领导者副本,并返回剩余副本的ID集合。case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id}}}}}

其中 replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect选合适副本默认首先Leader副本,但是2.4版本后支持主题分区非Leader副本中读取数据,即Follower副本读取数据

在代码上:

  • 通过case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id 判断设置,

在配置上:

  • broker端,需要配置参数 replica.selector.class,其默认配置为LeaderSelector,意思是:消费者从首领副本获取消息,改为RackAwareReplicaSelector,即消费者按照指定的rack id上的副本进行消费。还需要配置broker.rack参数,用来指定broker在哪个机房。
  • consumer端,需要配置参数client.rack,且这个参数和broker端的哪个broker.rack匹配上,就会从哪个broker上去获取消息数据。

读写分离在2.4之前为什么之前不支持,后面支持了呢?

之前不支持的原因:其实对于kakfa而言,主题分区的水平扩展完全可以解决消息的处理量,增加broker也可以降低系统负载,所以没有必要费力不讨好增加一个读写分离。
现在支持的原因:有一种场景不是很适合,跨机房或者说跨数据中心的场景,当其中一个数据中心需要向另一个数据中心同步数据的时候,如果只能从首领副本进行数据读取的话,需要跨机房来完成,而这些流量带宽又比较昂贵,而利用本地跟随者副本进行消息读取就成了比较明智的选择。
所以kafka推出这一个功能,目的并不是降低broker的系统负载,分摊消息处理量,而是为了节约流量资源

三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求

关于Follower发请求可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

def fetchRecords(fetchParams: FetchParams,fetchPartitionData: FetchRequest.PartitionData,fetchTimeMs: Long,maxBytes: Int,minOneMessage: Boolean,updateFetchState: Boolean): LogReadInfo = {def readFromLocalLog(log: UnifiedLog): LogReadInfo = {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}//判断获取数据的请求是否来自Followerif (fetchParams.isFromFollower) {// Check that the request is from a valid replica before doing the readval (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {val localLog = localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)val replica = followerReplicaOrThrow(fetchParams.replicaId,fetchPartitionData)val logReadInfo = readFromLocalLog(localLog)(replica, logReadInfo)}if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {updateFollowerFetchState(replica,followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,followerStartOffset = fetchPartitionData.logStartOffset,followerFetchTimeMs = fetchTimeMs,leaderEndOffset = logReadInfo.logEndOffset,fetchParams.replicaEpoch)}logReadInfo} else {//来自消费者客户端请求inReadLock(`leaderIsrUpdateLock`) {val localLog = localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)readFromLocalLog(localLog)}}}

1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁

上面的方法逻辑中

//Follower的请求val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) 
//来自消费者客户端请求inReadLock(`leaderIsrUpdateLock`) 

2、readFromLocalLog读取本地日志数据

 def readFromLocalLog(log: UnifiedLog): LogReadInfo = {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}

四、读取日志数据就是读取的segment文件(忽视零拷贝的加持)

1、获取当前本地日志的基础数据(高水位线,偏移量等),

private def readRecords(localLog: UnifiedLog,lastFetchedEpoch: Optional[Integer],fetchOffset: Long,currentLeaderEpoch: Optional[Integer],maxBytes: Int,fetchIsolation: FetchIsolation,minOneMessage: Boolean): LogReadInfo = {//localLog的高水位标记(initialHighWatermark)、、。val initialHighWatermark = localLog.highWatermark//日志起始偏移(initialLogStartOffset)val initialLogStartOffset = localLog.logStartOffset//日志结束偏移(initialLogEndOffset)val initialLogEndOffset = localLog.logEndOffset//和最后一个稳定偏移(initialLastStableOffset)val initialLastStableOffset = localLog.lastStableOffset//省略代码//代码调用localLog的read方法,读取指定偏移量处的数据val fetchedData = localLog.read(fetchOffset,maxBytes,fetchIsolation,minOneMessage)//返回一个包含读取数据的LogReadInfo对象。new LogReadInfo(fetchedData,Optional.empty(),initialHighWatermark,initialLogStartOffset,initialLogEndOffset,initialLastStableOffset)}
 def read(startOffset: Long,maxLength: Int,isolation: FetchIsolation,minOneMessage: Boolean): FetchDataInfo = {checkLogStartOffset(startOffset)val maxOffsetMetadata = isolation match {case FetchIsolation.LOG_END => localLog.logEndOffsetMetadatacase FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadatacase FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata}localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)}

2、遍历segment,直到从segment读取到数据

/*** @param startOffset   起始偏移量(startOffset)* @param maxLength  最大长度(maxLength)* @param minOneMessage  是否至少读取一个消息(minOneMessage)* @param maxOffsetMetadata  最大偏移元数据(maxOffsetMetadata)* @param includeAbortedTxns   是否包含已中止的事务(includeAbortedTxns)* @throws* @return  返回一个FetchDataInfo对象*/def read(startOffset: Long,maxLength: Int,minOneMessage: Boolean,maxOffsetMetadata: LogOffsetMetadata,includeAbortedTxns: Boolean): FetchDataInfo = {maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +s"total length ${segments.sizeInBytes} bytes")//获取下一个偏移元数据(endOffsetMetadata)和对应的偏移量(endOffset)val endOffsetMetadata = nextOffsetMetadataval endOffset = endOffsetMetadata.messageOffset//获得segment的集合,比如会获得某个位点后所有的segment的列表,有序var segmentOpt = segments.floorSegment(startOffset)// return error on attempt to read beyond the log end offset//如果起始偏移量大于结束偏移量或者找不到日志段,则抛出OffsetOutOfRangeException异常。if (startOffset > endOffset || segmentOpt.isEmpty)throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +s"but we only have log segments upto $endOffset.")//如果起始偏移量等于最大偏移量元数据的偏移量,函数返回一个空的FetchDataInfo对象if (startOffset == maxOffsetMetadata.messageOffset)emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)else if (startOffset > maxOffsetMetadata.messageOffset)//如果起始偏移量大于最大偏移量元数据的偏移量,函数返回一个空的FetchDataInfo对象,并将起始偏移量转换为偏移元数据emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)else {//函数在小于目标偏移量的基本偏移量的日志段上进行读取var fetchDataInfo: FetchDataInfo = null//首先fetchDataInfo不为null,和大于start位点的segment要存在while (fetchDataInfo == null && segmentOpt.isDefined) {val segment = segmentOpt.getval baseOffset = segment.baseOffsetval maxPosition =// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.//如果它在此段上,请使用最大偏移位置;否则,段大小是限制。if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegmentelse segment.sizefetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)if (fetchDataInfo != null) {//则根据条件判断,如果includeAbortedTxns为真,则调用addAbortedTransactions方法添加中断的事务到fetchDataInfo中。if (includeAbortedTxns)fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)}//如果fetchDataInfo为null,则将segmentOpt设置为segments中大于baseOffset的下一个段。else segmentOpt = segments.higherSegment(baseOffset)}//成功读取到消息,函数返回FetchDataInfo对象if (fetchDataInfo != null) fetchDataInfoelse {//如果已经超过了最后一个日志段的末尾且没有读取到任何数据,则返回一个空的FetchDataInfo对象,其中包含下一个偏移元数据和空的内存记录(MemoryRecords.EMPTY)new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)}}}}

首先获得segment列表var segmentOpt = segments.floorSegment(startOffset)
通过 fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) 从segment获取数据

五、创建文件日志流对象FileRecords

  def read(startOffset: Long,maxSize: Int,maxPosition: Long = size,minOneMessage: Boolean = false): FetchDataInfo = {if (maxSize < 0)throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")val startOffsetAndSize = translateOffset(startOffset)// if the start position is already off the end of the log, return null//则表示起始位置已经超出了日志的末尾,则返回 nullif (startOffsetAndSize == null)return null//起始偏移量、基准偏移量和起始位置创建一个LogOffsetMetadata对象val startPosition = startOffsetAndSize.positionval offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)val adjustedMaxSize =if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)else maxSize// return a log segment but with zero size in the case belowif (adjustedMaxSize == 0)return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)// calculate the length of the message set to read based on whether or not they gave us a maxOffset//根据给定的maxOffset计算要读取的消息集的长度,将其限制为maxPosition和起始位置之间的较小值,并将结果赋给fetchSize变量。val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)//创建一个FetchDataInfo对象,其中包含偏移量元数据、从起始位置开始的指定大小的日志切片(log slice)以及其他相关信息//其中log.slice(startPosition, fetchSize)是日志数据new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),adjustedMaxSize < startOffsetAndSize.size, Optional.empty())}

log.slice 获取文件数据

 public FileRecords slice(int position, int size) throws IOException {int availableBytes = availableBytes(position, size);int startPosition = this.start + position;return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);}

这里生成一个新的文件数据对象,下面就是FileRecords的构造方法

FileRecords(File file,FileChannel channel,int start,int end,boolean isSlice) throws IOException {this.file = file;this.channel = channel;this.start = start;this.end = end;this.isSlice = isSlice;this.size = new AtomicInteger();//表示这只是一个切片视图,不需要检查文件大小,直接将size设置为end - start。if (isSlice) {// don't check the file size if this is just a slice viewsize.set(end - start);} else {//如果isSlice为false,表示这不是一个切片,需要检查文件的大小。如果文件大小超过了Integer.MAX_VALUE,将抛出KafkaException异常。if (channel.size() > Integer.MAX_VALUE)throw new KafkaException("The size of segment " + file + " (" + channel.size() +") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE);//否则,将文件大小和end之间的较小值设置为limit,并将size设置为limit - start。然后,将文件通道的位置设置为limit,即文件末尾的位置。int limit = Math.min((int) channel.size(), end);size.set(limit - start);// if this is not a slice, update the file pointer to the end of the file// set the file position to the last byte in the filechannel.position(limit);}batches = batchesFrom(start);}

1、根据位点创建文件流FileLogInputStream

 /*** Get an iterator over the record batches in the file, starting at a specific position. This is similar to* {@link #batches()} except that callers specify a particular position to start reading the batches from. This* method must be used with caution: the start position passed in must be a known start of a batch.* @param start The position to start record iteration from; must be a known position for start of a batch* @return An iterator over batches starting from {@code start}*///它的作用是从FileRecords直接返回一个batch的iterator   
public Iterable<FileChannelRecordBatch> batchesFrom(final int start) {return () -> batchIterator(start);}private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) {final int end;if (isSlice)end = this.end;elseend = this.sizeInBytes();//创建一个FileLogInputStream对象inputStream,并传入this、start和end作为参数。FileLogInputStream inputStream = new FileLogInputStream(this, start, end);//创建一个RecordBatchIterator对象,并将inputStream作为参数传入。//将创建的RecordBatchIterator对象作为返回值返回。return new RecordBatchIterator<>(inputStream);} 
}       

FileLogInputStream类实现了nextBatch()接口,这个接口是从基础输入流中获取下一个记录批次。

public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {/*** Create a new log input stream over the FileChannel* @param records Underlying FileRecords instance* @param start Position in the file channel to start from* @param end Position in the file channel not to read past*/FileLogInputStream(FileRecords records,int start,int end) {this.fileRecords = records;this.position = start;this.end = end;}@Overridepublic FileChannelRecordBatch nextBatch() throws IOException {//首先获取文件的通道(channel)FileChannel channel = fileRecords.channel();//检查是否达到了文件末尾或者下一个记录批次的起始位置。如果达到了文件末尾,则返回空(null)。if (position >= end - HEADER_SIZE_UP_TO_MAGIC)return null;//读取文件通道中的记录头部数据,并将其存储在一个缓冲区(logHeaderBuffer)logHeaderBuffer.rewind();Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header");//记录头部数据中解析出偏移量(offset)和记录大小(size)logHeaderBuffer.rewind();long offset = logHeaderBuffer.getLong(OFFSET_OFFSET);int size = logHeaderBuffer.getInt(SIZE_OFFSET);// V0 has the smallest overhead, stricter checking is done laterif (size < LegacyRecord.RECORD_OVERHEAD_V0)throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +"overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));//检查是否已经超过了文件末尾减去记录开销和记录大小的位置。如果超过了,则返回空(null)if (position > end - LOG_OVERHEAD - size)return null;//代码会根据记录头部的(magic)byte magic = logHeaderBuffer.get(MAGIC_OFFSET);//创建一个记录批次对象(batch)final FileChannelRecordBatch batch;if (magic < RecordBatch.MAGIC_V个LUE_V2)//则创建一个旧版本的记录批次对象batch = new LegacyFileChannelRecordBatch(offset, magic, fileRecords, position, size);else//否则创建一个默认版本的记录批次对象batch = new DefaultFileChannelRecordBatch(offset, magic, fileRecords, position, size);//代码会更新当前位置(position),以便下次读取下一个记录批次。position += batch.sizeInBytes();return batch;}
}    

2、把文件流构建成数据批量迭代器对象RecordBatchIterator

上文中的batchIterator方法会把文件流构造RecordBatchIterator对象

class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {private final LogInputStream<T> logInputStream;RecordBatchIterator(LogInputStream<T> logInputStream) {this.logInputStream = logInputStream;}@Overrideprotected T makeNext() {try {T batch = logInputStream.nextBatch();if (batch == null)return allDone();return batch;} catch (EOFException e) {throw new CorruptRecordException("Unexpected EOF while attempting to read the next batch", e);} catch (IOException e) {throw new KafkaException(e);}}
}

AbstractIterator抽象类

public abstract class AbstractIterator<T> implements Iterator<T> {private enum State {READY, NOT_READY, DONE, FAILED}private State state = State.NOT_READY;private T next;@Overridepublic boolean hasNext() {switch (state) {case FAILED:throw new IllegalStateException("Iterator is in failed state");case DONE:return false;case READY:return true;default:return maybeComputeNext();}}@Overridepublic T next() {if (!hasNext())throw new NoSuchElementException();state = State.NOT_READY;if (next == null)throw new IllegalStateException("Expected item but none found.");return next;}@Overridepublic void remove() {throw new UnsupportedOperationException("Removal not supported");}public T peek() {if (!hasNext())throw new NoSuchElementException();return next;}protected T allDone() {state = State.DONE;return null;}protected abstract T makeNext();private Boolean maybeComputeNext() {state = State.FAILED;next = makeNext();if (state == State.DONE) {return false;} else {state = State.READY;return true;}}}

调用RecordBatchIterator类的makeNext()方法,之后调用第五章节的FileLogInputStream中的nextBatch()

DefaultFileChannelRecordBatch这个是默认的

static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {DefaultFileChannelRecordBatch(long offset,byte magic,FileRecords fileRecords,int position,int batchSize) {super(offset, magic, fileRecords, position, batchSize);}@Overrideprotected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {return new DefaultRecordBatch(buffer);}@Overridepublic long baseOffset() {return offset;}//省略代码}

3、DefaultRecordBatch实现iterator方法,在内存中创建数据

之后看一下哪里调用的DefaultFileChannelRecordBatch中的toMemoryRecordBatch方法

DefaultRecordBatch,再通过这个batchiterator方法获取到Iterator<Record>

public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {@Override public Iterator<Record> iterator() {if (count() == 0)return Collections.emptyIterator();if (!isCompressed())return uncompressedIterator();// for a normal iterator, we cannot ensure that the underlying compression stream is closed,// so we decompress the full record set here. Use cases which call for a lower memory footprint// can use `streamingIterator` at the cost of additional complexitytry (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {List<Record> records = new ArrayList<>(count());while (iterator.hasNext())records.add(iterator.next());return records.iterator();}}
}    

DefaultFileChannelRecordBatchFileChannelRecordBatch的一个子类。FileChannelRecordBatch表示日志是通过FileChannel的形式来保存的。在遍历日志的时候不需要将日志全部读到内存中,而是在需要的时候再读取。我们直接看最重要的iterator方法

public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {protected final long offset;protected final byte magic;protected final FileRecords fileRecords;protected final int position;protected final int batchSize;private RecordBatch fullBatch;private RecordBatch batchHeader;FileChannelRecordBatch(long offset,byte magic,FileRecords fileRecords,int position,int batchSize) {this.offset = offset;this.magic = magic;this.fileRecords = fileRecords;this.position = position;this.batchSize = batchSize;}//省略代码@Overridepublic Iterator<Record> iterator() {return loadFullBatch().iterator();}//省略代码}     
 protected RecordBatch loadFullBatch() {if (fullBatch == null) {batchHeader = null;fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch");}return fullBatch;}

最后会调用DefaultFileChannelRecordBatch类型的toMemoryRecordBatch方法在内存中生成批量数据

   private RecordBatch loadBatchWithSize(int size, String description) {FileChannel channel = fileRecords.channel();try {ByteBuffer buffer = ByteBuffer.allocate(size);Utils.readFullyOrFail(channel, buffer, position, description);buffer.rewind();//在内存中生成数据return toMemoryRecordBatch(buffer);} catch (IOException e) {throw new KafkaException("Failed to load record batch at position " + position + " from " + fileRecords, e);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71004.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【笔试强训选择题】Day35.习题(错题)解析

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;笔试强训选择题 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01; 文章目录 前言 一、Da…

l8-d7 实现TCP通信

一、TCP服务器的实现(理论) #include <sys/types.h> #include <sys/socket.h> int socket(int domain, int type, int protocol); -domain: 指定通信域&#xff08;通信地址族&#xff09;; -type: 指定套接字类型; -protocol: 指定协议; 套接字类型与协议 -type:…

2023高教社杯 国赛数学建模B题思路 - 多波束测线问题

1 赛题 B 题 多波束测线问题 单波束测深是利用声波在水中的传播特性来测量水体深度的技术。声波在均匀介质中作匀 速直线传播&#xff0c; 在不同界面上产生反射&#xff0c; 利用这一原理&#xff0c;从测量船换能器垂直向海底发射声波信 号&#xff0c;并记录从声波发射到信…

Ansible-roles学习

目录 一.roles角色介绍二.示例一.安装httpd服务 一.roles角色介绍 roles能够根据层次型结构自动装载变量文件&#xff0c;tasks以及handlers登。要使用roles只需在playbook中使用include指令即可。roles就是通过分别将变量&#xff0c;文件&#xff0c;任务&#xff0c;模块以…

layui实现数据列表的复选框回显

layui版本2.8以上 实现效果如图&#xff1a; <input type"hidden" name"id" id"id" value"{:g_val( id,0)}"> <div id"tableDiv"><table class"layui-hide" id"table_list" lay-filter…

springmvc的转发和重定向的案例演示 到底什么是转发和重定向两者有什么区别?

案例演示 在Spring MVC中&#xff0c;你可以使用ModelAndView或直接返回String来实现转发和重定向。以下是转发和重定向的示例&#xff1a; ### 1. 使用ModelAndView #### 转发&#xff08;Forward&#xff09; java RequestMapping("/forwardDemo") public Model…

VB个人邮件处理系统设计与实现

简述 当今世界电子邮件已经是网络生活中不可或缺的,相信每个认知网络的人都会有一个或多个自己的电子邮箱,人们通过电子邮件进行通信和交流,许多商家和组织机构也用电子邮件进行各种商业活动和业务联系,毫无疑问,电子邮件已经逐渐开始取代普通的信件,成为为主流的信件交流…

C++线程同步

线程同步 为什么需要线程同步 对于下面的代码&#xff1a; #include <iostream> #include <pthread.h> #include <cstring> #include <unistd.h> using namespace std;#define NUM_THREAD 100long long num 0;void* thread_inc(void* arg){for (i…

MYSQL学习之——约束

MYSQL学习之——约束 这个东西看起来好像是新开的一章&#xff0c;没有见过&#xff0c;但是前面的那个 CREATE TABLE employee&#xff08;字段1 value1 comment1 约束1,字段2 value2 comment2 约束2,... &#xff09;看到这个没有&#xff0c;创建表的时候就可以给字段添加…

点云切片的实现(PCL)C++

一、实现逻辑 1、通过PCL库的getMinMax3D得到xyz轴上的最大最小值&#xff1b; 函数原型&#xff1a; pcl::getMinMax3D(const pcl::PointCloud<PointT> &cloud, POintT &min_pt, PointT &max_pt) 2、设置切片厚度&#xff0c;计算某一轴方向上的切片数量&a…

搭建云原生环境

1.安装准备工作 确保所有被安装服务器时区和时间一致。时间不一致会影响 Elasticsearch 和 Skywalking 等信息无法采集的情况出现。 在各个服务器上安装时间同步命令工具&#xff1a;yum install ntp -y使用 ntpdate 命令&#xff0c;从时间服务器上进行同步&#xff0c;例如&a…

【智慧工地源码】物联网和传感器技术在智慧工地的应用

物联网&#xff08;IoT&#xff09;和传感器技术在智慧工地中扮演着至关重要的角色。这些技术的应用&#xff0c;使得智慧工地能够实现对施工过程的精确监控、数据收集和分析&#xff0c;以及设备互联&#xff0c;从而提高工程效率、减少成本并改善工人的工作环境。 一、物联网…

【SpringBoot】mockito+junit 单元测试

1.POM 引入以下依赖 <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>org.springframework.b…

ThreadLocal

ThreadLocal 参考&#xff1a;https://blog.csdn.net/u010445301/article/details/111322569 ThreadLocal简介 作用&#xff1a;实现线程范围内的局部变量&#xff0c;即ThreadLocal在一个线程中是共享的&#xff0c;在不同线程之间是隔离的。 原理&#xff1a;ThreadLocal存…

如何使用CSS画一个三角形

原理&#xff1a;其实就是规定元素的四个边框颜色及边框宽度&#xff0c;将元素宽高设置为0。如果要哪个方向的三角形&#xff0c;将对应其他三个方向的边框宽和颜色设置为0和透明transparent即可 1.元素设置边框&#xff0c;宽高&#xff0c;背景色 <style>.border {w…

单月打造8个10w+,情感类视频号如何爆火?

上月&#xff0c;腾讯公布了2023年Q2财报&#xff0c;其中&#xff0c;较为亮眼的是微信视频号的广告收入。据财报显示&#xff0c;二季度视频号用户使用时长与去年同期相比几乎翻倍&#xff0c;广告收入超过30亿元。作为微信生态的核心组件&#xff0c;视频号的内容生态呈现出…

NumPy模块:Python科学计算神器之一

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…

【小沐学NLP】Python使用NLTK库的入门教程

文章目录 1、简介2、安装2.1 安装nltk库2.2 安装nltk语料库 3、测试3.1 分句分词3.2 停用词过滤3.3 词干提取3.4 词形/词干还原3.5 同义词与反义词3.6 语义相关性3.7 词性标注3.8 命名实体识别3.9 Text对象3.10 文本分类3.11 其他分类器3.12 数据清洗 结语 1、简介 NLTK - 自然…

MPDIoU: A Loss for Efficient and Accurate Bounding BoxRegression

MPDIoU: A Loss for Efficient and Accurate Bounding BoxRegression MPDIoU:一个有效和准确的边界框损失回归函数 摘要 边界框回归(Bounding box regression, BBR)广泛应用于目标检测和实例分割&#xff0c;是目标定位的重要步骤。然而&#xff0c;当预测框与边界框具有相同的…

突破传统显示技术,探索OLED透明屏的亮度革命

OLED透明屏作为未来显示技术的颠覆者&#xff0c;其亮度性能成为其引人注目的特点之一。 那么&#xff0c;今天尼伽便深入探讨OLED透明屏的亮度&#xff0c;通过引用数据、报告和行业动态&#xff0c;为读者提供高可读性和专业性强的SEO软文&#xff0c;增加可信度和说服力。 …