记一次kafka使用不当导致的服务器异常

一、背景

1.运维反馈服务器cpu高,且高达80%
2.经过排查发现kafka出现消息积压情况
3.使用的是springboot kafka框架

dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

4.kafka消费者需要发起http调用第三方api

5.消费者出现报错日志
6.报错日志每间隔5分钟抛出一次

Consumer exception]
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983)at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)at sun.reflect.GeneratedMethodAccessor239.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)at com.sun.proxy.$Proxy206.commitSync(Unknown Source)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2324)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2319)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2305)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2119)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1104)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)... 3 common frames omitted

二、原因分析

1.根据报错消息分析
    踢出消费组通常是由于消费者被消费者组移除或重平衡(rebalance)导致的。

2.kafka消费者被踢出有如下几种情况
    心跳超时:消费者没有在规定的时间内发送心跳到 Kafka 集群,导致被移出消费者组。
    处理时间过长:消费者处理消息的时间超过了 max.poll.interval.ms 配置,导致 Kafka 认为消费者已经失效并将其移出消费者组。
    网络问题:网络延迟或不稳定可能导致消费者与 Kafka 集群之间的连接问题。
    资源限制:消费者运行环境的资源(CPU、内存等)不足,导致消费者无法及时处理消息和发送心跳。

3.业务逻辑
    业务上存在调用第三方http接口请求
    第三方接口请求会存在报错情况,通常情况下网络请求会有默认超时时间,例如ConnectTimeout、ReadTimeout等。
    出现消费慢时怀疑是第三方接口导致的,但是服务器日志没有超时报错信息,被误导不是超时导致的。
    重新新增日志打印定位请求参数,发现一直没有执行最后的ack操作,故怀疑是第三方请求线程一直在等待导致线程挂起
    查看代码设置发现原来ReadTimeout超时时间不配置情况下,默认都是-1,没有超时时间,被第三方依赖包坑了

三、解决方案

1.优化消费者处理逻辑
确保消费者处理消息的逻辑高效,尽量减少单个消息的处理时间。如果处理逻辑复杂,可以考虑使用多线程或异步处理来提高效率。

2 涉及第三方接口调用需要做好降级熔断操作
本次存在请求第三方http接口,经过排查是第三方接口一直没响应,同时http请求的sockekTimeOut(aka socketReadTimeOut)没设置超时时间,刚好默认为空,既不超时,
导致接口一直卡住,超过kafka的max.poll.interval.ms的时间

3.如果一个批次的消息在下次poll的时间内处理不完,可以降低poll获取消息的数量

spring:kafka:consumer:max-poll-records: 5 (从50降到5个)

4.如果消费者处理消息的时间较长,可以增加 max.poll.interval.ms 的值,以允许消费者有更长的时间处理消息。
(springboot yaml配置如下)

spring:kafka:consumer:properties:max.poll.interval.ms: 600000 (默认是5分钟(300000毫秒))

5.增加心跳间隔和超时时间
调整 heartbeat.interval.ms 和 session.timeout.ms 的值,以允许更长的心跳间隔和更长的会话超时时间
(一般情况下使用默认值即可)

四、踩过的坑


1. 在kafka消费者业务代码中使用了同步锁,例如redis,众所周知,锁竞争不到时会有等待时间,这样加长了消息的处理时长,导致超过max.poll.interval.ms会被踢出消费组

