【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️

 🎉🎉欢迎光临🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟特别推荐给大家我的最新专栏《Spring 狂野之旅:从入门到入魔》 🚀

本专栏带你从Spring入门到入魔!

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/

 

故事引言

当我们谈论 Spring Kafka 时,可以把它想象成一位非常出色的邮递员,但不是运送普通的信件,而是处理大量的有趣和有用的数据。这位邮递员擅长与 Kafka 进行互动,并且以一种高级抽象和易用的方式处理数据

这位邮递员的任务是将数据从一个地方传送到另一个地方,就像我们寄送包裹一样。他知道如何与 Kafka 进行通信,了解如何与输入和输出主题建立联系

当有人将数据放入输入主题时,这位邮递员会立即接收到通知,并迅速将数据取出。然后,他会对这些数据进行各种有趣的转换和处理操作,就像是一个巧手的魔术师一样。他可以将数据转换成不同的格式、进行聚合、过滤、连接和分流等操作。

一旦数据处理完毕,这位邮递员会将数据装入一个特殊的包裹,并标上目的地的地址,这个目的地就是输出主题。然后,他会快速地把包裹发送出去,确保数据能够按时到达。

Spring Kafka 就像是这位邮递员的工具箱,提供了许多有用的工具和功能,使他的工作更加轻松。它提供了简单且声明性的 API,让我们可以用一种直观的方式定义数据的处理逻辑和流处理拓扑

那么正文开始

目录

故事引言

简介和背景:

实时数据流处理对业务至关重要的原因:

Spring Kafka 基础知识:

深入了解 Apache Kafka 的核心概念和组件:

消息发布和消费:

消费者组管理:

消费者组的概念和作用:

实现有效的消费者组管理:以下是一些实现有效消费者组管理的关键考虑因素:

具体业务实践: 

流处理与处理拓扑

Kafka Streams 的概念和特性:

使用 Spring Kafka 构建和部署流处理拓扑:

实践:

首先,在 pom.xml 文件中添加以下 Maven 依赖:

然后,创建一个 Spring Kafka 流处理应用程序:


简介和背景:

Spring Kafka 是 Spring Framework 提供的一个集成 Apache Kafka 的库,用于构建基于 Kafka 的实时数据流处理应用程序。Apache Kafka 是一个高性能、分布式的流数据平台,广泛用于构建可扩展的、实时的数据处理管道。

实时数据流处理对业务至关重要的原因:

实时数据流处理对于现代业务来说非常重要。随着互联网的快速发展和数字化转型的加速,企业面临着大量的数据产生和处理的挑战。实时数据流处理能够帮助企业实时地捕获、处理和分析数据,从而使企业能够做出及时的决策、提供个性化的服务和优化业务流程。实时数据流处理还可以帮助企业发现潜在的机会和风险,并迅速采取行动。

Spring Kafka 基础知识:

深入了解 Apache Kafka 的核心概念和组件:

在开始学习 Spring Kafka 之前,了解 Apache Kafka 的核心概念和组件是非常重要的。一些核心概念包括:

  • 主题(Topic):消息的类别或者主题。
  • 分区(Partition):主题被分成多个分区,每个分区都是有序的,并且可以在多个机器上进行复制。
  • 生产者(Producer):负责将消息发布到 Kafka 主题。
  • 消费者(Consumer):从 Kafka 主题订阅并消费消息。
  • 消费者组(Consumer Group):一组消费者共同消费一个或多个主题,每个主题的分区被分配给一个消费者组中的一个消费者。
  • 偏移量(Offset):消费者可以跟踪已消费的消息的位置,通过偏移量来表示。

介绍 Spring Kafka 的基本用法和集成方式:

