12、Flink 的 Keyed State 代码示例

1、KeyedState 用例

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class _01_KeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);source.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {private ValueState<Tuple2<String, Integer>> valueState;private ListState<String> listState;private ReducingState<Integer> reducingState;private AggregatingState<Integer, String> aggregatingState;private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("valueState", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("listState", String.class);ReducingStateDescriptor<Integer> reducingStateDescriptor = new ReducingStateDescriptor<Integer>("reduceingState", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Integer.class);AggregatingStateDescriptor<Integer, Integer, String> aggregatingStateDescriptor = new AggregatingStateDescriptor<>("aggregatingState", new AggregateFunction<Integer, Integer, String>() {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(Integer value, Integer accumulator) {return value + accumulator;}@Overridepublic String getResult(Integer accumulator) {return "res=>" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}, Integer.class);MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Integer.class);valueState = getRuntimeContext().getState(valueStateDescriptor);listState = getRuntimeContext().getListState(listStateDescriptor);reducingState = getRuntimeContext().getReducingState(reducingStateDescriptor);aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {Tuple2<String, Integer> res = valueState.value();if (res == null) {res = new Tuple2<>(input, 1);valueState.update(res);} else {res.f1 += 1;valueState.update(res);}listState.add(input);reducingState.add(1);aggregatingState.add(1);if (mapState.contains(input)) {Integer beforeNum = mapState.get(input);mapState.put(input, beforeNum + 1);} else {mapState.put(input, 1);}}@Overridepublic void close() throws Exception {System.out.println("valueState=>"+valueState.value());valueState.clear();System.out.println("listState=>"+listState.get());listState.clear();System.out.println("reduceState=>"+reducingState.get());reducingState.clear();System.out.println("aggregatingState=>"+aggregatingState.get());aggregatingState.clear();System.out.println("mapState=>"+mapState.entries().toString());mapState.clear();}});env.execute();}
}

输入与输出

 依次输出:a,b,c,a,b,c预期输出结果:每个key的数据明细 listState:[a,a],[b,b],[c,c]每个key的数量带key valueState:[a,2],[b,2],[c,2]每个key的数量不带key reduceState:2,2,2每个key的数量不带key,且输入和输出数据类型不同 aggregatingState:res=>2,res=>2,res=>2每个key的数量带key mapState:[a,2],[b,2],[c,2]实际输出结果:valueState=>(a,2)valueState=>(b,2)valueState=>(c,2)listState=>[a, a]listState=>[b, b]listState=>[c, c]reduceState=>2reduceState=>2reduceState=>2aggregatingState=>res=>2aggregatingState=>res=>2aggregatingState=>res=>2mapState=>[a=2]mapState=>[b=2]mapState=>[c=2]

2、KeyedStateTTL

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class _02_KeyedStateTTL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);source.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {private ValueState<Tuple2<String, Integer>> valueState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("valueState", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));// 只支持 KeyedState 的 TTL// 只支持 processing time 的 TTLStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(5))// TTL 的更新策略// StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新// StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)// 状态的可见性// StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据// StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 状态的清理策略,默认过期数据会在读取的时候被删除// cleanupFullSnapshot() 全量快照时进行清理// cleanupIncrementally(10, true) 增量数据清理// cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) RocksDB 压缩过滤器.cleanupFullSnapshot().build();valueStateDescriptor.enableTimeToLive(stateTtlConfig);valueState = getRuntimeContext().getState(valueStateDescriptor);}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {Tuple2<String, Integer> res = valueState.value();if (res == null) {res = new Tuple2<>(input, 1);valueState.update(res);out.collect(res);} else {res.f1 += 1;valueState.update(res);out.collect(res);}}@Overridepublic void close() throws Exception {System.out.println("valueState=>" + valueState.value());valueState.clear();}}).print("res=>");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/5613.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

