springboot kafka在kafka server AUTH变动后consumer自动销毁

前言

笔者使用了kafka用来传输数据,笔者在今年10月写了文章,怎么使用配置化实现kafka的装载:springboot kafka多数据源,通过配置动态加载发送者和消费者-CSDN博客

不过在实际运行中,kafka broker是加密的,本来也没啥,但是突然的一天笔者在监控发现消费者掉线了,发送者居然还是正常的,见鬼的事情就是这么朴实的发生了,而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set

这样的错误,还有

Closing the Kafka consumer
Kafka consumer has been closed

这样的日志,非常诡异,后面查询kafka集群才知道,kafka集群在某个时间被改了AUTH配置,然后又改回来了,神奇的操作。

现象

实际上看到日志是懵的,毕竟kafka是对方提供的,AUTH用户名和密码也是对方给的,而且发送者也出现AUTH失败,但是发送者一直重试,然后因为kafka集群的AUTH改回来了,重试成功了;唯独消费者AUTH失败后,关闭了。

源码分析

从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set

发现日志出现在

org.springframework.kafka.listener.KafkaMessageListenerContainer

刚好在消息监听器容器的内部类

ListenerConsumer

看源码定义,是一个定时线程池的任务定义

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback

分析怎么去消费的KafkaMessageListenerContainer的doStart方法

既然明晰了,那么分析问题的来源, run方法

		public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent(); //事件通知,spring事件this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions(); // 初始分配partitionspublishConsumerStartedEvent(); // 消费者启动事件Throwable exitThrowable = null;this.lastReceive = System.currentTimeMillis();while (isRunning()) { //状态在上面的截图代码已经更新为运行状态try {pollAndInvoke(); // 队列拉取,拉取过程会出现各种异常}catch (NoOffsetForPartitionException nofpe) { //这个需要注意,但是这个是不可配置的this.fatalError = true;ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");exitThrowable = nofpe;break;}catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败,就是来源于kafka broker的auth鉴权if (this.authExceptionRetryInterval == null) { //时间配置,默认居然是nullListenerConsumer.this.logger.error(ae,"Authentication/Authorization Exception and no authExceptionRetryInterval set");this.fatalError = true;exitThrowable = ae;break; //循环结束}else { // 一段时间后重试ListenerConsumer.this.logger.error(ae,"Authentication/Authorization Exception, retrying in "+ this.authExceptionRetryInterval.toMillis() + " ms");// We can't pause/resume here, as KafkaConsumer doesn't take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError = true;ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG+ "' has been fenced");exitThrowable = fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, "Stopping container due to fencing");stop(false);exitThrowable = e;}catch (Error e) { // NOSONAR - rethrownthis.logger.error(e, "Stopping container due to an Error");this.fatalError = true;wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}//上面异常后,这里的处理wrapUp(exitThrowable);}

注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException,其中能配置重试的是AuthenticationException | AuthorizationException异常,就是授权失败可以通过配置过一段时间抢救一下,一般而言,kafka首次授权失败基本上就不太可能成功了,但是这个只能控制consumer销毁,producer还在重试,所以出现了消费者销毁了,发送者在kafka集群auth还原后成功恢复。看看wrapUp方法

既然知道了原理,那么解决办法是配置

authExceptionRetryInterval

解决方法

配置authExceptionRetryInterval也是不容易,分析取值

private final Duration authExceptionRetryInterval =this.containerProperties.getAuthExceptionRetryInterval();

从 containerProperties来的,实际上是继承org.springframework.kafka.listener.ConsumerProperties

在spring-kafka 2.8版本还对属性命名重构了,毕竟以前的命名字母太多了,😁

不过要修改这个值也不容易,kafkaproperties并没有提供这个参数,而且创建消费者容器工厂时

org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory

那么只能在

ConcurrentMessageListenerContainer

创建成功后,从这个里面读取配置,设置默认值。这就可以使用前面埋点的spring事件了,通过事件拿到Consumer,不就可以修改配置了

使用

publishConsumerStartedEvent();

的事件最合适,执行循环前最后一个事件,而且这里的this,就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象,就是我们需要的

编写代码如下:

@Component
public class KafkaConsumerStartedListener implements ApplicationListener<ConsumerStartedEvent> {@Overridepublic void onApplicationEvent(ConsumerStartedEvent event) {KafkaMessageListenerContainer<?, ?> container = event.getSource(KafkaMessageListenerContainer.class);container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));}
}

就这样就解决了问题,哈哈哈贼简单。 

