Kafka的重要组件,谈谈流处理引擎Kafka Stream

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
Kafka是什么,以及如何使用SpringBoot对接Kafka
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——Kafka可靠性分析及优化实践


在这里插入图片描述
我们前面介绍了很多kafka本身的特性与设计,也说了不少原理性的内容,本次我们稍微放松一下,来介绍一下 Kafka的一个重要组件—— Kafka Stream

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


一、Kafka Stream是什么

1. 简介

Kafka Stream是 Apache Kafka 的一个子项目,它提供了一种简单而快速的方法来对数据流进行处理,是一种无状态的流处理引擎,可以消费Kafka中的数据并将其转换为输出流。Kafka Stream不像其他流处理工具,它是一个Java库,能够快速构建、部署和管理数据流处理任务。

在这里插入图片描述

我们在前面的文章中《Kafka是什么,以及如何使用SpringBoot对接Kafka》 初步接触了kafka的客户端kafka client,当时如果有眼尖的同学应该也注意到了,在使用Spring Initializr创建项目时,就看见了Kafka Stream的身影

在这里插入图片描述

那么Kafka Stream 与 我们当时接触的 Kafka client 有什么联系吗?其实它们的共同点在于他们都是与Kafka集成的API,从逻辑层次来说,Kafka Stream 是建立在 Kafka client 上的,我们在引用 Kafka Stream 时, 其会自带着 Kafka client 的包,如下:

在这里插入图片描述

那它们的作用到底哪不一样呢?具体来说,它们的不同之处可从这几个方面看:

  • 功能不同
    Kafka Stream是用于流处理任务的API,它提供了一种简单而快速的方法来对数据流进行处理。相反,Kafka Client主要用于生产和消费Kafka消息

  • 处理方式不同
    Kafka Client主要依赖于订阅和轮询来消费Kafka消息。而Kafka Stream依赖于数据流的处理,它会自动将Kafka消息转化为数据流,并实时处理这些数据

  • API调用方式不同
    在Kafka Stream中,您需要定义一个拓扑结构,描述如何将输入流转换成输出流,并执行转换。而在Kafka Client中,您需要调用API来发送和接收Kafka消息

  • 应用场景不同
    Kafka Stream适用于实时数据分析、实时预测等需要流处理的场景。而Kafka Client更适用于异步数据传输的场景,例如日志收集、事件处理等。

2. 特点

我们前面说过,流处理引擎做的人也是很多的,常见的比如说Apache FlinkApache Spark StreamingApache Storm 以及阿里参考 Apache Storm 开发的Jstorm。既然有如此多的可选项,为什么还有Kafka Stream这个东西呢?其实说来也简单,就是应用简单+功能丰富

在这里插入图片描述
总计来说,其具备以下特点:

  1. 无需额外征用集群资源
    在传统的流处理中,需要单独的集群进行数据处理,这就意味着需要额外的开销。而Kafka Stream是直接在Kafka集群上执行的,不会征用额外的资源。

  2. 易于使用
    Kafka Stream提供了简单易用的API,使得开发人员可以快速地进行流处理任务的开发。它还支持Java 8中的Lambda表达式,使得代码更加简洁。

  3. 支持丰富的转换操作
    Kafka Stream支持丰富的转换操作,包括过滤、映射、聚合等。这些操作可以被组合使用,以满足不同的处理需求。

二、流程与核心类

1. KStream 和 KTable 概念

我们上面简要介绍了下Kafka Stream的特点。但是,要想明白其流程并正确使用,我们还需要讲两个核心概念,也就是KStreamKTable

  • KStream
    KStream是一个持续不断的流数据记录,每个记录都是一个key-value对,可以被读取、写入和转换。通常,KStream用于处理实时数据流,我们可以直接从kafka集群中指定主题来获取源源不断的数据

  • KTable
    KTable顾名思义,可以看作是一张持久化的、可查询的、支持状态更新的表格。它通常是利用KStream的数据经过一系列转换和聚合操作生成的,KTable可以被读取和更新,但不能被删除。

KStream和KTable是互补的。KStream可以转换成KTable,也可以从KTable中获取值;KTable也可以转换成KStream,我们可以使用下图,看一下是如何针对数据流中,出现的单词进行计数并”落表“的:
在这里插入图片描述

当然,我们还有必要提及一下GlobalKTable,它是一种特殊的KTable,GlobalKTable通常用于处理比较静态的全局数据,例如维护一个全局的用户信息表,而且只在应用程序启动时从Kafka主题中加载所有数据,这意味着需要消耗较大的内存空间。

