实例演示kafka stream消息流式处理流程及原理

以下结合案例:统计消息中单词出现次数,来测试并说明kafka消息流式处理的执行流程

Maven依赖

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>

准备工作

首先编写创建三个类,分别作为消息生产者、消息消费者、流式处理者
KafkaStreamProducer:消息生产者

public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}

该消息生产者向主题kafka-stream-topic-input发送五次hello kafka
KafkaStreamConsumer:消息消费者

public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手动提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 异步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}

KafkaStreamQuickStart:流式处理类

public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并处理流数据。* 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。* 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。** @param streamsBuilder 用于构建KStream对象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 从"kafka-stream-topic-input"主题中读取数据流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值进行分组,为后续的窗口化计数操作做准备.groupBy((key, value) -> value)// 定义10秒的时间窗口,在每个窗口内对每个分组进行计数.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 将计数结果转换为流,以便进行进一步的处理和转换.toStream()// 显示键值对的内容,并将键和值转换为字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 将处理后的流数据发送到"kafka-stream-topic-output"主题.to("kafka-stream-topic-output");}}

该处理类首先从主题kafka-stream-topic-input中获取消息数据,经处理后发送到主题kafka-stream-topic-output中,再由消息消费者KafkaStreamConsumer进行消费

执行结果

请添加图片描述
请添加图片描述

流式处理流程及原理说明

初始阶段

当从输入主题kafka-stream-topic-input读取数据流时,每个消息都是一个键值对。假设输入消息的键是null或一个特定的字符串,这取决于消息是如何被发送到输入主题的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但这个操作不会改变消息的键。如果输入消息的键是null,那么在这个阶段消息的键仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})

按消息的值进行分组

在 Kafka Streams 中,当使用groupBy方法对流进行分组时,实际上是在指定一个新的键,这个键将用于后续的窗口化操作和聚合操作。在这个案例中groupBy方法被用来按消息的值进行分组:

.groupBy((key, value) -> value)

这意味着在分组操作之后,流中的每个消息的键被设置为消息的值。因此,当你在后续的map方法中看到key参数时,这个key实际上是消息的原始值,因为在groupBy之后,消息的值已经变成了键。

定义时间窗口并计数

在这个阶段,消息被窗口化并计数,但是键保持不变。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

将计数结果转换为流

当将计数结果转换为流时,键仍然是之前分组时的键

.toStream()

处理和转换结果

map方法中,你看到的key参数实际上是分组后的键,也就是消息的原始值:

.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是为了获取键的字符串表示,而value.toString()是为了将计数值转换为字符串。

将处理后的数据发送到输出主题

.to("kafka-stream-topic-output");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/867518.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在Java应用中实现全文搜索功能

如何在Java应用中实现全文搜索功能 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在现代应用程序开发中&#xff0c;全文搜索功能变得越来越重要。它能够帮助…

Vue的介绍

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

使用引用返回类对象本身

#include<iostream> #include<ctime> using namespace std; class Person { public://Person()//{// cout << "构造函数调用"<<endl;//}Person(int age){this->age age;cout << "有参构造函数调用" << endl;}Pers…

如何编写高质量的测试报告

如何编写高质量的测试报告 简介测试报告的重要性测试报告的基本结构编写测试报告的步骤测试报告的关键要素测试报告的示例封面目录摘要引言测试方法测试环境测试结果缺陷统计和分析风险评估结论和建议附件 测试报告的审查和批准测试报告的维护和更新结语 简介 测试报告是软件开…

12-linux重定向与管道符

在线电子书:Linux 命令行大全.pdf (gitee.com) 重定向 Linux重定向是指修改原来默认的一些东西,对原来系统命令的默认执行方式进行改变,比如说简单的我不想看到在显示器的输出而是希望输出到某一文件中就可以通过Linux重定向来进行这项工作。 • cat -连接文件 • sort…

vue3【实战】来回拖拽放置图片

效果预览 技术要点 img 标签默认就是可拖拽的&#xff08;a 标签也是&#xff09;事件 e 内的 dataTransfer 对象可用于临时存储事件过程中的数据拖拽事件的默认行为是用浏览器新开页签打开被拖拽对象&#xff0c;所以通常需要禁用默认的浏览器行为被拖拽元素必须设置 id&#…

A61 STM32_HAL库函数 之 TIM扩展驱动 -- C -- 所有函数的介绍及使用

A61 STM32_HAL库函数 之 TIM扩展驱动 -- C -- 所有函数的介绍及使用 1 该驱动函数预览1.24 HAL_TIMEx_OnePulseN_Stop1.25 HAL_TIMEx_OnePulseN_Start_IT1.26 HAL_TIMEx_OnePulseN_Stop_IT1.27 HAL_TIMEx_ConfigCommutationEvent1.28 HAL_TIMEx_ConfigCommutationEvent_IT1.29 …

【pyqt-实训训练】串口助手

串口助手 前言一、ui设计二、ui的控件命名三、ui转py使用类的方法【扩展】使用ui文件导入&#xff01;P7的小错误解决办法 总结 前言 我的惯例就是万物之始&#xff0c;拜见吾师&#x1f970;⇨pyqt串口合集 最开始的时候我想的是&#xff0c;学了那么久的pyqt&#xff0c;我…

