SpringBoot配置多个kafka配置

参考文章:SpringBoot整合kafka配置多个kafka配置_springboot配置多个kafka-CSDN博客


引入依赖

       <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.14</version></dependency>

yml配置

有几个就配置几个 ,这里只配置两个

因为我在本地启动了两个kafka,所以bootstrap-servers这里分别指向两个不同端口的kafka地址

几个属性详细解释:

acks

 #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果
#acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。
#acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量

auto-offset-reset:

#当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;

spring:kafka:# 第一个kafka的配置first:bootstrap-servers: 127.0.0.1:9092 # 这个是kafka的地址,对应server.properties中配置的producer:                         # 生产者配置key-serializer: org.apache.kafka.common.serialization.StringSerializer  # Kafka提供的序列化和反序列化类value-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 16384        # 批量大小buffer-memory: 33554432  # 生产端缓冲区大小retries: 10              # 消息发送重试次数acks: -1                  # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)properties:linger:ms: 2000             # 提交延迟consumer:  # 消费者配置key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: testGroup             # 默认的消费组IDenable-auto-commit: true        # 是否自动提交offset auto-commit-interval: 2000      # 提交offset延时(接收到消息后多久提交offset)max-poll-records: 500           # 单次拉取消息的最大条数,根据业务需要配置auto-offset-reset: latestlistener:missing-topics-fatal: false # 当kafka启动的时候如果未找到对应topic不报错properties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000  # 消费请求的超时时间# 第二个kafka的配置second:bootstrap-servers: 127.0.0.1:9093  # 这个是kafka的地址,对应server.properties中配置的producer:                          # 生产者配置key-serializer: org.apache.kafka.common.serialization.StringSerializer  # Kafka提供的序列化和反序列化类value-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 16384         # 批量大小buffer-memory: 33554432   # 生产端缓冲区大小retries: 10               # 消息发送重试次数acks: -1                   # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)properties:linger:ms: 2000              # 提交延迟consumer: # 消费者配置key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: testGroup             # 默认的消费组IDenable-auto-commit: true        # 是否自动提交offset auto-commit-interval: 2000      # 提交offset延时(接收到消息后多久提交offset)max-poll-records: 500           # 单次拉取消息的最大条数,根据业务需要配置auto-offset-reset: latestlistener:missing-topics-fatal: false # 当kafka启动的时候如果未找到对应topic不报错properties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000  # 消费请求的超时时间

读取第一个kafka配置 

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;/*** 第一个kafka配置**/
@Configuration
public class FirstKafkaConfig {/*** 读取第一个kafka配置* Primary注解表示默认以这个为准** @return 第一个kafka配置*/@Primary@ConfigurationProperties(prefix = "spring.kafka.first")@Beanpublic KafkaProperties firstKafkaProperties() {return new KafkaProperties();}/*** 构建第一个kafka的生产者发送template** @param firstKafkaProperties 第一个kafka配置* @return 第一个kafka的生产者发送template*/@Primary@Beanpublic KafkaTemplate<String, String> firstKafkaTemplate(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties));}/*** 新建第一个kafka的生产者工厂** @param firstKafkaProperties 第一个kafka配置* @return 第一个kafka的生产者工厂*/private DefaultKafkaProducerFactory<String, String> firstProducerFactory(KafkaProperties firstKafkaProperties) {return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties());}/*** 构建第一个kafka的消费者监听容器工厂** @param firstKafkaProperties 第一个kafka配置* @return 第一个kafka的消费者监听容器工厂*/@Bean("firstKafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties));return factory;}/*** 新建第一个kafka的消费者工厂** @param firstKafkaProperties 第一个kafka配置* @return 第一个kafka的消费者工厂*/private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties firstKafkaProperties) {return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties());}}

读取第二个kafka配置

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;/*** 第二个kafka配置**/
@Configuration
public class SecondKafkaConfig {/*** 读取第二个kafka配置** @return 第二个kafka配置*/@ConfigurationProperties(prefix = "spring.kafka.second")@Bean("secondKafkaProperties")public KafkaProperties secondKafkaProperties() {return new KafkaProperties();}/*** 构建第二个kafka的生产者发送template** @param secondKafkaProperties 第二个kafka配置* @return 第二个kafka的生产者发送template*/@Beanpublic KafkaTemplate<String, String> secondKafkaTemplate(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties));}/*** 新建第二个kafka的生产者工厂** @param secondKafkaProperties 第二个kafka配置* @return 第二个kafka的生产者工厂*/private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) {return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());}/*** 构建第二个kafka的消费者监听容器工厂** @param secondKafkaProperties 第二个kafka配置* @return 第二个kafka的消费者监听容器工厂*/@Bean("secondKafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));return factory;}/*** 新建第二个kafka的消费者工厂** @param secondKafkaProperties 第二个kafka配置* @return 第二个kafka的消费者工厂*/private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) {return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());}
}

创建两个生产者