2. 常用逻辑与转换

我们上面说了KStreamKTable ,在代码里其实也对应了两个类,那这两个类都有些什么方法呢?最重要的,我们想知道,它们是如何互相转换的。

其实关于 KStream ,可能有些同学会想到JDK 里的 Stream ,因为确实很多方法是一致的,所以不用慌张。我们先来介绍下 KStream 的常用方法:

  • filter:过滤数据流中不符合条件的记录。
  • map:将每个记录转换为一个新的记录,可以改变记录的key和value。
  • flatMap:与map类似,可以将一个记录转换为多个新的记录。
  • mapValues:与map类似,但记录的键保留不变,只改变值
  • groupByKey:将记录按key进行分组,生成一个KGroupedStream对象,可以用于聚合操作。
  • reduce:对KGroupedStream对象进行聚合操作。
  • join:将两个KStream对象进行join操作,生成一个新的KStream对象。
  • windowed:对KStream对象进行窗口操作,可以使用时间窗口或大小窗口。
  • aggregate:将当前流中的记录聚合,并生成一个新的KTable。与reduce方法不同,aggregate方法不仅考虑当前记录,还考虑之前记录的聚合结果
  • to:将结果输出到输出主题中

我们举一个小代码段来看下这些方法的使用

KStream<String, String> input = ...;
// 使用filter方法过滤出包含"important"的值
KStream<String, String> filtered = input.filter((key, value) -> value.contains("important"))
// 使用mapValues方法将每个值的长度作为新值。
KStream<String, Integer> mapped = filtered.mapValues(value -> value.length());
// 使用groupBy方法将键值对按键分组,并使用count方法计算每个键出现的次数,将结果存储在KTable中
KTable<String, Integer> counted = mapped.groupBy((key, value) -> key).count(Materialized.as("counts"));
// 使用selectKey方法选取值中"-"前的部分作为新键
KStream<String, String> rekeyed = input.selectKey((key, value) -> value.split("-")[0]);
// 使用leftJoin方法将两个KStream进行左连接,即mapped流和rekeyed流进行连接,
// 连接的条件是两个流中的键相等。连接函数的定义是将两个整型值相加,并将结果作为连接后的流的值
KStream<String, Integer> joined = mapped.leftJoin(rekeyed, (value1, value2) -> value1 + value2);
// 使用groupByKey方法对键值对按键分组,并使用windowedBy方法将窗口大小设置为5分钟,
// 然后使用count方法计算每个键在此时间窗口中出现的次数,最后使用toStream方法将结果
// 转换为KStream类型并将时间窗口的起止时间设置为键,值设置为次数
KStream<String, Long> windowed = input .groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5))).count().toStream().map((key, value) -> new KeyValue<>(key.key(), value));
// 将结果输出到输出主题中
windowed.to("output-topic");

而关于KTable,也有一些常用方法,如下:

  • filter:根据指定的谓词过滤记录,并返回一个新的KTable。谓词是一个接受key和value作为参数的函数,如果返回true,则保留该记录,否则过滤掉。
  • mapValues:对KTable中的每个value执行指定的转换函数,并返回一个新的KTable。
  • groupBy:根据指定的key进行分组,并返回一个KGroupedTable对象,该对象用于执行各种聚合操作。
  • join:将当前KTable与另一个KTable或KStream进行连接,并返回一个新的KTable。
  • toStream:将KTable转换为KStream,并返回一个新的KStream对象。

我们也写一小段代码用于演示:

// 从输入流中建立一个KTable
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> myKTable = builder.table("input-topic", Materialized.as("ktable-store"));// 1. 执行一些过滤操作,保留包含特定前缀的键
KTable<String, String> filteredKTable = myKTable.filter((key, value) -> key.startsWith("prefix"));// 2. 执行mapValues操作,将每个键值对中的value进行大写转换
KTable<String, String> uppercasedKTable = myKTable.mapValues(e -> e.toUpperCase());// 3. 执行groupBy操作,将键值对按照key的前缀分组
KTable<String, String> groupedKTable = myKTable.groupBy((key, value) -> KeyValue.pair(key.split("_")[0], value)).reduce((aggValue, newValue) -> aggValue + "_" + newValue);// 4. 执行leftJoin操作,将两个KTable进行连接,如果某一个KTable中没有该key,则用null进行填充
KTable<String, String> leftJoinedKTable = myKTable.leftJoin(filteredKTable,(value1, value2) -> value1 + "-" + value2);// 5. 执行toStream操作,将KTable转换为KStream类型
myKTable.toStream().map((key, value) -> KeyValue.pair(key, value.toUpperCase()));

