浅析Kafka Streams消息流式处理流程及原理

以下结合案例:统计消息中单词出现次数,来测试并说明kafka消息流式处理的执行流程

Maven依赖

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>

准备工作

首先编写创建三个类,分别作为消息生产者、消息消费者、流式处理者
KafkaStreamProducer:消息生产者

public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}

该消息生产者向主题kafka-stream-topic-input发送五次hello kafka
KafkaStreamConsumer:消息消费者

public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手动提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 异步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}

KafkaStreamQuickStart:流式处理类

public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并处理流数据。* 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。* 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。** @param streamsBuilder 用于构建KStream对象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 从"kafka-stream-topic-input"主题中读取数据流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值进行分组,为后续的窗口化计数操作做准备.groupBy((key, value) -> value)// 定义10秒的时间窗口,在每个窗口内对每个分组进行计数.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 将计数结果转换为流,以便进行进一步的处理和转换.toStream()// 显示键值对的内容,并将键和值转换为字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 将处理后的流数据发送到"kafka-stream-topic-output"主题.to("kafka-stream-topic-output");}}

该处理类首先从主题kafka-stream-topic-input中获取消息数据,经处理后发送到主题kafka-stream-topic-output中,再由消息消费者KafkaStreamConsumer进行消费

执行结果

在这里插入图片描述
在这里插入图片描述

流式处理流程及原理说明

初始阶段

当从输入主题kafka-stream-topic-input读取数据流时,每个消息都是一个键值对。假设输入消息的键是null或一个特定的字符串,这取决于消息是如何被发送到输入主题的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但这个操作不会改变消息的键。如果输入消息的键是null,那么在这个阶段消息的键仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})

按消息的值进行分组

在 Kafka Streams 中,当使用groupBy方法对流进行分组时,实际上是在指定一个新的键,这个键将用于后续的窗口化操作和聚合操作。在这个案例中groupBy方法被用来按消息的值进行分组:

.groupBy((key, value) -> value)

这意味着在分组操作之后,流中的每个消息的键被设置为消息的值。因此,当你在后续的map方法中看到key参数时,这个key实际上是消息的原始值,因为在groupBy之后,消息的值已经变成了键。

定义时间窗口并计数

在这个阶段,消息被窗口化并计数,但是键保持不变。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

将计数结果转换为流

当将计数结果转换为流时,键仍然是之前分组时的键

.toStream()

处理和转换结果

map方法中,你看到的key参数实际上是分组后的键,也就是消息的原始值:

.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是为了获取键的字符串表示,而value.toString()是为了将计数值转换为字符串。

将处理后的数据发送到输出主题

.to("kafka-stream-topic-output");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/871125.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【三维AIGC】扩散模型LDM辅助3D Gaussian重建三维场景

标题&#xff1a;《Sampling 3D Gaussian Scenes in Seconds with Latent Diffusion Models》 来源&#xff1a;Glasgow大学&#xff1b;爱丁堡大学 连接&#xff1a;https://arxiv.org/abs/2406.13099 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何…

Spring Security学习笔记(一)Spring Security架构原理

前言&#xff1a;本系列博客基于Spring Boot 2.6.x依赖的Spring Security5.6.x版本 Spring Security中文文档&#xff1a;https://springdoc.cn/spring-security/index.html 一、什么是Spring Security Spring Security是一个安全控制相关的java框架&#xff0c;它提供了一套全…

海外ASO:iOS与谷歌优化的相同点和区别

海外ASO是针对iOS的App Store和谷歌的Google Play这两个主要海外应用商店进行的优化过程&#xff0c;两个不同的平台需要采取不同的优化策略&#xff0c;以下是对iOS优化和谷歌优化的详细解析&#xff1a; 一、iOS优化&#xff08;App Store&#xff09; 1、关键词覆盖 选择关…

用node.js写一个简单的图书管理界面——功能:添加,删除,修改数据

涉及到的模块&#xff1a; var fs require(‘fs’)——内置模块 var ejs require(‘ejs’)——第三方模块 var mysql require(‘mysql’)——第三方模块 var express require(‘express’)——第三方模块 var bodyParser require(‘body-parser’)——第三方中间件 需要…

打造你的智能家居指挥中心:基于STM32的多协议(zigbee、http)网关(附代码示例)

1. 项目概述 随着物联网技术的蓬勃发展&#xff0c;智能家居正逐步融入人们的日常生活。然而&#xff0c;市面上琳琅满目的智能家居设备通常采用不同的通信协议&#xff0c;导致不同品牌设备之间难以实现互联互通。为了解决这一难题&#xff0c;本文设计了一种基于STM32的多协…

ant design form动态增减表单项Form.List如何进行动态校验规则

项目需求&#xff1a; 在使用ant design form动态增减表单项Form.List时&#xff0c;Form.List中有多组表单项&#xff0c;一组中的最后一个表单项的校验规则是动态的&#xff0c;该组为最后一组时&#xff0c;最后一个表单项是非必填项&#xff0c;其他时候为必填项。假设动态…

docker inspect 如何提取容器的ip和端口 网络信息?

目录 通过原生Linux命令过滤找到IP 通过jq工具找到IP 使用docker -f 的过滤&#xff08;模板&#xff09; 查找端口映射信息 查看容器内部细节 docker inspect 容器ID或容器名 通过原生Linux命令过滤找到IP 通过jq工具找到IP jq 是一个轻量级且灵活的命令行工具&#xf…