2. 忙中出错
 2.1 查看技术文章说要设置调大max.poll.interval.ms的时间
    

    @Beanpublic KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setConcurrency(1);factory.getContainerProperties().setPollTimeout(6000);//设置为批量消费,每个批次数量在Kafka配置参数中设置factory.setBatchListener(true);//设置手动提交ackModefactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}


    
    误以为在就是设置factory.getContainerProperties().setPollTimeout(6000)这个方法。结果设置后头痛医脚
 
 2.2 Spring Kafka 中的 setPollTimeout 和 max.poll.interval.ms区别
    在 Spring Kafka 中,setPollTimeout 方法设置的是消费者从 Kafka 服务器拉取消息时的超时时间。这是在每次调用消费者的 poll 方法时使用的超时时间。
    例如:
    container.setPollTimeout(Duration.ofMillis(3000)); // 设置轮询超时时间为3秒
    
    原生 Kafka 客户端中的 poll 方法
    在原生 Kafka 客户端中,poll 方法的超时时间是通过传递给 poll 方法的参数来设置的:
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); // 设置轮询超时时间为3秒
    
    如何理解 poll 超时时间
    poll 方法的超时时间指的是消费者从 Kafka 服务器拉取消息的时间间隔。如果在指定的时间内没有消息可供消费,poll 方法会返回空的 ConsumerRecords 集合。
    这个超时时间不应该与 max.poll.interval.ms 混淆,后者是指消费者处理消息的最长时间,如果超过这个时间,消费者会被认为失效,从而触发重平衡。
    
3. 为了提高消费速度,不恰当的增加消费者多线程
    还是springboot kafka代码
    把factory.setConcurrency(1);改为 factory.setConcurrency(3);
    其实kafka的topic分区数决定了最多分配给多少个消费者(线程),
    例如topic A设置了3个分区,部署三个应用分别每个应用一个消费者线程,那么每个应用刚好分配一个分区,如果只有2个应用,那么每个应用消费者各获得一个分区,剩下一个分区是空的,
    此时可以让一个应用增加并发线程数,即setConcurrency(2),此时提高并发数的应用是获取2个分区,另一个是获得一个分区
    
    补充分区、消费者、消费者多线程的关系
    关于分区和消费者关系,官网原文为:

if, say, 6 TopicPartitions are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1. 
If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.


    假设提供了6个TopicPartitions,并发性为3;每个容器将得到2个分区。对于5个TopicPartitions, 2个容器将获得2个分区,第三个容器将获得1个分区。如果并发性大于TopicPartitions的数量,那么并发性将被调低,这样每个容器将获得一个分区。 


4. 不理解为什么触发重平衡会导致cpu高,导致排查方向不明确

五、验证得出的结论

Kafka 重平衡(rebalance)是指消费者组(Consumer Group)的成员变化(如加入或离开)时,Kafka 会重新分配分区给消费者。这一过程会导致 CPU 使用率升高,具体细节如下:
    【重平衡导致 CPU 高的具体原因】
        1.消费者停止消息处理(Stop-the-World)

       *当重平衡开始时,所有消费者必须停止当前的消息处理。这是一个Stop-the-World操作,

       会影响消费者的正常工作流程。

       *消费者需要提交当前处理到一半的消息的偏移量,这需要与 Kafka Broker 进行通信。
            

        2.协调器与消费者之间的通信
            *Kafka 使用协调器(Coordinator)来管理消费者组。重平衡时,

            协调器需要与所有消费者通信,获取当前消费者的状态,并重新分配分区。
            *这种通信涉及多次网络交互和数据处理,消耗 CPU 资源。
            
        3.分区分配和元数据更新
            *协调器计算新的分区分配方案,并将结果通知所有消费者。消费者需要更新本地的元数据,以反映新的分区分配。
            *这些操作包括反序列化元数据、更新本地状态、处理可能的分区迁移等,都会消耗 CPU。
            
        4.连接和断开连接
            *重平衡期间,消费者可能会频繁地连接和断开连接。这包括重新建立网络连接、初始化通信通道等操作。
            *网络 I/O 操作和相关的上下文切换会增加 CPU 负载。
        
        5.日志记录和监控
            *重平衡过程中,Kafka 和消费者通常会记录大量的日志信息。这些日志记录操作也会增加 CPU 的使用。
            *如果启用了详细的监控和指标收集,这些操作会进一步加剧 CPU 负载。

    【重平衡的流程细节】
        1.触发重平衡
            消费者加入或离开消费者组,或者消费者组协调器发生变化,都会触发重平衡。

   
        2.停止消息处理
            当前消费者停止消息处理,提交当前偏移量。
        3.协调器获取状态
            消费者向协调器发送 JoinGroupRequest,协调器收集所有消费者的状态。
        4.分区重新分配
            协调器根据分区分配策略(如 Range、RoundRobin、Sticky 等)重新分配分区。
        5分区分配通知
            协调器将新的分区分配方案发送给所有消费者,消费者更新本地的分区信息。
        6重新开始消息处理
            消费者根据新的分区分配,重新开始消息处理。


    【分区、消费者、消费者多线程的关系】
    关于分区和消费者关系,官网原文为:

if, say, 6 TopicPartitions are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

 翻译如下:
假设提供了6个TopicPartitions,并发性为3;每个容器将得到2个分区。对于5个TopicPartitions, 2个容器将获得2个分区,第三个容器将获得1个分区。如果并发性大于TopicPartitions的数量,那么并发性将被调低,这样每个容器将获得一个分区。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/37959.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux-网络安全私房菜

文章目录 前言入门基本指令篇章字符集设置cdlsdatemkdirtouch-d-m 修改主机名rmshredrename重命名mv移动tar打包与压缩打包但是不压缩打包且压缩更新包文件解压对应的包 zip压缩文件命令cat查看显示行号交互写入&#xff08;追加&#xff09;显示空行 more和lesshead和tailhead…

Android的悬浮时钟(一)

在Android&#xff0c;如果要悬浮在其他应用上方显示时钟或者其他界面的话是需要申请权限的。 首先在manifest中我们就要写自己要申请的权限SYSTEM_ALERT_WINDOW <uses-permission android:name"android.permission.SYSTEM_ALERT_WINDOW" /> 不同于请求照片或…

期末复习---程序填空

注意&#xff1a; 1.数组后移 *p *(p-1) //把前一个数赋值到后一个数的位置上来覆盖后一个数 2.指针找最大字符 max *p while( *p){ if( max< *p) { max*p; qp;/ 用新的指针指向这个已经找到的最大位置&#xff1b;!!!!!!!!! } p; //因为开始没有next &#xff…

Web工程化

1、webpack 1.1 概念 一个前端打包器。 webpack 只识别javascript. 所以需要安装nodejs环境。 1.2 运行环境 Nodejs Nodejs 是运行JavaScript的环境。 因为nodejs发布了许多版本&#xff0c;在不同的技术栈下&#xff0c;需要使用不同的nodejs。 所以需要在电脑上安装n…

web应用技术-第十一次课后作业

验证过滤器进行权限验证的原理。 Filter过滤器&#xff1a;可以把对资源的请求拦截下来&#xff0c;从而实现一些特殊的功能。一般完成登录校验、统一编码处理、敏感字符处理等通用操作。 定义&#xff1a;实现Filter接口 配置&#xff1a;WebFilter(urlPatterns"/*&qu…

常见VPS主机术语有哪些?VPS术语解析

常见VPS主机术语有哪些&#xff1f;本期为大家解析一下我们常见到的听到的VPS专业术语&#xff0c;帮助大家更轻松的了解VPS主机相关知识。 常见VPS主机术语 Apache – 世界上最流行的 Web 服务器软件。 CentOS – 旨在提供基于 Red Hat Enterprise Linux 的企业级操作系统的…

mysql 主主HA高可用方案详解

1.环境准备&#xff1a; 主机&#xff1a;1921.4,1921.5 操作系统&#xff1a;centos 7.3 mysql数据库版本&#xff1a;mysql 5.7.13 浮动IP:1921.182 2.mysql 下载及解压安装配置 2.1 下载&#xff1a; #wget http://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.13-linu…

easyexcel 模板填充Excel数据,实现自定义换行及动态调整行高,并保持列表格式一致

pom依赖&#xff1a; <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.2.5</version> </dependency><dependency><groupId>com.alibaba</groupId><artifa…

数据结构-线性表的应用

目录 前言一、有序表的合并1.1 顺序表实现1.2 单链表实现 二、稀疏多项式的相加和相乘2.1 稀疏多项式的相加2.2 稀疏多项式的相乘 总结 前言 本篇文章介绍线性表的应用&#xff0c;分别使用顺序表和单链表实现有序表的合并&#xff0c;最后介绍如何使用单链表实现两个稀疏多项…

