kafka-Stream详解篇(附案例)

在这里插入图片描述

文章目录

    • Kafka Stream 概述
    • Kafka Stream 概念
    • Kafka Stream 数据结构
    • 入门案例一
      • 需求描述与分析
      • 配置KafkaStream
      • 定义处理流程
      • 声明Topic
      • 接收处理结果
      • 发送消息测试
    • 入门案例二
      • 需求描述与分析
      • 定义处理流程
      • 接收处理结果
      • 声明Topic

更多相关内容可查看

Kafka Stream 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

Kafka Stream 概念

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
  • 处理拓扑 : 数据的处理流程 , 每一步处理流程就是一个处理拓扑
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

消息生产者 ----> Kafka Topic(原始数据) ------> Source Processor ------> 处理拓扑(很多步处理) ------> Sink Processor -----> Kafka Topic (运算结果) -----> 消费者(接收运行结果)

Kafka Stream 数据结构

Kafka数据结构类似于map,如下图,key-value键值对

KStream

KStream数据流,即是一段顺序的,可以无限长,不断更新的数据集。KStream数据流中的每一条数据相当于一次插入

商品的行为分值运算(排行) :
{“type”:“like”,“count”:1}
{“type”:“like”,“count”:-1}
{“type”:“like”,“count”:1}
对上面的行为数据进行运算得到运算结果 :
{“type”:“like”,“count”:2}

KTable数据流 , 即是一段顺序的,可以无限长,不断更新的数据集。KTable数据流中的每一条数据相当于一次更新

公交车的运行数据
{“No”:“518”,“location”:“武湖新天地”}
{“No”:“518”,“location”:“潘森产业园”}
{“No”:“518”,“location”:“产业园”}
对上面的行为数据进行运算得到运算结果 :
{“No”:“518”,“location”:“产业园”}

入门案例一

需求描述与分析


计算每个单词出现的次数

@Test
void testSend5() {List<String> strs = new ArrayList<String>();strs.add("hello word");strs.add("hello kafka");strs.add("hello spring kafka");strs.add("kafka stream");strs.add("spring kafka");strs.stream().forEach(s -> {kafkaTemplate.send("kafka.stream.topic1", "10001", s);});
}

配置KafkaStream

添加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId>
</dependency>

开启KafkaStream功能

配置Kafka Stream

spring:application:name: kafka-consumerkafka:bootstrap-servers: 118.25.197.221:9092consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: ${spring.application.name}enable-auto-commit: false  # 关闭自动提交, 使用手动提交偏移量streams:application-id: ${spring.application.name}-application-idclient-id: ${spring.application.name}-client-idproperties:default:key:serde: org.apache.kafka.common.serialization.Serdes$StringSerdevalue:serde: org.apache.kafka.common.serialization.Serdes$StringSerde

定义处理流程

