@KafkaListener指定kafka集群

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true);      // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620714.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win桌面图标间距变大如何调整

1、win键R-->输入regedit-->回车 2、 找到 IconSpacing 和 IconVerticalSpacing -->HKEY_CURRENT_USER-->Control Panel-->Desktop-->WindowMetrics-->IconSpacing-->IconVerticalSpacing 3、分别将其值改成-1125&#xff08;系统默认的值&#xff09…

手写RPC框架(手写dubbo框架)

提示&#xff1a;dubbo底层实现&#xff0c;手写dubbo框架。手写rpc框架、用servlet实现dubbo、用servlet实现rpc框架 文章目录 前言一、实现步骤描述1.1、provider的原理1.2、consumer的原理&#xff1a; 二、代码实现2.1、api项目2.1.12.1.22.1.3 2.2、provider项目2.2.1、pr…

椋鸟C语言笔记#33:文件的顺序读写

萌新的学习笔记&#xff0c;写错了恳请斧正。 目录 光标&#xff08;文件位置指示器&#xff09; 文件的顺序读写 fgetc 使用实例 fputc 使用实例 fgets fputs 使用实例 fscanf fprintf fread fwrite 使用实例 光标&#xff08;文件位置指示器&#xff09; 我们…

在程序中链接静态库 和 动态库

9. 链接库 在编写程序的过程中&#xff0c;可能会用到一些系统提供的动态库或者自己制作出的动态库 或者静态库文件&#xff0c;cmake中也为我们提供了相关的加载动态库的命令hehedalinux:~/Linux/loveDBTeacher-v3$ tree . ├── CMakeLists.txt ├── include │ └── …

鸿蒙开发-UI-组件-状态管理

鸿蒙开发-序言 鸿蒙开发-工具 鸿蒙开发-初体验 鸿蒙开发-运行机制 鸿蒙开发-运行机制-Stage模型 鸿蒙开发-UI 鸿蒙开发-UI-组件 文章目录 前言 一、什么是状态管理 二、管理组件拥有的状态 1.组件内状态 State装饰器 2.父子组价单向同步 Prop装饰器 3.父子双向同步 Link装…

Linux下动态库和静态库编译实践

Linux下动态库和静态库编译实践 背景动态库&#xff08;.so文件&#xff09;静态库(.a文件)关于GLIBC 背景 之前写过JNI的文章&#xff0c;在JNI实践过程中&#xff0c;也涉及到对动态库/静态库的一些编译实践&#xff0c;这里统一记录一下。 动态库&#xff08;.so文件&…

RWKV入门

主要参考资料 B站视频《【项目原作解读】RWKV Foundation侯皓文&#xff1a;新型RNN模型RWKV&#xff0c;结合Transformer的并行化训练优势和RNN的高效推理》 RWKV官网: https://www.rwkv.com/ 目录 前言RWKV由来模型架构关键结果劣势未来展望 前言 RNN无法并行化&#xff0c;…

CPU告警不用愁,用C语言编写CPU使用率限制程序

现在云服务已经深入千家万户了&#xff0c;不仅商用&#xff0c;私用也很多。很多云服务厂商也都有配套的服务器安全模块&#xff0c;可以检测网络流量异常、内存占用量和CPU占用率&#xff0c;并且允许人工设置告警阈值。例如&#xff0c;CPU持续大于90%10分钟&#xff0c;那么…

华为机试真题实战应用【赛题代码篇】-选修课(附Java、C++和python代码)

目录 题目描述 思路解析 代码实现 Java JS C++ 代码2 python

ESU毅速丨制造企业需不需要建设增材制造中心?

随着科技的不断发展&#xff0c;增材制造技术已经成为制造行业的新宠。越来越多的企业开始考虑建设增材制造中心&#xff0c;以提高生产效率、降低成本、加速产品创新。但是&#xff0c;对于制造企业来说&#xff0c;是否需要建设增材制造中心呢&#xff1f; 首先&#xff0c;我…

