springboot 实现kafka多源配置

文章目录

  • 背景
  • 核心配置
    • 自动化配置类
    • 注册生产者、消费者核心bean到spring
    • 配置spring.factories
    • yml配置
    • 使用
  • 源码仓库

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka
@Configuration(proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {public MyKafkaAutoConfiguration() {}public void setBeanFactory(BeanFactory beanFactory) throws BeansException {beanFactory.getBean(CustomKafkaDataSourceRegister.class);}
}

注册生产者、消费者核心bean到spring

public void afterPropertiesSet() {Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();if (factories != null && !factories.isEmpty()) {factories.forEach((factoryName, consumerConfig) -> {KafkaProperties.Listener listener = consumerConfig.getListener();Integer concurrency = consumerConfig.getConcurrency();// 创建监听容器工厂ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);// 注册到容器if (!beanFactory.containsBean(factoryName)) {beanFactory.registerSingleton(factoryName, containerFactory);}});}Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();if (!ObjectUtils.isEmpty(templates)) {templates.forEach((templateName, producerConfig) -> {//registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);//注册spring bean的两种方式registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));});}}

配置spring.factories


org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.kafka.MyKafkaAutoConfiguration

yml配置

spring:kafka:multiple:consumer:factories:test-factory:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: 192.168.56.112:9092group-id: group_aconcurrency: 25fetch-min-size: 1048576fetch-max-wait: 3000listener:type: batchproperties:spring-json-trusted-packages: '*'key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latestproducer:templates:test-template:bootstrap-servers: 192.168.56.112:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在这里插入图片描述

在这里插入图片描述

源码仓库

https://github.com/fafeidou/shield

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/21207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SwiftUI知识点(一)

前言&#xff1a; Swift知识点&#xff0c;大至看完了&#xff0c;公司项目是Swift语言写的&#xff0c;后续苹果新出的SwiftUI&#xff0c;也需要学习一下 不知觉间&#xff0c;SwiftUI是19年出的&#xff0c;现在24年&#xff0c;5年前的东西了 学习的几个原因&#xff1a; …

Android Media Framework(一)OpenMAX 框架简介

学习开源代码最快的方式是先阅读它的文档&#xff0c;再查看它的头文件&#xff0c;最后研读代码实现并进行编译调试。Android早期引入OpenMAX IL作为使用音视频编解码器的标准接口&#xff0c;了解Android Media框架的底层运行原理要从OMX IL开始。在这一节&#xff0c;我们将…

本机安装深度学习库cuda11.8,cudnn8.6和tensorRT8.5

https://blog.csdn.net/qq_46107892/article/details/131453019 首先是安装cuda11.8 wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pinsudo mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600wg…

44-3 waf绕过 - WAF绕过方法

环境准备: 43-5 waf绕过 - 安全狗简介及安装-CSDN博客然后在安装pikachu靶场:构建完善的安全渗透测试环境:推荐工具、资源和下载链接_渗透测试靶机下载-CSDN博客一、首先验证云WAF是否存在于靶场(老师的靶场是部署在阿里云) 靶场地址:http://127.0.0.1/pikachu-master/v…

游戏找不到steam_api64.dll如何解决,全面解析原因及解决方法

在现代游戏中&#xff0c;Steam平台已经成为了玩家们下载、安装和玩游戏的主要渠道之一。然而&#xff0c;有些玩家可能会遇到一个问题&#xff0c;即游戏找不到steam_api64.dll文件。这个问题可能会导致游戏无法正常运行或启动。本文将详细介绍如何解决这个问题&#xff0c;帮…

23、linux系统文件和日志分析

linux文件系统与日志分析 文件时存储在硬盘上的&#xff0c;硬盘上的最小存储单位是扇区&#xff0c;每个扇区大大小是512字节。 inode&#xff1a;元信息&#xff08;文件的属性 权限&#xff0c;创建者&#xff0c;创建日期等&#xff09; block&#xff1a;块&#xff0c…

ZDH-数据管理模块

目录 主题 项目源码 预览地址 安装包下载地址 数据管理服务 数据资源管理 数据资源权限 数据资源血缘 总结 感谢支持 主题 本篇文章主要介绍ZDH-数据管理服务及应用场景 项目源码 zdh_web: GitHub - zhaoyachao/zdh_web: 大数据采集,抽取平台 预览地址 后台管理…

Mac安装pytorch(二)

书接上回&#xff0c;配置好了pytorch环境后&#xff0c;看看是否真的能用 终端输入一下代码&#xff1a; import torch xtorch.rand(3,4) print(x) 出现这些后表明安装完成&#xff0c;可使用 接下来在pycharm中使用 打开设置

JavaScript数组应用

检测数据类型 1.typeof()可以检测基本数据类型&#xff0c;但是在检测null时会返回object。另外它不能检测负责的数据类型&#xff0c;如正则表达式对象 2.constructor可以检测绝大部分数据的类型&#xff0c;但是不能检测null和underfined的数据类型 3.toString()方法&#x…

视频监控平台AS1000:通过网络SDK接入松下视频监控设备(Panasonic监控摄像机) 的源代码的函数和功能介绍及分享

目录 一、视频监控平台介绍 1、概述 2、视频接入能力介绍 3、功能介绍 二、PANASONIC网络摄像机 1、产品种类与定位 2、规格参数 3、功能特点 4、环境适应性 5、网络功能 6、其他特性 三、代码和解释 1、代码和注释 2、函数功能说明 &#xff08;1&#xff09;处…

网络网络层

data: 2024/5/25 14:02:20 周六 limou3434 叠甲&#xff1a;以下文章主要是依靠我的实际编码学习中总结出来的经验之谈&#xff0c;求逻辑自洽&#xff0c;不能百分百保证正确&#xff0c;有错误、未定义、不合适的内容请尽情指出&#xff01; 文章目录 1.协议结构2.封装分离3.…

2023年全国职业院校技能大赛(高职组)“云计算应用”赛项赛卷7(私有云)

#需要资源&#xff08;软件包及镜像&#xff09;或有问题的&#xff0c;可私聊博主&#xff01;&#xff01;&#xff01; #需要资源&#xff08;软件包及镜像&#xff09;或有问题的&#xff0c;可私聊博主&#xff01;&#xff01;&#xff01; #需要资源&#xff08;软件包…

24年护网工具,今年想参加护网的同学要会用

24年护网工具集 吉祥学安全知识星球&#x1f517;http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247483727&idx1&sndb05d8c1115a4539716eddd9fde4e5c9&chksmc0e47813f793f105017fb8551c9b996dc7782987e19efb166ab665f44ca6d900210e6c4c0281&scene21…

513.找树左下角的值

给定一个二叉树&#xff0c;在树的最后一行找到最左边的值。 示例 1: 示例 2: 思路&#xff1a; 深度最大的叶子结点一定是最后一行。 优先左边搜索&#xff0c;记录深度最大的叶子节点&#xff0c;此时就是树的最后一行最左边的值 代码&#xff1a; class Solution:def fi…

【C#】自定义List排序规则的两种方式

目录 1.系统排序原理 2.方式一&#xff1a;调用接口并重写 3.方式二&#xff1a;传排序规则函数做参数 1.系统排序原理 当我们对一个List<int>类型的数组如list1排序时&#xff0c;一个轻松的list1.sort();帮我们解决了问题 但是在实际应用过程中&#xff0c;往往我们…

【Python】Python异步编程

Python 异步编程 异步编程 异步编程是一种编程范式&#xff0c;通过非阻塞的方式执行任务&#xff0c;允许程序在等待某些操作&#xff08;如I/O操作、网络请求、数据库查询等&#xff09;完成时&#xff0c;继续执行其他任务。这与同步编程&#xff08;或阻塞编程&#xff09…

鸿蒙开发接口资源管理:【@ohos.intl (国际化-Intl)】

国际化-Intl 说明&#xff1a;开发前请熟悉鸿蒙开发指导文档&#xff1a;gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md点击或者复制转到。 本模块首批接口从API version 6开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。Intl模块…

找出字符串中出现最多次数的字符以及出现的次数

str.charAt(i) 是JavaScript中获取字符串中特定位置字符的方法&#xff0c;表示获取当前的字符。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-wi…

遥感影像信息提取

刘老师&#xff08;副教授&#xff09;&#xff0c;来自双一流重点高校&#xff0c;长期从事GIS/RS/3S技术及其生态环境领域中的应用等方面的研究和教学工作&#xff0c;并参与GIS的二次开发&#xff0c;发表多篇sci论文&#xff0c;具有资深的技术底蕴和专业背景。 专题一&am…

代码审计:Fortify SCA 代码审计神器.

什么是 Fortify SCA 代码审计工具 Fortify 是一个静态的、白盒的软件源代码安全测试工具。它通过内置的五大主要分析引擎&#xff1a;数据流、语义、结构、控制流、配置流等对应用软件的源代码进行静态的分析&#xff0c;通过与软件安全漏洞规则集进行匹配、查找&#xff0c;从…