package com.heima.kafka.stream;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;/*** @Author Administrator* @Date 2023/6/30**/
@Configuration
public class KafkaStreamConfig {/*** 原始数据 ------* 10001  hello word* 10001  hello kafka* 10001  hello spring kafka* 10001  kafka stream* 10001  spring kafka** 对原始数据中的value字符串进行切割* 10001  [hello,word]* 10001  [hello,kafka]* 10001  [hello,spring,kafka]* 10001  [kafka,stream]* 10001  [spring,kafka]** 对value数组进行扁平化处理(将多维数组转化为一维数组)* 10001  hello* 10001  word* 10001  hello* 10001  kafka* 10001  hello* 10001  spring* 10001  kafka* 10001  stream* 10001  spring* 10001  kafka** 对数据格式进行转化, 使用value作为key* hello  hello* word   word* hello  hello* kafka  kafka* hello  hello* spring spring* kafka  kafka* kafka  kafka* stream stream* spring spring* kafka  kafka** 对key进行分组 *  hello  hello*  hello  hello*  hello  hello**  word   word**  kafka  kafka*  kafka  kafka*  kafka  kafka*  kafka  kafka**  spring  spring*  spring  spring**  stream  stream*	*计算组内单词数量 , 得到运算结果 -----* hello 3* word 1* kafka 4* spring 2* stream 1** @param builder* @return*/@Beanpublic KStream<String, String> kStream(StreamsBuilder builder) {//1. 定义数据来源KStream<String, String> kStream = builder.<String, String>stream("kafka.stream.topic1");//2. 定义数据处理流程kStream//2.1 对原始数据中的value字符串进行切割   mapValues : 对流中数据的value进行处理转化.mapValues(value -> value.split(" "))//2.2 对value数组进行扁平化处理(将多维数组转化为一维数组)   flatMapValues : 对流中数据的数组格式的value进行处理转化(多维转一维).flatMapValues(value -> Arrays.asList(value))//2.3 对数据格式进行转化, 使用value作为key   map : 对流中数据的key和value进行处理转化.map(((key, value) -> new KeyValue<>(value,value)))//2.4 对key进行分组  groupByKey : 根据key进行分组.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))//设置聚合时间窗口, 在指定时间窗口范围之内的数据会进行一次运算, 输出运算结果.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//2.5 求每一个组中的单词数量   count : 组内计算元素数量.count(Materialized.with(Serdes.String(),Serdes.Long()))//2.6 将运算结果发送到另一个topic中   toStream : 将其他类型的流转化为 kStream.toStream().map((key, value) -> new KeyValue<>(key.key(),value.toString()))//将运算结果发送到一个topic, 供消费者接收.to("kafka.stream.topic2");//3. 返回KStream对象return kStream;}
}

声明Topic

KafkaStream不会自动帮助我们创建Topic ,所以我们需要自己声明消息来源的topic和消息发送的topic

@Bean
public NewTopic streamTopic1() {return TopicBuilder.name("kafka.stream.topic1").build();
}@Bean
public NewTopic streamTopic2() {return TopicBuilder.name("kafka.stream.topic2").build();
}

接收处理结果

定义一个消费者 , 从to("kafka.stream.topic2")中接收计算完毕的消息

@Component
@Slf4j
public class KafkaStreamConsumerListener {@KafkaListener(topics = "kafka.stream.topic2", groupId = "steam")public void listenTopic1(ConsumerRecord<String, String> record) {String key = record.key();String value = record.value();log.info("单词:{} , 出现{}次", key, value);}
}

发送消息测试

@SpringBootTest
@Slf4j
public class KafkaStreamProducerTest {@Resourceprivate KafkaTemplate kafkaTemplate;@Testvoid testSend5() {List<String> strs = new ArrayList<String>();strs.add("hello word");strs.add("hello kafka");strs.add("hello spring kafka");strs.add("kafka stream");strs.add("spring kafka");strs.stream().forEach(s -> {kafkaTemplate.send("kafka.stream.topic1", "10001", s);});}
}

入门案例二

需求描述与分析

现在有一组文章行为数据 , 使用ArticleMessage对象封装

package com.heima.kafka.pojos;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author Administrator*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ArticleMessage {/*** 文章ID*/private Long articleId;/*** 修改文章的字段类型*/private UpdateArticleType type;/*** 修改数据的增量,可为正负*/private Integer add;public enum UpdateArticleType {COLLECTION, COMMENT, LIKES, VIEWS;}
}

模拟数据如下 :

@Test
void testSend6() {List<ArticleMessage> strs = new ArrayList<ArticleMessage>();ArticleMessage message1 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, 1);ArticleMessage message4 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, 1);ArticleMessage message7 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, 1);ArticleMessage message3 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.LIKES, -1);ArticleMessage message2 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.VIEWS, 1);ArticleMessage message6 = new ArticleMessage(1498973263815045122l, ArticleMessage.UpdateArticleType.COLLECTION, 1);ArticleMessage message5 = new ArticleMessage(1498973263815045122l, ArticleMessage.UpdateArticleType.COLLECTION, 1);ArticleMessage message8 = new ArticleMessage(1498973263815045122l, ArticleMessage.UpdateArticleType.COLLECTION, 1);ArticleMessage message9 = new ArticleMessage(1498972384605040641l, ArticleMessage.UpdateArticleType.COLLECTION, 1);strs.add(message1);strs.add(message2);strs.add(message3);strs.add(message4);strs.add(message5);strs.add(message6);strs.add(message7);strs.add(message8);strs.add(message9);strs.stream().forEach(s -> {kafkaTemplate.send("hot.article.score.topic" , JSON.toJSONString(s));});
}

需求如下 : 请计算出每个文章每种行为的次数 , 输出 :
文章ID : COLLECTION:10,COMMENT:20,LIKES:5,VIEWS:30

定义处理流程