Spring Kafka 提供了简单而强大的 API,用于在 Spring 应用程序中使用 Kafka。它提供了以下核心功能:

  • 消息生产:使用 Spring Kafka 的 KafkaTemplate 类可以方便地将消息发布到 Kafka 主题。
  • 消息消费:通过使用 Spring Kafka 提供的 @KafkaListener 注解,可以轻松地创建消息消费者,并处理来自 Kafka 主题的消息。
  • 错误处理:Spring Kafka 提供了灵活的错误处理机制,可以处理消息发布和消费过程中的各种错误情况。
  • 事务支持:Spring Kafka 支持与 Spring 的事务管理机制集成,从而实现消息发布和消费的事务性操作。

消息发布和消费:

在 Spring Kafka 中发布消息到 Kafka 主题,你可以使用 KafkaTemplate 类的 send() 方法。通过指定要发送的主题和消息内容,可以将消息发送到 Kafka。

要消费 Kafka 主题中的消息,你可以使用 @KafkaListener 注解来创建一个消息监听器。通过指定要监听的主题和消息处理方法,可以在接收到消息时触发相应的逻辑。

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void publishMessage(String topic, String message) {kafkaTemplate.send(topic, message);
}

要消费 Kafka 主题中的消息,你可以使用 @KafkaListener 注解来创建一个消息监听器。通过指定要监听的主题和消息处理方法,可以在接收到消息时触发相应的逻辑。

@KafkaListener(topics = "myTopic")
public void consumeMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);
}

理解消息的序列化和反序列化:

在 Kafka 中,消息的序列化和反序列化是非常重要的概念。当消息被发送到 Kafka 时,它们需要被序列化为字节流。同样地,在消息被消费时,它们需要被反序列化为原始的数据格式。

Spring Kafka 提供了默认的序列化和反序列化机制,可以根据消息的类型自动进行转换。对于常见的数据类型,如字符串、JSON、字节数组等,Spring Kafka 已经提供了相应的序列化和反序列化实现。此外,你也可以自定义序列化和反序列化器来处理特定的消息格式。

例如,你可以使用 StringSerializer 和 StringDeserializer 来序列化和反序列化字符串消息:

@Configuration
public class KafkaConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(config);}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(config);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}

消费者组管理:

消费者组的概念和作用:

消费者组是一组具有相同消费者组ID的消费者,它们共同消费一个或多个 Kafka 主题的消息。消费者组的作用是实现消息的并行处理和负载均衡。通过将主题的分区分配给消费者组中的不同消费者,可以实现消息的并行处理,提高处理吞吐量和降低延迟。消费者组还提供了容错性,当某个消费者出现故障时,其他消费者可以接管其分区并继续处理消息。

实现有效的消费者组管理:
以下是一些实现有效消费者组管理的关键考虑因素:

  1. 消费者组ID的选择:为每个消费者组选择一个唯一的ID,确保不同的消费者组之间互不干扰。

  2. 分区分配策略:选择适当的分区分配策略,确保分配给消费者的分区负载均衡,并避免某些消费者负载过重或空闲。

  3. 动态扩缩容:根据负载情况和处理需求,动态地增加或减少消费者的数量,以实现弹性的消费者组管理。

  4. 监控和健康检查:监控消费者组的运行状态,及时发现并处理故障消费者,确保消费者组的稳定运行。

具体业务实践: 

假设有一个在线电商平台,用户可以在平台上购买商品。平台需要处理用户的订单,并将订单信息发送到一个 Kafka 主题中。订单处理包括验证订单、生成发货单、更新库存等操作。

在这个场景中,可以使用消费者组来实现订单处理的并行处理和负载均衡。具体步骤如下:

  1. 创建一个名为"order"的 Kafka 主题,用于接收用户的订单信息。

  2. 创建一个消费者组,比如名为"order-processing-group"的消费者组。

  3. 启动多个消费者实例,加入到"order-processing-group"消费者组中。每个消费者实例都会订阅"order"主题,并独立地消费订单消息。

  4. Kafka 会根据消费者组的配置,将"order"主题的分区均匀地分配给消费者组中的消费者实例。每个消费者实例将独立地处理分配给它的分区上的订单消息。

  5. 当有新的订单消息到达"order"主题时,Kafka 会将消息分配给消费者组中的一个消费者实例。消费者实例会处理订单消息,执行验证、生成发货单、更新库存等操作。

