如何使用@KafkaListener实现从nacos中动态获取监听的topic

1、简介

        对于经常需要变更kafka主题的场景,为了实现动态监听topic的功能,可以使用以下方式。

2、使用步骤
2.1、添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
2.2、nacos中配置
    # kafka 配置
spring:kafka:bootstrap-servers: ip地址:9092topics: topic1,tpic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerenable-idempotence: trueacks: alltransactional-id: kafka-groupconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: kafka-clickhouse-groupauto-offset-reset: latestenable-auto-commit: falseisolation-level: read_committedallow-auto-create-topics: truelistener:ack-mode: MANUAL_IMMEDIATEconcurrency: 3
2.3、配置类
package org.aecsi.kafkadatatock.config;import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Configuration
@RequiredArgsConstructor
@EnableKafka
@RefreshScope
public class KafkaConfig {private final KafkaProperties kafkaProperties;@Beanpublic KafkaAdmin kafkaAdmin() {return new KafkaAdmin(kafkaProperties.buildAdminProperties());}@Beanpublic AdminClient adminClient(KafkaAdmin kafkaAdmin) {return AdminClient.create(kafkaAdmin.getConfigurationProperties());}@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildProducerProperties());configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-clickhouse-producer");DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configProps);factory.setTransactionIdPrefix("kafka-clickhouse-producer-");return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}@Bean@RefreshScopepublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildConsumerProperties());configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true);configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaTransactionManager<String, String> transactionManager) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setTransactionManager(transactionManager);return factory;}@Bean@RefreshScopepublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setAutoStartup(true);return factory;}@Beanpublic ApplicationRunner kafkaListenerStarter(KafkaListenerEndpointRegistry registry) {return args -> {// 启动所有 Kafka 监听器registry.start();};}
}

接收消息类

@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", autoStartup = "false")@Transactional(transactionManager = "transactionManager")public void processMessage(ConsumerRecord<String, String> record,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {try {log.info("kafka 接受 topic: {} 消息", topic);
//          处理消息acknowledgment.acknowledge();} catch (Exception e) {log.error("Error processing message for topic {}: {}", topic, e.getMessage());throw e;}}

 主启动类添加一个注解

@EnableConfigurationProperties(KafkaProperties.class)
3、总结

      实现kafka动态获取topic还有其他方式,博主目前只验证这一种,其他方式待更新。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/903014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《TCP/IP详解 卷1:协议》之第七、八章:Ping Traceroute

目录 一、ICMP回显请求和回显应答 1、ICMP回显请求 2、ICMP回显应答 二、ARP高速缓存 三、IP记录路由选项&#xff08;Record Route&#xff0c;RR&#xff09; 1、记录路由选项的工作过程 2、RR 选项的 IP 头部格式 2.1、RR 请求 2.2、RR响应 四、ping 的去返路径 五…

30天通过软考高项-第四天

30天通过软考高项-第四天 任务&#xff1a;项目进度管理 思维导图阅读 知识点集锦阅读 知识点记忆 章节习题练习 知识点练习 手写回忆ITTO 听一遍喜马拉雅关于范围的内容 进度管理-背 1. 过程定义 龟腚排池至控 规划进度管理&#xff1a;为了规划、编制、管理…

根据JSON动态生成表单表格

根据JSON动态生成表单表格 一. 子组件 DynamicFormTable.vue1,根据JSON数据动态生成表单表格,支持表单验证JS部分1.1,props数据1.2,表单数据和数据监听1.3,自动验证1.4,表单验证1.5,获取表单数据1.6,事件处理1.7,暴露方法给父组件2,HTML部分二,父组件1, 模拟数据2,…

【赵渝强老师】快速上手TiDB数据库

从TiDBv4.0起&#xff0c;提供了包管理工具TiUP&#xff0c;负责管理TiDB、PD、TiKV等组件。用户只需通过TiUP命令即可运行这些组件&#xff0c;显著降低了管理难度。TiUP程序只包含少数几个命令&#xff0c;用来下载、更新、卸载组件。TiUP通过各种组件来扩展其功能。组件是一…

springboot入门-DTO数据传输层

在 Spring Boot 应用中&#xff0c;DTO&#xff08;Data Transfer Object&#xff0c;数据传输对象&#xff09; 是专门用于在不同层&#xff08;如 Controller 层、Service 层、外部系统&#xff09;之间传输数据的对象。它的核心目的是解耦数据模型和业务逻辑&#xff0c;避免…

安装docker,在docker上安装mysql,docker上安装nginx

目录 一.安装docker 1.1查看Linux版本的命令这里推荐两种&#xff1a; 1.2查看内核版本有三种方式&#xff1a; 2.安装 2.1 如果之前安装了docker&#xff0c;先删除旧版本的doker 2.2 安装需要的软件包&#xff0c;yum-util提供yum-config-manager功能&#xff0c;另外两…

Android killPackageProcessesLSP 源码分析

该方法用于终止指定包名/用户ID/应用ID下符合条件的应用进程&#xff0c;涉及多进程管理、资源冻结、进程清理及优先级更新等操作。核心流程分为进程筛选、资源冻结、进程终止与资源恢复三个阶段。 /*** 从已排序的进程列表中&#xff0c;提取从指定起始索引 startIdx 开始的连…

