写在开头:
本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点。
文章内容输出来源:拉勾教育大数据高薪训练营。
一致性保证
水位标记
水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)。
LEO和HW
每个分区副本对象都有两个重要的属性:LEO和HW
LEO:即日志末端位移(log end offset),记录了该副本日志中下一条消息的位移值。如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有区别的。
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于 HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同
上图中,HW值是7,表示位移是 0~7 的所有消息都已经处于“已提交状态”(committed),而LEO值是14,8~13的消息就是未完全备份(fully replicated)——为什么没有14?LEO指向的是下一条消息到来时的位移。
消费者无法消费分区下Leader副本中位移大于分区HW的消息
Follower副本何时更新LEO
Follower副本不停地向Leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先我必须言明,Kafka有两套Follower副本
LEO:
1. 一套LEO保存在Follower副本所在Broker的副本管理机中;
2. 另一套LEO保存在Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的follower副本的LEO。
Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW。
1. Follower副本的本地LEO何时更新? Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,LEO值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从而自动更新LEO值。
2. Leader端Follower的LEO何时更新? Leader端的Follower的LEO更新发生在Leader在处理 Follower FETCH请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取 相应的数据,给Follower返回数据前,先更新Follower的LEO。
Follower副本何时更新HW
Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。
比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。
即:如果Follower的LEO大于Leader的HW,Follower HW值不会大于Leader的HW值。
Leader副本何时更新LEO
和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。
Leader副本何时更新HW值
Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性
Leader会尝试去更新分区HW的四种情况:
1. Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。
2. Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。
3. 生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需要更新
4. Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值
结论:
当Kafka broker都正常工作时,分区HW值的更新时机有两个:
1. Leader处理PRODUCE请求时
2. Leader处理FETCH请求时。
Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择最小的LEO值作为HW值。
需要满足的条件,(二选一):
1. 处于ISR中
2. 副本LEO落后于Leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)
如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最小值
消息重复的场景及解决方案
消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段:
1. 生产者阶段
2. broke阶段
3. 消费者阶段
生产者阶段重复场景
生产发送的消息没有收到正确的broke响应,导致生产者重试。
生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。
生产者发送重复解决方案
启动kafka的幂等性
要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1
ack=0,不重试。 可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。
生产者和broke阶段消息丢失场景
ack=0,不重试
生产者发送消息完,不管结果了,如果发送失败也就丢失了。
ack=1,leader crash
生产者发送消息完,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失
unclean.leader.election.enable 配置true
允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失
解决生产者和broke阶段消息丢失
禁用unclean选举,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable=false
生产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。 不允许unclean Leader选举。
配置:min.insync.replicas > 1
当生产者将 acks 设置为 all (或 -1 )时, min.insync.replicas>1 。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。
当一起使用时, min.insync.replicas 和 ack 允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。
失败的offset单独记录
生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理。
消费者数据重复场景及解决方案
数据消费完没有及时提交offset到broker。
消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。
解决方案
取消自动提交
每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。
下游做幂等
一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把 offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。
__consumer_offsets
Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
创建topic “tp_test_01”
kafka-topics.sh --zookeeper node1:2181/myKafka --create --
topic tp_test_01 --partitions 5 --replication-factor 1
使用kafka-console-producer.sh脚本生产消息
[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> messages.txt;
done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic
tp_test_01 < messages.txt
由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了60条消息)
验证消息生产成功
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list node1:9092 --topic tp_test_01 --time -1
创建一个console consumer group
kafka-console-consumer.sh
--bootstrap-server linux121:9092 --topic tp_test_01 --from-beginning
获取该consumer group的group id(后面需要根据该id查询它的位移信息)
kafka-consumer-groups.sh --bootstrap-server linux121:9092 --list
查询__consumer_offsets topic所有内容
注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
kafka-console-consumer.sh --topic __consumer_offsets
--bootstrap-server node1:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
--consumer.config config/consumer.properties --from-beginning
默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。
计算指定consumer group在__consumer_offsets topic中分区信息
这时候就用到了group.id :console-consumer-77682
Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
即
__consumer_offsets的分区41保存了这个consumer group的位移信息。
获取指定consumer group的位移信息
kafka-simple-consumer-shell.sh --topic __consumer_offsets
--partition 41 --broker-list linux121:9092
--formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
可以看到__consumer_offsets topic的每一日志项的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
延时队列
两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。
延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。
对于延时生产(消息)而言,如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。
假设某个分区有3个副本:leader、follower1和follower2,它们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求后,将消息3和消息4写入leader副本的本地日志文件。
由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。
那么这里等待消息3和消息4写入follower1副本和follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。
延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。
就延时生产操作而言,它的外部事件是所要写入消息的某个分区的HW(高水位)发生增长。也就是说,随着follower副本不断地与leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。
延时拉取操作,是由超时触发或外部事件触发而被执行的。超时触发很好理解,就是等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍复杂了一些,因为拉取请求不单单由follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也是不同的。如果是follower副本的延时拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消费者客户端的延时拉取,它的外部事件可以简单地理解为HW的增长。