kafka广播消费组停机后未删除优化

背景

kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀+时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是

会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除kafka下面的消费组,导致消费组越来越多,目前

我们有promethes监控kafka消息偏移,一直没有消费的消费组就会进行报警;

解决思路

既然是没有删除消费组就通过优雅停机,应用关闭前采用java的api操作kafka消费组,进行删除

代码实现

1)编写类实现DisposableBean接口,实现destroy方法,注意每个项目定义的id会不一样,此例子中 id = “cfgs-broadcast”

package com.simo.vsim.cfgs.init;import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;@Data
@Component
@Slf4j
public class ApplicationListen implements InitializingBean, DisposableBean {@Resourceprivate KafkaListenerEndpointRegistry registry;@NacosValue(value = "${spring.kafka.bootstrap-servers}", autoRefreshed = true)private String servers;@Overridepublic void destroy()  {MessageListenerContainer listenerContainer = registry.getListenerContainer("cfgs-broadcast");String groupId = listenerContainer.getGroupId();Map<String, Object> props = new HashMap<>(1);props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);AdminClient adminClient = AdminClient.create(props);DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(groupId));KafkaFuture resultFuture = deleteConsumerGroupsResult.all();try {resultFuture.get();log.info("kafka关闭消费组="+groupId);} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}adminClient.close();}@Overridepublic void afterPropertiesSet() {}
}

2)接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除,id = “cfgs-broadcast”

/*** groupId不一样代表广播模式,earliest 可能重复消费,latest可能漏消费* @param message* @param ack*/
@KafkaListener(containerFactory = "manualImmediateListenerContainerFactory" , topics = {"${kafka.topic.cfgs-broadcast}"},properties = {"auto.offset.reset=latest"},groupId = "cfgs-broadcast-" + "#{T(java.lang.System).currentTimeMillis()}",idIsGroup = false,id = "cfgs-broadcast")
public void onMessageManualBroadcast(List<Object> message, Acknowledgment ack){message.forEach(item -> handleMsg(2,item));//直接提交offsetack.acknowledge();
}

效果

1)正常启动有这个消费组:cfgs-broadcast-1696754926097

2)重新启动,通过日志显示已经删除(k8s默认是优雅停机)
在这里插入图片描述
如果是iead直接关闭下,不要一下子点击两下停止,点击一次是优雅停机,连续点击2次就是kill -9的效果,就无法看到效果
![在这里插入图片描述](https://img-blog.csdnimg.cn/d36947cdd8f048acaa886eadafeaa34b.png

3)查看kafka消费组,确实已经删除

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Bun v1.0.3 发布,Zig 编写的 JavaScript 运行时

导读Bun 发布了其最新版本 v1.0.3&#xff0c;这是一个集 JavaScript 运行时、打包器、转译器和包管理器于一体的工具。这次的更新不仅修复了众多已知的问题&#xff0c;还引入了一系列令人期待的新功能。 首先&#xff0c;Bun 在这个版本中增加了对 TypeScript 的 emitDecora…

Java架构师主流架构设计模式

目录 1 主流架构设计模式1.1 分层架构模式1.1.1 分层架构的一些常见原则1.1.2 分层的单向依赖原则1.1.3 无循环依赖的原则1.1.4 避免跨层通信的原则1.2 微服务架构模式1.3 基于事件的架构模式2 整洁架构3 六边形架构4 微内核架构模式5 基于空间的架构模式6 道过滤器模式和代理模…

31 数据分析(中)numpy介绍

文章目录 工具excelTableauPower Queryjupytermatplotlibnumpy安装导入包快速掌握&#xff08;bushi&#xff09;array和list的相互转化 np的range多维数组的属性array的改变形状array升降维度array内元素的类型数和array的运算array之间的加减法认识轴切片条件与逻辑修改值app…

XPS测试仪器-科学指南针

在做 X 射线光电子能谱(XPS)测试时&#xff0c;科学指南针检测平台工作人员在与很多同学沟通中了解到&#xff0c;好多同学仅仅是通过文献或者师兄师姐的推荐对XPS测试有了解&#xff0c;但是对于其测试仪器还属于小白阶段&#xff0c;针对此&#xff0c;科学指南针检测平台团队…

练[HarekazeCTF2019]encode_and_encode

[HarekazeCTF2019]encode_and_encode 文章目录 [HarekazeCTF2019]encode_and_encode掌握知识解题思路代码分析 关键paylaod 掌握知识 ​ JSON对Unicode字符的解析转义&#xff0c;json格式的构建&#xff0c;代码审计&#xff0c;php伪协议的利用&#xff0c;file_get_content…

ELK集群 日志中心集群、kafka、logstash

ES&#xff1a;用来日志存储 Logstash:用来日志的搜集&#xff0c;进行日志格式转换并且传送给别人&#xff08;转发&#xff09; Kibana:主要用于日志的展示和分析 kafka Filebeat:搜集文件数据 es-1 本地解析 vi /etc/hosts scp /etc/hosts es-2:/etc/hosts scp /etc…

