Kafka重平衡导致无限循环消费问题

1. 问题描述

Kafka消费者消费消息超过了5分钟,不停的触发重平衡,消费者的offset因为重平衡提交失败,重复拉取消费,重复消费。

2. 问题原因

kafka默认的消息消费超时时间max.poll.interval.ms = 300000, 也就是5分钟,超过5分钟,会认为当前消费者已经死掉,会主动发起重平衡。

3. 解决方案

  1. 减小kafka一批次拉取的数据量: max.poll.records,加快消费者消费的速度,控制拉取的消息能在短时间内消费掉
  2. 拉取到消息之后异步处理(保证成功消费)
  3. 调大 拉取消息线程最长空闲时间:max.poll.interval.ms

重复消费有哪些原因

消息重复消费的根本原因都在于:已经消费了数据,但是offset没有成功提交。

offset提交失败,很大一部分原因在于发生了重平衡:

  1. 消费者宕机、重启等。导致消息已经消费但是没有提交offset。
  2. 消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。
  3. 消息处理耗时,或者消费者拉取的消息量太多,处理耗时,超过了max.poll.interval.ms的配置时间,导致认为当前消费者已经死掉,触发重平衡。

Kafka知识回顾

1. 常见配置参数
### broker
# offset 保留时间,默认为7天
offsets.retention.minutes=10080### producer
# 生产者会尝试将业务发送到相同的 Partition 的消息合包发送到 Broker,batch.size 设置合包的大小上限。默认为 16KB。batch.size 设太小会导致吞吐下降,设太大会导致内存使用过多。
batch.size=16384# Kafka producer 的 ack 有 3 种机制,分别说明如下:
# -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,可以配合 Topic 级别参数 min.insync.replicas 使用。
# 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)
# 1:生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡(如果leader已死但是尚未复制,则消息可能丢失)
# 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
acks=1# 控制生产请求在 Broker 等待副本同步满足 acks 设置的条件所等待的最大时间
timeout.ms=30000# 配置生产者用来缓存消息等待发送到 Broker 的内存。用户要根据生产者所在进程的内存总大小调节
buffer.memory=33554432# 当生产消息的速度比 Sender 线程发送到 Broker 速度快,导致 buffer.memory 配置的内存用完时会阻塞生产者 send 操作,该参数设置最大的阻塞时间
max.block.ms=60000# 设置消息延迟发送的时间(ms),这样可以等待更多的消息组成 batch 发送。默认为0表示立即发送。当待发送的消息达到 batch.size 设置的大小时,不管是否达到 linger.ms 设置的时间,请求也会立即发送
# 推荐用户根据实际使用场景,设置linger.ms在100~1000之间,更大的取值相对有更大的吞吐但会相应增加时延
linger.ms=100# 设置分区缓存消息量(bytes),达到该数值时生产者将批量的消息发送给broker。默认值为16384,过小的batch.size会增加发送请求次数可能会损耗性能和影响稳定性,用户根据实际场景,可适当增大该值。注:该值为上限值,当还未达到时若时间已经达到linger.ms时生产者会发送消息
batch.size=16384# 生产者能够发送的请求包大小上限,默认为1MB。在修改该值时注意不能超过 Broker 配置的包大小上限16MB
max.request.size=1048576# 压缩格式配置,目前 0.9(包含)以下版本不允许使用压缩,0.10(包含)以上不允许使用 GZip 压缩
compression.type=[none, snappy, lz4]# 客户端发送给 Broker 的请求的超时时间,不能小于 Broker 配置的 replica.lag.time.max.ms,目前该值为10000ms
request.timeout.ms=30000# 客户端在每个连接上最多可发送的最大的未确认请求数,该参数大于1且 retries 大于0时可能导致数据乱序。 希望消息严格有序时,建议客户将该值设置1
max.in.flight.requests.per.connection=5# 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
retries=0# 发送请求失败时到下一次重试请求之间的时间
retry.backoff.ms=100### Consumer
# 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offset, 如果使用 spring-kafka,即使设置false,也会自动提交,因为false表示的是kafka客户端放弃自动提交,把权利交给Spring框架,消息消费后Spring框架还是会帮你自动提交
enable.auto.commit=true# 当 auto.commit.enable=true 时,自动提交 Offset 的时间间隔,建议设置至少1000
auto.commit.interval.ms=5000# 当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时,如何重置 Offset
# earliest:表示自动重置到 partition 的最小 offset
# latest:默认为 latest,表示自动重置到 partition 的最大 offset
# none:不自动进行 offset 重置,抛出 OffsetOutOfRangeException 异常
auto.offset.reset=latest# 标识消费者所属的消费分组
group.id=""# 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间
session.timeout.ms=10000# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一
heartbeat.interval.ms=3000# 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者
max.poll.interval.ms=300000# Fecth 请求最少返回的数据大小。默认设置为 1B,表示请求能够尽快返回。增大该值会增加吞吐,同时也会增加延迟
fetch.min.bytes=1# Fetch 请求最多返回的数据大小,默认设置为 50MB
fetch.max.bytes=52428800# Fetch 请求等待时间
fetch.max.wait.ms=500# Fetch 请求每个 partition 返回的最大数据大小,默认为1MB
max.partition.fetch.bytes=1048576# 在一次 poll 调用中返回的记录数
max.poll.records=500# 客户端请求超时时间,如果超过这个时间没有收到应答,则请求超时失败
request.timeout.ms=305000
2. poll机制
  1. 每次poll的消息处理完成之后再进行下一次poll,是同步操作
  2. 每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移
  3. 每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息
  4. poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒
3. 重平衡 rebalance

将分区的所有权从一个消费者转移到其他消费者的行为称为再均衡(重平衡,rebalance)。

消费者通过向组织协调者(kafka broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为 消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。

如果过了一段时间Kafka停止发送心跳了,会话(session)就会过期,组织协调者就会认为这个consumer已经死亡,就会触发一次重平衡。如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息。在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿。

重平衡是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点(bug),而这些 bug 到现在社区还无法修改。也就是说,在重平衡期间,消费者组中的消费者实例都会停止消费(Stop The World),等待重平衡的完成。而且重平衡这个过程很慢。

触发重平衡的三种情况:
1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
2. 主题的分区数发生变更,分区增加或者减少时就会触发重平衡
3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

重平衡异常:
max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行重平衡操作(将分区分配给组内其他消费者成员)若超过这个时间则报如下异常:

>org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records
4. 重复消费的解决方案

由于网络问题,重复消费不可避免,因此,消费者需要实现消费幂等。

方案:

  1. 消息表
  2. 数据库唯一索引
  3. 缓存消费过的消息id
5. 消费者消费异常,会重试消费吗
  1. enable.auto.commit=truekafka消费者拉取到消息后,即使消费异常也会自动提交offset,不会重新消费 .
  2. enable.auto.commit=false
    2.1 手动提交offset,提交offset之前抛异常,则不会提交offset,因此会再次重新消费
    2.2 spring-kafka帮忙提交offset,不手动提交,spring框架会在最后提交offset,因此消费过程中抛异常,也不会提交offset,因此会再次重新消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/19578.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python魔法之旅-魔法方法(03)

目录 一、概述 1、定义 2、作用 二、主要应用场景 1、构造和析构 2、操作符重载 3、字符串和表示 4、容器管理 5、可调用对象 6、上下文管理 7、属性访问和描述符 8、迭代器和生成器 9、数值类型 10、复制和序列化 11、自定义元类行为 12、自定义类行为 13、类…

【Linux】初识信号及信号的产生

初始信号 初始信号什么是信号站在应用角度的信号查看Linux系统定义的信号列表 信号的常见处理方式信号的产生通过终端按键产生信号什么是core dump?如何开启core dump?core dump有什么用?为什么默认关闭core dump?设置了core文件大小但是没有产生core文…

【SQL学习进阶】从入门到高级应用(八)

文章目录 ✨连接查询✨什么是连接查询✨连接查询的分类✨笛卡尔积现象✨内连接✨什么叫内连接✨内连接之等值连接✨内连接之非等值连接✨内连接之自连接 ✨外连接✨什么叫外连接✨外连接之左外连接(左连接)✨外连接之右外连接(右连接&#xf…

ubuntu 18.04 ros1学习

总结了一下,学习内容主要有: 1.ubuntu的基础命令 pwd: 获得当前路径 cd: 进入或者退出一个目录 ls:列举该文件夹下的所有文件名称 mv 移动一个文件到另一个目录中 cp 拷贝一个文件到另一个目录中 rm -r 删除文件 gedit sudo 给予管理员权限 sudo apt-…

php反序列化学习(2)

1、魔术方法触发规则: 魔术方法触发的前提是:魔法方法所在类(或对象)被调用 分析代码,_wakeup()的触发条件是进行反序列化,_tostrinng()触发的条件是把对象当成字符串调用,但是魔术方法触发的前…

安全测试用例及解析(Word原件,直接套用检测)

5 信息安全性测试用例 5.1 安全功能测试 5.1.1 标识和鉴别 5.1.2 访问控制 5.1.3 安全审计 5.1.4 数据完整性 5.1.5 数据保密性 5.1.6 软件容错 5.1.7 会话管理 5.1.8 安全漏洞 5.1.9 外部接口 5.1.10 抗抵赖 5.1.11 资源控制 5.2 应用安全漏洞扫描 5.2.1 应用安全漏洞扫描 5.3…

redis笔记2(key\value的设计)

1-redis的key-value设计 选择value (1)是否需要排序,需要,用sortset (2)缓存的值是单个还是多个 (3)单个:简单String,对象值的用Hash (4&#xff…

Java 多态与接口设计:深入理解Java的多态特性,并探讨在实际编程中如何设计和使用接口

深入理解Java的多态特性 多态的基本概念 在Java中,多态是面向对象的三大特性(封装、继承、多态)之一。它指的是,一个对象可以有多种形态。具体来说,多态让我们在设计代码时可以使用父类型占位符的方式来引用子类型的对象,而具体执行哪个子类的方法,将在运行时动…

方差和标准差的区别

标准差和方差都是用来衡量随机变量的离散程度的统计量,它们之间有以下区别: 方差(Variance): 方差是衡量随机变量离其均值的离散程度的度量。它是各个数据与其平均值之差的平方和的平均值。方差的公式为:…

手写节流和防抖

一、什么是防抖 定义: 用于限制某个函数在短时间内被频繁调用的情况 特点: 延迟执行:防抖会延迟执行目标函数,直到一定的空闲时间过去后才执行,如果在这段时间内再次触发,则会重新计时。 合并触发&#…

WHAT - 常见的前端工具库

前端开发有许多工具库,可以帮助开发者提高效率、简化代码和增强功能。以下是一些常见的前端工具库: 一、常见工具库 函数库 1. Lodash(推荐) Lodash 是一个 JavaScript 实用程序库,提供了一系列方便的函数&#xf…

鹤城杯 2021 流量分析

看分组也知道考http流量 是布尔盲注 过滤器筛选http流量 将流量包过滤分离 http tshark -r timu.pcapng -Y "http" -T json > 1.json这个时候取 http.request.uri 进一步分离 http.request.uri字段是我们需要的数据 tshark -r timu.pcapng -Y "http&quo…

【全开源】Java情侣飞行棋系统微信小程序+H5+APP源码+微信公众号

​让爱情与游戏并行 一、引言:飞行棋与情侣时光的交融 在快节奏的现代生活中,情侣们常常寻找一种既能增进感情又能共同娱乐的方式。飞行棋,这款经典的家庭游戏,因其简单易上手、策略性强而深受大众喜爱。而现在,我们…

Vue2中的计算属性(computed)和监听属性(watch)

1、说明 在Vue中我们经常会使用到多个参数计算出来的结果,在这种情况下我们可以定义参数和方法,将处理结果赋值给自定义参数,这种方式较为复杂,由此vue提供了计算属性方法。面对响应式页面,我们为了做到实时响应页面参…

[个人笔记] 记录docker-compose的部署过程

容器技术 第二章 记录docker-compose的部署过程 容器技术记录docker-compose的部署过程(可选)新建docker用户(可选)迁移docker-ce目录docker-compose官方插件形式安装官方二进制形式独立安装(可选) 使用docker-compose二进制包的 bash_completion 命令补齐 参考来源 记录docker…

Android8.1高通平台修改默认输入法

需求 安卓8.1 SDK原生的输入法只能打英文, 需要替换成中文输入法. 以高通平台为例, 其它平台也适用. 查看设备当前默认输入法 adb shell settings list secure | grep input 可以看到当前默认是LatinIME这个安卓原生输入法. default_input_methodcom.android.inputmethod.l…

使用compile_commands.json配置includePath环境,解决vscode中引入头文件处有波浪线的问题

通过编译时生成的 compile_commands.json 文件自动完成对 vscode 中头文件路径的配置,实现 vscode 中的代码的自动跳转。完成头文件路径配置后,可以避免代码头部导入头文件部分出现波浪线,警告说无法正确找到头文件。 步骤 需要在 vscode 中…

木馒头头戴式蓝牙耳机

这里写目录标题 木馒头二代头戴式蓝牙耳机清除连接记忆 木馒头二代头戴式蓝牙耳机清除连接记忆 在配对模式下,同时按住播放和暂停按钮4秒,LED闪烁紫色3次,即为清除成功。

电子烟开发【恒压、恒有效算法】

恒压算法 pwm是通过软件模拟的 pwm满值运行是250全占空比 #define D_TARGET_AVERAGE_VOLTAGE 3500 //R_ADC1_Vout :发热丝两端AD值 //R_ADC_FVR :电池电压AD值 //FVR_VOLTAGE :电池AD参考电压 满电值AD //R_Smk1Duty :最后…

Java 面试题:String、StringBuffer、StringBuilder 有什么区别?

几乎所有的应用开发都离不开操作字符串,理解字符串的设计和实现以及相关工具如拼接类的使用,对写出高质量代码是非常有帮助的。关于这个问题,我前面的回答是一个通常的概要性回答,至少你要知道 String 是 Immutable 的&#xff0c…