70.网络游戏逆向分析与漏洞攻防-角色与怪物信息的更新-整理与角色数据更新有关的数据

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 如果看不懂、不知道现在做的什么&#xff0c;那就跟着做完看效果 现在的代码都是依据数据包来写的&#xff0c;如果看不懂代码&#xff0c;就说明没看懂数据包…

基于python的舞蹈经验分享交流网站django+vue

1.运行环境&#xff1a;python3.7/python3.8。 2.IDE环境&#xff1a;pycharmmysql5.7/8.0; 3.数据库工具&#xff1a;Navicat11 4.硬件环境&#xff1a;windows11/10 8G内存以上 5.数据库&#xff1a;MySql 5.7/8.0版本&#xff1b; 运行成功后&#xff0c;在浏览器中输入&am…

新唐的nuc980/nuc972的开发3-官方源码编译

上一节中bsp已经安装&#xff0c;交叉环境已经搭建&#xff0c;理应就可以正常的编写上层的应用程序啦。 但是系统启动次序是- uboot-> kernel内核 ->挂载文件系统 ->上层应用程序 下面是bsp安装后的文件&#xff1a; 因此本章节&#xff0c;将讲解 uboot-> kerne…

Ubuntu Linux完全入门视频教程

Ubuntu Linux完全入门视频教程 UbuntuLinux完全入门视频教程1.rar UbuntuLinux亮全入门视频教程10.ra UbuntuLinux亮全入门视频教程11.ra UbuntuLinux完全入门视频教程12.ra UbuntuLinux亮全入门视频教程13.ra UbuntuLinux完全入门视频教程14.rar UbuntuLinux完全入门视频教程…

刷代码随想录有感(51):从中序和后序前序和中序构造二叉树

中后题干&#xff1a; 第一步&#xff1a;如果数组大小为零的话&#xff0c;说明是空节点了。 第二步&#xff1a;如果不为空&#xff0c;那么取后序数组最后一个元素作为节点元素。 第三步&#xff1a;找到后序数组最后一个元素在中序数组的位置&#xff0c;作为切割点 第四…

es5中的类和静态方法、继承详解

1、关于es5 es5中的类 // 1、最简单的类function Person() {this.name "姓名";this.age 20;}let p new Person();console.log(p.name);// 2、构造函数和原型链里面增加方法function Person() {this.name "姓名"; // 属性this.age 20;this.run f…

Large Language Models for Test-Free Fault Localization

基本信息 这是24年2月发表在ICSE 24会议&#xff08;CCF A&#xff09;的一篇文章&#xff0c;作者团队来自美国卡内基梅隆大学。 博客创建者 武松 作者 Aidan Z.H. Yang&#xff0c;Claire Le Goues&#xff0c;Ruben Martins&#xff0c;Vincent J. Hellendoorn 标签 …

自制英语听力视频 5.1

breaking news&#xff1a;突发新闻 judge&#xff1a;法官 hush money&#xff1a;封口费 trial&#xff1a;审判 violated:违反 gag order&#xff1a;禁言令 the judge has ruled the former president has violated a gag order&#xff1a;法官裁定前总统违反了禁言…

启明云端2.4寸屏+ESP32-S3+小型智能调速电动家用除草机案例 触控三档调速,能显示电压故障码

今天给大家分享个启明云端2.4寸屏ESP32-S3小型智能调速电动家用除草机案例&#xff0c;国外有草坪文化&#xff0c;这个机器能智能触控三档调速&#xff0c;带屏能显示电压故障码&#xff0c;数显档位&#xff08;3档最大&#xff09;&#xff0c;触控屏&#xff0c;长按3秒就能…

使用 langchain 连接 通义千问 并用 fastApi 开放接口

安装 langchain 方法 https://www.cnblogs.com/hailexuexi/p/18087602 安装 fastapi fastapi 是一个用于构建高性能 Web 应用的 Python 框架&#xff0c;它提供了简洁、高效的 API 开发体验。 pip install fastapi 安装 uvicorn uvicorn 是一个用于运行 FastAPI 应用的服务…