具体实现:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class OrderConsumer {private static final String TOPIC = "order";private static final String GROUP_ID = "order-processing-group";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// 创建消费者配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建 Kafka 消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList(TOPIC));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String orderMessage = record.value();// 执行订单处理操作,例如验证订单、生成发货单、更新库存等processOrder(orderMessage);}}}private static void processOrder(String orderMessage) {// 实现订单处理逻辑System.out.println("Processing order: " + orderMessage);// TODO: 执行订单处理的具体业务逻辑}
}

​​​​​​​流处理与处理拓扑

  1. Kafka Streams 的概念和特性:

    • Kafka Streams 是一个用于构建实时流处理应用程序的客户端库。
    • 它允许开发人员以简单且声明性的方式处理 Kafka 主题中的数据流。
    • Kafka Streams 提供了丰富的功能,包括数据转换、数据聚合、窗口操作、连接和分流等。
       // 创建拓扑建造器StreamsBuilder builder = new StreamsBuilder();// 创建输入流KStream<String, String> inputStream = builder.stream("input-topic");// 进行数据转换和处理操作KStream<String, String> outputStream = inputStream.mapValues(value -> value.toUpperCase()).filter((key, value) -> value.startsWith("A"));// 将处理结果输出到输出主题outputStream.to("output-topic");// 创建 Kafka Streams 实例KafkaStreams streams = new KafkaStreams(builder.build(), props);

    • 它具有高度可扩展性和容错性,可以通过水平扩展来处理大规模的数据流。
    • Kafka Streams 库紧密集成了 Kafka 的生态系统,可以无缝整合其他 Kafka 组件和工具。
  2. 使用 Spring Kafka 构建和部署流处理拓扑:

    • Spring Kafka 是 Spring Framework 提供的用于与 Kafka 交互的模块。
    • 它提供了高级抽象和易用的 API,简化了 Kafka 流处理应用程序的开发和集成。
    • 使用 Spring Kafka,可以通过配置和注解来定义流处理拓扑,包括输入和输出主题、数据转换和处理逻辑等。
    • Spring Kafka 还提供了与 Spring Boot 的集成,简化了应用程序的配置和部署流程。

实践:

首先,在 pom.xml 文件中添加以下 Maven 依赖:

<dependencies><!-- Spring Kafka 相关依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.8.1</version><scope>test</scope></dependency><!-- 其他依赖 -->
</dependencies>

然后,创建一个 Spring Kafka 流处理应用程序:

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;@SpringBootApplication
@EnableKafka
public class SpringKafkaApp {public static void main(String[] args) {SpringApplication.run(SpringKafkaApp.class, args);}// 创建输入和输出主题@Beanpublic NewTopic inputTopic() {return new NewTopic("input-topic", 1, (short) 1);}@Beanpublic NewTopic outputTopic() {return new NewTopic("output-topic", 1, (short) 1);}// 定义流处理拓扑@KafkaListener(topics = "input-topic")public void processInputMessage(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {// 在这里进行数据转换和处理操作String processedMessage = message.toUpperCase();// 发送处理结果到输出主题kafkaTemplate().send("output-topic", processedMessage);}// 创建 KafkaTemplate 实例@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// 创建 ProducerFactory 实例@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}
}

通过 @EnableKafka 注解启用 Spring Kafka。

通过 @Bean 注解创建了输入主题和输出主题的 NewTopic 实例。

使用 @KafkaListener 注解的方法作为消息监听器,监听名为 "input-topic" 的输入主题。

在 processInputMessage 方法中,我们可以进行数据转换和处理操作。在这个示例中,我们将收到的消息转换为大写。

然后,我们使用 KafkaTemplate 将处理结果发送到名为 "output-topic" 的输出主题。