SpringCloud全链路灰度发布

日升时奋斗&#xff0c;日落时自省 目录 1、实现框架 2、负载均衡模块 3、封装负载均衡器 4、网关模块 5、服务模块 5.1、注册为灰度服务实例 5.2、设置负载均衡器 5.3、传递灰度标签 1、实现框架 Spring Cloud全链路灰色发布实现构架&#xff1a; 灰度发布的具体实现…

GEE机器学习——利用最短距离方法进行土地分类和精度评定

最短距离方法 最短距离方法(Minimum Distance)是一种常用的模式识别算法,用于计算样本之间的相似度或距离。该方法通过计算样本之间的欧氏距离或其他距离度量,来确定样本之间的相似程度或差异程度。 最短距离方法的具体步骤如下: 1. 数据准备:收集并准备用于训练的数据…

golang学习-函数

1、匿名函数 没有函数名的函数&#xff0c;格式如下&#xff1a; func(参数)返回值{ 函数体 } func main() {//将匿名函数保存到变量中sum : func(x, y int) int {return x y}fmt.Println(sum(10, 20)) //通过变量调用匿名函数//自执行函数:匿名函数定义完加()直接执行fu…

Java中的继承、方法覆盖和多态

一、继承 关于java语言当中的继承&#xff1a; 1.继承是面向对象三大特征之一&#xff0c;三大特征分别是&#xff1a;封装、继承、多态 2.继承最”基本“的作用是&#xff1a;代码复用。但是继承最”重要“的作用是&#xff1a;有了 继承才有了以后”方法的覆盖“和”多态机制…

利用Monte Carlo进行数值积分(二)

进步空间很大的算法版本 话说去年6月的一个周六&#xff0c;我很无聊地发了一个帖子&#xff0c;写了一个自己感觉有点无聊的帖子。 Matlab多重积分的两种实现【从六重积分到一百重积分】https://withstand.blog.csdn.net/article/details/127564478 这个帖子居然成了我这种懒…

springboot整合 hibernate详解

springboot整合 hibernate详解 大家好&#xff0c;我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天&#xff0c;我们将深入研究Spring Boot与Hibernate整合的技术&#xff0c;解析其…

实现数字的千分位,表示,

重点如下 区分是否是负数区分是否有小数点使用正则表达式\B 是指非单词边界? 是正向查找?! 是负向查找 代表有一个或者多个() 是分组g 代表全局匹配 function splitStr(num) {// 转化成字符串let numStr ${num}let isNegative falseif (numStr.startsWith(-)) {isNegativ…

求解建公路问题

课程设计题目 求解建公路问题 课程设计目的 深入掌握 Prim 和 Kruskal算法在求解实际问题中的应用 问题描述 假设有 n 个村庄,编号从到,现在修建一些道路使任意两个村庄之间可以互相连通。所谓两个村庄 A 和B是连通的,指当且仅当A 和 B之间有一条道路或者存在一个村庄 C 使得…

QT通过QPdfWriter类实现pdf文件生成与输出

一.QPdfWriter类介绍 本文代码工程下载地址&#xff1a; https://download.csdn.net/download/xieliru/88736664?spm1001.2014.3001.5503 QPdfWrite是一个用于创建PDF文件的类&#xff0c;它是Qt库的一部分。它提供了一些方法和功能&#xff0c;使您能够创建和写入PDF文件。…

#Prompt##提示词工程##AIGC##LLM#使用大型预训练语言模型的关键考量

如果有不清楚的地方可以评论区留言&#xff0c;我会给大家补上的&#xff01; 本文包括&#xff1a; Prompt 的一些行业术语介绍 Prompt 写好提示词的方法经验介绍&#xff08;附示例教程&#xff09; LLM自身存在的问题&#xff08;可以用Prompt解决的以及无法用Prompt解决的&…