px4仿真实现无人机自主飞行

一,确定消息类型 无人机通过即在电脑是现自主飞行:思路如下。 通过Mavros功能包,将ROS消息转换为Mavlink消息。实现对无人机的控制。 几种消息之间的关系如下: 对于ROS数据,就是我们机载电脑执行ROS系统的数据。 对于Mavros消息,就是Mavros功能包内部的消息。查询网站…

Maven 环境配置

Maven 是一个基于 Java 的工具&#xff0c;所以要做的第一件事情就是安装 JDK。 系统要求 项目 要求 JDK Maven 3.3 要求 JDK 1.7 或以上 Maven 3.2 要求 JDK 1.6 或以上 Maven 3.0/3.1 要求 JDK 1.5 或以上 内存 没有最低要求 磁盘 Maven 自身安装需要大约 10 MB 空间…

振弦采集仪应用于隧道安全监测

振弦采集仪应用于隧道安全监测 振弦采集仪是当今必不可少的现代隧道安全监测工具。该设备广泛应用于隧道内部各种安全参数的实时监测&#xff0c;包括但不限于隧道变形、裂缝、压力、温度等。本文详细介绍了振弦采集仪在隧道安全监测中的应用。 首先&#xff0c;我们来了解一下…

STM32使用HAL库驱动DS3231

1、STM32通讯口配置 启动IIC&#xff0c;默认配置即可。 2、头文件 #ifndef __DS3231_H #define __DS3231_H#include "main.h"#define DS3231_COM_PORT hi2c1 /*通讯端口*//**************************** defines *******************************/ #define DS3231…

算法题:摆动序列(贪心算法解决序列问题)

这道题是一道贪心算法题&#xff0c;如果前两个数是递增&#xff0c;则后面要递减&#xff0c;如果不符合则往后遍历&#xff0c;直到找到符合的。&#xff08;完整题目附在了最后&#xff09; 代码如下&#xff1a; class Solution(object):def wiggleMaxLength(self, nums):…

前后端分离计算机毕设项目之基于SpringBoot的旅游网站的设计与实现《内含源码+文档+部署教程》

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

git介绍和安装、(git,github,gitlab,gitee介绍)、git工作流程、git常用命令、git忽略文件

1 git介绍和安装 2 git&#xff0c;github&#xff0c;gitlab&#xff0c;gitee介绍 3 git工作流程 4 git常用命令 5 git忽略文件 1 git介绍和安装 首页功能写完了---》正常应该提交到版本仓库---》大家都能看到这个---》 运维应该把现在这个项目部署到测试环境中---》测试…

2023年中国汽车后市场行业研究报告

第一章 行业概况 1.1 定义 汽车后市场行业在中国的快速崛起&#xff0c;反映了汽车产业链的完善和消费者需求的多样化。这个行业涵盖了汽车销售后&#xff0c;围绕汽车使用过程中涌现的各类服务和交易活动。它不仅为消费者提供了汽车使用过程中所需的全方位服务&#xff0c;也…

【C刷题】day4

一、选择题 1、设变量已正确定义&#xff0c;以下不能统计出一行中输入字符个数&#xff08;不包含回车符&#xff09;的程序段是&#xff08; &#xff09; A: n0;while(chgetchar()!\n)n; B: n0;while(getchar()!\n)n; C: for(n0;getchar()!\n…

C语言中文网 - Shell脚本 - 4

第1章 Shell基础&#xff08;开胃菜&#xff09; 4. 进入Shell的两种方式 在 Linux 发展的早期&#xff0c;唯一能用的工具就是 Shell&#xff0c;Linux 用户都是在 Shell 中输入文本命令&#xff0c;并查看文本输出&#xff1b;如果有必要的话&#xff0c;Shell 也能显示一些…

计算机毕业设计选什么题目好?springboot 社区流浪动物救助领养系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

短视频视频号矩阵系统源码独立部署开发对接

一、多账号矩阵管理功能&#xff08;基于api接口开发与没有官方接口开发的区别&#xff09; 基于API接口开发&#xff0c;可以通过调用官方提供的接口获取账号信息、创建新账号、更新账号设置等操作&#xff0c;实现自动化的账号管理绑定授权&#xff0c;通过相关的接口开发绑定…

如何在 Spring Boot 中进行文件上传

在 Spring Boot 中进行文件上传 文件上传是Web应用程序中常见的功能之一&#xff0c;它允许用户将文件从客户端上传到服务器。Spring Boot提供了便捷的方式来处理文件上传&#xff0c;并且整合了Spring框架的强大功能&#xff0c;使文件上传变得相对简单。本文将介绍如何在Spr…

FPGA---UDP通信求助

项目场景&#xff1a; 使用UDP进行回环&#xff0c;网络调试助手&#xff0c;发送数据通过UDP接收模块接收&#xff0c;解析出数据&#xff0c;给到UDP发送模块&#xff0c;传回上位机。 问题描述 UDP接收模块中&#xff0c;接收到的CRC校验值与自己计算CRC校验值进行判断&am…