Spring Boot配置多个Kafka数据源

一、配置文件

application.properties配置文件如下

#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true

二、pom依赖

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

三、生产者、消费者配置

1.第一个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}

2.第二个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}

四.生产者

@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}

五.消费者

@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}

备注:

生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:

Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113221.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Elasticsearch 8.X 分词插件版本更新不及时解决方案

1、关于 Elasticsearch 8.X IK 分词插件相关问题 球友在 ElasticSearch 版本选型问题中提及&#xff1a;如果要使用ik插件&#xff0c;是不是就使用目前最新的IK对应elasticsearch的版本“8.8.2”&#xff1f; https://github.com/medcl/elasticsearch-analysis-ik/releases/ta…

python异常及解决方法汇总

文章目录 1、flask异常&#xff1a;TypeError: __init__() got an unexpected keyword argument unbound_message参考文献 1、flask异常&#xff1a;TypeError: init() got an unexpected keyword argument ‘unbound_message’ 解决方法&#xff1a; pip install Flask2.1.3…

怎么获取开源的商城源码

前言 开源的商城源码是指可以自由获取、使用和修改的商城程序代码&#xff0c;通常由开源社区或个人开发者贡献和维护。有许多开源的商城源码可以用于建立自己的商城网站&#xff0c;这篇文章将为您介绍如何找到这些源码。 GitHub搜索 GitHub是一个国际知名的开源代码托管平…

K8s 概念及组件

K8s 的全称为Kubernetes&#xff0c;是一种开源的容器编排平台&#xff0c;用于自动化部署以及扩展和管理容器化的应用程序&#xff0c;它提供了一种容器编排和管理的方式&#xff0c;可以帮助开发人员更轻松的管理容器化的应用程序&#xff0c;并且提供了一种跨多个主机的自动…

Jmeter性能测试 —— jmeter之使用ServerAgent监控服务器

ServerAgent 性能测试时我们关注的重要指标是&#xff1a;并发用户数&#xff0c;TPS&#xff0c;请求成功率&#xff0c;响应时间&#xff0c;服务器的CPU&#xff0c;memory&#xff0c; I/O disk等。Jmeter的聚合报告可以查看并发数、吞吐量、请求成功率、响应时间等&#…

Reasoning with Language Model Prompting: A Survey

本文是LLM系列的文章&#xff0c;针对《Reasoning with Language Model Prompting: A Survey》的翻译。 语言模型提示推理&#xff1a;综述 摘要1 引言2 前言3 方法分类4 比较和讨论5 基准与资源6 未来方向7 结论与视角 摘要 推理作为解决复杂问题的基本能力&#xff0c;可以…

ERR_PNPM_LINKING_FAILED Error: EPERM: operation not permitted, rename

webstorm终端pnpm报错  ERR_PNPM_LINKING_FAILED  Error: EPERM: operation not permitted, rename ’ 报错原因&#xff1a;powershell权限不够 解决办法&#xff1a;提升权限/在文件打开Powershell安装依赖

发现一款非常好用的学术GPT,可形成知识库,并分析论文,根据观点生成文字

发现一款非常好用的学术GPT&#xff0c;支持CHATGPT3.5交互、论文分析与生成&#xff0c;目前作者并未全面推广&#xff0c;仅在小圈子里使用&#xff0c;可以保证后端api的使用稳定性&#xff0c;不会出现大量用户共享gpt 服务&#xff0c;导致gpt调用超时的情况。 使用方法&a…

java经典面试题总结

1.请简述Java的继承&#xff0c;重写和多态的概念和运用 继承是一种Java中重要的面向对象编程方式&#xff0c;它允许一个类从另一个类继承某些属性和方法&#xff0c;在这种关系下&#xff0c;子类可以重写父类的方法&#xff0c;从而实现不同的行为。 多态是继承实现的一种关…

关系数据库-postgresql-基础

文章目录 介绍linux下安装postgresql源码安装navicat连接 介绍 Postgresql官网开源的关系型数据库&#xff1b; linux下安装 Ubuntu下可以使用apt包管理器安装&#xff1b;参考地址CentOS下可以使用yum包管理器安装&#xff1b;OpenSuse下可以使用zypper包管理器安装&#xf…