@RestController
@RequestMapping("/producer1")
@Api(tags = "kafka生产者测试1")
public class ProducerDemoController1 {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息** @param msg* @return*/@PostMapping("/sendMsg")public String sendMsg(@RequestParam(name = "msg", defaultValue = "hello kafka") String msg) {kafkaTemplate.send("YWZT-TOPIC-TEST", msg);return "消息已发送";}
}
@RestController
@RequestMapping("/producer2")
@Api(tags = "kafka生产者测试2")
public class ProducerDemoController2 {/*** 第二个kafka配置需要指定下名字 */@Resource(name = "secondKafkaTemplate")private KafkaTemplate<String, String> secondKafkaTemplate;/*** 发送消息** @param msg* @return*/@PostMapping("/sendMsg")public String sendMsg(@RequestParam(name = "msg", defaultValue = "hello kafka") String msg) {secondKafkaTemplate.send("YWZT-TOPIC-TEST", msg);return "消息已发送";}
}

创建两个消费者

@Slf4j
@Component
@RequiredArgsConstructor
public class YwztExConsumer1 {@KafkaListener(containerFactory = "firstKafkaListenerContainerFactory",topics = {"YWZT-TOPIC-TEST"},groupId = "testGroup")public void consumer(String value) throws JsonProcessingException {//获取报文log.info("------------------------报文信息----------------:{} ......", StrUtil.sub(value, 0, 400));}}
@Slf4j
@Component
@RequiredArgsConstructor
public class YwztExConsumer2 {@KafkaListener(containerFactory = "secondKafkaListenerContainerFactory",topics = {"YWZT-TOPIC-TEST"},groupId = "testGroup")public void consumer(String value) throws JsonProcessingException {//获取报文log.info("------------------------报文信息----------------:{} ......", StrUtil.sub(value, 0, 400));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

minio文件存储+ckplayer视频播放(minio分片上传合并视频播放)

文章目录 参考简述效果启动minio代码配置类RedisConfigWebConfigMinioClientAutoConfigurationOSSPropertiesapplication.yml 实体类MinioObjectResultStatusCodeOssFileOssPolicy 工具类FileTypeUtilMd5UtilMediaTypeMinioTemplate 文件分片上传与合并MinioFileControllerMini…

Webpack打包常见问题及优化策略

聚沙成塔每天进步一点点 本文回顾 ⭐ 专栏简介Webpack打包常见问题及优化策略1. 引言2. Webpack打包常见问题2.1 打包时间过长问题描述主要原因 2.2 打包体积过大问题描述主要原因 2.3 依赖包版本冲突问题描述主要原因 2.4 动态导入和代码拆分问题问题描述主要原因 2.5 文件路径…

Python+VScode 两个不同文件夹里的py文件相互调用|python的模块调用|绝对导入

第一次用VScode写python遇到了模块无法识别的问题&#xff0c;搞了一整天&#xff0c; 上网查&#xff0c;chatGPT都不行&#xff0c;现在时解决了。 首先项目结构如下&#xff0c;四个文件夹&#xff0c;四个py文件 代码&#xff1a; def f1fun():print("f1") de…

Code Practice Journal | Day59-60_Graph09 最短路径(待更)

1. Dijkstra 1.1 原理与步骤 步骤&#xff1a; 选取距离源点最近且未被访问过的节点标记该节点为已访问更新未访问节点到源点的距离 1.2 代码实现 以KamaCoder47题为例 题目&#xff1a;47. 参加科学大会&#xff08;第六期模拟笔试&#xff09; (kamacoder.com) class Progra…

力扣2402.会议室 III

力扣2402.会议室 III 双堆模拟 一个堆存未占用的会议室编号一个堆存已占用的结束时间和编号 class Solution {public:int mostBooked(int n, vector<vector<int>>& meetings) {int cnt[n];memset(cnt,0,sizeof(cnt));priority_queue<int,vector<int&g…

编写一个自动发送每日电子邮件报告的 Python 脚本

要编写一个自动发送每日电子邮件报告的 Python 脚本&#xff0c;并进行设置&#xff0c;你需要完成以下几个步骤&#xff1a; 1. 安装必要的库 你需要 smtplib 库&#xff08;Python 标准库中包含&#xff09;用于发送电子邮件&#xff0c;email 库&#xff08;也是 Python 标…

Apache SeaTunnel Zeta 引擎源码解析(一)Server端的初始化

引入 本系列文章是基于 Apache SeaTunnel 2.3.6版本&#xff0c;围绕Zeta引擎给大家介绍其任务是如何从提交到运行的全流程&#xff0c;希望通过这篇文档&#xff0c;对刚刚上手SeaTunnel的朋友提供一些帮助。 我们整体的文章将会分成三篇&#xff0c;从以下方向给大家介绍&am…

指针5.回调函数与qsort

今天来学习回调函数与qsort 目录 1.回调函数实现模拟计算器代码的简化原代码运行结果简化代码运行结果 qsort函数排序整型数据代码运行结果 qsort排序结构数据代码 qsort函数的模拟实现代码运行结果 总结 1.回调函数 回调函数就是⼀个通过函数指针调用的函数。 如果你把函数的…

C++语法基础(一)

第一个C程序 1. <iostream>&#xff08;C&#xff09; <iostream> 是 C 标准库中的头文件&#xff0c;用于处理输入输出操作。它提供了基于流&#xff08;stream&#xff09;的输入输出机制。 特点&#xff1a; 面向对象&#xff1a;C 中的输入输出操作是基于流…

hyperf json-rpc

安装 安装docker hyperf 安装 hyperf-rpc-server-v8 &#xff08;服务端&#xff09; docker run --name hyperf-rpc-server-v8 \ -v /www/docker/hyperf-rpc-server:/data/project \ -w /data/project \ -p 9508:9501 -it \ --privileged -u root \ --entrypoint /bin/sh \…

Swift 可选类型

Swift 可选类型 Swift 是一种强类型编程语言,它在类型安全方面做了很多工作,以确保代码的稳定性和可靠性。在 Swift 中,可选类型(Optional)是一种特殊的类型,用于处理值可能缺失的情况。本文将详细介绍 Swift 中的可选类型,包括其定义、使用场景、语法以及如何正确地处…

Upload-LABS通关攻略【1-20关】

Pass-01 第一关是前端JS绕过 上传一个php文件显示只能上传特定后缀名的文件 这里将1.php改为1.jpg直接进行抓包&#xff0c;在数据包中将jpg改为php放行 文件上传成功&#xff0c;邮件图片新建页面打开 可以访问到1.php文件&#xff0c;则一句话密码上传成功 使用蚁剑 进行连接…

Redux的中间件原理分析

Redux的中间件原理分析 redux的中间件对于使用过redux的各位都不会感到陌生&#xff0c;通过应用上我们需要的所有要应用在redux流程上的中间件&#xff0c;我们可以加强dispatch的功能。最近抽了点时间把之前整理分析过的中间件有关的东西放在这里分享分享。本文只对中间件涉…

音视频开发之旅(90)-Vision Transformer论文解读与源码分析

目录 1.背景和问题 2.Vision Transformer(VIT)模型结构 3.Patch Embedding 4.实现效果 5.代码解析 6.资料 一、背景和问题 上一篇我们学习了Transformer的原理&#xff0c;主要介绍了在NLP领域上的应用&#xff0c;那么在CV(图像视频)领域该如何使用&#xff1f; 最直观…

C# 获取当前鼠标位置

在C#中&#xff0c;获取当前鼠标位置可以通过多种方式实现&#xff0c;但最常见和直接的方法之一是使用System.Windows.Forms命名空间中的Cursor类或者Control类的PointToClient&#xff08;如果你正在处理WinForms应用程序&#xff09;或Windows.UI.Core.CoreWindow的PointerP…

Java源码学习之高并发编程基础——AQS源码剖析之阻塞队列(下)

1.前言&目录 前言&#xff1a; 在上一篇文章AQS源码剖析之阻塞队列&#xff08;上&#xff09;中介绍了以独占锁模式下AQS的基本原理&#xff0c;AQS仅仅起到了一个“维持线程等待秩序”的作用&#xff0c;那么本篇文章继续讲解共享锁模式下的特点。 AQS不操纵锁的获取或者…

算法复盘——LeetCode hot100:哈希

文章目录 哈希表哈希表的基本概念哈希表的使用1. 插入操作2. 查找操作3. 删除操作 哈希表的优点和缺点1.两数之和复盘 242.有效的字母异位词复盘 49.字母异位词分组复盘 128. 最长连续序列复盘HashSet 哈希表 先来搞清楚什么是哈希表吧~ 概念不清楚方法不清楚怎么做题捏 哈希表…

问:说一下Java中数组的实例化方式有哪些?

在Java中&#xff0c;数组的实例化可以通过多种方式完成。以下是五种不同的实例化数组的方式。 1. 直接初始化 这种方式在声明数组的同时&#xff0c;直接初始化数组的元素。 // 示例&#xff1a;直接初始化一个整型数组 int[] numbers {1, 2, 3, 4, 5}; // 解释&#xff1…

使用mysql保存密码

登录MySQL 这行命令告诉MySQL客户端程序用户root准备登录&#xff0c;-p表示告诉 MySQL 客户端程序提示输入密码。 mysql -u root -p创建数据库 create database wifi; use wifi;create table password(user_password CHAR(8),primary key(user_password));源码 代码编译 …

C#——类与结构

在未学习面向对象语言时&#xff0c;我常常将类比作一种结构体&#xff0c;其实类与结构体也确实很相似&#xff0c;类用来做函数的集合&#xff0c;结构用来做变量的集合&#xff0c;接下来将从几个角度刨析类与结构的不同。 类 vs 结构 类和结构在设计和使用时有不同的考虑…