实例演示kafka stream消息流式处理流程及原理

以下结合案例:统计消息中单词出现次数,来测试并说明kafka消息流式处理的执行流程

Maven依赖

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>

准备工作

首先编写创建三个类,分别作为消息生产者、消息消费者、流式处理者
KafkaStreamProducer:消息生产者

public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}

该消息生产者向主题kafka-stream-topic-input发送五次hello kafka
KafkaStreamConsumer:消息消费者

public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手动提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 异步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}

KafkaStreamQuickStart:流式处理类

public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并处理流数据。* 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。* 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。** @param streamsBuilder 用于构建KStream对象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 从"kafka-stream-topic-input"主题中读取数据流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值进行分组,为后续的窗口化计数操作做准备.groupBy((key, value) -> value)// 定义10秒的时间窗口,在每个窗口内对每个分组进行计数.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 将计数结果转换为流,以便进行进一步的处理和转换.toStream()// 显示键值对的内容,并将键和值转换为字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 将处理后的流数据发送到"kafka-stream-topic-output"主题.to("kafka-stream-topic-output");}}

该处理类首先从主题kafka-stream-topic-input中获取消息数据,经处理后发送到主题kafka-stream-topic-output中,再由消息消费者KafkaStreamConsumer进行消费

执行结果

请添加图片描述
请添加图片描述

流式处理流程及原理说明

初始阶段

当从输入主题kafka-stream-topic-input读取数据流时,每个消息都是一个键值对。假设输入消息的键是null或一个特定的字符串,这取决于消息是如何被发送到输入主题的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但这个操作不会改变消息的键。如果输入消息的键是null,那么在这个阶段消息的键仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})

按消息的值进行分组

在 Kafka Streams 中,当使用groupBy方法对流进行分组时,实际上是在指定一个新的键,这个键将用于后续的窗口化操作和聚合操作。在这个案例中groupBy方法被用来按消息的值进行分组:

.groupBy((key, value) -> value)

这意味着在分组操作之后,流中的每个消息的键被设置为消息的值。因此,当你在后续的map方法中看到key参数时,这个key实际上是消息的原始值,因为在groupBy之后,消息的值已经变成了键。

定义时间窗口并计数

在这个阶段,消息被窗口化并计数,但是键保持不变。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

将计数结果转换为流

当将计数结果转换为流时,键仍然是之前分组时的键

.toStream()

处理和转换结果

map方法中,你看到的key参数实际上是分组后的键,也就是消息的原始值:

.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是为了获取键的字符串表示,而value.toString()是为了将计数值转换为字符串。

将处理后的数据发送到输出主题

.to("kafka-stream-topic-output");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/867518.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3【实战】来回拖拽放置图片

效果预览 技术要点 img 标签默认就是可拖拽的&#xff08;a 标签也是&#xff09;事件 e 内的 dataTransfer 对象可用于临时存储事件过程中的数据拖拽事件的默认行为是用浏览器新开页签打开被拖拽对象&#xff0c;所以通常需要禁用默认的浏览器行为被拖拽元素必须设置 id&#…

【pyqt-实训训练】串口助手

串口助手 前言一、ui设计二、ui的控件命名三、ui转py使用类的方法【扩展】使用ui文件导入&#xff01;P7的小错误解决办法 总结 前言 我的惯例就是万物之始&#xff0c;拜见吾师&#x1f970;⇨pyqt串口合集 最开始的时候我想的是&#xff0c;学了那么久的pyqt&#xff0c;我…

ASCII码对照表(Matplotlib颜色对照表)

文章目录 1、简介1.1 颜色代码 2、Matplotlib库简介2.1 简介2.2 安装2.3 后端2.4 入门例子 3、Matplotlib库颜色3.1 概述3.2 颜色图的分类3.3 颜色格式表示3.4 内置颜色映射3.5 xkcd 颜色映射3.6 颜色命名表 4、Colorcet库5、颜色对照表结语 1、简介 1.1 颜色代码 颜色代码是…

线程同步66666

1. 概述 当有多个线程访问同一个共享资源&#xff08;临界资源&#xff09;时&#xff0c;且不允许同时访问&#xff0c;那么就需要线程同步。常见的线程同步方式&#xff1a;互斥锁、读写锁、条件变量、信号量。 2. 互斥锁 互斥锁的方式可以简单概括为&#xff1a;锁定操作…

【MYSQL】InnoDB引擎为什么选可重复读作为默认隔离级别

InnoDB引擎为什么选可重复读作为默认隔离级别 一般的DBMS系统&#xff0c;默认都会使用读提交&#xff08;Read-Comitted&#xff0c;RC&#xff09;作为默认隔离级别&#xff0c;如Oracle、SQL Server等&#xff0c;而MySQL却使用可重复读&#xff08;Read-Repeatable&#x…

alphazero学习

AlphaGoZero是AlphaGo算法的升级版本。不需要像训练AlphaGo那样&#xff0c;不需要用人类棋局这些先验知识训练&#xff0c;用MCTS自我博弈产生实时动态产生训练样本。用MCTS来创建训练集&#xff0c;然后训练nnet建模的策略网络和价值网络。就是用MCTSPlayer产生的数据来训练和…

【JVM 的内存模型】