当然,关于上述哪些方法,我们也可以用一张图来概括它们之间的转换关系,如下图,其中的 KGroupedStream 和 KGroupedTable 其实就是KStream 和 KTable 进行聚合操作后的产物
在这里插入图片描述

三、使用场景与Demo

1. 实时数据分析

Kafka Stream可以将实时到达的数据进行处理,以便进行实时数据分析。在这种情况下,Kafka Stream通常会将数据转换为一些有用的信息,以便于更好的理解数据,我们可以举一个简单的示例demo

假设我们有一个数据流,其中包含电影评分信息和电影相关信息。我们的任务是计算出每个电影的平均评分。

首先,我们需要定义输入数据流所需的数据结构。假设我们的数据结构如下:

@Data
public class MovieRating {private String movieId;private float rating;
}@Data
public class Movie {private String movieId;private String title;
}

接下来,我们需要编写Kafka流应用程序。我们可以将其分为三个步骤:

1.从Kafka主题读取电影评分和电影相关信息。
2.以电影ID为键,将电影评分聚合到一个窗口中,并计算平均值。
3.将结果写入新的Kafka主题。

public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "movie-ratings-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);final StreamsBuilder builder = new StreamsBuilder();// 步骤1:从kafka主题中读取电影信息及评分// 我们假设主题包含Avro编码的数据KStream<String, MovieRating> ratings = builder.stream("movie-ratings");KStream<String, Movie> movies = builder.stream("movies");// 步骤2: 按电影ID聚合评分并计算平均评分.KTable<Windowed<String>, Double> averageRatings = ratings.groupBy((key, value) -> value.getMovieId()).windowedBy(TimeWindows.of(Duration.ofMinutes(10))).aggregate(() -> new RatingAggregate(0.0, 0L),(key, value, aggregate) -> new RatingAggregate(aggregate.getSum() + value.getRating(), aggregate.getCount() + 1),Materialized.with(Serdes.String(), new RatingAggregateSerde())).mapValues((aggregate) -> aggregate.getSum() / aggregate.getCount()).toStream().groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(10))).reduce((value1, value2) -> Math.max(value1, value2),Materialized.with(Serdes.String(), Serdes.Double())).toStream().map((key, value) -> new KeyValue<>(key.key(), value));// 步骤3: 将结果写入一个新的kafka主题.averageRatings.to("average-ratings");final KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();
}// 用于聚合评分的辅助类
public static class RatingAggregate {private double sum;private long count;public RatingAggregate(double sum, long count) {this.sum = sum;this.count = count;}public double getSum() {return sum;}public long getCount() {return count;}
}// 序列化与反序列化.
public static class RatingAggregateSerde extends Serdes.WrapperSerde<RatingAggregate> {public RatingAggregateSerde() {super(new JsonSerializer<>(), new JsonDeserializer<>(RatingAggregate.class));}
}

在上面的代码中,我们使用Serdes.String()和SpecificAvroSerde来序列化和反序列化字符串和Avro-encoded对象。我们使用TimeWindows.of(Duration.ofMinutes(10))定义大小为10分钟的窗口。我们使用RatingAggregate类来辅助计算每个电影的平均评分,RatingAggregateSerde类来序列化和反序列化RatingAggregate对象

2. 实时预测

Kafka Stream可以用于实时预测任务,例如在一些应用中,需要根据实时到达的数据来进行预测。Kafka Stream可以使用已有的模型,对实时数据进行预测,从而实现实时的推荐或预测等功能。

还是拿电影举例,我们经常可以看到电影票房的预测,我们可以以此写一个Demo