C语言学习/复习37--进阶总结与题目练习

一、题目练习 1. 循环与无符号char的取值范围 注意事项&#xff1a;0~255 -128~127 char类的取值范围看做循环图 2.ASCLL值与循环 3.按位操作与bit位 4 .结构体的大小 注意事项&#xff1a;结构体嵌套结构体的大小计算 5.循环条件 6.数据类型与原反补码 7.指针访问字符串数…

react18子组件设置接收默认值和值类型验证

父组件传值 import ChildCom from ./components/ChildCom export default function Person {return(<div><ChildCom name"alan-ben" age{18} score{[98, 97, 100]} /></div>) } 子组件接收并验证类型 import React from react import PropTypes…

CentOS-Stream-9配置网络和web控制台cockpit

vim /etc/NetworkManager/system-connections/ens33.nmconnection&#xff0c;修改autoconnectiontrue自动连接网络&#xff0c;可以自动获取IP地址&#xff0c;或者设置固定IP地址&#xff0c;在[ipv4]下面编辑methodmanual和address1192.168.4.111/24,192.168.4.1和dns223.5.…

商城系统推荐,如何找到一款可靠的商城系统?

如今&#xff0c;电商系统成为商家必不可少的营销工具&#xff0c;其系统在金融、外贸、零售等行业领域应用广泛。那么&#xff0c;作为初试水的企业又没有挑选电商系统的经验&#xff0c;如何找到拥有全功能、全渠道、可靠的网上商城系统呢&#xff1f; 我们可以先按电商系统…

【python】给函数参数和返回值标注类型

目录 &#xff08;1&#xff09;类型标注进化历史 从Python 3.0开始的类型标注 从Python 3.5开始的类型标注库 typing &#xff08;2&#xff09;更细化的类型标注 1.返回一个列表&#xff0c;且列表的元素类型是指定的 2.返回字典类型&#xff0c;键和值都是指定类型 3…

【Vue 2.x】学习vue之三路由

文章目录 Vue三路由第十章1、vue中的路由vue的应用分为a、多页面应用b、单页面应用 2、路由的基本应用1、基础2、使用3、加载 3、vue组件的分类1、普通组件2、路由组件 4、路由的嵌套5、路由传递Query参数1、拼接参数传递2、路由传递对象 6、简化路由1、命名路由 7、parms传递参…

java钉钉微信qq扫码登录

概述 第三方接口其实比较简单&#xff0c;按照文档来操作即可&#xff0c;代码也就那点&#xff0c;最费时间的反而是在对接系统的账号的申请上&#xff0c;不建议个人申请很麻烦&#xff0c;还是让公司运维申请企业账号。 作为一名合格的开人人员&#xff0c;不仅仅是把第三…

力扣82-链表、迭代 的思考

题目解读 给定一个已排序的链表的头 head &#xff0c; 删除原始链表中所有重复数字的节点&#xff0c;只留下不同的数字 。返回 已排序的链表 。 两个示范 思考 返回链表&#xff1a;返回更新链表后的头结点&#xff1b; 更新链表&#xff1a;判断重复元素&#xff0c;改变指针…

centos7 宝塔php7安装mongodb扩展

一、下载、解压源码 下载地址&#xff1a;https://pecl.php.net/package/mongodb 1 2 wget -c https://pecl.php.net/get/mongodb-1.5.3.tgz tar -zxvf mongodb-1.5.3.tgz 二、编译安装源码 1 2 3 4 cd mongodb-1.5.3 /www/server/php/70/bin/phpize ./configure --with-p…

Day56|动态规划part16:583. 两个字符串的删除操作、72. 编辑距离、编辑距离总结篇

583. 两个字符串的删除操作 我的方法&#xff0c;先求出两者的最长公共子序列长度&#xff0c;再用两个字符串的长度相减就是两者分别要做操作的步数&#xff1a; class Solution {public int minDistance(String word1, String word2) {int[][] dp new int[word1.length() …