使用Spring Boot集成中间件:Kafka的高级使用案例讲解

使用Spring Boot集成中间件:Kafka的具体使用案例讲解

导言

在实际应用中,Kafka作为一种强大的分布式消息系统,广泛应用于实时数据处理和消息传递。本文将通过一个全面的使用案例,详细介绍如何使用Spring Boot集成Kafka,并展示其在实际场景中的应用。

1. 准备工作

在开始之前,我们需要确保已经完成以下准备工作:

  • 安装并启动Kafka集群
  • 创建Kafka主题(Topic)用于消息的发布与订阅

2. 生产者示例

首先,我们来创建一个简单的生产者,将消息发送到Kafka主题。

@RestController
public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/produce/{message}")public ResponseEntity<String> produceMessage(@PathVariable String message) {kafkaTemplate.send("my-topic", message);return ResponseEntity.ok("Message sent to Kafka: " + message);}
}

在上述代码中,我们使用了Spring Boot提供的KafkaTemplate,通过调用send方法将消息发送到名为my-topic的Kafka主题。

3. 消费者示例

接下来,我们创建一个简单的消费者,订阅并处理来自Kafka主题的消息。

@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consumeMessage(String message) {System.out.println("Received message from Kafka: " + message);// 进行消息处理逻辑}
}

通过@KafkaListener注解,我们指定了要监听的主题为my-topic,同时指定了消费者组的ID为my-group。当有新消息到达时,consumeMessage方法将被触发,进行消息处理逻辑。

4. 配置文件

application.propertiesapplication.yml中配置Kafka相关属性。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group

这里我们配置了Kafka的地址和消费者组的ID。

5. 运行和测试

启动Spring Boot应用程序,通过POST请求发送消息:

curl -X POST http://localhost:8080/produce/HelloKafka

在控制台或日志中,可以看到消费者输出了接收到的消息。
##############################################################################################################

一些其他的使用场景

使用Spring Boot集成中间件:Kafka高级使用案例

在这个高级使用案例中,我们将深入展示Spring Boot集成Kafka的一些高级功能,包括多分区、事务、自定义分区策略以及消息过滤。这将使我们更好地适应复杂的业务场景。

1. 配置多分区和自定义分区策略

首先,我们在Kafka配置中设置多分区以提高并发处理能力,并实现自定义分区策略。

@Configuration
public class KafkaConfig {@Beanpublic NewTopic myTopic() {return TopicBuilder.name("my-topic").partitions(5) // 设置为5个分区.replicas(1).build();}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);template.setDefaultTopic("my-topic");return template;}@Beanpublic ProducerListener<String, String> producerListener() {return new MyProducerListener();}
}

在上述配置中,我们将主题my-topic配置为5个分区,并设置了生产者的默认主题。同时,我们实现了一个自定义的生产者监听器MyProducerListener,可以在消息发送前后执行额外的逻辑。

2. 事务支持和幂等性配置

接下来,我们配置生产者启用事务,并设置消费者为幂等性消费。

@Configuration
public class KafkaConfig {// ... 上述配置 ...@Beanpublic KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();// ... 其他配置 ...props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, consumerFactory);factory.setBatchListener(true);return factory;}
}

在上述配置中,我们使用了KafkaTransactionManager配置事务管理器,同时设置了消费者的隔离级别为read_committed,启用了批量监听。

3. 自定义分区策略

为了更灵活地控制消息的分布,我们可以实现自定义的分区策略。

public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);int numPartitions = partitions.size();if (key == null || !(key instanceof String)) {throw new InvalidRecordException("Invalid key");}String keyValue = (String) key;int hashCode = keyValue.hashCode();return Math.abs(hashCode % numPartitions);}@Overridepublic void close() {// 关闭资源逻辑}@Overridepublic void configure(Map<String, ?> configs) {// 配置初始化逻辑}
}

在上述分区器中,我们使用了消息的字符串形式的key进行哈希计算,然后取绝对值得到分区数。这使得具有相同key的消息始终被分发到同一个分区。

4. 运行和测试

通过上述配置,我们可以启动Spring Boot应用程序,观察多分区、事务支持和自定义分区策略在消息生产和消费中的效果。

curl -X POST http://localhost:8080/produce/HelloKafka

在Kafka消费者日志中,可以看到消息被正确地分配到了指定的分区,并且事务操作生效,确保消息的一致性。

Kafka获取文件流的具体案例讲解


在许多实际应用场景中,我们需要处理文件数据,并将文件流传输到Kafka中进行进一步的处理。下面将通过一个具体的案例来演示如何使用Spring Boot和Kafka实现文件流的生产和消费。

1. 文件流生产者

