自定义kafka客户端消费topic

文章目录

  • 自定义kafka客户端消费topic
    • 结论
    • 1 背景
    • 2 spring集成2.1.8.RELEASE版本不支持autoStartup属性
    • 3 自定义kafka客户端消费topic
      • 3.1 yml配置
      • 3.2 KafkaConfig客户端配置
      • 3.3 手动启动消费客户端

自定义kafka客户端消费topic

结论

使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。

1 背景

后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic

2 spring集成2.1.8.RELEASE版本不支持autoStartup属性

使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没有找到可以直接配置属性autoStartup = "false"来手动启动topic,可能是版本低的原因,如果有可以支持的版本,也可以打在评论区,我去验证一下。

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.8.RELEASE</version>
</dependency>
@KafkaListener(topics = "<Kafka主题>", autoStartup = "false") 
public void receive(String message) {    // 处理接收到的消息 
}

3 自定义kafka客户端消费topic

3.1 yml配置

spring:kafka:bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092consumer:group-id: data-devenable-auto-commit: trueauto-offset-reset: latestauto-commit-interval: 1000topic:costomTopic: costomData

3.2 KafkaConfig客户端配置

kafka其他配置项和原有的kafka客户端配置一样,只有额外增加了一个cutomConsumer让spring来管理,方便手动启动客户端来使用

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;//    @Value("${spring.kafka.listener.concurrency}")
//    private Integer concurrency;@Value("${spring.kafka.consumer.auto-commit-interval}")private Integer autoCommitInterval;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// concurrencyfactory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}@Beanpublic KafkaConsumer cutomConsumer() {// 新建一个自定义启动消费者KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs());return consumer;}
}

3.3 手动启动消费客户端

这里手动启动消费客户端只有在配置了costomTopic才开始启动,如果需要动态指定启停topic

@Component
public class CutomKafkaConsumer {// 使用cutomConsumer实例消费@Autowiredprivate KafkaConsumer cutomConsumer;@Value("${spring.kafka.topic.costomTopic:}")public void setCostomTopic(String costomTopic) {// 手动启动消费类,防止下级模块默认不配置costomTopic导致启动报错if (StringUtils.isEmpty(costomTopic)) {return;}// 使这个消费者订阅对应话题cutomConsumer.subscribe(Collections.singleton(costomTopic));// 单线程拉取消息ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();consumerExecutor.submit(new Runnable() {@Overridepublic void run() {while (true) {ConsumerRecords<String, String> records = cutomConsumer.poll(3000);if (!records.iterator().hasNext()) {continue;}try {// 捕获异常,防止顶级消费循环被异常中断records.forEach(record -> operate(record));} catch (Exception e) {log.error("消费数据失败,失败原因: {}", e.getMessage(), e);}// 通过异步的方式提交位移cutomConsumer.commitAsync(((offsets, exception) -> {if (exception == null) {offsets.forEach((topicPartition, metadata) -> {System.out.println(topicPartition + " -> offset=" + metadata.offset());});} else {exception.printStackTrace();// 如果出错了,同步提交位移cutomConsumer.commitSync(offsets);}}));}}});}
}    public void operate(ConsumerRecord<String, String> record) {log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());
}

参考:
Kafka消费者——API开发
Kafka Consumer如何实现精确一次消费数据
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
@KafkaListener 详解及消息消费启停控制
kafka多个消费者消费一个topic_kafka消费者组与重平衡机制,了解一下
kafka学习(五):消费者分区策略(再平衡机制)
Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/215493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人体关键点检测2:Pytorch实现人体关键点检测(人体姿势估计)含训练代码

人体关键点检测2&#xff1a;Pytorch实现人体关键点检测(人体姿势估计)含训练代码 目录 人体关键点检测2&#xff1a;Pytorch实现人体关键点检测(人体姿势估计)含训练代码 1. 前言 2.人体关键点检测方法 (1)Top-Down(自上而下)方法 (2)Bottom-Up(自下而上)方法&#xff1…

Android - 分区存储 MediaStore、SAF

官方页面 参考文章 一、概念 分区存储&#xff08;Scoped Storage&#xff09;的推出是针对 APP 访问外部存储的行为&#xff08;乱建乱获取文件和文件夹&#xff09;进行规范和限制&#xff0c;以减少混乱使得用户能更好的控制自己的文件。 公有目录被分为两大类&#xff1a;…

会员运营常用的ChatGPT通用提示词模板

会员体系&#xff1a;如何建立和完善会员体系&#xff1f; 会员等级&#xff1a;如何设定会员等级及权益&#xff1f; 会员留存&#xff1a;如何提高会员留存率&#xff1f; 会员活跃度&#xff1a;如何提高会员活跃度&#xff1f; 会员招募&#xff1a;如何招募新会员&…

ubuntu install sqlmap

refer: https://github.com/sqlmapproject/sqlmap 安装sqlmap&#xff0c;可以直接使用git 克隆整个sqlmap项目&#xff1a; git clone --depth 1 https://github.com/sqlmapproject/sqlmap.git sqlmap-dev 2.然后进入sqlmap-dev&#xff0c;使用命令&#xff1a; python s…

静态代理IP搭建步骤,静态匿名在线代理IP如何使用?

静态代理搭建步骤 1. 确定需求 在搭建静态代理之前&#xff0c;需要明确自己的需求&#xff0c;包括代理服务器的位置、访问速度、匿名性、安全性等方面的要求。 2. 选择代理服务器提供商 可以选择自己购买服务器搭建代理&#xff0c;也可以选择使用云服务提供商的代理服务…