/*** @param builder* @return*/
@Bean
public KStream<String, String> kStream(StreamsBuilder builder) {//获取KStream流对象KStream<String, String> kStream = builder.stream("hot.article.score.topic");//定义流处理拓扑kStream//JSON转化为Java对象.mapValues(value -> JSON.parseObject(value, ArticleMessage.class))//key和值处理  key: 文章ID  , value : 行为类型:数量.map((key, value) -> new KeyValue<>(value.getArticleId(), value.getType().name() + ":" + value.getAdd()))//根据key进行分组.groupByKey(Grouped.with(Serdes.Long(), Serdes.String()))//设置时间窗口.windowedBy(TimeWindows.of(Duration.ofMillis(10000)))//数据聚合.aggregate(() -> "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0", (key, value, aggValue) -> {if (StringUtils.isBlank(value)) {return aggValue;}String[] aggValues = aggValue.split(",");Map<String, Integer> map = new HashMap<>();for (String agg : aggValues) {String[] strs = agg.split(":");map.put(strs[0], Integer.valueOf(strs[1]));}String[] values = value.split(":");map.put(values[0], map.get(values[0]) + Integer.valueOf(values[1]));String format = String.format("COLLECTION:%s,COMMENT:%s,LIKES:%s,VIEWS:%s", map.get("COLLECTION"), map.get("COMMENT"), map.get("LIKES"), map.get("VIEWS"));return format;}, Materialized.with(Serdes.Long(), Serdes.String()))//重新转化为kStream.toStream()//数据格式转换.map((key, value) -> new KeyValue<>(key.key().toString(), value.toString())).to("hot.article.incr.handle.topic");return kStream;
}

接收处理结果

@KafkaListener(topics = "hot.article.incr.handle.topic", groupId = "group3")
public void consumer8(ConsumerRecord<String, String> record) {String key = record.key();String value = record.value();System.out.println("consumer8接收到消息:" + key + ":" + value);
}

声明Topic

