KafkaStream Local Store和Global Store区别和用法

前言

使用kafkaStream进行流式计算时,如果需要对数据进行状态处理,那么常用的会遇到kafkaStream的store,而store也有Local Store以及Global Store,当然也可以使用其他方案的来进行状态保存,文本主要理清楚kafkaStream中的Local Store以及Global Store之间的区别和用法,以及什么时候选择何种store和当store无法满足我们需求时,应该如何使用其他方案来进行数据的状态保存

本文所有方法和代码皆只针对kafka-streams的3.7.0版本,pom如下:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>3.7.0</version>
</dependency>

由于不同版本的KafkaStream在使用上有较大区别,也因为KafkaStream不同版本API改动较大,所以如果版本不一致,使用方法甚至是一些核心概念都会跟本文讲述有所出入,并且KafkaStream由于相对小众,文档也很少,官网的文档也只是一些简单介绍,所以需要注意避坑

Local Store和Global Store的共同点和区别点
共同点:

1、都是用于流式计算中进行状态存储的

2、具体结构类似,使用的都是如:KeyValueStore,SessionStore等类

3、实际机制类似,会通过内存、本地目录和kafka Topic的变更记录等方式来进行缓存数据更新和恢复

不同点

1、适用场景不同

Local Store 适合用于单个实例的状态管理,适合处理单个分区的数据,并且缓存数据不会多个实例共享

Global Store 适用于跨实例共享数据状态,多个实例通过Topic中的更新记录来跟新进程中的数据

2、使用方法不同

Local Store 可以直接在代码中调用对应类型存储(如:KeyValueStore)的put方法进行更新数据,不需要考虑数据一致性(因为可见性只有单个实例)

Global Store 不能直接调用对应的put和delete方法,所有更新和删除缓存都需要通过发送数据到Global 配置的topic中,然后自行实现Topic数据消费者(实现:org.apache.kafka.streams.processor.api.Processor类),在消费者类中进行数据更新等操作,同时因为需要自己实现更新实例中的数据逻辑,数据一致性也需要开发者自行处理,虽然正常来说利用Kafka本身的特性很少出现数据一致性问题,但是如果多实例之间性能差异和网络环境等差异,容易将数据不一致的时长延长,如果要求Store一致性强且容忍数据不一致时限短,则需要注意考虑Store更新数据消费者的处理能力

3、扩展性

Local Store:可以通过增加输入主题的分区数来扩展处理能力,但每个实例仍然独立运行。

Global Store:需要在多个实例之间共享状态,因此在设计时需要考虑如何高效地管理和同步状态。

常见的Store 类型
org.apache.kafka.streams.state.KeyValueStore
org.apache.kafka.streams.state.SessionStore
org.apache.kafka.streams.state.TimestampedKeyValueStore
org.apache.kafka.streams.state.VersionedKeyValueStore
org.apache.kafka.streams.state.WindowStore

需要根据实际使用场景选择合适的状态存储类

用法
Local Store

第一步,先生成对应类型的StoreBuilder对象,如我需要用KeyValueStore,然后状态存储的名字是:testLocalStore(这个名字不能重复,因为会根据消费者id加储存名称创建对应的Topic,当然如果是不同的KafkaStream程序,消费者id不一致,那么重复就没有关系了),因为是KeyValue类型的储存,所以需要设定对应的Key和Value数据的序列化对象,具体代码如下:

StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());

其中Stores.persistentKeyValueStore代表的我得存储是持久化的,正常都是会用持久化,当然也有存储一些不重要或者程序重启丢失也无所谓的状态数据,可以使用Stores.inMemoryKeyValueStore以及基于LRU淘汰机制的储存Stores.lruMap,第二个参数Serdes.String()代表存储数据的key是字符串,第三个参数同理,如果是要存储一些对象,也可以使用自定义的序列化类,实现

org.apache.kafka.common.serialization.Serializer

序列化类,以及反序列化类

org.apache.kafka.common.serialization.Deserializer

然后定义好即可,如:

new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),new KryoDeserializer<>(TestStoreBean.class)

