kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

28fcb114ce2f3fba9fdcc18109c6976e.png

写在开头:

本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点。

文章内容输出来源:拉勾教育大数据高薪训练营。

一致性保证

水位标记

水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)。

LEO和HW

每个分区副本对象都有两个重要的属性:LEO和HW

LEO:即日志末端位移(log end offset),记录了该副本日志中下一条消息的位移值。如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有区别的。

HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于 HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同

5d31db3c27b0b6e6c85dee37962a9dc5.png

上图中,HW值是7,表示位移是 0~7 的所有消息都已经处于“已提交状态”(committed),而LEO值是14,8~13的消息就是未完全备份(fully replicated)——为什么没有14?LEO指向的是下一条消息到来时的位移。

消费者无法消费分区下Leader副本中位移大于分区HW的消息

Follower副本何时更新LEO

Follower副本不停地向Leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先我必须言明,Kafka有两套Follower副本

LEO:

1. 一套LEO保存在Follower副本所在Broker的副本管理机中;

2. 另一套LEO保存在Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的follower副本的LEO。

Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW。

1. Follower副本的本地LEO何时更新? Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,LEO值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从而自动更新LEO值。

2. Leader端Follower的LEO何时更新? Leader端的Follower的LEO更新发生在Leader在处理 Follower FETCH请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取 相应的数据,给Follower返回数据前,先更新Follower的LEO。

Follower副本何时更新HW

Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。

比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。

即:如果Follower的LEO大于Leader的HW,Follower HW值不会大于Leader的HW值。

Leader副本何时更新LEO

和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。

Leader副本何时更新HW值

Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性

Leader会尝试去更新分区HW的四种情况:

1. Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。

2. Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。

3. 生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需要更新

4. Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值

结论:

当Kafka broker都正常工作时,分区HW值的更新时机有两个:

1. Leader处理PRODUCE请求时

2. Leader处理FETCH请求时。

Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择最小的LEO值作为HW值

需要满足的条件,(二选一):

1. 处于ISR中

2. 副本LEO落后于Leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)

如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最小值

消息重复的场景及解决方案

消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段:

1. 生产者阶段

2. broke阶段

3. 消费者阶段

生产者阶段重复场景

生产发送的消息没有收到正确的broke响应,导致生产者重试。

生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。

生产者发送重复解决方案

启动kafka的幂等性

要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1

ack=0,不重试。 可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

生产者和broke阶段消息丢失场景

ack=0,不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了。

ack=1,leader crash

生产者发送消息完,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失

unclean.leader.election.enable 配置true

允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失

解决生产者和broke阶段消息丢失

禁用unclean选举,ack=all

ack=all / -1,tries > 1,unclean.leader.election.enable=false

生产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。 不允许unclean Leader选举。

配置:min.insync.replicas > 1

当生产者将 acks 设置为 all (或 -1 )时, min.insync.replicas>1 。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

当一起使用时, min.insync.replicas 和 ack 允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。

失败的offset单独记录

生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理。

消费者数据重复场景及解决方案

数据消费完没有及时提交offset到broker。

消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。

解决方案

取消自动提交

每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。

下游做幂等

一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把 offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。

__consumer_offsets

Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

创建topic “tp_test_01”

kafka-topics.sh --zookeeper node1:2181/myKafka --create -- 
topic tp_test_01 --partitions 5 --replication-factor 1

使用kafka-console-producer.sh脚本生产消息

[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> messages.txt; 
done 
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic 
tp_test_01 < messages.txt 

由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了60条消息)

验证消息生产成功

kafka-run-class.sh  kafka.tools.GetOffsetShell 
--broker-list node1:9092 --topic tp_test_01 --time  -1

78cc76e1d5c96e5e6eaf8e10b9043382.png

创建一个console consumer group

kafka-console-consumer.sh 
--bootstrap-server linux121:9092 --topic tp_test_01 --from-beginning

获取该consumer group的group id(后面需要根据该id查询它的位移信息)

kafka-consumer-groups.sh --bootstrap-server linux121:9092 --list

d78de41e681721442d3a21b91f84ccc0.png

查询__consumer_offsets topic所有内容

注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false

kafka-console-consumer.sh --topic __consumer_offsets 
--bootstrap-server node1:9092 --formatter 
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" 
--consumer.config config/consumer.properties --from-beginning 

默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

计算指定consumer group在__consumer_offsets topic中分区信息

这时候就用到了group.id :console-consumer-77682

Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

Math.abs(groupID.hashCode()) % numPartitions 

ea6c644321739f52490d9946c4a957b1.png

9efeba35cb4e97bb92a5036bc220397c.png

__consumer_offsets的分区41保存了这个consumer group的位移信息。

获取指定consumer group的位移信息

kafka-simple-consumer-shell.sh --topic __consumer_offsets 
--partition 41  --broker-list linux121:9092 
--formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

