Spring Boot 集成 Kafka

Spring Boot 与 Kafka 集成是实现高效消息传递和数据流处理的常见方式。Spring Boot 提供了简化 Kafka 配置和使用的功能,使得集成过程变得更加直观和高效。以下是 Spring Boot 集成 Kafka 的详细步骤,包括配置、生产者和消费者的实现以及一些高级特性。

1. 添加依赖

首先,你需要在 Spring Boot 项目的 pom.xml 文件中添加 Kafka 相关的依赖。使用 Spring Boot 的起步依赖可以简化配置。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

2. 配置 Kafka

2.1. 配置文件

application.propertiesapplication.yml 文件中配置 Kafka 相关属性。

application.properties:

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092# Kafka 消费者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# Kafka 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

application.yml:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

2.2. Kafka 配置类

在 Spring Boot 中,你可以使用 @Configuration 注解创建一个配置类,来定义 Kafka 的生产者和消费者配置。

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}

3. 实现 Kafka 生产者

3.1. 生产者服务

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private static final String TOPIC = "my_topic";public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}

3.2. 控制器示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaProducerService.sendMessage(message);}
}

4. 实现 Kafka 消费者

4.1. 消费者服务

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "my_topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}

5. 高级特性

5.1. 消息事务

Kafka 支持消息事务,确保消息的原子性。

生产者配置

spring.kafka.producer.enable-idempotence=true
spring.kafka.producer.transaction-id-prefix=my-transactional-id

使用事务

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.TransactionTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class KafkaTransactionalService {private final KafkaTemplate<String, String> kafkaTemplate;private final TransactionTemplate transactionTemplate;public KafkaTransactionalService(KafkaTemplate<String, String> kafkaTemplate, TransactionTemplate transactionTemplate) {this.kafkaTemplate = kafkaTemplate;this.transactionTemplate = transactionTemplate;}@Transactionalpublic void sendMessageInTransaction(String message) {kafkaTemplate.executeInTransaction(t -> {kafkaTemplate.send("my_topic", message);return true;});}
}

5.2. 异步发送与回调

异步发送

public void sendMessageAsync(String message) {kafkaTemplate.send("my_topic", message).addCallback(result -> System.out.println("Sent message: " + message),ex -> System.err.println("Failed to send message: " + ex.getMessage()));
}

总结

Spring Boot 与 Kafka 的集成使得消息队列的使用变得更加简单和高效。通过上述步骤,你可以轻松地配置 Kafka、实现生产者和消费者,并利用 Spring Boot 提供的强大功能来处理消息流。了解 Kafka 的高级特性(如事务和异步处理)能够帮助你更好地满足业务需求,确保系统的高可用性和数据一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/50085.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker配置上网代理获取镜像

一、添docker子配置档设置 1、创建目录 mkdir /etc/systemd/system/docker.service.d 2、创建http-proxy.conf文件,增加以下内容 cat > /etc/systemd/system/docker.service.d/http-proxy.conf <<EOF [Service] Environment“HTTP_PROXYhttp://192.168.0.2:8118…

分布式系统常见软件架构模式

常见的分布式软件架构 Peer-to-Peer (P2P) PatternAPI Gateway PatternPub-Sub (Publish-Subscribe)Request-Response PatternEvent Sourcing PatternETL (Extract, Transform, Load) PatternBatching PatternStreaming Processing PatternOrchestration Pattern总结 先上个图&…

.h264 .h265 压缩率的直观感受

1.资源文件 https://download.csdn.net/download/twicave/89579327 上面是.264 .265和原始的YUV420文件&#xff0c;各自的大小。 2.转换工具&#xff1a; 2.1 .h264 .h265互转 可以使用ffmpeg工具&#xff1a;Builds - CODEX FFMPEG gyan.dev 命令行参数&#xff1a; …

liteos定时器回调时间过长造成死机问题解决思路

项目需求 原代码是稳定的&#xff0c;现我实现EMQ平台断开连接的时候&#xff0c;把HSL的模拟点位数据采集到网关&#xff0c;然后存入Flash&#xff0c;当EMQ平台连接的时候&#xff0c;把Flash里面的点位数据放在消息队列里面&#xff0c;不影响实时采集。 核心1&#xff1a…

Springboot实现缓存组件(ctgcache、redis)配置动态切换

目录 一、需求背景 二、实现缓存组件的动态切换 1.第一步&#xff1a;配置文件新增切换开关 2.第二步&#xff1a;创建ctgcache 缓存条件类 3.第三步&#xff1a;创建redis 缓存条件类 4.第四步&#xff1a;创建缓存切换配置类 5.第五步&#xff1a;创建缓存服务接口 6.…

godot新建项目及设置外部编辑器为vscode

一、新建项目 初次打开界面如下所示&#xff0c;点击取消按钮先关闭掉默认弹出的框 点击①新建弹出中间的弹窗②中填入项目的名称 ③中设置项目的存储路径&#xff0c;点击箭头所指浏览按钮&#xff0c;会弹出如下所示窗口 根据图中所示可以选择或新建自己的游戏存储路径&…

鸿蒙(HarmonyOS)自定义Dialog实现时间选择控件