基于Python3的Scapy构造DNS报文

一&#xff1a;DNS协议 DNS&#xff08;Domain Name System&#xff09;协议是计算机网络中的一种基础协议&#xff0c;它用于将域名&#xff08;如www.baidu.com&#xff09;转换为IP地址&#xff08;如192.168.0.1&#xff09;&#xff0c;从而实现计算机之间的通信。 DNS 分…

React基础: 项目创建 JSX 基础语法 React基础的组件使用 useState状态 基础样式控制

01 React 文章目录 01 React一、React是什么1、React的优势 二、React开发环境搭建1、创建项目2、运行项目3、项目的目录结构 三、JSX基础1、什么是 JSX代码示例&#xff1a; 2、JSX使用场景2.1代码示例&#xff1a; 3、JSX中实现列表渲染4、JSX - 实现基本的条件渲染5、JSX - …

喜讯!持安科技入选2023年北京市知识产权试点单位!

近日&#xff0c;北京市知识产权局发布了“2023年度北京市知识产权试点示范单位及2020年度北京市知识产权试点示范单位复审通过名单”名单。 经过严格的初审、形式审核和专家评审&#xff0c;北京持安科技有限公司入选“2023年北京市知识产权试点单位”。 北京市知识产权试点示…

并发性Socket通信源码(基于linux环境下多线程)

服务器端&#xff1a;server.c 1 #include <stdio.h>2 #include <stdlib.h>3 #include <unistd.h>4 #include <string.h>5 #include <arpa/inet.h>6 #include <pthread.h>7 void* working(void *arg);8 //信息结构体9 struct sockinfo10 …

《数据结构、算法与应用C++语言描述》-队列的应用-图元识别问题

《数据结构、算法与应用C语言描述》-队列的应用-图元识别问题 图元识别 问题描述 数字化图像是一个 mxm 的像素矩阵。在单色图像中&#xff0c;每一个像素要么为0&#xff0c;要么为 1。值为0的像素表示图像的背景。值为1的像素表示图元上的一个点&#xff0c;称其为图元像素…

BLUE引擎变量数据分析

今天跟大家说一下BLUE引擎的变量运用&#xff0c;以及使用中的小细节。大家在使用变量的时候&#xff0c;自定义变量不要以P、G、M、I、D、N、A开头。 变量与变量之间的常用格式: SMALL M88 <$STR(G88)> ;检测私人变量M88&#xff0c;是否小于全局变量G88 LARGE M88 &l…

Rust错误处理

返回值和错误处理 panic 深入剖析 主动调用 fn main() {panic!("crash and burn"); }backtrace 栈展开 panic 时的两种终止方式 当出现 panic! 时&#xff0c;程序提供了两种方式来处理终止流程&#xff1a;栈展开和直接终止 何时该使用 panic! 先来一点背景知…

分布式定时任务xxljob

xxl-job的xxl为作者名徐雪里拼音首字母。 xxl-job的作者是2015年开始开发这个项目&#xff0c;那时候springmvcbootstrapadminlte 大行其道&#xff0c;所以这个框架调度器一直沿用这个架构。 一、运行调度器 调度器可以集群或单点运行&#xff0c;以单点运行为例 下载代码…

前端 CSS 经典:box-shadow

1. 基础属性 /* box-shadow: h-shadow v-shadow blur spread color inset; */ box-shadow: 10px 10px 2px 2px red inset; h-shadow: 必填&#xff0c;水平阴影的位置&#xff0c;允许负值 v-shadow: 必填&#xff0c;垂直阴影的位置&#xff0c;允许负值 blur: 可选&#xff…

Linux shell编程学习笔记14:编写和运行第一个shell脚本hello world!

* 20231020 写这篇博文断断续续花了好几天&#xff0c;为了说明不同shell在执行同一脚本文件时的差别&#xff0c;我分别在csdn提供线上Linux环境 &#xff08;使用的shell是zsh&#xff09;和自己的电脑上&#xff08;使用的shell是bash&#xff09;做测试。功夫不负有心人&am…