public class MovieProcessor {private static final String INPUT_TOPIC = "box-office-input";private static final String OUTPUT_TOPIC = "box-office-output";public static void main(String[] args) {// 创建 Kafka Streams 配置Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "box-office-predictor");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// 创建 Kafka StreamsStreamsBuilder builder = new StreamsBuilder();KStream<String, String> inputStream = builder.stream(INPUT_TOPIC);// 将上映日期转换为毫秒数,并计算出预测票房KTable<Long, Double> boxOfficePrediction = inputStream.mapValues(new ValueMapper<String, Double>() {@Overridepublic Double apply(String value) {String[] fields = value.split(",");long releaseDateMillis = LocalDate.parse(fields[1]).toEpochDay() * 24 * 60 * 60 * 1000;int runtime = Integer.parseInt(fields[2]);double boxOffice = Double.parseDouble(fields[3]);double prediction = boxOffice / (runtime * 60 * 1000.0) * (releaseDateMillis - System.currentTimeMillis());return prediction > 0 ? prediction : 0;}}).groupBy(new KeyValueMapper<String, Double, Long>() {@Overridepublic Long apply(String key, Double value) {return 1L;}}).reduce(new Reducer<Double>() {@Overridepublic Double apply(Double value1, Double value2) {return value1 + value2;}}).mapValues(new ValueMapper<Double, Double>() {@Overridepublic Double apply(Double value) {return value / (24 * 60 * 60 * 1000.0);}});// 将预测结果发送到 Kafka Topic 中boxOfficePrediction.toStream().to("prediction");// 启动 Kafka StreamsKafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();}
}

四、总结

今天我们学了一些关于Kafka Stream的内容太,知道了它是一种流处理引擎,可以消费Kafka中的数据,进行处理后,还能其转换为输出流。它特点在于不需要额外征用集群资源、易于使用、支持丰富的转换操作。使用场景包括实时数据分析、实时预测等。但其实Kafka Stream的内容还是很多的,我们将在后面的学习中继续讲解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/154696.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot中配置文件生效位置

1. 配置文件位置 首先小伙伴们要明白&#xff0c;Spring Boot 默认加载的配置文件是 application.properties 或者 application.yaml&#xff0c;properties优先级高于yaml。默认的加载位置一共有五个&#xff0c;五个位置可以分为两类&#xff1a; 从 classpath 下加载&…

「Qt Widget中文示例指南」如何创建一个计算器?(一)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 本文将展示如何使用…

再谈谈注解

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 注解&#xff0c;和反射…

成都瀚网科技有限公司抖音带货靠谱么

近年来&#xff0c;随着社交媒体的兴起&#xff0c;越来越多的企业开始利用抖音等短视频平台进行产品推广和销售。成都瀚网科技有限公司也紧跟潮流&#xff0c;通过抖音平台进行带货。那么&#xff0c;成都瀚网科技有限公司的抖音带货靠谱么&#xff1f;本文将从以下几个方面进…

网站监控的重要性及实施策略

随着互联网的快速发展&#xff0c;网站已经成为企业和个人不可或缺的在线服务平台。然而&#xff0c;网站的安全性和稳定性一直是企业及个人非常关注的问题。一旦网站出现故障或者被攻击&#xff0c;将会给企业和个人带来严重的损失。因此&#xff0c;实施有效的网站监控策略对…

为什么 Django 后台管理系统那么“丑”?

哈喽大家好&#xff0c;我是咸鱼 相信使用过 Django 的小伙伴都知道 Django 有一个默认的后台管理系统——Django Admin 它的 UI 很多年都没有发生过变化&#xff0c;现在看来显得有些“过时且简陋” 那为什么 Django 的维护者却不去优化一下呢&#xff1f;原文作者去询问了多…

RT-DETR手把手教程,注意力机制如何添加在网络的不同位置进行创新优化

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文独家改进&#xff1a;本文首先复现了将EMA引入到RT-DETR中&#xff0c;并跟不同模块进行结合创新&#xff1b;1&#xff09;Rep C3结合&#xff1b;2&#xff09;直接作为注意力机制放在网络不同位置&#xff1b;3&#xff09;高效…

女儿冬天的第一件羽绒服,这也太好看了

分享女儿的时尚穿搭 撞色插肩款羽绒服 同色系的精彩碰撞 描绘出绚烂的色彩 走在街上就是最靓的崽 显肤色显瘦超吸睛 妥投时尚小潮人一枚

如何将图片转为excel或word?(客户端)

演示软件&#xff1a;金鸣表格文字识别大师3.6.1&#xff08;新版本界面可能会略有不同&#xff09; 第一部分 将图片转为excel或文表混合的word 一般的软件要将图片转为可编辑的excel&#xff0c;都需要待识别的图片要有明显清晰的表格线&#xff0c;但我们程序现已克服了这…

【数据结构】图的存储结构(邻接矩阵)

一.邻接矩阵 1.图的特点 任何两个顶点之间都可能存在边&#xff0c;无法通过存储位置表示这种任意的逻辑关系。 图无法采用顺序存储结构。 2.如何存储图&#xff1f; 将顶点与边分开存储。 3.邻接矩阵&#xff08;数组表示法&#xff09; 基本思想&#xff1a; 用一个一维数…

jenkins-2.426.1-1.1.noarch.rpm 的公钥没有安装

执行命令 yum install jenkins 报错 jenkins-2.426.1-1.1.noarch.rpm 的公钥没有安装 下载的软件包保存在缓存中&#xff0c;直到下次成功执行事务。 您可以通过执行 yum clean packages 删除软件包缓存。 错误&#xff1a;GPG 检查失败 解决办法&#xff1a; 1、安装新的公…

『C++成长记』类和对象

&#x1f525;博客主页&#xff1a;小王又困了 &#x1f4da;系列专栏&#xff1a;C &#x1f31f;人之为学&#xff0c;不日近则日退 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、类的引入 二、类的定义 三、类的访问限定符 四、类的作用域 五、类的实例化…

vue-waterfall2 实现瀑布流,及总结的问题

注意&#xff1a;引入需要在主界面引入&#xff0c;直接在组件中引用会有问题 1.安装 npm install vue-waterfall21.8.20 --save &#xff08;提示&#xff1a;一定要安装1.8.20&#xff0c;最新版会有一部分问题&#xff09; 2.打开main.js文件 import waterfall from v…

微服务实战系列之Gateway

前言 人类世界自工业革命以来&#xff0c;无论从金融、货币、制度&#xff0c;还是科技、资源、社会各个方面&#xff0c;都发生了翻天覆地的变化。物质极大丰富&#xff0c;从而也推动了科技的极速发展。当计算机问世也仅仅不到80年&#xff0c;而如今我们的生活中处处有它的影…

云原生Docker系列 | Docker私有镜像仓库公有镜像仓库使用

云原生Docker系列 | Docker私有镜像仓库&公有镜像仓库使用 1. 使用公有云镜像仓库1.1. 阿里云镜像仓库1.2. 华为云镜像仓库1.3. 腾讯云镜像仓库2. 使用Docker Hub镜像仓库3. 使用Harbor构建私有镜像仓库4. 搭建本地Registry镜像仓库1. 使用公有云镜像仓库 1.1. 阿里云镜像…

GNSS位移监测站系统是什么

WX-WY4G 一、GNSS位移监测站系统的工作原理GNSS位移监测站系统是一种基于导航卫星系统&#xff08;GNSS&#xff09;的高精度位移监测技术。它通过接收和处理来自卫星的信号&#xff0c;对地表物体的位置进行精度的实时监测。这个系统具有可靠性的特点&#xff0c;被广泛应用于…

ubuntu20.04.1网络图标突然消失,无法上网

故障&#xff1a;打开虚拟机进入Ubuntu系统后&#xff0c;打开火狐浏览器&#xff0c;发现无法连接网络。 解决办法&#xff1a;因为刚接触Linux系统&#xff0c;就在网上找各种资料&#xff0c;试了各种办法无果&#xff0c;最后发现有可能网络配置文件被更改。 打开控制台输…

JavaScript编程基础 – 函数进阶

JavaScript编程基础 – 函数进阶 JavaScript Programming Essentials – Perfect Functions “函数的第一条原则是要小&#xff0c;函数的第二条原则是要更小。“ – 罗伯特.C.马丁 前文讲述过函数多取决于数学的函数概念&#xff0c;以此来定义JavaScript编程语言的函数&…

C++类与对象(3)—拷贝构造函数运算符重载

目录 一、拷贝构造函数 1、定义 2、特征 3、内置与自定义类型 4、const修饰参数 5、默认生成 浅拷贝 深拷贝 6、总结 二、运算符重载 1、定义 2、判断是否相等 3、比较大小 4、赋值 5、总结 一、拷贝构造函数 1、定义 拷贝构造函数&#xff1a;只有单个形参…

利用 React 和 Bootstrap 进行强大的前端开发

文章目录 介绍React 和 Bootstrap设置环境使用 Bootstrap 创建 React 组件React-Bootstrap 组件结论 介绍 创建响应式、交互式和外观引人入胜的 Web 界面是现代前端开发人员的基本技能。幸运的是&#xff0c;借助 React 和 Bootstrap 等工具的出现&#xff0c;制作这些 UI 变得…