Kafka系列之SpringBoot集成Kafka

本文介绍如何在springboot项目中集成kafka收发message。

pom依赖

springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

	<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

Kafka相关的yaml配置

spring:kafka:bootstrap-servers: 30.46.35.29:9092producer:retries: 3acks: -1batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4properties:linger.ms: 1'interceptor.classes': com.tencent.qidian.ma.commontools.trace.kafka.TracingProducerInterceptorconsumer:heartbeat-interval: 3000max-poll-records: 100enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 30000listener:concurrency: 3type: batchack-mode: manual_immediate

以下为相关配置的说明:

spring.kafka

  • spring.kafka.bootstrap-servers: 指定 Kafka 服务器的地址列表,格式为 host:port,多个地址使用逗号分隔。

spring.kafka.producer用于配置 Kafka 消费者相关属性

spring.kafka.consumer用于配置 Kafka 消费者相关属性

  • spring.kafka.consumer.enable-auto-commit的值为false表示关闭Kafka客户端的自动提交offSet。
  • spring.kafka.consumer.max-poll-records的值为20表示在开启了批量消费以后,每次从Kafka服务端拉取的数据最大条数为20。

在 Spring 中是使用 Kafka 监听器来进行消息消费的,spring.kafka.listener用来配置监听器的相关配置

  • spring.kafka.listener.type的值为batch表示开启批量消费,默认值为single(单条)。
  • spring.kafka.listener.ack-mode的值为manual_immediate表示关闭Spring的自动提交offSet,我们需要在代码中进行手动提交。spring.kafka.listener.ack-mode的取值有两个比较常见的选项值MANUAL 和MANUAL_IMMEDIATE。MANUAL表示处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。MANUAL_IMMEDIATE表示每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交。

生产者配置

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)生成bean,@Bean

常见配置参考:

package com.somnus.config.kafka;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Resourceprivate KafkaProperties kafkaProperties;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProperties.getProducer().getBatchSize());props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getProperties().get("linger.ms"));props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProperties.getProducer().getBufferMemory());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getKeySerializer());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getValueSerializer());return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

消费端配置

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)生成bean,@Bean

常见配置参考:

package com.tencent.qidian.ma.maaction.web.config.kafka;import jakarta.annotation.Resource;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;/*** KafkaBeanConfiguration*/
@Configuration
@EnableKafka
public class KafkaBeanConfiguration {@Resourceprivate ConsumerFactory consumerFactory;@Resourceprivate KafkaProperties kafkaProperties;@Bean(name = "kafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());factory.setConcurrency(kafkaProperties.getListener().getConcurrency());if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {factory.setBatchListener(true);}return factory;}// 此bean为了后续演示使用,参考消费演示中的containerFactory属性配置@Bean(name = "tenThreadsKafkaListenerContainerFactory1")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>tenThreadsKafkaListenerContainerFactory1() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());factory.setConcurrency(10);if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {factory.setBatchListener(true);}return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,kafkaProperties.getConsumer().getMaxPollRecords());propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getConsumer().getEnableAutoCommit());propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getConsumer().getProperties().get("session.timeout.ms"));propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getKeyDeserializer());propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getValueDeserializer());return propsMap;}}

SpringBoot 集成 KafkaTemplate 发送Kafka消息

@Resourceprivate ObjectMapper mapper;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;try {Order order = new Order();String message = mapper.writeValueAsString(order);CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("order", message);future.thenAccept(result -> {if (result.getRecordMetadata() != null) {log.debug("send message:{} with offset:{}", message, result.getRecordMetadata().offset());}}).exceptionally(exception -> {log.error("KafkaProducer send message failure,topic={},data={}", topic, message, exception);return null;});} catch (Exception e) {log.error("KafkaProducer send message exception,topic={},message={}", topic, message, e);}

SpringBoot 集成 @KafkaListener 消费Kafka消息

max.poll.interval.ms

默认为5分钟
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费,触发rebalance 。
如果你的消费者节点总是在重启完不久就不消费了,可以考虑检查改配置项或者优化你的消费者的消费速度等等。

max.poll.records

max-poll-records是Kafka consumer的一个配置参数,表示consumer一次从Kafka broker中拉取的最大消息数目,默认值为500条。在Kafka中,一个消费者组可以有多个consumer实例,每个consumer实例负责消费一个或多个partition的消息,每个consumer实例一次从broker中可以拉取一个或多个消息。

max-poll-records参数的作用就是控制每次拉取消息的最大数目,以实现消费弱化和控制内存资源的需求。

参考Kafka中的max-poll-records和listener.concurrency配置

注意:@KafkaListener注解中的concurrency会覆盖消费者工厂中的concurrency,以下面代码为例,即使kafkaListenerContainerFactory1中的并发数是3,但是最终生成的监听器数量是2。

@Resourceprivate ObjectMapper mapper;@KafkaListener(id = "order_consumer",topics = "order",groupId = "g_order_consumer_group",//可配置containerFactory参数,使用指定的containerFactory,不配置默认使用名称是kafkaListenerContainerFactory的bean//containerFactory = "kafkaListenerContainerFactory1",//concurrency = "2",properties = {"max.poll.interval.ms:300000", "max.poll.records:1"})// 可以只有ConsumerRecords<String, String> records参数。ack参数非必需,ack.acknowledge()是为了防消息丢失public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {String msg = record.value();log.info("Consume msg:{}", msg);try {Order order = mapper.readValue(val, Order.class);// 处理业务逻辑} catch (Exception e) {log.error("Consume failed, msg:{}", val, e);}}ack.acknowledge();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/40019.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32F1+HAL库+FreeTOTS学习5——内核中断管理及中断控制函数

STM32F1HAL库FreeTOTS学习5——中断管理和临界段代码保护 中断简介中断优先级寄存器拓展FreeRTOS中PendSV和Systick中断优先级配置三个中断屏蔽寄存器FreeRTOS中断管理函数代码验证 上一期我们学习了FreeRTOS中任务挂起与恢复&#xff0c;在中断服务程序中恢复任务过程中&#…

[Redis]哨兵机制

哨兵机制概念 在传统主从复制机制中&#xff0c;会存在一些问题&#xff1a; 1. 主节点发生故障时&#xff0c;进行主备切换的过程是复杂的&#xff0c;需要人工参与&#xff0c;导致故障恢复时间无法保障。 2. 主节点可以将读压力分散出去&#xff0c;但写压力/存储压力是无法…

印章谁在管、谁用了、用在哪?契约锁让您打开手机一看便知

“印章都交给谁在管”、“哪些人能用”、“都有哪些业务在用”…这些既是管理者最关心的印章问题也是影响印章安全的关键要素。但是公司旗下分子公司那么多&#xff0c;各类公章、法人章、财务章、合同章一大堆&#xff0c;想“问”明白很难。 契约锁电子签及印控平台推出“印章…

14-11 2024 年的 13 个 AI 趋势

2024 年的 13 个 AI 趋势 人工智能对环境的影响和平人工智能人工智能支持的问题解决和决策针对人工智能公司的诉讼2024 年美国总统大选与人工智能威胁人工智能、网络犯罪和社会工程威胁人工智能治疗孤独与对人工智能的情感依赖人工智能影响者中国争夺人工智能霸主地位人工智能…

一句话回答的前端面试题

该篇文章为一句话的答案&#xff0c;想看更详细的面试题请看这篇>《前端面试题》 原型链&#xff1a; 实例与原型的链条&#xff0c;原型是prototype&#xff0c;链是__proto__&#xff0c;每个函数有一个原型对象&#xff0c;函数在创建时有一个默认属性 prototype&#x…

YOLOv10全网最新创新点改进系列:融合GSConv+Slim Neck,双改进、双增强,替换特征融合层实现, 轻量化涨点改进策略,有效涨点神器!

YOLOv10全网最新创新点改进系列&#xff1a;融合GSConvSlim Neck&#xff0c;双改进、双增强&#xff0c;替换特征融合层实现&#xff0c; 轻量化涨点改进策略&#xff0c;有效涨点神器&#xff01; 所有改进代码均经过实验测试跑通&#xff01;截止发稿时YOLOv10已改进40&…

【数据结构】06.栈队列

一、栈 1.1栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出LIFO&#xff08;Last In First Out)的原则。 压栈&#…

FPGA就业方向以及主要工作

FPGA&#xff08;Field-Programmable Gate Array&#xff09;作为可编程逻辑器件&#xff0c;在多个行业和领域中都有广泛的应用。具备FPGA技能的专业人士可以在多个方向上找到就业机会&#xff0c;以下是FPGA主要的就业方向及其对应的主要工作职责&#xff1a; 通信行业 职位…

LangChain终极内幕指南,学会langchain就看它了

1.概述 在人工智能迅速演进的时代&#xff0c;诸如Open AI的ChatGPT和Google的Bard等大型语言模型(LLMs)正彻底改变我们与技术互动的方式。这些技术巨头和SaaS公司正在竞相利用LLMs的威力&#xff0c;创造更为智能和实用的应用程序。 然而&#xff0c;真正的变革并非仅仅停留…

低压电工精选历年真题附答案

1.当电压为5V时&#xff0c;导体的电阻值为5欧&#xff0c;那么当电阻两端电压为2V时&#xff0c;导体的电阻值为()欧。[单选题] A 、10B、5(正确答案) C、2 2.当电气火灾发生时&#xff0c;应首先切断电源再灭火&#xff0c;但当电源无法切断时&#xff0c;只能带电灭火&…

Finding and exploting an unused API endpoint

Using 0$ account buy a piece of lether priced at $133 1、尝试访问api接口 大概率可能访问不到,但是可以尝试访问下 /api/swagger/v1 /openapi.json 2、页面功能点寻找 api send to Repeter 3、Find Supported HTTP请求 POST方法测试 通过测试得知支持GET方法和PATC…

C语言实现的人员管理系统(顺序表版)

该系统具有以下主要功能&#xff1a; 添加人员信息&#xff1a;在有空间的前提下&#xff0c;用户输入人员的工号、姓名、性别、联系电话和 QQ 号等信息&#xff0c;系统会自动检查编号的唯一性&#xff0c;确保不重复。查找人员信息&#xff1a;提供按工号和姓名两种查找方式…

av_read_frame 代码研究

------------------------------------------------------------ author: hjjdebug date: 2024年 07月 05日 星期五 11:02:51 CST av_read_frame 代码研究 ------------------------------------------------------------ 有人只标注一层,标注一层太肤浅了.不能了解底层之精妙…

Lianwei 安全周报|2024.07.01

新的一周又开始了&#xff0c;以下是本周「Lianwei周报」&#xff0c;我们总结推荐了本周的政策/标准/指南最新动态、热点资讯和安全事件&#xff0c;保证大家不错过本周的每一个重点&#xff01; 政策/标准/指南最新动态 01 出于安全考虑&#xff0c;拜登下令禁用卡巴斯基杀毒…

【康复学习--LeetCode每日一题】3115. 质数的最大距离

题目&#xff1a; 给你一个整数数组 nums。 返回两个&#xff08;不一定不同的&#xff09;质数在 nums 中 下标 的 最大距离。 示例 1&#xff1a; 输入&#xff1a; nums [4,2,9,5,3] 输出&#xff1a; 3 解释&#xff1a; nums[1]、nums[3] 和 nums[4] 是质数。因此答案是…

SpringBoot各类数量限制及超出后抛出的异常

前言 在使用SpringBoot开发接口时&#xff0c;动不动的就发生各种超过默认值的限制&#xff0c;这里总结了下SpringBoot默认限制的设置以及可能会发生的异常&#xff0c;便于问题的排查和快速修改默认值。 配置项配置项说明默认值超过大小后抛出的异常spring.servlet.multipa…

系统管理(System Keeping):全新迭代,优化您的开发体验

随着科技的不断进步和用户需求的日益增长&#xff0c;系统管理&#xff08;System Keeping&#xff09;不断进行迭代更新&#xff0c;致力于为用户带来更加高效、便捷的开发体验。本次全新迭代&#xff0c;不仅在界面与交互上进行了革新&#xff0c;更在功能整合、个性化与安全…

ECOLOGY9重置系统管理员密码

ECOLOGY9系统管理员密码忘记需要重置&#xff1a; 1、KB2110之后版本加了防篡改逻辑&#xff0c;数据库中初始话密码需要将hashdata、signdata更新为空&#xff0c;执行如下语句初始化 update HrmResourceManager set password ‘C4CA4238A0B923820DCC509A6F75849B’,salt‘’…

Android --- Service

出自于此&#xff0c;写得很清楚。关于Android Service真正的完全详解&#xff0c;你需要知道的一切_android service-CSDN博客 出自【zejian的博客】 什么是Service? Service(服务)是一个一种可以在后台执行长时间运行操作而没有用户界面的应用组件。 服务可由其他应用组件…

万字长文|关于 OpenAI 接口开发你应该知道的一切

这篇文章中个人结合自己的实践经验把 OpenAI 官方文档解读一遍。但是原文档涉及内容众多&#xff0c;包括微调&#xff0c;嵌入&#xff08;Embeddings&#xff09;等众多主题&#xff0c;我这里重点挑选自己开发高频使用到的&#xff0c;需要详细了解的可以自行前往官网阅读。…