其中KryoSerializer和KryoDeserializer是我自定义的使用Kryo序列化Java对象的类,TestStoreBean是我保存的状态的数据封装bean

KryoSerializer代码如下:

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;/*** kryo序列化类* @author Raye* @since 2024-6-4*/
public class KryoSerializer<T> implements Serializer<T> {private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {@Overrideprotected Kryo initialValue() {Kryo kryo = new Kryo();/*** 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变化,* 上线的同时就必须清除 Redis 里的所有缓存,* 否则那些缓存再回来反序列化的时候,就会报错*///支持对象循环引用(否则会栈溢出)kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置//不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置return kryo;}};/*** 获得当前线程的 Kryo 实例** @return 当前线程的 Kryo 实例*/public static Kryo getInstance() {return KRYO_LOCAL.get();}private Class<T> clz;public KryoSerializer(Class<T> clz) {this.clz = clz;}@Overridepublic byte[] serialize(String s, T t) {if(t == null){return null;}ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();Output output = new Output(byteArrayOutputStream);Kryo kryo = getInstance();kryo.writeObjectOrNull(output, t,clz);output.flush();return byteArrayOutputStream.toByteArray();}
}

KryoDeserializer代码如下:

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;/*** kryo反序列化类* @author Raye* @since 2024-6-4*/
public class KryoDeserializer<T> implements Deserializer<T> {private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {@Overrideprotected Kryo initialValue() {Kryo kryo = new Kryo();/*** 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变化,* 上线的同时就必须清除 Redis 里的所有缓存,* 否则那些缓存再回来反序列化的时候,就会报错*///支持对象循环引用(否则会栈溢出)kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置//不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置return kryo;}};/*** 获得当前线程的 Kryo 实例** @return 当前线程的 Kryo 实例*/public static Kryo getInstance() {return KRYO_LOCAL.get();}private Class<T> clz;public KryoDeserializer(Class<T> clz) {this.clz = clz;}@Overridepublic T deserialize(String s, byte[] bytes) {if(bytes == null){return null;}ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);Input input = new Input(byteArrayInputStream);Kryo kryo = getInstance();try {return kryo.readObjectOrNull(input, clz);}catch (Exception e){e.printStackTrace();}return null;}
}

同理,使用LocalStore时,可以将代码替换成以下内容:

StoreBuilder<KeyValueStore<String, TestStoreBean>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),new KryoDeserializer<>(TestStoreBean.class));

有了StoreBuilder对象之后,直接在StreamsBuilder对象中添加即可

streamsBuilder.addStateStore(kvBuilder);

需要使用时,先在处理数据的Processor类中的init方法获取对应的状态存储对象

this.testLocalStore = context.getStateStore("testLocalStore");

然后就可以在process方法中调用testLocalStore的get、put、delete等方法操作状态存储数据了,具体代码如下

	@Slf4jpublic static class StreamProcessor implements Processor<String,String,String,String> {private KeyValueStore<String,String> testLocalStore;private ProcessorContext context;private String toTopic;@Overridepublic void init(ProcessorContext context) {this.context = context;this.testLocalStore = context.getStateStore("testLocalStore");}public StreamProcessor(String toTopic) {this.toTopic = toTopic;}@Overridepublic void process(Record<String, String> record) {testLocalStore.put("key1","testValue1");log.info("testLocalStore key1 : {}",testLocalStore.get("key1"));testLocalStore.delete("key1");context.forward(record,toTopic);}}

其中实现的Processor类全称是:org.apache.kafka.streams.processor.api.Processor,上面代码只是在数据处理流程中简单保存了数据,然后获取出来以及删除,没有对流数据做任何处理,就直接发送到输出的topic了

完整代码如下:

	@Beanpublic KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){log.info("init kStreamTestStore");StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());streamsBuilder.addStateStore(kvBuilder);KStream<String, String> stream = streamsBuilder.stream(fromTopic);stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic),"testLocalStore");streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);return stream;}