(视频演示)基于OpenCV的实时视频跟踪火焰识别软件V1.0源码及exe下载

本文介绍了基于OpenCV的实时视频跟踪火焰识别软件&#xff0c;该软件通过先进的图像处理技术实现对实时视频中火焰的检测与跟踪&#xff0c;同时支持导入图片进行火焰识别。主要功能包括相机选择、实时跟踪和图片模式。软件适用于多种场合&#xff0c;用于保障人民生命财产安全…

OpenGL笔记二之glad加载opengl函数以及opengl-API(函数)初体验

OpenGL笔记二之glad加载opengl函数以及opengl-API(函数)初体验 总结自bilibili赵新政老师的教程 code review! 文章目录 OpenGL笔记二之glad加载opengl函数以及opengl-API(函数)初体验1.运行2.重点3.目录结构4.main.cpp5.CMakeList.txt 1.运行 2.重点 3.目录结构 01_GLFW_WI…

Python-PLAXIS自动化建模技术与典型岩土工程

有限单元法在岩土工程问题中应用非常广泛&#xff0c;很多软件都采用有限单元解法。在使用各大软件进行数值模拟建模的过程中&#xff0c;您是否发现GUI界面中重复性的点击输入工作太繁琐&#xff1f;从而拖慢了设计或方案必选进程&#xff1f; 搭建自己的Plaxis模型&#xff…

设计模式的七大原则

1.单一职责原则 单一职责原则(Single responsibility principle)&#xff0c;即一个类应该只负责一项职责。如类A负责两个不同职责&#xff1a;职责1&#xff0c;职责2。当职责1需求变更而改变A时&#xff0c;可能造成职责2执行错误&#xff0c;所以需要将类A的粒度分解为A1、…

安卓14中Zygote初始化流程及源码分析

文章目录 日志抓取结合日志与源码分析systemServer zygote创建时序图一般应用 zygote 创建时序图向 zygote socket 发送数据时序图 本文首发地址 https://h89.cn/archives/298.html 最新更新地址 https://gitee.com/chenjim/chenjimblog 本文主要结合日志和代码看安卓 14 中 Zy…

C/C++ 进阶(7)模拟实现map/set

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;C 一、简介 map和set都是关联性容器&#xff0c;底层都是用红黑树写的。 特点&#xff1a;存的Key值都是唯一的&#xff0c;不重复。 map存的是键值对&#xff08;Key—Value&#xff09;。 set存的是键…

Git的命令使用与IDEA内置git图形化的使用

Git 简介 Git 是分布式版本控制系统&#xff0c;它可以帮助开发人员跟踪和管理代码的更改。Git 可以记录代码的历史记录&#xff0c;并允许您在不同版本之间切换。 通过历史记录可以查看&#xff1a; 进行了哪些更改&#xff1f;谁进行了更改&#xff1f;何时进行了更改&#…

网络安全高级工具软件100套

1、 Nessus&#xff1a;最好的UNIX漏洞扫描工具 Nessus 是最好的免费网络漏洞扫描器&#xff0c;它可以运行于几乎所有的UNIX平台之上。它不止永久升级&#xff0c;还免费提供多达11000种插件&#xff08;但需要注册并接受EULA-acceptance–终端用户授权协议&#xff09;。 它…

在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——聚合与搜索(三)

#在生产环境中部署Elasticsearch&#xff1a;最佳实践和故障排除技巧——聚合与搜索&#xff08;三&#xff09; 前言 文章目录 前言- 聚合和分析- 执行聚合操作- 1. 使用Java API执行聚合操作- 2. 使用CURL命令执行聚合操作- 1. 使用Java API执行度量操作- 2. 使用CURL命令执…

谷粒商城实战笔记-34-前端基础-ES6-promise异步编排

文章目录 一&#xff0c;回调地狱&#xff08;Callback Hell&#xff09;二&#xff0c;实战Promise1&#xff0c;场景说明2&#xff0c;回调地狱-传统实现3&#xff0c;使用Promise重构3.1 用Promise实现上述需求3.2 进一步重构 在ES6中&#xff0c;Promise是一个用于异步编程…

网络安全-内网安全加固方案

内网接入限制(MAC地址白名单) 只允许信任设备接入内网&#xff0c;且每次自动获取的IP地址不变(后续就可根据IP地址控制访问权限) 开启DHCP服务根据MAC地址静态分配固定IP地址 只允许可信的DHCP服务器分配IP地址(防止私建DHCP服务器) DHCP Snooping 信…

在Vue中,子组件向父组件传递数据

在Vue中&#xff0c;子组件向父组件传递数据通常通过两种方式实现&#xff1a;事件和回调函数。这两种方式允许子组件与其父组件进行通信&#xff0c;传递数据或触发特定的行为。 1. 通过事件传递数据 子组件可以通过触发自定义事件&#xff0c;并将数据作为事件的参数来向父组…

电脑案件冲突问题

一.故障展示 有一天我打开了电脑,发现3这个数字按键一直在输入,拔了外界的键盘,他这个按键还是会冲突 ,就如同上面的图一样 ,可能是电脑内部的键位进了灰卡住了什么东西导致的,于是我果断就电脑上的按键给扣下来了,扣的时候不知道里面的结构非常的谨慎,所以没导致里面的结构被损…