kafka 3.5 生产者在把数据推送到服务端,再到落盘的过程中,怎么保证不丢失数据源码

  • 一、生产者客户端配置参数acks说明
    • 1、acks=1
    • 2、acks=0
    • 3、acks=-1
  • 二、请求在写入Leader的数据管道之前,则会验证Leader的ISR副本数量和配置中的最小ISR数量
    • 1、Leader的ISR小于配置文件中minInSyncReplicas,并且acks=-1,则抛异常
    • 2、如果acks不等于-1,则就算Leader的ISR小于配置,也会正常执行写入数据管道操作
  • 三、请求把数据写入到Leader的数据管道后,acks=-1和非-1,有不同的逻辑
    • 1、如果acks=-1,则会创建延迟Produce请求,等待ISR中所有副本的响应
    • 2、如果acks不等于-1,写入到Leader的数据管道后,则直接执行回调函数返回结果
  • 四、在返回response时,回调函数会遍历分区异常信息
    • 1、如果acks=0,则关闭套接字服务器
    • 2、如果acks不等0,会返回异常信息

一、生产者客户端配置参数acks说明

首先,客户端需要配置一个acks参数,默认值是1,下面是acks各个值的说明
acks=-1,太慢,acks=0,有风险,acks=1,则是推荐,所以也是默认值的原因

1、acks=1

这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。

2、acks=0

表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1

3、acks=-1

这意味着leader需要等待ISR中所有备份都成功写入日志。只要任何一个备份存活,数据都不会丢失。min.insync.replicas指定必须确认写入才能被认为成功的副本的最小数量。