笔者在使用API时发现了还有一个API是授权失败后重启,是布尔变量,默认值是false,即不重试。那么这个是哪里触发的呢,在我们看到的消费者销毁事件里面,实际上控制重试有多种办法,一个是循环去kafka broker拉取,一个是重启kafka消费者容器。

 

这个在上面已经说明了,简单截个图再说明一下

设置的地方还是上面的代码里面,同理也可以修改事件为

ConsumerStoppedEvent

可以更精准,当然就用刚刚的代码加一行设置也是可以的。

总结

kafka在发送者和消费者是区分开的,发送者如果连接kafka broker失败后可以一直重试直到成功,但是消费者确有各种各样的逻辑,可以精准控制,比如消费者重启的配置

restartAfterAuthExceptions

可以控制消费者在停止时重启,如果仅仅是授权失败,而且不需要反复重启(消耗资源),那么可以通过

authExceptionRetryInterval

配置时间周期的方式实现,但是kafka并没有给我们配置的入口,但是kafka在消费者启动消费的过程埋了很多spring事件钩子,通过这些钩子可以操作,估计spring-kafka也不希望我们去修改,毕竟消费者启动失败了或授权失败了,消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件,可以在启动完成通过

org.springframework.kafka.config.KafkaListenerEndpointRegistry

拿到所有消费者容器,来批量设置属性,毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888076.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Windows下编译支持https的wsdl2h

下载源码 在官网下载源码 安装Openssl 下载OpenSSL并安装&#xff0c;安装完成后需要将OpenSSL的路径添加到环境变量中 配置VS 1、打开工程 2、因为前面安装的OpenSLL是64位的&#xff0c;因此需要创建一个X64的配置 打开配置管理器&#xff0c;然后选择新建&#xff0…

【Webgl_glslThreejs】制作流水效果/毛玻璃效果材质

效果预览 shadertory源码 source&#xff1a; https://www.shadertoy.com/view/lldyDs 材质代码 import { DoubleSide, ShaderChunk, ShaderMaterial, TextureLoader } from "three"; /** * * source https://www.shadertoy.com/view/lldyDs */export default fu…

个性化联邦学习方法

基于知识蒸馏的个性化联邦学习方法 基于 Logit 的知识蒸馏方法&#xff1a; 基于 logit 的知识蒸馏方法也是知识蒸馏中的一种常见技术。通常&#xff0c;logit 是指模型输出的原始预测值&#xff08;未经过 softmax 函数处理的类别分数&#xff09;。在知识蒸馏中&#xff0c;…

《解锁决策树算法:机器学习领域的实用利器及其多面应用》

一、引言 在当今数据驱动的时代&#xff0c;机器学习正深刻改变着我们生活与工作的方方面面&#xff0c;而决策树算法作为其中的经典算法&#xff0c;凭借直观易懂、高效实用的优势&#xff0c;在众多领域都占据着重要地位。本文将带领大家全方位深入探究决策树算法&#xff0…

vue3+ts+uniapp微信小程序顶部导航栏

这是colorui改的&#xff0c;不用就不用看啦 color-ui(https://docs.xzeu.com/#/) 新建component文件夹创建topNavigation.vue <template><view><view class"cu-custom" :style"height: CustomBar px"><view class"cu-bar…

【AI战略思考13】克服懒惰,保持专注,提升效率,不再焦虑

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】 引言 我发现自己最近非常懒惰&#xff0c;浪费了很多时间&#xff0c;也容易分心&#xff0c;不够专注&#xff0c;效率低下&#xff0c;且每天都有点焦虑&#xff0c;因此制定了下面的要求和作息时间表。 目…

Unity3D ngui和ugui区别与优缺点详解

前言 Unity3D是一款跨平台的游戏开发引擎&#xff0c;它支持多种平台&#xff0c;包括PC、移动设备和主机。在Unity3D中&#xff0c;UI系统是游戏开发中非常重要的一部分&#xff0c;它负责游戏中的用户界面的显示和交互。 对惹&#xff0c;这里有一个游戏开发交流小组&#…

openssl使用哈希算法生成随机密钥

文章目录 一、openssl中随机数函数**OpenSSL 随机数函数概览**1. **核心随机数函数** **常用函数详解**1. RAND_bytes2. RAND_priv_bytes3. RAND_seed 和 RAND_add4. RAND_status **随机数生成器的熵池****常见用例****注意事项** 二、使用哈希算法生成随机的密钥 一、openssl中…

大模型翻译能力评测