@Bean
public NewTopic topic7() {return TopicBuilder.name("kafka.topic7").build();
}@Bean
public NewTopic article() {return TopicBuilder.name("hot.article.score.topic").build();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/862924.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解linux shell 中的exec内置命令ubuntu bash

概览 每当我们在Bash shell 中运行任何命令时&#xff0c;默认情况下都会创建一个子 shell&#xff0c;并生成&#xff08;分叉&#xff09;一个新的子进程来执行该命令。但是&#xff0c;当使用 exec时&#xff0c;exec 后面的命令将替换当前 shell。这意味着不会创建任何子 …

【高中数学之基本不等式】已知:x,y皆为正实数且x+3y=5xy,求3x+4y的最小值?

解&#xff1a;此题若使用“化二为一”法&#xff0c;会发现分母中出现了5x-3或是5y-1这样的式子&#xff0c;它可能导致负数的出现&#xff0c;已经违反了基本不等式中a,b皆大于零的应用条件。 此时应该迷途知返&#xff0c;及时易辙改弦。 此题适用“乘一法”&#xff0c;这…

第1章_搭建开发环境

文章目录 第1章 搭建开发环境1.1开发套件硬件接口资源介绍1.2资料下载1.3安装Keil MDK1.3.1**软件下载**1.3.2**软件安装**1.3.3 PACK 安装 1.4 安装 STM32CubeMX1.5 安装 STM32CubeProgrammer1.6 安装 ST-Link 驱动1.7 安装 CH340 驱动 第1章 搭建开发环境 1.1开发套件硬件接…

Redis基础教程(一):redis配置

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

星光云720全景VR系统源码

星光云720全景VR系统源码 系统体验地址项目介绍JDK版本后端主要依赖前端框架前端node 版本用户端框架介绍技术选型依赖全景内容简介系统图片部分功能截图系统体验地址 系统体验地址 VR全景系统体验地址 账号&#xff1a;18175760278 密码&#xff1a;12345678 项目介绍 JDK版…

图鸟模板-官网:基于Vue 3的前端技术探索与实践

摘要&#xff1a; 随着Web技术的不断发展&#xff0c;前端开发已经从简单的页面展示向功能丰富、交互体验优良的方向发展。Vue.js作为一款轻量级且功能强大的前端框架&#xff0c;自推出以来就受到了广泛的关注和应用。特别是Vue 3的发布&#xff0c;更是为前端开发带来了诸多新…

机器学习笔记 人脸识别技术全面回顾和小结(1)

一、简述 人脸识别是视觉模式识别的一个细分问题。人类一直在识别视觉模式&#xff0c;我们通过眼睛获得视觉信息。这些信息被大脑识别为有意义的概念。对于计算机来说&#xff0c;无论是图片还是视频&#xff0c;它都是许多像素的矩阵。机器应该找出数据的某一部分在数据中代表…

最近公共祖先(倍增,tarjan,树链剖分)

两个点的最近公共祖先&#xff0c;即两个点的所有公共祖先中&#xff0c;离根节点最远的一个节点。 倍增算法 1.dfs一遍&#xff0c;创建ST表 2.利用ST表求LCA 内容来源 D09 倍增算法 P3379【模板】最近公共祖先&#xff08;LCA&#xff09; #include<iostream> #in…

特斯拉下一代自动驾驶芯片的深度预测

引言 特斯拉一直以来都在自动驾驶技术上不断突破&#xff0c;随着AI大模型技术的爆发&#xff0c;其下一代自动驾驶芯片&#xff08;HW5.0&#xff09;也备受瞩目。本文将深入分析和预测特斯拉下一代自动驾驶芯片AI5的技术特点及其对行业的影响。 深入技术分析 现有自动驾驶…

React小记(五)_Hooks入门到进阶

React 16.8 版本 类组件 和 函数组件 两种组件共存&#xff0c;到目前 React 18 版本&#xff0c;官方已经不在推荐使用类组件&#xff0c;在函数组件中 hooks 是必不可少的&#xff0c;它允许我们函数组件像类组件一样可以使用组件的状态&#xff0c;并模拟组件的生命周期等一…

高效数据采集监控平台 一体化平台 数据可视化!

提高工作效率&#xff0c;一直是各种厂家在寻找的方法。任何一种有效且实用的方法都值得去尝试。数据采集监控平台是一种能高效处理数据的方式&#xff0c;其主要工作内容是从各个产生数据的仪器设备传感器中采集数据、对数据进行集中整理整合、分析、显示、绘制图表、存储、传…

java基于ssm+jsp 扶贫惠农推介系统

1管理员功能模块 管理员输入个人的用户名、密码、角色登录系统&#xff0c;这时候系统的数据库就会在进行查找相关的信息&#xff0c;如果我们输入的用户名、密码不正确&#xff0c;数据库就会提示出错误的信息提示&#xff0c;同时会提示管理员重新输入自己的用户名、密码&am…

DigiRL:让 AI 自己学会控制手机

类似于苹果此前发布的Ferret-UI 的安卓开源平替。主要用于在 Android 设备上识别 UI 和执行指令&#xff0c;不同的是它利用了离线到在线强化学习&#xff08;Offline-to-Online RL&#xff09;&#xff0c;能够快速适应应用更新或 UI 变化。

麒麟桌面系统CVE-2024-1086漏洞修复

原文链接&#xff1a;麒麟桌面操作系统上CVE-2024-1086漏洞修复 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇在麒麟桌面操作系统上修复CVE-2024-1086漏洞的文章。漏洞CVE-2024-1086是一个新的安全漏洞&#xff0c;如果不及时修复&#xff0c;可能会对系统造成安全…

Windows和Linux C++判断磁盘空间是否充足

基本是由百度Ai写代码生成的&#xff0c;记录一下。实现此功能需要调用系统的API函数。 对于Windows&#xff0c;可调用函数GetDiskFreeSpaceEx&#xff0c;使用该函数需要包含头文件windows.h。该函数的原型&#xff1a; 它的四个参数&#xff1a; lpDirectoryName&#xff0…

自然语言处理-BERT处理框架-transformer

目录 1.介绍 2.Transformer 2.1 引言 2.2 传统RNN网络的问题 2.3 整体架构 2.4 Attention 2.5 Self-Attention如何计算 3.multi-headed机制 4. BERT训练方法 1.介绍 BERT&#xff1a;当前主流的解决框架&#xff0c;一站式搞定NLP任务。&#xff08;解决一个NLP任务时的考虑…

人工智能设备pbootcms网站模板源码

模板介绍 人工智能行业发展趋势不断攀升逐渐成为了新业态&#xff0c;小编精心为大家收集整理了一款HTML5人工智能设备pbootcms网站模板整站源码下载&#xff0c;可帮助您快速建站以展示企业的产品与业务&#xff0c;响应式自适应设计也会适配所有浏览设备。 模板截图 源码下…

文心一言4.0免费使用

领取&安装链接&#xff1a;Baidu Comate 领取季卡 有图有真相 原理&#xff1a;百度comate使用文心一言最新的4.0模型。百度comate目前免费使用&#xff0c;可以借助comate达到免费使用4.0模型目的。 如何获得 点击「Baidu Comate 领取季卡 -> 领取权益」&#xff0…

静态链表详解(C语言版)

顺序表和链表的优缺点 顺序表和链表是两种基本的线性数据结构&#xff0c;它们各自有不同的优缺点&#xff0c;适用于不同的应用场景。 顺序表&#xff08;Sequential List&#xff0c;通常指数组&#xff09; 优点&#xff1a; 随机访问&#xff1a;可以通过索引快速访问任…

使用Endnote中英文等的实现和GB7714格式

Endnote是一款被广泛使用的文献管理软件&#xff0c;其是SCI&#xff08;Thomson Scientific 公司&#xff09;的官方软件&#xff0c;支持国际期刊的参考文献格式有3776 种【也可以自定义期刊引用格式】。 软件非常方便科研狗进行文献整理&#xff0c;写笔记&#xff0c;做备…