注意:由于使用Store需要通过ProcessorContext对象来获取Store对象,所以在KafkaStream常用的一些map,mapValue,flatMapValues这些流式计算方法中是没办法使用的,只能在一些更底层的Api中去使用,如process

Global Store

同Local Store一样,需要先生成对应类型的StoreBuilder对象,代码跟Local Store一样

StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());

然后定义处理状态更新日志的Processor类,在这个类中,可以对缓存数据进行更新和删除操作(其他地方都是不能直接修改Global Store的)

public class GlobalStoreHandleProcessor<K, V> implements Processor<K,V,Void,Void> {private KeyValueStore<K, V> store;private String storeName;public GlobalStoreHandleProcessor(String storeName) {this.storeName = storeName;}@Overridepublic void process(Record<K,V> record) {if(record == null || record.value() == null) {return;}store.put(record.key(), record.value());}@Overridepublic void init(ProcessorContext context) {this.store = context.getStateStore(storeName);}}

跟KafkaStream的process是一样的,只需要在process方法中对缓存进行更新或者删除操作即可,我这里只是简单put操作,具体逻辑可以根据自己情况进行处理

在StreamsBuilder对象中添加StoreBuilder对象

streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),()->new GlobalStoreHandleProcessor<>("testGlobalStore"));

其中第二个参数testGlobalStore是Global Store绑定的数据变更记录的Topic,如果要更新,则需要通过向这个topic发送数据来进行更新Global Store中的数据

处理数据的Processor类实例代码

public static class StreamProcessor implements Processor<String,String,String,String> {private KeyValueStore<String,String> testGlobalStore;private ProcessorContext context;private String toTopic;@Overridepublic void init(ProcessorContext context) {this.context = context;this.testGlobalStore = context.getStateStore("testGlobalStore");}public StreamProcessor(String toTopic) {this.toTopic = toTopic;}@Overridepublic void process(Record<String, String> record) {testLocalStore.put(jsonObject.getString("key"),jsonObject.getString("value"));log.info("testLocalStore key1 : {}",testGlobalStore.get("key1"));//发送更新Global Store的数据context.forward(new Record("testGlobalKey","global value",record.timestamp()),"testGlobalStore");context.forward(record,toTopic);}}

与Local Store不同的是,不能在处理数据流的时候,对缓存进行put操作,只能通过将数据发送到Global Store关联的topic中,在GlobalStoreHandleProcessor中去做更新

完整代码如下:

	@Beanpublic KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){log.info("init kStreamTestStore");StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),()->new GlobalStoreHandleProcessor<>("testGlobalStore"));KStream<String, String> stream = streamsBuilder.stream(fromTopic);stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic));streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);return stream;}

与Local Store不同点在于,不需要在process方法中添加store的名字,但是因为要从process方法中直接将更新Store的数据发送到topic,所以需要添加一个Global Store绑定的Topic的输出扩展,也就是下面这行代码

streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);
不适合的场景

由于KafkaStream Store 没有自动过期数据和过期数据自动删除的概率(可能是有,但是我没有找到对应文档),所以如果我们存储的key集合特别大,并且需要自动过期和自动删除,那么就不适合使用Store来处理了,因为需要我们自行处理删除逻辑,尤其是有些场景中,并不会对过期的key进行访问,所以采用惰性删除基本上不现实,但是定时删除,因为Store会存储到磁盘,如果存储的key很多,删除对应数据的时候耗时很长,尤其是单次删除大量key的时候,可能会直接超时,并且还必须要自己处理定时删除的逻辑,想要更好的去删除,就需要大量时间去开发和优化。

虽然使用内存的Store能稍微好点,但是毕竟单个进程内存有限,并且正常流处理中,如果需要保存状态,那么肯定是希望进程重启之后,能恢复数据,避免计算出错的,所以如果是有大量不重复key,并且数据需要到期自动删除的话,可以直接使用Redis做状态存储,并且进过我得实际测试,使用Redis并不比Store慢,并且在key量越来越大的情况下,Redis的性能是完全优于Store的(只针对持久化的Store),当然使用Redis,还是会更使用Global Store一样,需要考虑数据一致性的问题,不过这个问题可以通过将相同key的数据从Kafka Topic就分配到同一个Topic分区中来避免

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/22585.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【QT5】<总览二> QT信号槽、对象树及样式表