一、操作环境 操作系统: Windows 11 专业版、IDE:DevEco Studio 3.1.1 Release、SDK:HarmonyOS 3.1.0&#xff08;API 9&#xff09; 二、效果图 三、代码 SelectedDateDialog.ets文件/*** 时间选择*/ CustomDialog export struct SelectedDateDialog {State selectedDate:…

Linux系统上安装Redis

百度网盘&#xff1a; 通过网盘分享的文件&#xff1a;redis_linux 链接: https://pan.baidu.com/s/1ZcECygWA15pQWCuiVdjCtg?pwd8888 提取码: 8888 1.把安装包拖拽到/ruanjian/redis/文件夹中&#xff08;自己选择&#xff09; 2.进入压缩包所在文件夹&#xff0c;解压压缩…

ROM修改进阶教程------修改rom 开机自动安装指定apk 自启脚本完整步骤解析

rom修改的初期认识 在解包修改系统分区过程中。很多客户需求刷完rom后自动安装指定apk。这种与内置apk有区别。而且一些极个别apk无法内置。今天对这种修改rom刷入机型后第一次启动后自动安装指定apk的需求做个步骤解析。 在前期博文中我有做过说明。官方系统固件解…

按图搜索新体验:阿里巴巴拍立淘API返回值详解

阿里巴巴拍立淘API是一项基于图片搜索的商品搜索服务&#xff0c;它允许用户通过上传商品图片&#xff0c;系统自动识别图片中的商品信息&#xff0c;并返回与之相关的搜索结果。以下是对阿里巴巴拍立淘API返回值的详细解析&#xff1a; 一、主要返回值内容 商品信息 商品列表…

Java面试题(每日更新)

每日五道&#xff01;学会就去面试&#xff01; 本文的宗旨是为读者朋友们整理一份详实而又权威的面试清单&#xff0c;下面一起进入主题吧。 目录 1.概述 2.Java 基础 2.1 JDK 和 JRE 有什么区别&#xff1f; 2.2 和 equals 的区别是什么&#xff1f; 2.3 两个对象的…

ECharts实现按月统计和MTBF统计

一、数据准备 下表是小明最近一年的旅游记录 create_datecity_namecost_money2023-10-10 10:10:10北京14992023-11-11 11:11:11上海29992023-12-12 12:12:12上海19992024-01-24 12:12:12北京1232024-01-24 12:12:12上海2232024-02-24 12:12:12广州5642024-02-24 12:12:12北京…

leetcode-148. 排序链表

题目描述 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 示例 1&#xff1a; 输入&#xff1a;head [4,2,1,3] 输出&#xff1a;[1,2,3,4]示例 2&#xff1a; 输入&#xff1a;head [-1,5,3,4,0] 输出&#xff1a;[-1,0,3,4,5]示例 3&#x…

react18+

主要是围绕函数式组件讲&#xff0c;18主要用就是函数式组件&#xff0c;学习前先熟悉下原生js的基本使用&#xff0c;主要是事件 1、UI操作 1.1、书写jsx标签语言 基本写法和原生如同一则&#xff0c;只是放在一个方法里面返回而已&#xff0c;我们称这样的写法为函数式组件…

OrangePi Zero2 全志H616 开发初探

目录&#xff1a; 一、刷机和系统启动1、TF 卡格式化&#xff1a;2、镜像烧录&#xff1a;3、登录系统&#xff1a; 二、基于官方外设开发1、wiringPi 外设 SDK 安装&#xff1a;2、C 文件编译&#xff1a;3、基于官方外设的应用开发&#xff1a;① GPIO 输入输出&#xff1a;②…

【个人亲试最新】WSL2中的Ubuntu 22.04安装Docker

文章目录 Wsl2中的Ubuntu22.04安装Docker其他问题wsl中执行Ubuntu 报错&#xff1a;System has not been booted with systemd as init system (PID 1). Can‘t operate. 参考博客 &#x1f60a;点此到文末惊喜↩︎ Wsl2中的Ubuntu22.04安装Docker 确定为wsl2ubuntu22.04&#…

C++20之设计模式:状态模式

状态模式 状态模式状态驱动的状态机手工状态机Boost.MSM 中的状态机总结 状态模式 我必须承认:我的行为是由我的状态支配的。如果我没有足够的睡眠&#xff0c;我会有点累。如果我喝了酒&#xff0c;我就不会开车了。所有这些都是状态(states)&#xff0c;它们支配着我的行为:…

uniapp 中uni.request H5无法使用请求头

今天调试一天忘了看官网文档一直以为写错了&#xff0c;后来发现uniapp 里面就不能自定义h5我擦。 如果要测试可以走HX的内置浏览器中测试 其他比如小程序或者APP都可以使用

vue接入google map自定义marker教程

需求背景 由于客户需求&#xff0c;原来系统接入的高德地图&#xff0c;他们不接受&#xff0c;需要换成google地图。然后就各种百度&#xff0c;各种Google&#xff0c;却不能实现。----无语&#xff0c;就连google地图官方的api也是一坨S-H-I。所以才出现这篇文章。 google地…

【Python实战因果推断】56_因果推理概论6

目录 Causal Quantities: An Example Bias Causal Quantities: An Example 让我们看看在我们的商业问题中&#xff0c;你如何定义这些量。首先&#xff0c;你要注意到&#xff0c;你永远无法知道价格削减&#xff08;即促销活动&#xff09;对某个特定商家的确切影响&#xf…