1. JVM内存模型 下图为JVM内存结构模型&#xff1a; 两种执行方式&#xff1a; 解释执行&#xff1a;JVM是由C语言编写的&#xff0c;其中有C解释器&#xff0c;负责先将Java语言解释翻译为C语言。缺点是经过一次JVM翻译&#xff0c;速度慢一点。JIT执行&#xff1a;JIT编译器…

ubuntu设置开启自动挂载sftp

1. 前言 与其说 ubuntu 开启自动挂载 sftp, 更确切的说应该是 nautilus (ubuntu上默认的文件管理器) 开机自动挂载 sftp。 因为 这里即使选择永远记住&#xff0c;开机也不会自动挂载 sftp 2.设置方法 gnome-session-properties #开机只启动设置命令设置 gio mount sftp…

经典双运算放大器LM358

前言 LM358双运放有几十年的历史了吧&#xff1f;通用运放&#xff0c;很常用&#xff0c;搞电路的避免不了接触运放&#xff0c;怎么选择运放&#xff0c;是工程师关心的问题吧&#xff1f; 从本文开始&#xff0c;将陆续发一些常用的运放&#xff0c;大家选型可以参考&#…

浪潮信息携手算力企业为华东产业集群布局提供高质量算力支撑

随着信息技术的飞速发展&#xff0c;算力已成为推动数字经济发展的核心力量。近日&#xff0c;浪潮信息与五家领先的算力运营公司在南京正式签署战略合作协议&#xff0c;共同加速华东地区智算基础设施布局&#xff0c;为区域经济发展注入新动力。 进击的算力 江苏持续加码智算…

springboot三层架构详细讲解

目录 springBoot三层架构0.简介1.各层架构1.1 Controller层1.2 Service层1.3 ServiceImpl1.4 Mapper1.5 Entity1.6 Mapper.xml 2.各层之间的联系2.1 Controller 与 Service2.2 Service 与 ServiceImpl2.3 Service 与 Mapper2.4 Mapper 与 Mapper.xml2.5 Service 与 Entity2.6 C…

Exploting an API endpoiint using documentation

HTTP request methods https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods 第一步:burp抓包刷新页面 httphistory中只能看到两个记录,可以看下Response,是HTML页面,说明这里有HTML页面 ,但是没有发现特定的API接口。 第二步:用户登录 转到用户登录的功能点处…

Nacos源码分析:心跳机制、健康检查、服务发现、AP集群

文章目录 心跳机制与服务健康检查NacosClient端NacosServer端NacosServer端健康检查 服务发现NacosClient端NacosServer端 AP集群从源码启动集群心跳设计原理各节点状态同步服务实例数据同步服务实例状态变动同步 心跳机制与服务健康检查 官方文档&#xff1a;发送某个实例的心…

基于GWO灰狼优化的多目标优化算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1灰狼优化算法原理 4.2 多目标优化问题(MOP)的帕累托最优解 4.3 基于GWO的多目标优化算法 5.完整程序 1.程序功能描述 基于GWO灰狼优化的多目标优化算法matlab仿真&#xff0c;目标函数…

Linux多进程和多线程(六)进程间通信-共享内存

多进程(六) 共享内存共享内存的创建 示例: 共享内存删除 共享内存映射 共享内存映射的创建解除共享内存映射示例:写入和读取共享内存中的数据 写入: ### 读取: 大致操作流程: 多进程(六) 共享内存 共享内存是将分配的物理空间直接映射到进程的⽤户虚拟地址空间中, 减少数据在…

Java | Leetcode Java题解之第217题存在重复元素

题目&#xff1a; 题解&#xff1a; class Solution {public boolean containsDuplicate(int[] nums) {Set<Integer> set new HashSet<Integer>();for (int x : nums) {if (!set.add(x)) {return true;}}return false;} }

C#开发的自定义提示和对话框窗体 - 开源研究系列文章

上次开发了《LUAgent服务器端工具》&#xff0c;然后就开发了自定义的提示和对话框窗体&#xff0c;因为这个是无边框窗体&#xff0c;所以不使用默认的MessageBox了&#xff0c;界面美观并且用户体验更好一些。然后就写了此文&#xff0c;让其他读者能够使用或者复用此类库的代…

非对称加密算法原理与应用2——RSA私钥加密文件

作者:私语茶馆 1.相关章节 (1)非对称加密算法原理与应用1——秘钥的生成-CSDN博客 第一章节讲述的是创建秘钥对,并将公钥和私钥导出为文件格式存储。 本章节继续讲如何利用私钥加密内容,包括从密钥库或文件中读取私钥,并用RSA算法加密文件和String。 2.私钥加密的概述…

git pull拉取显示Already up-to-date,但文件并没有更新

1、问题&#xff1a; 使用git pull拉取远程仓库代码&#xff0c;显示更新成功&#xff08;Already up-to-date&#xff09;&#xff0c;但是本地代码没有更新 这是因为本地有尚未提交的更改&#xff0c;和远程代码有冲突导致无法更新 2、解决方法&#xff1a; 可以使用git s…

axios的使用,处理请求和响应,axios拦截器

1、axios官网 https://www.axios-http.cn/docs/interceptors 2、安装 npm install axios 3、在onMouunted钩子函数中使用axios来发送请求&#xff0c;接受响应 4.出现的问题&#xff1a; &#xff08;1&#xff09; 但是如果发送请求请求时间过长&#xff0c;回出现请求待处…