文章目录 前言 一、QT信号与槽 1. 信号槽连接模型 2. 信号槽介绍 3. 自定义信号槽 二、不使用UI文件编程 三、QT的对象树 四、添加资源文件 五、样式表的使用 六、QSS文件的使用 前言 承接【QT5】&#xff1c;总览一&#xff1e; QT环境搭建、快捷键及编程规范。若存…

【最新鸿蒙应用开发】——一篇搞懂什么是UIAbility

UIAbility组件 UIAbility组件是一种包含UI的应用组件&#xff0c;UIAbility组件是系统调度的基本单元&#xff08;最小单元&#xff09;&#xff0c;为应用提供绘制界面的窗口&#xff0c;主要用于和用户交互。一个应用可以包含一个或多个UIAbility组件。 UIAbility的设计理念…

AI大模型应用开发实践:5.快速入门 Assistants API

快速入门 Assistants API Assistants API 允许您在自己的应用程序中构建人工智能助手。一个助手有其指令,并可以利用模型、工具和知识来回应用户查询。 Assistants API 目前支持三种类型的工具: 代码解释器 Code Interpreter检索 Retrieval函数调用 Function calling使用 P…

【LeetCode】使括号有效的最少添加

题目链接&#xff1a; 921. 使括号有效的最少添加 - 力扣&#xff08;LeetCode&#xff09; 对于一个只有&#xff08;&#xff09;组合的括号字符串&#xff0c;如果想要这个字符串是有效的括号对&#xff0c;找出最少需要插入多少个括号 括号离不开栈&#xff0c;栈可以消除…

Java同步与线程安全,同步方法、同步块和java.util.concurrent包的使用

Java的同步与线程安全是并发编程中至关重要的部分。在多线程环境下&#xff0c;确保数据的一致性和避免竞态条件&#xff08;race condition&#xff09;是程序设计的关键。 一、Java中的线程安全 线程安全&#xff08;Thread Safety&#xff09;是指多线程环境下&#xff0c…

C++中的注释作用

程序的注释是解释性语句&#xff0c;您可以在 C 代码中包含注释&#xff0c;这将提高源代码的可读性。所有的编程语言都允许某种形式的注释。 C 支持单行注释和多行注释。注释中的所有字符会被 C 编译器忽略。 C 注释一般有两种&#xff1a; // - 一般用于单行注释。 / ...…

【耗时八个小时】机器学习过拟合和欠拟合!看这一篇文章就够了

.. 纯. .干 货 . . . . 在机器学习中&#xff0c;有一项非常重要的概念&#xff0c;那就是&#xff1a;过拟合&#xff08;Overfitting&#xff09;和欠拟合&#xff08;Underfitting&#xff09;。 它们涉及到机器学习中常见的两种模型性能问题&#xff0c;分别表示模型在训练…

[modern c++] 使用shared_mutex , shared_lock完成读写锁,Need C++ 17

前言&#xff1a; C 17开始&#xff0c;引入了两个新的同步组件&#xff0c; shared_mutex 和 shared_lock &#xff0c;这两个组件的一个典型使用案例就是实现读写锁。 原语&#xff1a; shared_mutex &#xff0c; 一个提供让多个线程都可以同时获取能力的mutex。 shared_…

一键开启:盲盒小程序里的梦幻奇遇

在繁忙的都市生活中&#xff0c;每个人心中都藏着一个关于奇遇的梦想。如今&#xff0c;我们为您精心打造了一款盲盒小程序——“梦幻奇遇”&#xff0c;只需一键开启&#xff0c;就能带您走进一个充满无限惊喜和梦幻色彩的奇幻世界。 一、神秘盲盒&#xff0c;惊喜连连 “梦幻…

gitlab之cicd的gitlab-runner集成-dockerfile构建环境