首先,我们创建一个文件流生产者,读取本地文件并将文件内容发送到Kafka主题。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;@Service
public class FileProducerService {private final KafkaTemplate<String, byte[]> kafkaTemplate;@Autowiredpublic FileProducerService(KafkaTemplate<String, byte[]> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void produceFile(String topic, String filePath) {try {byte[] fileBytes = Files.readAllBytes(Path.of(filePath));kafkaTemplate.send(topic, fileBytes);} catch (IOException e) {// 处理文件读取异常e.printStackTrace();}}
}

在上述代码中,我们注入了KafkaTemplate,通过Files.readAllBytes读取文件内容并通过kafkaTemplate.send发送到指定的Kafka主题。

2. 文件流消费者

接下来,我们创建一个文件流消费者,监听Kafka主题并将接收到的文件流保存到本地。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;import java.io.FileOutputStream;
import java.io.IOException;@Service
public class FileConsumerService {@KafkaListener(topics = "file-topic")public void consumeFile(byte[] fileBytes) {try {// 保存文件到本地String fileName = "received-file.txt";FileOutputStream outputStream = new FileOutputStream(fileName);outputStream.write(fileBytes);outputStream.close();} catch (IOException e) {// 处理文件保存异常e.printStackTrace();}}
}

通过@KafkaListener注解,我们监听名为file-topic的Kafka主题,接收文件流并保存到本地文件。

3. 配置文件

application.propertiesapplication.yml中配置Kafka相关属性。

spring.kafka.bootstrap-servers=localhost:9092

4. 运行和测试

在Spring Boot应用程序中运行文件流生产者和消费者,通过调用生产者的方法,将文件内容发送到Kafka主题:

fileProducerService.produceFile("file-topic", "path/to/your/file.txt");

消费者将接收到文件流,并将其保存到本地文件。你可以通过查看消费者的日志或检查保存的文件来验证流程是否正常运行。

结语

通过对kafka的一些常用使用案例代码分析,希望这个能够帮助大家更深入地理解和使用Spring Boot集成Kafka的高级功能。感谢阅读!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/622994.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab串口数据交互的使用

一、matlab将串口数据读取并储存到position中 delete(instrfindall);%注销系统之前已经打开的串口资源 clear s %清空s的数据 s serial(COM6,BaudRate,115200);%定义串口及波特率 fopen(s)%打开串口 fwrite(s,00AB,)%向串口写入读取电机位置指令 for i1:8 %共8个电机position…

JVM实战(15)——Full GC调优

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…

气象能见度监测站的应用介绍

【TH-NJD10】能见度是反映大气透明度的一个重要指标&#xff0c;对于航空、航海、道路交通等领域具有重要意义。 一、气象能见度监测站的应用 交通气象服务 气象能见度监测站在交通气象服务中发挥着重要作用。在高速公路、机场、港口等交通枢纽&#xff0c;能见度监测数据对于交…

U盘格式化后数据能恢复吗?几个具体解决问题的答案

U盘是一种常见的存储设备&#xff0c;它可以方便我们携带各种文件和数据。但是&#xff0c;当我们不小心将U盘格式化了&#xff0c;里面的数据也将全部都消失。并且&#xff0c;对于一些拥有较多电脑操作技能的用户来讲&#xff0c;格式化删除的数据与普通右击删除的方式相比&a…

机器人制作开源方案 | 乒乓球自动拾取机器人

作者&#xff1a;刘众森、王森、王绘东、崔岳震、宋维鑫 单位&#xff1a;山东农业工程学院 指导老师&#xff1a;潘莹月、廖希杰 1. 场景调研 我们小组选择项目的任务方向乒乓球的捡取与存放&#xff0c;针对此问题我们研发了一款乒乓球自动拾取机器人。众所周知&#xff0…

【Unity】【VRTK】【Pico】如何快速在VRTK中引入带动画的PICO控制器

【背景】 之前的VRTK篇章中,我只介绍了Oculus,Open VR,SImulator这三种Rig的配置方法,那么Pico如何融合VRTK进行开发呢? 【需要的开发包】 先像一个正常PICO项目那样导入PICO的SDK到Unity。VRTK 4的Package导入器中搜Pico,可以导入一个Pico的Integration,导入后Projec…

238.【2023年华为OD机试真题(C卷)】火星文计算(模拟-JavaPythonC++JS实现)

🚀点击这里可直接跳转到本专栏,可查阅顶置最新的华为OD机试宝典~ 本专栏所有题目均包含优质解题思路,高质量解题代码(Java&Python&C++&JS分别实现),详细代码讲解,助你深入学习,深度掌握! 文章目录 一. 题目-火星文计算二.解题思路三.题解代码Python题解代…

python中none的替换方法:pandasnumpy

none的替换方法&#xff1a; 1.pandas # 将缺失的id值替换为None merged_df[id].fillna(None, inplaceTrue) #这行代码使用了Pandas库中的fillna方法&#xff0c;对DataFrame中的id列进行了填充操作。具体来说&#xff0c;它将该列中的缺失值用字符串None进行填充&#xff0c…

Python中的__add__()方法

在 Python 中&#xff0c;__add__() 是一个特殊方法&#xff08;magic method&#xff09;&#xff0c;用于定义对象之间的加法操作。当你使用 运算符对两个对象进行相加时&#xff0c;实际上会调用对象的 __add__() 方法。 下面是一个简单的例子&#xff0c;演示了 __add__()…

SpringCloud.03.网关Gateway

目录 网关Gateway的概念&#xff1a; 准备 使用 方式一 因为配置了网关所以可以直接通过gateway发送请求 方式二 修改配置前&#xff1a;http://localhost:8082/provider/run 方式三(动态路由) 导入配置类 网关Gateway的概念&#xff1a; Spring Cloud Gateway 是 Spri…

【网络工程师】NAT与动态路由

一、NAT网络地址转换 1、NAT&#xff1a;Network Address Translations 网络地址转换 2、ip地址问题&#xff1a;ipv4地址严重不够用了&#xff08;A、B、C类可以使用 D组播 E科研&#xff09; 3、解决&#xff1a;把IP地址分为了公网IP和私网IP 公网IP只能在公网上使用 私网…

LLM模型的generate和chat函数区别

在 Hugging Face 的 transformers 库中&#xff0c;GPT&#xff08;Generative Pre-trained Transformer&#xff09;类的模型有两个常用的生成文本的方法&#xff1a;generate 和 chat。这两个方法在使用上有一些区别。通常公司发布的 LLM 模型会有一个基础版本&#xff0c;还…

使用Docker容器部署LNMP服务

目录 实验前准备部署Nginx环境准备准备nginx.conf配置文件生成并启动镜像验证nginx 部署Mysql准备工作目录编写Dockerfile脚本准备my.cnf文件生成并启动镜像启动镜像容器验证mysql 部署php建立工作目录编写Dockerfile脚本准备配置文件生成并启动镜像验证php 启动wordpressmysql…

书生·浦语大模型实战营-学习笔记3

目录 (3)基于 InternLM 和 LangChain 搭建你的知识库1. 大模型开发范式&#xff08;RAG、Fine-tune&#xff09;RAG微调 &#xff08;传统自然语言处理的方法&#xff09; 2. LangChain简介&#xff08;RAG开发框架&#xff09;3. 构建向量数据库4. 搭建知识库助手5. Web Demo部…

非线性方程求根迭代法(C++)

文章目录 问题描述算法描述不动点迭代法一维情形多维情形 牛顿迭代法单根情形重根情形 割线法抛物线法逆二次插值法 算法实现准备工作一般迭代法割线法抛物线法逆二次插值法 实例分析例1例2 迭代法是一种求解非线性方程根的方法, 它通过构造一个迭代过程, 将一个非线性方程转化…

瑞_Java开发手册_(一)编程规约

文章目录 编程规约的意义&#xff08;一&#xff09;命名风格&#xff08;二&#xff09;常量定义&#xff08;三&#xff09;代码格式&#xff08;四&#xff09;OOP 规约&#xff08;五&#xff09;日期时间&#xff08;六&#xff09;集合处理&#xff08;七&#xff09;并发…

实现STM32烧写程序-(3) Hex文件结构

简介 要对STM32进行更新动作, 就需要对程序文件进行解析, 大部分编译的生成程序文件是Hex或者Bin, 先来看看Hex的结构吧。 资料 Hex文件 简介 Hex文件格式最早由Intel公司于1973年创建。它最初是为了在Intel 8080微处理器上存储和传输二进制数据而设计的。随后&#xff0c;Hex…

基于HarmonyOS的华为智能手表APP开发实战——Fitness_华为手表app开发

、 基于HarmonyOS的华为智能手表APP开发实战——Fitness_华为手表app开发 Excerpt 文章浏览阅读8.7k次,点赞6次,收藏43次。本文针对华为HarmonyOS智能穿戴产品(即HUAWEI WATCH 3)开发了一款运动健康类的游戏化APP——Fitness,旨在通过游戏化的方式,提升用户运动动机。_华…

c++ 开发生态环境、工作流程、生命周期-拾遗

拾遗 1 生态环境初识 当您使用Visual Studio 2019进行C开发时&#xff0c;您将进入C生态环境。以下是一些重要的概念和步骤&#xff1a; C程序的结构&#xff1a; 一个典型的C程序包括源文件&#xff08;.cpp&#xff09;、头文件&#xff08;.h&#xff09;、编译后的目标文…

【算法实验】实验1

实验1-1 斐波那契数 【问题描述】斐波那契数 &#xff08;通常用 F(n) 表示&#xff09;形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始&#xff0c;后面的每一项数字都是前面两项数字的和。 定义&#xff1a;F(0) 0, F(1) 1, F(n) F(n-1) F(n-2) 其中n>1 要求计…