【Python百宝箱】探索强化学习算法的利器:航行在AI之海的罗盘指南

强化学习的工具宝盒&#xff1a;探索各色瑰宝&#xff0c;点亮智能之旅 前言 人工智能和强化学习正成为推动科技进步的重要力量。在这个领域中&#xff0c;使用适当的库和工具可以加速算法研发和应用部署的过程。本文将深入探索一系列具有代表性的强化学习库和工具&#xff0…

有趣的数学 用示例来阐述什么是初值问题二

一、示例 解决以下初值问题。 解决这个初始值问题的第一步是找到一个通用的解决方案。为此&#xff0c;我们找到微分方程两边的反导数。 即 我们能够对两边进行积分&#xff0c;因为y项是单独出现的。请注意&#xff0c;有两个积分常数&#xff1a;C1和C2。求解前面的方程y给出…

电工--半导体器件

目录 半导体的导电特性 PN结及其单向导电性 二极管 稳压二极管 双极型晶体管 半导体的导电特性 本征半导体&#xff1a;完全纯净的、晶格完整的半导体 载流子&#xff1a;自由电子和空穴 温度愈高&#xff0c;载流子数目愈多&#xff0c;导电性能就愈好 型半导体&…

28. Python Web 编程:Django 基础教程

目录 安装使用创建项目启动服务器创建数据库创建应用创建模型设计路由设计视图设计模版 安装使用 Django 项目主页&#xff1a;https://www.djangoproject.com 访问官网 https://www.djangoproject.com/download/ 或者 https://github.com/django/django Windows 按住winR 输…

docker build构建报错:shim error: docker-runc not installed on system

问题&#xff1a; docker构建镜像时报错&#xff1a;shim error: docker-runc not installed on system 解决&#xff1a; ln -s /usr/libexec/docker/docker-runc-current /usr/bin/docker-runc

MySQL数据库——锁-表级锁(表锁、元数据锁、意向锁)

目录 介绍 表锁 语法 特点 元数据锁 介绍 演示 意向锁 介绍 分类 演示 介绍 表级锁&#xff0c;每次操作锁住整张表。锁定粒度大&#xff0c;发生锁冲突的概率最高&#xff0c;并发度最低。应用在MyISAM、InnoDB、BDB等存储引擎中。 对于表级锁&#xff0c;主要…

选择排序和堆排序

目录 前言 一.选择排序 1.思想 2.实现 3.特点 二.堆排序 1.思想 2.实现 3.特点 前言 排序算法是计算机科学中的基础工具之一&#xff0c;对于数据处理和算法设计有着深远的影响。了解不同排序算法的特性和适用场景&#xff0c;能够帮助程序员在特定情况下…

【Go】基于GoFiber从零开始搭建一个GoWeb后台管理系统(一)搭建项目

前言 最近两个月一直在忙公司的项目&#xff0c;上班时间经常高强度写代码&#xff0c;下班了只想躺着&#xff0c;没心思再学习、做自己的项目了。最近这几天轻松一点了&#xff0c;终于有时间 摸鱼了 做自己的事了&#xff0c;所以到现在我总算是搭起来一个比较完整的后台管…

nrfutil工具安装

准备工作&#xff0c;下载相关安装包 链接&#xff1a;https://pan.baidu.com/s/1LWxhibf8LiP_Cq3sw0kALQ 提取码&#xff1a;2dlc 解压后&#xff0c;分别安装以下安装包 在C盘下创建目录nordic_tools&#xff0c;并将nrfutil复制到刚创建的目录下 环境变量path下添加C:\nor…

图像采集卡 Xtium™2-XGV PX8支持高速 GigE Vision 工业相机

图像采集卡&#xff08;Image Capture Card&#xff09;&#xff0c;又称图像捕捉卡&#xff0c;是一种可以获取数字化视频图像信息&#xff0c;并将其存储和播放出来的硬件设备。很多图像采集卡能在捕捉视频信息的同时获得伴音&#xff0c;使音频部分和视频部分在数字化时同步…

python elasticsearch 日期聚合

索引以及数据如下 PUT dateagg {"mappings": {"properties": {"charge":{"type": "double"},"types":{"type": "keyword"},"create_date":{"type": "date",&…

裸机单片机适用的软件架构

单片机通常分为三种工作模式&#xff0c;分别是 1、前后台顺序执行法 2、操作系统 3、时间片轮询法 1、前后台顺序执行法 利用单片机的中断进行前后台切换&#xff0c;然后进行任务顺序执行&#xff0c;但其实在…

Spring Boot Web

目录 一. 概述 二. Spring Boot Web 1.2.1 创建SpringBoot工程&#xff08;需要联网&#xff09; 1.2.2 定义请求处理类 1.2.3 运行测试 1.3 Web分析 三. Http协议 3.1 HTTP-概述 刚才提到HTTP协议是规定了请求和响应数据的格式&#xff0c;那具体的格式是什么呢? 3…

spring结合设计模式之策略模式

策略模式基本概念&#xff1a; 一个接口或者抽象类&#xff0c;里面两个方法&#xff08;一个方法匹配类型&#xff0c;一个可替换的逻辑实现方法&#xff09;不同策略的差异化实现(就是说&#xff0c;不同策略的实现类) 使用策略模式替换判断&#xff0c;使代码更加优雅。 …

Swagger快速上手

快速开始&#xff1a; 导入maven包 <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version> </dependency><dependency><groupId>io.springfox<…