通过 @Bean 注解创建了 KafkaTemplate 和 ProducerFactory 的实例,用于发送消息到 Kafka。

本期到这啦我们下期再见~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/721769.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】堆排序

堆的概念 堆&#xff1a;一种特殊完全二叉树&#xff0c;也就是二叉树必须全部是满的&#xff0c;但是最后一排可以从右向左缺失。 大根堆&#xff1a;每个节点都比他的子节点大 小根堆&#xff1a;每个节点都比子节点小 堆在代码中的形式 堆在代码中实际上就是列表&#…

[Angular 基础] - routing 路由(下)

[Angular 基础] - routing 路由(下) 之前部分 Angular 笔记&#xff1a; [Angular 基础] - 自定义指令&#xff0c;深入学习 directive [Angular 基础] - service 服务 [Angular 基础] - routing 路由(上) 使用 route 书接上回&#xff0c;继续折腾 routing 按照最初的 wi…

Linux--文件(2)-重定向和文件缓冲

命令行中的重定向符号 介绍和使用 在Linux的命令行中&#xff0c;重定向符号用于将命令的输入或输出重定向到文件或设备。 常见的重定向符号&#xff1a; 1.“>“符号&#xff1a;将命令的标准输出重定向到指定文件中&#xff0c;并覆盖原有的内容。 2.”>>“符号&a…

1.初识python

1.初识python 编程语言是用来定义计算机程序的语言&#xff0c;用来向计算机发出指令。 1.python语言是一种面向对象的解释型高级编程语言。 解释型语言&#xff1a;使用专门的解释器对源码程序逐行解释成特定平台的机器并立即执行&#xff0c;是代码在执行时才被解释器一行行…

c++数据结构算法复习基础-- 3 --线性表-单向链表-笔试面试常见问题

1、单链表逆序 思路图 代码实现 //著: 链表结构里记得加 friend void ReverseLink(Clink& link); void ReverseLink(Clink& link) {Node* p link.head_->next_;while( p nullptr){return;}Node* q p->next_;link.head_->next_ nullptr;while(p ! nullpt…

YOLOv8改进 在更换的PoolFormer主干网络中增加注意力机制

一、PoolFormer的网络结构 PoolFormer采用自注意力机制和池化操作相结合的方式&#xff0c;同时考虑了局部和全局的特征关系。 具体的代码如&#xff08;YOLOv8改进 更换多层池化操作主干网络PoolFormer_yolov8池化-CSDN博客&#xff09;所示。 二、Global Attention Mechan…

python一张大图找小图的个数

python一张大图找小图的个数 一、背景 有时候我们在浏览网站时&#xff0c;发现都是前端搞出来的一张张图&#xff0c;我们只能用盯住屏幕的小眼睛看着&#xff0c;很累的统计&#xff0c;这个是我在项目中发现没办法统计&#xff0c;网上的教程很多&#xff0c;都不成功&…

Python 面向对象编程——类的使用

一、学习目标 1&#xff0e;掌握类的定义和实例化对象。 2&#xff0e;熟练掌握类的构造函数__init__使用。 3&#xff0e;掌握类的继承机制和使用。 二、相关练习 1、定义一个玩具类Toy()&#xff0c;创建名字为“小汽车”、“手枪”和“积木”的玩具实例&#xff0c;计…

深圳牵头打造鸿蒙原生应用软件生态 | 百能云芯

深圳市工业和信息化局、深圳市政务服务和数据管理局于3月3日联合印发了《深圳市支持开源鸿蒙原生应用发展2024年行动计划》。这一计划旨在通过政策引导、市场推动、社会协同的方式&#xff0c;将深圳打造成一个鸿蒙原生应用软件生态的中心&#xff0c;推动鸿蒙系统在当地的发展…

PyQT6的从零开始在Pycharm中配置与使用