目录 概述离线资源docker-compose问题 docker-compose问题1问题2 gitlab-runner集成gitlab 概述 cicd引文目录是想通过dockerfile构建 maven、jdk、docker环境的 gitlab-runner 运行环境。但docker最后测试的时候有点问题&#xff0c;且最后使用 kubectl 时有麻烦&#xff0c;所…

python--面向对象-文件读写-异常

一、继承 定义一个类时&#xff0c;需要使用另外一个类的方法或属性&#xff0c;就可以通过继承实现 object是Python的顶级类&#xff0c;创建类是会自动继承&#xff0c;就拥有object中的方法 定义格式 # 类的定义 # 旧式类定义 一般在定义单个类时使用 class 类名:name N…

Spring Boot 使用自定义注解和自定义线程池实现异步日志记录

&#x1f604; 19年之后由于某些原因断更了三年&#xff0c;23年重新扬帆起航&#xff0c;推出更多优质博文&#xff0c;希望大家多多支持&#xff5e; &#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Mi…

.NET集成DeveloperSharp操作SqlServer、MySql等数据库

&#x1f3c6;作者&#xff1a;科技、互联网行业优质创作者 &#x1f3c6;专注领域&#xff1a;.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造 &#x1f3c6;欢迎关注我&#xff08;Net数字智慧化基地&#xff09;&#xff0c;里面…

在Linux系统中程序是如何执行的?

在Linux系统中&#xff0c;程序的执行是一个复杂而精细的过程&#xff0c;涉及多个步骤。 1.进程创建 在Linux中&#xff0c;进程的创建&#xff0c;除了第一个进程&#xff08;0号进程&#xff09;是通过硬编码创建&#xff0c;其他所有进程通常都是通过fork()系统调用来实现…

力扣2134.最少交换次数得到连续的1(断环成链)

力扣2134.最少交换次数得到连续的1(断环成链) 最终一定是所有1的个数(长度) 的区间 所以求所有1的和 用和作为k作滑动窗口将环断成长度为nsum-1的链 class Solution {public:int minSwaps(vector<int>& nums) {int sum accumulate(nums.begin(),nums.end(),0);in…

如何保持气膜场馆内部空气新鲜—轻空间

气膜建筑作为现代建筑的一种新兴形式&#xff0c;以其独特的优势和设计受到了广泛欢迎。然而&#xff0c;保持气膜内部空气新鲜是一个必须解决的问题。我们通过配备先进的新风系统&#xff0c;提供了高效的解决方案。 新风系统的工作原理 气膜建筑内部空气的新鲜度主要依靠其配…

在C++中,NULL和nullptr有什么区别?

在C11之前&#xff0c;一般使用NULL代表空指针。 NULL的定义在C和C中不同&#xff0c;而且C和C针对0和指针之间的运算规则也存在差异&#xff1a; C03标准&#xff1a;空指针常量是整数类型的整型常量表达式右值&#xff0c;其值为零。空指针常量可以转换为指针类型&#xff…

【vscode-快捷键 一键JSON格式化】

网上有很多JSON格式化工具&#xff0c;也有很多好用的在线json格式化工具。但是其实Vscode里面的可以直接格式化JSON&#xff0c;这里分享一个我常用的小插件 Prettify JSON 未格式化的JSON数据 召唤出命令行&#xff0c;输入prettify JSON 即可! ✿✿ヽ(▽)ノ✿

算法题:Java求数组中最大的值

采用分而治之&#xff08;二分法&#xff09;的思想去求解 分而治之&#xff1a;分而治之的思想可以用于解决很多问题&#xff0c;大概的思路就是把一个比较大的复杂的问题切分成小的块&#xff0c;然后分头去解决他们&#xff0c;最后再把结果合并起来&#xff0c;就是“分而治…

C++中的string类详解

在C中&#xff0c;字符串是一个非常重要的数据类型&#xff0c;用于存储和处理文本数据。C标准库提供了std::string类&#xff0c;它是一个模板类&#xff0c;专门用于处理字符串。std::string类提供了丰富的成员函数和操作符重载&#xff0c;使得字符串操作变得简单而高效。本…