flink的ProcessWindowFunction函数的三种状态

背景

在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState,
每个key+每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢?

ProcessWindowFunction处理函数三种状态之间的关系:

1.getRuntimeContext.getState这个定义的状态是每个key维度的,也就是可以跨时间窗口并维持状态的
2.context.windowState()这个定义的状态是和每个key以及窗口相关的,也就是虽然key相同,但是时间窗口不同,他们的值也不一样.
3.context.globalState这个定义的状态是和每个key相关的,也就是和getRuntimeContext.getState的定义一样,可以跨窗口维护状态
验证代码如下所示:

package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;
import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";//更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n",name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {// 自定义状态private ValueState<KeyCount> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));}public void clear(Context context){ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));contextWindowValueState.clear();}@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,Collector<String> collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new KeyCount();current.key = s;current.count = 0;}int count = 0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for (Tuple2<String, Integer> tuple2 : iterable) {count++;}// 更新当前key的元素总数current.count += count;// 更新状态到backendstate.update(current);System.out.println("getRuntimeContext() == context :" + (getRuntimeContext() == context));ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));ValueState<KeyCount> contextGlobalValueState = context.globalState().getState(new ValueStateDescriptor<>("myGlobalState", KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if (windowValue == null) {windowValue = new KeyCount();windowValue.key = s;windowValue.count = 0;}windowValue.count += count;contextWindowValueState.update(windowValue);KeyCount globalValue = contextGlobalValueState.value();if (globalValue == null) {globalValue = new KeyCount();globalValue.key = s;globalValue.count = 0;}globalValue.count += count;contextGlobalValueState.update(globalValue);ValueState<KeyCount> contextWindowSameNameState =context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value = String.format("window, %s, %s - %s, %d,    total : %d, windowStateCount :%s, globalStateCount :%s\n",// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp));}}

输出结果:

window, XXX, 08:34:45 - 08:34:50, 3,    total : 22, windowStateCount :KeyCount{key='XXX', count=3}, globalStateCount :KeyCount{key='XXX', count=22}
window, YYY, 08:34:45 - 08:34:50, 2,    total : 22, windowStateCount :KeyCount{key='YYY', count=2}, globalStateCount :KeyCount{key='YYY', count=22}

从结果可以验证以上的结论,此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉,因为一旦时间窗口结束,就再也没有机会清理了
从这个例子中还发现一个比较有趣的现象:

ValueState<KeyCount> state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextWindowSameNameState =context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextGlobalSameNameState =context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));

在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到,并且他们指向的都是同一个变量,可以参见代码的输出:

System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (contextWindowSameNameState == contextGlobalSameNameState));
System.out.println("state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));

结果如下:

contextWindowSameNameState == contextGlobalSameNameState :true
state == contextGlobalSameNameState :true

参考文献:
https://cloud.tencent.com/developer/article/1815079

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/31853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分别用Vue和Java来实现的风靡一时的2048 游戏

目录 1、Vue实现2、Java实现 2048 游戏是一个基于网格的数字益智游戏&#xff0c;玩家需要通过滑动相同的数字来合并它们&#xff0c;并最终得到一个值为 2048 的方块。以下是分别用Vue和Java来实现的 2048 游戏&#xff0c;包含运行效果。 1、Vue实现 首先&#xff0c;创建一…

Kubernetes入门 三、命令行工具 kubectl

目录 语法操作示例资源操作Pod 与集群资源类型与别名格式化输出 kubectl 是 Kubernetes 集群的命令行工具&#xff0c;通过它能够对集群本身进行管理&#xff0c;并能够在集群上进行容器化应用的安装和部署。 语法 使用以下语法从终端窗口运行 kubectl 命令&#xff1a; kub…

Python爬虫——requests_get请求

import requests# ?可加可不加 url http://www.baidu.com/s?headers {Cookie: ,User-Agent: , }data {wd: 北京 } # params 参数 response requests.get(urlurl, paramsdata, headersheaders)content response.text print(content)总结&#xff1a; 参数使用params传递…

初学HTML:在线简易画板设计。

最近在HTML&#xff0c;记录下一点点成果。 设计了一个简易画板&#xff0c;通过HTML的Canvas元素实现一个在线画板&#xff0c;用户可以在上面绘制图形或涂鸦。 下面是运行效果&#xff1a; 下面是代码&#xff1a; <!DOCTYPE html> <html> <head><ti…

【Nginx】静态资源部署、反向代理、负载均衡

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ nginx静态资源部署、反向代理、负载均衡 &…

vue3 provide inject实现强制刷新

1、在 App.vue 文件里写入 provide 的方法 <template> <div id"app"><keep-alive> <router-view v-if"isRouterAlive"></router-view></keep-alive> </div> </template> <script> export default …

详细教程:如何搭建废品回收小程序

废品回收是一项环保举措&#xff0c;通过回收和再利用废弃物品&#xff0c;可以减少资源浪费和环境污染。近年来&#xff0c;随着智能手机的普及&#xff0c;小程序成为了推广和运营的重要工具。本文将详细介绍如何搭建一个废品回收小程序。 1. 进入乔拓云网后台 首先&#xf…

微信朋友圈置顶功能已大范围上线!

微信是目前全球最受欢迎的社交媒体应用之一&#xff0c;拥有数十亿的用户。作为一款持续发展和改进的应用&#xff0c;微信不断推出新的功能来提升用户体验。 近日&#xff0c;iOS微信8.0.41内测版迎来了更新&#xff0c;本次更新距离上个正式版间隔了大概10天的时间。 微信朋友…

Agents改变游戏规则,亚马逊云科技生成式AI让基础模型加速工作流

最近&#xff0c;Stability AI正式发布了下一代文生图模型——Stable Diffusion XL 1.0这次的1.0版本是Stability AI的旗舰版生图模型&#xff0c;也是最先进的开源生图模型。 在目前的开放式图像模型中&#xff0c;SDXL 1.0是参数数量最多的。官方表示&#xff0c;这次采用的…

SpringBoot源码分析(8)--内置ApplicationContextInitializer

文章目录 1、DelegatingApplicationContextInitializer2、SharedMetadataReaderFactoryContextInitializer3、ContextIdApplicationContextInitializer4、ConfigurationWarningsApplicationContextInitializer5、ServerPortInfoApplicationContextInitializer6、ConditionEvalu…

一行JS代码导出ant-design中复杂table表格的Excel

使用方式 1、安装依赖 npm install xlsx-js-style2、复制代码文件exportExcel.js至工程 https://github.com/EnthuDai/export-excel-in-one-line 3、在引入excel.js后调用 Excel.export(columns, dataSource, 导出文件名)4、代码demo 5、效果 页面excel 适用范围 对于使…

16bit、8 通道、500kSPS、 SAR 型 ADC——MS5188N

MS5188N 是 8 通道、 16bit 、电荷再分配逐次逼近型模数 转换器&#xff0c;采用单电源供电。 MS5188N 拥有多通道、低功耗数据采集系统所需的所有 组成部分&#xff0c;包括&#xff1a;无失码的真 16 位 SAR ADC &#xff1b;用于将输入配 置为单端输入&#xff0…

Spring IoC 详解

目录 一、引言二、Spring Bean三、将一个类声明为 Bean 所涉及的注解四、Component 和 Bean 的区别五、注入 Bean 的注解六、Autowired 和 Resource 的区别七、Bean7.1 作用域7.2 线程安全7.3 生命周期 一、引言 IoC&#xff08;Inversion of Control:控制反转&#xff09; 是…

【AndV】ant-design-vue中select使用mode=“combobox“无效:

文章目录 一、问题:二、解决: 一、问题: Warning: [antdv: Select] The combobox mode of Select is deprecated,it will be removed in next major version,please use AutoComplete instead 二、解决: 将mode"combobox"改为mode"SECRET_COMBOBOX_MODE_DO_NOT_…

Centos7.9_或者华为OpenEuler安装Mysql8.0.33安装_亲测成功---Linux工作笔记061

[root@host-10-233-255-199 soft]# ll 总用量 1.1G -rw-r--r-- 1 root root 137M 5月 10 15:20 jdk-8u271-linux-x64.tar.gz -rw-r--r-- 1 root root 925M 7月 6 09:02 mysql-8.0.33-el7-x86_64.tar 看一下我们的安装包. [root@host-10-233-255-199 soft]# rpm -qa|grep m…

机器学习深度学习——注意力提示、注意力池化(核回归)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——常见循环神经网络结构&#xff08;RNN、LSTM、GRU&#xff09; &#x1f4da;订阅专栏&#xff1a;机器…

外网通过ipv6访问家里设备

目录 1.需要整体理解如何在外网连接家里设备。 2.路由器打通ipv6。 3.移动光猫配置ipv6。 4.test-ipv6.com测试成功&#xff0c;但是ping不通 还是ping不通&#xff0c;提出如下可能 5.动态域名解析&#xff08;ddns-go&#xff09; a.dns服务商权限设置 b.IPv6设置 c…

一文读懂ThreadLocal

ThreadLocal 有什么用&#xff1f; 目录 ThreadLocal 有什么用&#xff1f; 如何使用 ThreadLocal&#xff1f; ThreadLocal 原理了解吗&#xff1f; ThreadLocal 有什么用&#xff1f; 通常情况下&#xff0c;我们创建的变量是可以被任何一个线程访问并修改的。如果想实现…

rv1126设置静态ip

开发板配网--------------------------------------------------------------------------------------------- 刚拿到的开发板里面的网络配置大多不可用&#xff0c;此时是无法ping通的&#xff0c;这个时候需要重新修改相关的配置文件&#xff1b; Vi /etc/profile 最后面…

竞赛项目 深度学习图像风格迁移

文章目录 0 前言1 VGG网络2 风格迁移3 内容损失4 风格损失5 主代码实现6 迁移模型实现7 效果展示8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习图像风格迁移 - opencv python 该项目较为新颖&#xff0c;适合作为竞赛课题…