PyQT6的从零开始在Pycharm中配置与使用 1.安装PyQt6 PyQt6-tools2.在Pycharm中配置扩展工具2.1配置QTdesigner2.2配置Pyuic 3.启动3.1、启动designer3.2、启动Pyuic 1.安装PyQt6 PyQt6-tools pip install PyQt6 PyQt6-tools安装成功后&#xff0c;查看安装版本&#xff0c;版本…

基于springboot+vue的医疗报销系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

面试笔记系列四之SpringBoot+SpringCloud+计算机网络基础知识点整理及常见面试题

目录 Spring Boot 什么是 Spring Boot&#xff1f; Spring Boot 有哪些优点&#xff1f; SpringBootApplication注解 Spring Boot 的启动流程 Spring Boot属性加载顺序 springboot自动配置原理是什么&#xff1f;&#xff08;*&#xff09; 如何理解springboot中的start…

低代码平台开发实践:基于React的高效构建与创新【文末送书-29】

文章目录 背景低代码平台简介基于React的优势低代码平台的实际应用 低代码平台开发实践&#xff1a;基于React【文末送书-29】 背景 随着技术的不断进步和业务需求的日益复杂&#xff0c;低代码平台成为现代软件开发领域中备受关注的工具之一。在这个快节奏的时代&#xff0c;…

解决手机连接校园网同一设备老是需要重复认证的问题(+解决原理)

相信大家平时在使用校园网的时候总会遇到同一设备隔三岔五就要重复认证绑定的问题&#xff0c;这里直接附上解决方案。 打开手机的wifi-->连接校园网然后进入设置-->在隐私选项选择“使用设备MAC” 如下图&#xff0c;问题解决了&#xff01;如果想知道原理的可以继续往…

如何处理微服务之间的通信和数据一致性?

✨✨祝屏幕前的兄弟姐妹们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 一、微服务通信 1、同步通信&#xff1a;HTTP 1.1.同步通信示例代码&#xf…

1、Ajax、get、post、ajax,随机颜色

一、Ajax初始 1、什么是Ajax&#xff1f; 异步的JavaScript和xml 2、xml是什么&#xff1f; 一种标记语言&#xff0c;传输和存储数据----------现在用JSON传输数据 3、Ajax的作用 局部加载 可以使网页异步更新 4、Ajax的原理或者步骤(6步) 创建Ajax对象 if (window.X…

2024年租用阿里云服务器多少钱?阿里云服务器租用价格表(最新版)

2024年租用阿里云服务器一年多少钱&#xff1f;不同时期阿里云服务器的租用价格不同&#xff0c;随着2024年阿里云上云采购季活动的开启和阿里云最新一轮的云产品降价调整&#xff0c;阿里云服务器租用价格也做了一些调整&#xff0c;配置最低的1核1G云服务器收费标准为22.8/月…

NAT模式 LVS负载均衡部署

一 架构图 二 文字表述过程 1 当客户端 发起请求报文是: 源ip:客户端的ip地址(cip) 目的地址:vip(代理服务器的外网地址) 2.当数据包到达我们的 代理服务器 源ip不变&#xff0c;需要修改目的ip及端口号 源ip:客户端的ip地址(c…

智慧城市中的数字孪生:构建城市管理的未来框架

目录 一、引言 二、数字孪生技术概述 三、数字孪生技术在智慧城市中的应用 1、实时监测与预警 2、模拟与优化 3、智能化决策 4、协同与共享 四、数字孪生技术构建城市管理的未来框架的价值 1、提高管理效率 2、优化资源配置 3、提升公共服务水平 4、增强应对突发事…

【Android开发】02-小费计算APP(Tip Time)

github地址&#xff08;项目中的A02_TipTime文件夹&#xff09;&#xff1a; https://github.com/tao355667/Android_Development 一、功能介绍 输入消费金额和服务满意度后&#xff0c;可计算出相应的小费(可选是否四舍五入)支持中英文系统可根据系统主题的明暗切换界面 二、…