1. 背景介绍 随着自然语言处理技术的飞速发展&#xff0c;机器翻译已经成为一个重要的研究领域。近年来&#xff0c;基于大模型的语言模型在机器翻译任务上取得了显著的进展。这些大模型通常具有数亿甚至数千亿的参数&#xff0c;能够更好地理解和生成自然语言。 但是&#xf…

刷题日常(找到字符串中所有字母异位词,​ 和为 K 的子数组​,​ 滑动窗口最大值​,全排列)

找到字符串中所有字母异位词 给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 题目分析&#xff1a; 1.将p里面的字符先丢进一个hash1中&#xff0c;只需要在S字符里面找到多少个和他相同的has…

【汇编语言】call 和 ret 指令(三) —— 深度解析汇编语言中的批量数据传递与寄存器冲突

文章目录 前言1. 批量数据的传递1.1 存在的问题1.2 如何解决这个问题1.3 示例演示1.3.1 问题说明1.3.2 程序实现 2. 寄存器冲突问题的引入2.1 问题引入2.2 分析与解决问题2.2.1 字符串定义方式2.2.2 分析子程序功能2.2.3 得到子程序代码 2.3 子程序的应用2.3.1 示例12.3.2 示例…

青训营-豆包MarsCode技术训练营试题解析九

介绍 ‌豆包青训营‌是由字节跳动和稀土掘金社区共同发起的技术培训和人才选拔项目&#xff0c;主要面向在校大学生。该项目的目标是培养具有职业竞争力的优秀开发工程师&#xff0c;并提供全程免费的课程&#xff0c;不收取任何费用‌。 课程内容和方向 豆包青训营的课程涵…

前端的面试题

1.常用的块与行属性内标签有哪些&#xff1f;有什么特征&#xff1f; 块标签&#xff1a;div、h1~h6、ul、li、table、p、br、form。 特征&#xff1a;独占一行&#xff0c;换行显示&#xff0c;可以设置宽高&#xff0c;可以嵌套块和行 行标签&#xff1a;span、a、img、text…

`asyncio.wait` 和 `asyncio.gather` 的区别

asyncio.wait 和 asyncio.gather 的区别 1. asyncio.wait2. asyncio.gather主要区别总结 在Python的异步编程中&#xff0c;asyncio.wait 和 asyncio.gather 都是用于等待多个异步任务完成的工具&#xff0c;但它们在功能和使用方式上有一些关键的区别。本文将详细解释这两个函…

48-基于单片机的LCD12864时间调控和串口抱站

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机的公交报站系统&#xff0c;可以手动报站&#xff0c;站名十个。 在lcd12864上显示时间&#xff08;年月日时分秒&#xff09;和站名&#xff0c;时间可以设置&#xff0c; 仿真中可以…

如何为 XFS 文件系统的 /dev/centos/root 增加 800G 空间

如何为 XFS 文件系统的 /dev/centos/root 增加 800G 空间 一、前言二、准备工作三、扩展逻辑卷1. 检查现有 LVM 配置2. 扩展物理卷3. 扩展卷组4. 扩展逻辑卷四、调整文件系统大小1. 检查文件系统状态2. 扩展文件系统五、处理可能出现的问题1. 文件系统无法扩展2. 磁盘空间不足3…

爬虫框架快速入门——Scrapy

适用人群&#xff1a;零基础、对网络爬虫有兴趣但不知道从何开始的小白。 什么是 Scrapy&#xff1f; Scrapy 是一个基于 Python 的网络爬虫框架&#xff0c;它能帮助你快速爬取网站上的数据&#xff0c;并将数据保存到文件或数据库中。 特点&#xff1a; 高效&#xff1a;支…

Redis 分布式锁实现方案

一、概述 分布式锁&#xff0c;即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题&#xff0c;而分布式锁&#xff0c;就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是&#xff0c;分布式系统中竞争共享资源的最小粒度从线程升级…

前端node.js

一.什么是node.js 官网解释:Node.js 是一个开源的、跨平台的 JavaScript 运行时环境。 二.初步使用node.js 需要区分开的是node.js和javascript互通的只有console和定时器两个API. 三.Buffer Buffer 是一个类似于数组的 对象&#xff0c;用于表示固定长度的字节序列。Buffer…

Python中字符串和正则表达式

Python中字符串和正则表达式 在Python编程中&#xff0c;字符串是最常用的数据类型之一。字符串用于表示文本数据&#xff0c;而正则表达式则是一种强大的工具&#xff0c;用于处理和匹配字符串中的模式。本文将介绍Python中的字符串操作、字符串格式化以及如何使用正则表达式…