openAICEO山姆奥特曼未来预测雄文之三个观察

《三个观察》 山姆奥特曼 这篇文章主要讲的是关于AGI&#xff08;人工通用智能&#xff09;的未来发展及其对社会的影响&#xff0c;用大白话总结如下&#xff1a; 核心观点&#xff1a; AGI是什么&#xff1f; AGI是一种能像人类一样解决各种复杂问题的智能系统&#xff0c;比…

部署yolo到k230教程

训练&#xff1a;K230 借助 AICube部署AI 视觉模型 YOLO等教程_嘉楠 ai cube多标签分类-CSDN博客K230模型训练ai cube报错生成部署文件异常_aicube部署模型显示生成部署文件异常-CSDN博客 部署&#xff1a; # 导入必要的库和模块 import os import ujson # 超快的JS…

Flask 应用封装成 Docker 服务的完整技术指南

一、实现原理 容器化核心逻辑 Docker 通过将应用代码、运行环境和依赖项打包成镜像&#xff0c;实现环境一致性。Flask 应用容器化需包含&#xff1a; Python 基础运行环境项目代码及依赖库&#xff08;requirements.txt&#xff09;WSGI服务器&#xff08;如 Gunicorn&#xf…

windows上的 Vmware Workstation 环境搭建

本文的视频版本&#xff1a;https://www.bilibili.com/video/BV1JhLRzyESh Vmware Workstation 是一款跨平台的桌面级虚拟化软件&#xff0c;可以使用 Vmware 创建虚拟机&#xff0c;我们一般使用 Linux 虚拟机&#xff08;目前主流的 Linux 发行版是 Ubuntu&#xff09;&…

Linux下终端命令行安装常见字体示例

一、准备工作&#xff1a; 准备好要安装的字体文件&#xff0c;如宋体、微软雅黑&#xff08;simsun.ttc、msyh.ttc)。进入字体路径&#xff1a; /usr/share/fonts&#xff0c;使用root权限&#xff0c;新建一个目录shell_fonts。 二、命令行安装字体&#xff1a; 将要安装…

CentOS中在线安装Docker(超详细)

1&#xff09;检查安装docker的基本要求&#xff1a; 64位CPU架构的计算机&#xff0c;目前不支持32为CPU架构的计算机 系统的Linux内核版本为3.10及以上 开启CGroups和namespace功能 2&#xff09;使用命令查看当前系统的内核版本 [rootlocalhost ~]# uname -r 3.10.0-862…

武汉昊衡科技OLI光纤微裂纹检测仪:高密度光器件的精准守护者

随着AI技术应用越来越广&#xff0c;算力需求激增&#xff0c;光通信系统正加速向小型化、高密度、多通道方向演进。硅光芯片、高速光模块等核心器件内部的光纤通道数量成倍增加&#xff0c;波导结构愈发精细&#xff0c;传统检测手段因分辨率不足、效率低下&#xff0c;难以精…

Java数据结构——Stack

Stack 栈的概念和使用栈的概念栈的使用 栈的应用出栈元素序列有效的括号栈的压入、弹出序列逆波兰表达式最小栈 栈的概念和使用 栈的概念 栈(Stack)&#xff1a;一种特殊的线性表&#xff0c;只允许再栈的一端进行插入和删除元素&#xff0c;这一端点被称为栈顶&#xff0c;另…

神经网络与计算机视觉

2016 年,随着 AlphaGo 在围棋比赛中击败李世石,“人工智能”、“神经网络”、“深度 学习”等字眼便越来越多的出现在大众眼前,智能化好像成为一种不可逆转的趋势,带给大家新奇感的同时也带来了一丝忧惧:在不远的未来,机器是否真的拥有思维和情感?《终结者》中天网大战人…

VS2019 与gitcode团队管理

1、安装git 点击下一步安装即可 2、vs2019连接gitcode 然后更改本地的代码添加文件等都可以进行远程同步操作了

Python类和对象四(十三)

魔法方法&#xff1a; 按位运算 按位于运算 只要相同才是1 或运算&#xff1a; 只要某个位是1结果就是1 、 按位非 将结果取反 按位异或&#xff1a; 左移和右移运算符&#xff1a; 右移两位 右移动n位&#xff0c;就是除以2的n次方 左移两位&#xff1a; 左移n位就是乘…

如何设置极狐GitLab 议题截止日?

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;关于中文参考文档和资料有&#xff1a; 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 截止日期 (BASIC ALL) 可以在议题中使用截止日期&#xff0c;来跟踪截止日期并确保功能按时交付。用户至少需要报告者权限才…

如何在 Conda 环境中降级 Python 版本:详细指南

如何在 Conda 环境中降级 Python 版本&#xff1a;详细指南 Python 版本的管理在开发过程中至关重要&#xff0c;特别是在处理不同项目需求时。对于使用 Conda 环境的 Python 程序员来说&#xff0c;版本管理不仅仅是安装不同的 Python 版本&#xff0c;还涉及到依赖关系的兼容…