a852b2f9067f8699287963355472df8f.png

可以看到__consumer_offsets topic的每一日志项的格式都是:

[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

延时队列

两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。

95549b512b92a6125053e4bd4276c745.png

Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。

延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。

对于延时生产(消息)而言,如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。

4da1cbad3a68e740f8e433800ce56ffc.png

假设某个分区有3个副本:leader、follower1和follower2,它们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求后,将消息3和消息4写入leader副本的本地日志文件。

由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。

b1bd0bc6373d47c5e529990f796d225f.png

那么这里等待消息3和消息4写入follower1副本和follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。

延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

就延时生产操作而言,它的外部事件是所要写入消息的某个分区的HW(高水位)发生增长。也就是说,随着follower副本不断地与leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。

延时拉取操作,是由超时触发或外部事件触发而被执行的。超时触发很好理解,就是等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍复杂了一些,因为拉取请求不单单由follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也是不同的。如果是follower副本的延时拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消费者客户端的延时拉取,它的外部事件可以简单地理解为HW的增长。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/530367.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

eclipse debug 工程源码时出现source not found问题解决

问题描述&#xff1a;使用eclipse debug启动应用&#xff0c;并且打断点在工程的源码上面&#xff0c;提示source not found。 问题解决&#xff1a; 1、选中工程&#xff0c;右键Debug As》Debug Configurations 2、在Java Application下面选中需要debug的程序&#xff0c;然…

代码中有个get是啥意思_是时候秀一波了,甩掉get和set,Lombok让代码更简洁

前言前几天有个新来的同事(实习生)惊讶的对我说&#xff1a;我们的代码里好多错误&#xff0c;我的程序本地都启动不了。我一脸懵逼的质问他&#xff1a;目前线上的代码&#xff0c;怎么会有问题吗&#xff1f;他不服气的说&#xff1a;你来看嘛&#xff0c;就是有问题&#xf…

JavaWeb工程师知识图谱

一个工作快三年的的Java菜鸟&#xff0c;总结梳理了一下JavaWeb工程师必须掌握的一些知识点&#xff08;持续更新中。。。&#xff09;。 预览效果 xmind原始文件 百度云盘 链接&#xff1a;https://pan.baidu.com/s/1hp3MWGOX2I8APw75Suu52Q 提取码&#xff1a;j6w6

松下a6伺服x4接线图_2021中山东凤松下温控器回收价高同行

2021中山东凤松下温控器回收价高同行西门子TDC&#xff0c;西门子存储卡,西门子变频器等全线西门子自动化产品。小汪 满意的价格&#xff0c;快的付款速度&#xff0c;热诚欢迎全国各地朋友洽谈合作。具体回收业务&#xff1a;SIEMENS可编程控制器 1、SIMATIC&#xff0c;S7系列…

eclipse启动发生Failed to load JNI shared library

今天启动eclipse发生下面的情况 从网上知道是eclipse和jdk位数不一致导致的。 输入java -version ,查看JDK是多少位&#xff0c;显示64位的就是64位JDK&#xff0c;未显示的为32位的JDK。 eclipse的安装目录下有一个叫eclipse.ini的配置文件&#xff0c;打开后能看到 x86_64说…

5条件筛选功能_一分钟,彻底学会Excel高级筛选,坐等升职加薪!

Excel中高级筛选是普通筛选的加强&#xff0c;能够实现更加复杂的筛选功能。请您看下面的示例图&#xff1a;数据示例图如果要求筛选出班级为2班且语文成绩大于100分的数据&#xff0c;那么使用普通筛选连续筛选两次就可以得到结果。请您看下面的操作演示&#xff1a;普通筛选操…

工程图标注粗糙度_Inventor教程之工程图标注实例

1工程图标注实例对以下实体零件进行全部的标注演示。操作步骤如下&#xff1a;(1)打开文件。运行Inventor&#xff0c;单击“快速入门”选项卡“启动”面板上的“打开”按钮&#xff0c;在“打开”对话框中选择“实体零件”&#xff0c;单击“打开”按钮进入实体零件。(2)新建工…

定时线程_SpringBoot定时任务,@Async多线程异步执行

一、使用SpringBoot实现定时任务这个不是重点&#xff0c;就简单的实现一下&#xff0c;至于cron表达式怎么写也不是重点&#xff0c;自行百度即可。1-1、基于 Scheduled 注解的方式import org.springframework.scheduling.annotation.EnableScheduling;import org.springframe…

mysql怎么把datetime类型转换_mysql怎样实现time转datetime

mysql实现time转datetime的方法&#xff1a;使用在sql语句中【FROM_UNIXTIME(时间值)】&#xff0c;代码为【insert into test(time) values(FROM_UNIXTIME(%d))",time(NULL)】。mysql实现time转datetime的方法&#xff1a;FROM_UNIXTIME(time(NULL))将liunx系统的time_t类…

SpringBoot入门二

参考Spring Boot Starters - 御坂研究所 创建自己的starter starter是依赖的一种synthesize&#xff08;合成&#xff09;。 starter会把需要用到的依赖全部包含进来&#xff0c;避免开发者自己手动引入依赖。 starter的逻辑 pom.xml<parent><groupId>org.spri…

Tomcat入门

一&#xff0c;tomcat启动 双击startup.bat,如果出现一闪而过的情况&#xff0c;在文件的末尾添加pause&#xff0c;就可以看到环境变量设置的路径是否正确 如果无法在电脑的高级系统设置中设置环境变量&#xff0c;可以在setclasspath.bat中设置环境变量 set JAVA_HOMEC:\P…

线程组的概念

一&#xff0c;线程组和线程的结构&#xff1a;树形结构 每个Thread必然存在于一个ThreadGroup中&#xff0c;Thread不能独立于ThreadGroup存在。 执行main()方法线程的名字是main 如果在new Thread时没有显式指定&#xff0c;那么默认将父线程&#xff08;当前执行new Threa…

delphi7 mysql控件_Delphi7连接MySql数据库-DBGrid控件显示数据

一个简单的Delphi7小程序&#xff0c;使用MySql数据库做简单查询&#xff0c;用DBGrid控件显示结果&#xff0c;实现过程如下&#xff1a;(1)在MySql中新建demouser表&#xff0c;插入记录用于测试。(2)在Delphi7中新建项目。(3)在From中添加组件。组件Panel&#xff1a;pnl1组…

filtic函数 matlab_matlab filtic 函数应用 filter 解差分方程 dft 函数

matlab filtic 函数应用 filter 解差分方程 dft 函数一、 解差分方程说明都在代码注释里面了%这里要利用filtic函数 为滤波器的直接II型实现选择初始条件%求解查分方程 y(n) - 0.4y(n-1) - 0.45y(n-2) 0.45x(n) 0.4x(n-1) - x(n-2)%y(-1) 0 y(-2) 1 x(-1) 1 x(-2) 2%x(n)…

rabbitmq进阶一

上一篇文章有讲到rabbitmq的安装、web管理端和springboot简单集成rabbitmq 本文重点介绍rabbitmq相关api的使用 按照官网常用的五种模式的顺序&#xff1a;HelloWorld、Work queues、Publish/Subscribe、Routing、Topics 模式简单介绍 HelloWorld 一个生产者&#xff0c;一…

mysql一直copying to tmp table_mysql提示Copying to tmp table on disk

网站运行的慢了&#xff0c;查找原因是Copying to tmp table on disk那怎么解决这个问题呢解决一例最近常常碰到网站慢的情况&#xff0c;登陆到后台&#xff0c;查询一下 /opt/mysql/bin/mysqladmin processlist;发现一个查询状态为&#xff1a; Copying to tmp table 而且此查…

idea cloud bootstrap是啥_application.yml与bootstrap.yml的区别

Spring Boot 默认支持 properties(.properties) 和 YAML(.yml .yaml ) 两种格式的配置文件&#xff0c;yml 和 properties 文件都属于配置文件&#xff0c;功能一样。Spring Cloud 构建于 Spring Boot 之上&#xff0c;在 Spring Boot 中有两种上下文&#xff0c;一种是 bootst…

元数据解决分表不可 mysql_MySQL InnoDB技术内幕:内存管理、事务和锁

前面有多篇文章介绍过MySQL InnoDB的相关知识&#xff0c;今天我们要更深入一些&#xff0c;看看它们的内部原理和机制是如何实现的。一、内存管理我们知道&#xff0c;MySQl是一个存储系统&#xff0c;数据最后都写在磁盘上。我们以前也提到过&#xff0c;磁盘的速度特别是大容…

navicat for mysql 13_Navicat for MySQL下载

Navicat for MySQL 是一套管理和开发 MySQL 或 MariaDB 的理想解决方案。它使你以单一程序同时连接到 MySQL 和 MariaDB。这个功能齐备的前端软件为数据库管理、开发和维护提供了直观而强大的图形界面。它提供了一组全面的工具给 MySQL 或MariaDB 新手&#xff0c;同时给专业人…

spring兼容mysql_springboot 最新版本支持 mysql6.0.6吗

缥缈止盈1.首先在pom文件中加入下列依赖,一个使用jpa所需依赖,一个连接MySQL使用的依赖:mysqlmysql-connector-javaorg.springframework.bootspring-boot-starter-data-jpa 123456789102.在配置文件中添加datasource配置和jpa配置,在mysql中已经提前创建了一个名为db_test的数据…