SpringBoot日常:集成Kafka

文章目录

      • 1、pom.xml文件
      • 2、application.yml
      • 3、生产者配置类
      • 4、消费者配置类
      • 5、消息订阅
      • 6、生产者发送消息
      • 7、测试发送消息

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 --><scope>import</scope><type>pom</type></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.2.1.RELEASE</version></dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:kafka:bootstrap-servers: 192.168.102.179:9092producer:acks: 1retries: 0batch-size: 30720000buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer#消费者配置consumer:group-id: test-kafka#是否开启手动提交 默认自动提交enable-auto-commit: true#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔auto-commit-interval: 5000#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#一次调用 poll() 操作时返回的最大记录数 默认为 500 条max-poll-records: 2#kafka session timeoutsession:timeout:ms: 300000listener:#kafka 没有创建指定的 topic 下  项目启动是否报错 true  falsemissing-topics-fatal: false#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息type: singleack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 11:33* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(4);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 12:09* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String offsetReset;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-commit-interval}")private String autoCommitIntervalMs;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Bean("myConsumerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(12);//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//kafak 服务器props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//消费组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//一次调用poll()操作时返回的最大记录数,默认值为500props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//自动提交时间间隔 默认 5秒props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);return props;}/*** 消费者工厂*/@Bean("myContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));// 并发创建的消费者数量factory.setConcurrency(3);// 开启批处理factory.setBatchListener(true);//拉取超时时间factory.getContainerProperties().setPollTimeout(1500);//是否自动提交 ACK kafka 默认是自动提交if (!enableAutoCommit) {//共有其中方式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);}return factory;}
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author 码至终章* @Date 2025/1/8 14:19* @Version 1.0*/
@Slf4j
@Component
public class MyKafkaConsumer {@KafkaListener(id = "my-kafka-consumer",idIsGroup = false, topics = "topicone",containerFactory = "myContainerFactory")public void listen(String message) {log.info("接收到主题消息,消息内容:{}", message);}
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping ("/sendMessage")public void sendMessage(@RequestParam String message) {this.kafkaTemplate.send("topicone", message);}
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66585.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【实用技能】如何使用 .NET C# 中的 Azure Key Vault 中的 PFX 证书对 PDF 文档进行签名

TX Text Control 是一款功能类似于 MS Word 的文字处理控件&#xff0c;包括文档创建、编辑、打印、邮件合并、格式转换、拆分合并、导入导出、批量生成等功能。广泛应用于企业文档管理&#xff0c;网站内容发布&#xff0c;电子病历中病案模板创建、病历书写、修改历史、连续打…

33.3K 的Freqtrade:开启加密货币自动化交易之旅

“ 如何更高效、智能地进行交易成为众多投资者关注的焦点。” Freqtrade 是一款用 Python 编写的免费开源加密货币交易机器人。它就像一位不知疲倦的智能交易助手&#xff0c;能够连接到众多主流加密货币交易所&#xff0c;如 Binance、Bitmart、Bybit 等&#xff08;支…

Mac M2基于MySQL 8.4.3搭建(伪)主从集群

前置准备工作 安装MySQL 8.4.3 参考博主之前的文档&#xff0c;在本地Mac安装好MySQL&#xff1a;Mac M2 Pro安装MySQL 8.4.3安装目录&#xff1a;/usr/local/mysql&#xff0c;安装好的MySQL都处于运行状态&#xff0c;需要先停止MySQL服务最快的方式&#xff1a;系统设置 …

事务的回滚与失效行为

创建一张测试表 AccountMapper public interface AccountMapper {Update("update account set balance #{balance} where username #{username}")int updateUserBalance(Param("username") String username, Param("balance") Integer bal…

【C语言】_字符数组与常量字符串

目录 1. 常量字符串的不可变性 2. 关于常量字符串的打印 3. 关于字符数组与常量字符串的内存分布 1. 常量字符串的不可变性 char arr[10] "abcdef";// 字符数组char* p2 arr;char* p3 "abcdef"; // 常量字符串 尝试对常量字符串进行修改&#xff…

【GUI-pyqt5】QCommandLinkButton类

1. 描述 命令链接的Windows Vista引入的新控件他的用途类似于单选按钮的用途&#xff0c;因为他用于在一组互斥选项之间进行选择命令链接按钮不应单独使用&#xff0c;而应作为向导和对话框中单选按钮替代选项外观通常类似于平面按钮的外观&#xff0c;但除了普通按钮文本外&a…

69.基于SpringBoot + Vue实现的前后端分离-家乡特色推荐系统(项目 + 论文PPT)

项目介绍 在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括家乡特色推荐的网络应用&#xff0c;在外国家乡特色推荐系统已经是很普遍的方式&#xff0c;不过国内的管理网站可能还处于起步阶段。家乡特色推荐系统采用java技术&…

HCIE-day10-ISIS

ISIS ISIS&#xff08;Intermediate System-to-Intermediate System&#xff09;中间系统到中间系统&#xff0c;属于IGP&#xff08;内部网关协议&#xff09;&#xff1b;是一种链路状态协议&#xff0c;使用最短路径优先SPF算法进行路由计算&#xff0c;与ospf协议有很多相…

图像处理|膨胀操作

在图像处理领域&#xff0c;形态学操作是一种基于图像形状的操作&#xff0c;用于分析和处理图像中对象的几何结构。**膨胀操作&#xff08;Dilation&#xff09;**是形态学操作的一种&#xff0c;它能够扩展图像中白色区域&#xff08;前景&#xff09;或减少黑色区域&#xf…

【机器学习】量子机器学习:当量子计算遇上人工智能,颠覆即将来临?

我的个人主页 我的领域&#xff1a;人工智能篇&#xff0c;希望能帮助到大家&#xff01;&#xff01;&#xff01;&#x1f44d;点赞 收藏❤ 在当今科技飞速发展的时代&#xff0c;量子计算与人工智能宛如两颗璀璨的星辰&#xff0c;各自在不同的苍穹闪耀&#xff0c;正以前…

Sprint Boot教程之五十:Spring Boot JpaRepository 示例

Spring Boot JpaRepository 示例 Spring Boot建立在 Spring 之上&#xff0c;包含 Spring 的所有功能。由于其快速的生产就绪环境&#xff0c;使开发人员能够直接专注于逻辑&#xff0c;而不必费力配置和设置&#xff0c;因此如今它正成为开发人员的最爱。Spring Boot 是一个基…

腾讯云AI代码助手编程挑战赛-桌面壁纸随机更换

作品简介 用于更换壁纸缓缓心情&#xff0c;或者选择困难症&#xff0c;每一个图片都想用来做壁纸&#xff0c;并且节约了手工时间&#xff0c;所以根据这个需求来创建的这款应用工具&#xff0c;使用的是腾讯云AI代码助手来生成的所有代码&#xff0c;使用方便&#xff0c;快…

说说你对作用域链的理解

一、作用域 作用域&#xff0c;即变量&#xff08;变量作用域又称上下文&#xff09;和函数生效&#xff08;能被访问&#xff09;的区域或集合 换句话说&#xff0c;作用域决定了代码区块中变量和其他资源的可见性 举个例子 function myFunction() {let inVariable "…

SpringBootWeb 登录认证(day12)

登录功能 基本信息 请求参数 参数格式&#xff1a;application/json 请求数据样例&#xff1a; 响应数据 参数格式&#xff1a;application/json 响应数据样例&#xff1a; Slf4j RestController public class LoginController {Autowiredpriva…

ASP.NET Core 实现微服务 - Consul 配置中心

这一次我们继续介绍微服务相关组件配置中心的使用方法。本来打算介绍下携程开源的重型配置中心框架 apollo 但是体系实在是太过于庞大&#xff0c;还是让我爱不起来。因为前面我们已经介绍了使用Consul 做为服务注册发现的组件 &#xff0c;那么干脆继续使用 Consul 来作为配置…

DeviceNet转Profinet网关如何革新污水处理行业!

DeviceNet转Profinet网关如何革新污水处理行业&#xff1f;在污水处理行业中&#xff0c;随着环保法规的日益严格和处理技术的不断进步&#xff0c;工业自动化技术的应用越来越广泛。特别是在提高生产效率、降低运营成本以及确保处理质量方面&#xff0c;自动化技术发挥着不可替…

(四)结合代码初步理解帧缓存(Frame Buffer)概念

帧缓存&#xff08;Framebuffer&#xff09;是图形渲染管线中的一个非常重要的概念&#xff0c;它用于存储渲染过程中产生的像素数据&#xff0c;并最终输出到显示器上。简单来说&#xff0c;帧缓存就是计算机图形中的“临时画布”&#xff0c;它储存渲染操作生成的图像数据&am…

58. Three.js案例-创建一个带有红蓝配置的半球光源的场景

58. Three.js案例-创建一个带有红蓝配置的半球光源的场景 实现效果 本案例展示了如何使用Three.js创建一个带有红蓝配置的半球光源的场景&#xff0c;并在其中添加一个旋转的球体。通过设置不同的光照参数&#xff0c;可以观察到球体表面材质的变化。 知识点 WebGLRenderer …

前端基础技术全解析:从HTML前端基础标签语言开始,逐步深入CSS样式修饰、JavaScript脚本控制、Ajax异步通信以及WebSocket持久通信

目录 前言&#xff1a; 1.前端技术html简单了解&#xff1a; 1.1HTML代码是由标签构成的。 1.2.HTML 文件基本结构 1.3.HTML 常见标签 标题标签: 段落标签: p 文本格式化标签 图片标签&#xff1a; 超链接标签: a 测试代码&#xff1a; 展示效果&#xff1a; 表单…

wireshark抓包工具新手使用教程

wireshark抓包工具新手入门使用教程 一、Wireshark软件安装二、Wireshark 抓包示范三、Wireshakr抓包界面四、Wireshark过滤器设置五、wireshark过滤器表达式的规则六、Wireshark抓包分析TCP三次握手七、Wireshark分析常用列标签格式 Wireshark是一款开源的网络协议分析工具&am…