大数据处理系统架构特征

Storm之父Nathan Marz在《大数据系统构建&#xff1a;可扩展实时数据系统构建原理与最佳实践》一书中&#xff0c;提出了他认为大数据系统应该具有的属性。 1.鲁棒性和容错性&#xff08;Robust and Fault-tolerant&#xff09; 对大规模分布式系统来说&#xff0c;机器是不可…

ASCII码对照表(Matplotlib颜色对照表)

文章目录 1、简介1.1 颜色代码 2、Matplotlib库简介2.1 简介2.2 安装2.3 后端2.4 入门例子 3、Matplotlib库颜色3.1 概述3.2 颜色图的分类3.3 颜色格式表示3.4 内置颜色映射3.5 xkcd 颜色映射3.6 颜色命名表 4、Colorcet库5、颜色对照表结语 1、简介 1.1 颜色代码 颜色代码是…

ASPICE评估是汽车软件质量的可靠保障

为了确保汽车软件的质量、可靠性和安全性&#xff0c;汽车行业普遍采用了一种名为ASPICE&#xff08;Automotive SPICE&#xff09;的评估标准。本文将深入探讨ASPICE评估的定义、流程及其在汽车软件开发中的重要性。 一、ASPICE评估的定义 ASPICE&#xff0c;全称Automotive …

线程同步66666

1. 概述 当有多个线程访问同一个共享资源&#xff08;临界资源&#xff09;时&#xff0c;且不允许同时访问&#xff0c;那么就需要线程同步。常见的线程同步方式&#xff1a;互斥锁、读写锁、条件变量、信号量。 2. 互斥锁 互斥锁的方式可以简单概括为&#xff1a;锁定操作…

大语言模型应用--AI工程化落地

近几年AI的飞速发展&#xff0c;着实带来了很大的冲击&#xff0c;但是其实现在AI并没有完全的跨界&#xff0c;仍然只是在小圈子内“自嗨”。不过相对于之前已经有了很大的不同了 本文就针对当前的大模型现状&#xff0c;来说一下工程化落地的相关事情&#xff0c;也是随感而发…

【MYSQL】InnoDB引擎为什么选可重复读作为默认隔离级别

InnoDB引擎为什么选可重复读作为默认隔离级别 一般的DBMS系统&#xff0c;默认都会使用读提交&#xff08;Read-Comitted&#xff0c;RC&#xff09;作为默认隔离级别&#xff0c;如Oracle、SQL Server等&#xff0c;而MySQL却使用可重复读&#xff08;Read-Repeatable&#x…

alphazero学习

AlphaGoZero是AlphaGo算法的升级版本。不需要像训练AlphaGo那样&#xff0c;不需要用人类棋局这些先验知识训练&#xff0c;用MCTS自我博弈产生实时动态产生训练样本。用MCTS来创建训练集&#xff0c;然后训练nnet建模的策略网络和价值网络。就是用MCTSPlayer产生的数据来训练和…

JVM的基础,class文件的理解(2)

本文是“深入学习JVM”系列的第二篇文章&#xff0c;主要介绍class文件的数据结构。 我是蚊子码农&#xff0c;欢迎各位的点赞、关注和收藏&#xff0c;有了你们的激励&#xff0c;我会带来更好的作品。 一、前言 class文件&#xff0c;通常由Java编译器编译得到&#xff0c;…

【JVM 的内存模型】

1. JVM内存模型 下图为JVM内存结构模型&#xff1a; 两种执行方式&#xff1a; 解释执行&#xff1a;JVM是由C语言编写的&#xff0c;其中有C解释器&#xff0c;负责先将Java语言解释翻译为C语言。缺点是经过一次JVM翻译&#xff0c;速度慢一点。JIT执行&#xff1a;JIT编译器…

ubuntu设置开启自动挂载sftp

1. 前言 与其说 ubuntu 开启自动挂载 sftp, 更确切的说应该是 nautilus (ubuntu上默认的文件管理器) 开机自动挂载 sftp。 因为 这里即使选择永远记住&#xff0c;开机也不会自动挂载 sftp 2.设置方法 gnome-session-properties #开机只启动设置命令设置 gio mount sftp…

经典双运算放大器LM358

前言 LM358双运放有几十年的历史了吧&#xff1f;通用运放&#xff0c;很常用&#xff0c;搞电路的避免不了接触运放&#xff0c;怎么选择运放&#xff0c;是工程师关心的问题吧&#xff1f; 从本文开始&#xff0c;将陆续发一些常用的运放&#xff0c;大家选型可以参考&#…

浪潮信息携手算力企业为华东产业集群布局提供高质量算力支撑

随着信息技术的飞速发展&#xff0c;算力已成为推动数字经济发展的核心力量。近日&#xff0c;浪潮信息与五家领先的算力运营公司在南京正式签署战略合作协议&#xff0c;共同加速华东地区智算基础设施布局&#xff0c;为区域经济发展注入新动力。 进击的算力 江苏持续加码智算…