二、请求在写入Leader的数据管道之前,则会验证Leader的ISR副本数量和配置中的最小ISR数量

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,requestLocal: RequestLocal): LogAppendInfo = {//函数首先获取leaderIsrUpdateLock的读锁,以确保对Leader和ISR(In-Sync Replica)的更新操作是同步的。val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {//然后检查当前是否有Leader日志,leaderLogIfLocal match {//如果存在Leader日志,case Some(leaderLog) =>//则获取最小ISR(MinInSyncReplicas)的配置和ISR的大小。val minIsr = leaderLog.config.minInSyncReplicasval inSyncSize = partitionState.isr.size// Avoid writing to leader if there are not enough insync replicas to make it safe,如果没有足够的不同步副本来使其安全,请避免写入领导者//如果ISR的大小小于最小ISR要求,并且requiredAcks的值为-1(表示不需要确认),则抛出NotEnoughReplicasException异常。if (inSyncSize < minIsr && requiredAcks == -1) {throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")}//调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,interBrokerProtocolVersion, requestLocal)// we may need to increment high watermark since ISR could be down to 1,// 我们可能需要增加高水位线,因为 ISR 可能降至 1(info, maybeIncrementLeaderHW(leaderLog))//如果没有,则抛出NotLeaderOrFollowerException异常。case None =>throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))}}//返回追加记录的信息,并根据是否增加了Leader高水位线,将LeaderHwChange.INCREASED或LeaderHwChange.SAME复制给返回信息的副本。info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)}

1、Leader的ISR小于配置文件中minInSyncReplicas,并且acks=-1,则抛异常

会验证acks=-1并且当前Leader的ISR副本数量小于配置中规定的最小值

val minIsr = leaderLog.config.minInSyncReplicasval inSyncSize = partitionState.isr.sizeif (inSyncSize < minIsr && requiredAcks == -1) {throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")}

2、如果acks不等于-1,则就算Leader的ISR小于配置,也会正常执行写入数据管道操作

 //调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,interBrokerProtocolVersion, requestLocal)

三、请求把数据写入到Leader的数据管道后,acks=-1和非-1,有不同的逻辑

这里不从头开始,如果想知道推送的数据怎么到下面方法的,可以看kafka 3.5 kafka服务端接收生产者发送的数据源码

/***将消息附加到分区的领导副本,并等待它们复制到其他副本;当超时或满足所需的 ACK 时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁* 请注意,所有挂起的延迟检查操作都存储在队列中。所有 ReplicaManager.appendRecords() 的调用方都应为所有受影响的分区调用 ActionQueue.tryCompleteActions,而不会保留任何冲突的锁。*/def appendRecords(timeout: Long,requiredAcks: Short,internalTopicsAllowed: Boolean,origin: AppendOrigin,entriesPerPartition: Map[TopicPartition, MemoryRecords],responseCallback: Map[TopicPartition, PartitionResponse] => Unit,delayedProduceLock: Option[Lock] = None,recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {//省略代码                //把数据中加入到本地Log                val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,origin, entriesPerPartition, requiredAcks, requestLocal)//省略代码      //调用recordConversionStatsCallback方法,将每个分区的记录转换统计信息传递给回调函数。recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })//通过 delayedProduceRequestRequired 方法判断是否需要等待其它副本完成写入,如果 acks = -1,则需要等待所有副本的回应if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {//根据条件判断是否需要创建延迟的produce操作。如果需要,创建一个DelayedProduce对象,并将它添加到delayedProducePurgatory中。val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)//创建(主题、分区)对的列表,以用作此延迟生成操作的键val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq// 再一次尝试完成该延时请求//  如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)} else {//如果不需要延迟操作,直接将produce的结果返回给回调函数。val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }responseCallback(produceResponseStatus)}else{//如果不需要延迟操作,直接将produce的结果返回给回调函数。// we can respond immediatelyval produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }responseCallback(produceResponseStatus)}//省略代码}

1、如果acks=-1,则会创建延迟Produce请求,等待ISR中所有副本的响应

  //它用于判断是否需要延迟发送生产请求并等待复制完成// 1. required acks = -1 判断requiredAcks是否等于-1,即是否需要等待所有副本的确认。// 2. there is data to append 判断entriesPerPartition是否不为空,即是否有要追加的数据。// 3. at least one partition append was successful (fewer errors than partitions) 计算localProduceResults中异常定义的数量,判断其是否小于entriesPerPartition的大小,即是否至少有一个分区的追加操作成功(即比分区数少的错误,如果全错,就应该直接返回)。private def delayedProduceRequestRequired(requiredAcks: Short,entriesPerPartition: Map[TopicPartition, MemoryRecords],localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {requiredAcks == -1 &&entriesPerPartition.nonEmpty &&localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size}
 /***检查操作是否可以完成,如果没有,则根据给定的监视键进行监视*请注意,可以在多个密钥上监视延迟操作。对于某些键(但不是所有键),操作可能会在添加到监视列表后完成。* 在这种情况下,操作被视为已完成,不会添加到其余键的监视列表中。过期收割线程将从存在该操作的任何观察程序列表中删除此操作。* @param operation the delayed operation to be checked 要检查的延迟操作* @param watchKeys keys for bookkeeping the operation 用于监视的键* @return true iff the delayed operations can be completed by the caller 如果延迟操作可以由调用方完成,则为 true*/def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {assert(watchKeys.nonEmpty, "The watch key list can't be empty")//尝试完成操作,如果操作不能立即完成,则将操作添加到所有观察键的观察列表中,并递增estimatedTotalOperations计数器的值if (operation.safeTryCompleteOrElse {watchKeys.foreach(key => watchForOperation(key, operation))if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()}) return true//如果操作仍未完成,则根据条件执行以下操作:if (!operation.isCompleted) {//如果启用了定时器(timerEnabled为真),则将操作添加到超时定时器中。if (timerEnabled)timeoutTimer.add(operation)//如果操作已完成,则取消定时器任务。if (operation.isCompleted) {// cancel the timer taskoperation.cancel()}}//返回false表示操作未完成。false}

2、如果acks不等于-1,写入到Leader的数据管道后,则直接执行回调函数返回结果

	   //如果不需要延迟操作,直接将produce的结果返回给回调函数。// we can respond immediatelyval produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }responseCallback(produceResponseStatus)

四、在返回response时,回调函数会遍历分区异常信息

//用于发送 produce 响应的回调 ProduceResponse 的构造能够接受自动生成的协议数据,因此 KafkaApishandleProduceRequest 应应用自动生成的协议以避免额外的转换@nowarn("cat=deprecation")def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponsesvar errorInResponse = falsemergedResponseStatus.forKeyValue { (topicPartition, status) =>if (status.error != Errors.NONE) {errorInResponse = truedebug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(request.header.correlationId,request.header.clientId,topicPartition,status.error.exceptionName))}}//记录带宽和请求配额特定的值,并在违反任何配额时通过静音通道来限制。如果违反了两个配额,请使用两个配额之间的最大限制时间。请注意,如果 acks == 0,则不会强制执行请求配额。val timeMs = time.milliseconds()val requestSize = request.sizeInBytesval bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, requestSize, timeMs)val requestThrottleTimeMs =if (produceRequest.acks == 0) 0else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)if (maxThrottleTimeMs > 0) {request.apiThrottleTimeMs = maxThrottleTimeMsif (bandwidthThrottleTimeMs > requestThrottleTimeMs) {requestHelper.throttle(quotas.produce, request, bandwidthThrottleTimeMs)} else {requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)}}//如果produceRequest.acks等于0,表示不需要响应。if (produceRequest.acks == 0) {//如果生产者请求,则无需操作;//但是,如果在处理请求时出现任何错误,由于生产者期望没有响应,服务器将关闭套接字服务器,以便生产者客户端知道发生了一些错误并刷新其元数据if (errorInResponse) {//如果errorInResponse为true,则关闭连接并发送包含错误信息的ProduceResponse响应val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>topicPartition -> status.error.exceptionName}.mkString(", ")info(s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +s"from client id ${request.header.clientId} with ack=0\n" +s"Topic and partition to exceptions: $exceptionsSummary")requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)} else {//如果没有异常,发送无操作的响应。requestHelper.sendNoOpResponseExemptThrottle(request)}} else {//如果produceRequest.acks不等于0,将mergedResponseStatus和maxThrottleTimeMs作为参数构造ProduceResponse响应,并通过requestChannel发送响应。requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)}}

1、如果acks=0,则关闭套接字服务器

		//如果生产者请求,则无需操作;//但是,如果在处理请求时出现任何错误,由于生产者期望没有响应,服务器将关闭套接字服务器,以便生产者客户端知道发生了一些错误并刷新其元数据if (errorInResponse) {//如果errorInResponse为true,则关闭连接并发送包含错误信息的ProduceResponse响应val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>topicPartition -> status.error.exceptionName}.mkString(", ")info(s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +s"from client id ${request.header.clientId} with ack=0\n" +s"Topic and partition to exceptions: $exceptionsSummary")requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)}

2、如果acks不等0,会返回异常信息

 requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/78851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel VLOOKUP 初学者教程:通过示例学习

目录 前言 一、VLOOKUP的用法 二、应用VLOOKUP的步骤 三、VLOOKUP用于近似匹配 四、在同一个表里放置不同的VLOOKUP函数 结论 前言 Vlookup&#xff08;V 代表“垂直”&#xff09;是 excel 中的内置函数&#xff0c;允许在 excel 的不同列之间建立关系。 换句话说&#x…

iPhone苹果15手机怎么看是国行还是美版或港版的苹果iPhone15手机?

iPhone苹果手机15机型区域版本识别代码 CH代码为国行 LL代码为美版 ZP代码为港版 iPhone苹果15手机怎么看是国行还是美版或港版的苹果iPhone15手机&#xff1f; 1、打开苹果iPhone15手机桌面上的「设置」&#xff1b; 2、在iPhone苹果15手机设置内找到「通用」并点击打开&…

大型游戏动作竞技游戏开发和体感VR/AR游戏开发:创造引人入胜的虚拟世界

大型游戏动作竞技游戏和体感VR/AR游戏都代表了游戏开发领域的最新趋势。它们提供了高度沉浸式的娱乐体验&#xff0c;结合了视觉、听觉和体感互动。在本文中&#xff0c;我们将探讨如何开发这两种类型的游戏&#xff0c;并介绍其关键特点和开发流程。 大型游戏动作竞技游戏的特…

Spring学习 (一): IoC容器

前言 参考 廖雪峰Spring教程 一、什么是IoC容器 容器的意思可以理解为一个提供供程序正常运行&#xff0c;提供各种依赖的组件的包的环境。 IoC&#xff0c;控制反转&#xff0c;实际上就是将原本由代码编写者控制的各个对象&#xff08;组件&#xff09;的生命周期托管给底…

Java手写HashMap及拓展实践

Java手写HashMap 思维导图 #mermaid-svg-liNfjvnThNZyNIWd {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-liNfjvnThNZyNIWd .error-icon{fill:#552222;}#mermaid-svg-liNfjvnThNZyNIWd .error-text{fill:#552222;…

【OJ比赛日历】快周末了,不来一场比赛吗? #09.16-09.22 #12场

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 2023-09-16&#xff08;周六&#xff09; #3场比赛2023-09-17…

【FPGA项目】进阶版沙盘演练——报文收发(报文处理、CDC、CRC)

前言 书接上文【FPGA项目】沙盘演练——基础版报文收发_子墨祭的博客-CSDN博客&#xff0c;前面我们做了基础版的报文收发&#xff0c;相信对逻辑设计有了一定的认知&#xff0c;在此基础上&#xff0c;继续完善一个实际报文收发可能会遇到的一些处理&#xff1a; 报文处理握手…

公交查询系统

目录 需求分析 1 概述 2 课题分析 3 实现功能步骤 4 项目背景 概要设计 1 系统流程图. 2 功能模块. 3 各功能模块 4 数据存储 5 类设计 三、详细设计 1公交线路查询系统用户界面 2公交信息存储模快 3公交信息查询模块 4用户信息输入和输出模块 四、调试分析 五、使用说明 六、…

STM32外部复位IC与看门狗冲突,无法复位问题解决方案

使用STM32H743制作了一款飞控&#xff0c;外部复位IC采用MAX809STR,打板完后&#xff0c;烧录飞控固件后大量板子无法正常启动&#xff0c;怀疑是晶振没有起振或MCU未焊接好&#xff0c;检查后均焊接正常&#xff0c;编写裸机LED定时闪烁验证程序可正常运行。经网上查询资料锁定…

Python 环境搭建,集成开发环境IDE: PyCharm

Python 环境搭建,集成开发环境IDE: PyCharm 一、Python 环境搭建二、Python下载三、Python安装四、环境变量配置五、Python 环境变量六、运行Python1、交互式解释器&#xff1a;2、命令行脚本3、集成开发环境&#xff08;IDE&#xff1a;Integrated Development Environment&am…

Hadoop-Hive

1. hive安装部署 2. hive基础 3. hive高级查询 4. Hive函数及性能优化 1.hive安装部署 解压tar -xvf ./apache-hive-3.1.2-bin.tar.gz -C /opt/soft/ 改名mv apache-hive-3.1.2-bin/ hive312 配置环境变量&#xff1a;vim /etc/profile #hive export HIVE_HOME/opt/soft/hive…

软件测试的基本流程是什么?软件测试流程详细介绍

软件测试和软件开发一样&#xff0c;是一个比较复杂的工作过程&#xff0c;如果无章法可循&#xff0c;随意进行测试势必会造成测试工作的混乱。为了使测试工作标准化、规范化&#xff0c;并且快速、高效、高质量地完成测试工作&#xff0c;需要制订完整且具体的测试流程。 01…

JavaScript的DOM操作(二)

一、元素的特性attribute 1.元素的属性和特性 前面我们已经学习了如何获取节点&#xff0c;以及节点通常所包含的属性&#xff0c;接下来我们来仔细研究元素Element。 我们知道&#xff0c;一个元素除了有开始标签、结束标签、内容之外&#xff0c;还有很多的属性&#xff0…

Flutter 使用pageview无缝隙自动轮播教程

导入要使用的轮播图片 List<String> imagesa ["assets/images/car_qidian.jpg","assets/images/car_bg.jpg","assets/images/car_bg.jpg","assets/images/car_bg.jpg","assets/images/car_bg.jpg","assets/imag…

【算法与数据结构】450、LeetCode删除二叉搜索树中的节点

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;本题首先要分析删除节点的五种情况&#xff1a; 1、没有找到节点2、找到节点 左右子树为空左子树为空…

docker容器管理-实操命令

本单元主要是在docker镜像管理下进一步的培训学习文档。 docker镜像管理-实操_忍冬行者的博客-CSDN博客 四.容器管理 1.运行一个容器 docker container run --name c1 -it nginx:latest /bin/sh 2.后台运行一个容器 docker container run --name c1 -it -d nginx:latest 3.查…

微信小程序项目开发Day1

没接触过&#xff0c;直接看视频学习&#xff1a; 千锋教育微信小程序开发制作前端教程&#xff0c;零基础轻松入门玩转微信小程序_哔哩哔哩_bilibili千锋教育微信小程序开发制作前端教程&#xff0c;零基础轻松入门玩转微信小程序共计56条视频&#xff0c;包括&#xff1a;学…

List 获取前N条数据

1.使用for循环遍历 public static void main(String[] args) {int limit 5;List<Integer> oldList Lists.newArrayList(1, 2, 3, 4, 5, 6, 7);List<Integer> newList Lists.newArrayList();if (oldList.size() < limit) {newList.addAll(oldList);return;}fo…

软件工程课件

软件工程 考点概述软件工程概述能力成度模型能力成熟度模型集成软件过程模型逆向工程软件需求需求获取数据流图 需求定义 考点概述 重点章节 软件工程概述 之前老版教程的&#xff0c;之前考过 能力成度模型 记忆 能力等级 和 特点 能力成熟度模型集成 相比于CMM&#xff0c;第…

结合el-input、el-select实现纯前端过滤树形el-table数据

样式图示 1.搜索实现方法 const searchBtn async () > {// 获取table列表数据接口const res await Api.menuList({paging: false})if (res.code 200) {// 把树形结构转成扁平结构let result treeToArray(res.data)// 处理搜索框中数据进行table显示项过滤if(commonData…