基于springboot+vue+uniapp的超市售货管理平台

开发语言&#xff1a;Java框架&#xff1a;springbootuniappJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#…

考研生活day2--王道课后习题2.3.1、2.3.2、2.3.3

2.3.1 题目描述&#xff1a; 这题和曾经做过的LeetCode203.移除元素一模一样&#xff0c;所以我们就使用LeetCode进行书写&#xff0c;题目链接203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; 解题思路 大家的第一反应肯定是根据书上所学的书写方法一样书写&…

【PB案例学习笔记】-26制作一个带浮动图标的工具栏

写在前面 这是PB案例学习笔记系列文章的第26篇&#xff0c;该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习&#xff0c;提高编程技巧&#xff0c;以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码&#xff0c;小凡都上传到了gite…

爬虫cookie是什么意思

“爬虫 cookie”指的是网络爬虫在访问网站时所使用的cookie&#xff0c;网络爬虫是一种自动化程序&#xff0c;用于在互联网上收集信息并进行索引&#xff0c;这些信息可以用于搜索引擎、数据分析或其他目的。 本教程操作系统&#xff1a;Windows10系统、Dell G3电脑。 “爬虫…

51-1 内网信息收集 - 内网资源探测

导语 在内网渗透过程中,通常需要利用各种技术来探测内网资源,为后续的横向渗透做准备。发现内网存活的主机及其详细信息可以帮助确定攻击方向和潜在的漏洞。 一、基于 ICMP 发现存活主机 ICMP(Internet Control Message Protocol,因特网控制消息协议)是 TCP/IP 协议簇的…

一段式、二段式和三段式状态机的特点及适用情况:

在FPGA设计中,状态机的选择主要取决于具体应用场景和设计需求。 一段式状态机: 优点: 结构简单,易于理解和实现占用资源少时序逻辑简单,延迟小 缺点: 组合逻辑复杂度高可能存在毛刺问题不易于大规模状态机的设计 适用场景: 简单的控制逻辑状态数量较少的场合对时序要求较…

React+TS前台项目实战(二十二)-- 全局常用导出组件Export封装

文章目录 前言Export组件1. 功能分析2. 代码详细注释3. 使用方式4. 效果展示 总结 前言 今天我们来封装一个带导出图标的导出组件。 Export组件 1. 功能分析 通过传入链接地址&#xff0c;规定要跳转的导出页面&#xff0c;或是直接通过链接导出数据 2. 代码详细注释 // /c…

虚拟环境管理

虚拟环境 在使用 Python 时我们一般使用“pip install 第三方包名”来安装第三方包&#xff0c;但是由于pip的特性&#xff0c;系统只能安装每个包的一个版本。而在实际开发中&#xff0c;可能同时开发多个项目&#xff0c;如&#xff1a;上图有三个项目&#xff1b;每个项目需…

django学习入门系列之第三点《BootSrap初了解》

文章目录 初识BootStrap往期回顾 初识BootStrap BootSrap是什么&#xff1f; 是别人帮我们已写好的CSS样式&#xff0c;我们如果想要使用这个BootSrap&#xff1a; 下载BootStrap使用 在页面上引入BootStrap编写HTML时&#xff0c;按照BootStrap的规定来编写 自定制 官网&…

【UE5.1】Chaos物理系统基础——02 场系统的应用

目录 步骤 一、运用临时场&#xff08;外部张力&#xff09;破裂几何体集 二、使用构造场固定几何体集 步骤 在上一篇中&#xff08;【UE5.1】Chaos物理系统基础——01 创建可被破坏的物体&#xff09;我们已经创建了可被破碎的几何体集&#xff0c;在最后我们防止几何体集…

微信小程序简历Demo

微信小程序简历Demo 使用介绍最后获取源码 bilibili视频介绍 使用介绍 使用微信小程序实现的一个简历实现Demo 拖动马里奥&#xff0c;到指定Name下方 向上顶就可以显示对应的简历样式 点击头像可拨打电话 点击信息处可显示当前位置 最后 这是一个简单并且有趣的微信小程…