Flink学习连载文章9--状态(State)

State

state 可以理解为-- 历史计算结果

有状态计算和无状态计算

  • 无状态计算:
    • 不需要考虑历史数据, 相同的输入,得到相同的输出!
    • 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
  • 有状态计算:
    • 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
    • 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

注意: Flink默认已经支持了无状态和有状态计算!

例如WordCount代码:已经做好了状态维护, 输入hello,输出(hello,1),再输入hello,输出(hello,2)

有状态计算和无状态计算的应用场景

  • 无状态计算:数据转换,过滤等操作直接使用无状态的map/filter即可
  • 有状态计算:需要做聚合/比较的操作得使用有状态的sum/reduce/maxBy/minBy....

以wordcout为例,说明上图的流程

对Managed State继续细分,它又有两种类型:Keyed State和Operator State。

Flink状态 - 托管状态- KeyedState ( 在keyBy之后可以使用状态 )- ValueState  (存储一个值)- ListState   (存储多个值)- MapState    (存储key-value) - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]- 原生状态 (不用)

Keyed State (键控状态)

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(...) 来得到 KeyedStream 。

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

· ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。

· ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。

· ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。

· AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。

· FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。

· MapState:维护 Map 类型的状态。

代码演示-Managed State-Keyed State

//nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/state/

案例1:

使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)

也就是我们自己使用KeyState中的ValueState来模拟实现maxBy

代码实现:

package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-26 15:27:21**/
public class _01_KeyedStateDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L),Tuple2.of("北京", 7L));//2. source-加载数据tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String,Long>>() {// 借助状态这个API实现ValueState<Long> maxValueState= null;@Overridepublic void open(Configuration parameters) throws Exception {// 就是对ValueState初始化ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);maxValueState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {Long val = value.f1;if(maxValueState.value() == null){maxValueState.update(val);}else{if(maxValueState.value() < val){maxValueState.update(val);}}return Tuple2.of(value.f0,maxValueState.value());}}).print();//.maxBy(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

案例2:

如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

姓名,温度输入                      输出张三,37张三,38张三,39张三,35张三,40张三,41               张三,[39,40,41]张三,40               张三,[39,40,41,40]
package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;import java.util.ArrayList;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-26 15:54:07**/
public class _02_KeyedStateDemo2 {// 如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);//3. transformation-数据处理转换   zs,37dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");return Tuple2.of(arr[0],Integer.valueOf(arr[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {ValueState<Integer> valueState = null;ListState<Integer> listState = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);valueState = getRuntimeContext().getState(stateDescriptor);ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {Integer tiwen = value.f1;if(tiwen >= 38){valueState.update(valueState.value()==null?1:(valueState.value()+1));listState.add(tiwen);}if(valueState.value()!=null && valueState.value() >= 3){ArrayList<Integer> list = new ArrayList<>();Iterable<Integer> iterable = listState.get();for (Integer tiwenwen : iterable) {list.add(tiwenwen);}out.collect(Tuple2.of(value.f0,list));}}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/62875.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式硬件面试题【经验】总结----会不断添加更新

目录 引言 一、电阻 1、电阻选型时一般从那几个方面考虑 2、上拉电阻的作用 3、PTC热敏电阻作为电源电路保险丝的工作原理 4、如果阻抗不匹配&#xff0c;有哪些后果 二、电容 1、电容选型一般从哪些方面进行考虑? 2、1uf的电容通常来滤除什么频率的信号 三、三极管…

Linux——基础命令(2) 文件内容操作

目录 ​编辑 文件内容操作 1.Vim &#xff08;1&#xff09;移动光标 &#xff08;2&#xff09;复制 &#xff08;3&#xff09;剪切 &#xff08;4&#xff09;删除 &#xff08;5&#xff09;粘贴 &#xff08;6&#xff09;替换,撤销,查找 &#xff08;7&#xff…

Stable Diffusion 3详解

&#x1f33a;系列文章推荐&#x1f33a; 扩散模型系列文章正在持续的更新&#xff0c;更新节奏如下&#xff0c;先更新SD模型讲解&#xff0c;再更新相关的微调方法文章&#xff0c;敬请期待&#xff01;&#xff01;&#xff01;&#xff08;本文及其之前的文章均已更新&…

微信小程序——文档下载功能分享(含代码)

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

开源的跨平台SQL 编辑器Beekeeper Studio

一款开源的跨平台 SQL 编辑器&#xff0c;提供 SQL 语法高亮、自动补全、数据表内容筛选与过滤、连接 Web 数据库、存储历史查询记录等功能。该编辑器支持 SQLite、MySQL、MariaDB、Postgres 等主流数据库&#xff0c;并兼容 Windows、macOS、Linux 等桌面操作系统。 项目地址…

Shader的涉及的数学知识总结

着色器&#xff08;Shader&#xff09;编程广泛应用于计算机图形学中&#xff0c;用于实现各种视觉效果。编写高效的着色器需要扎实的数学基础&#xff0c;以下是着色器编程中常见的数学知识及其应用&#xff1a; 1. 向量代数 向量&#xff1a;表示具有大小和方向的量&#x…

数据结构——排序第三幕(深究快排(非递归实现)、快排的优化、内省排序,排序总结)超详细!!!!

文章目录 前言一、非递归实现快排二、快排的优化版本三、内省排序四、排序算法复杂度以及稳定性的分析总结 前言 继上一篇博客基于递归的方式学习了快速排序和归并排序 今天我们来深究快速排序&#xff0c;使用栈的数据结构非递归实现快排&#xff0c;优化快排&#xff08;三路…

Spring Web开发注解和请求(1)

大家好我是小帅&#xff0c;今天我们来学习Spring Web MVC框架&#xff08;入门级&#xff09; 文章目录 1. 什么是 Spring Web MVC&#xff1f;1.1 MVC 定义1.2 什么是Spring MVC ? 2. 学习Spring MVC2.1 建⽴连接第一个spring MVC程序 3. web开发注解的解释3.1RestControlle…

剖析kubernetes service的IP能否在宿主机中ping通

文章目录 前言一、serviceIP是怎么产生的二、宿主机中ping serviceIP地址1.ping示例2.为什么ping不通剖析2.1.封装及解封装过程2.2.ICMP报文以太网数据帧格式2.3.原因 三、ping不通svcIP是否跟iptables规则有关&#xff1f;四、为什么ipvs的的clusterIP类型的service能够ping通…

vue项目部署到github pages后页面显示不出来??

问题&#xff1a; 当我们在命令行执行 npm run build 后&#xff0c;项目的目录下会生成一个 dist 文件夹&#xff0c;它里面又包含一个 static 文件夹和一个 index.html 文件&#xff0c;这是 webpack 最终打包好的文件 项目上传到仓库后发现页面为空&#xff0c;找不到文件路…

ROS基本框架2——在ROS开发中创建并使用自定义消息(C++版本)

ROS基本框架2——在ROS开发中创建并使用自定义消息(C++版本) code review! 参考笔记 1.ROS基本框架1——编写简单的发布者和订阅者(C++和Python版本) 2.ROS基本框架2——在ROS开发中创建并使用自定义消息(C++版本) 文章目录 ROS基本框架2——在ROS开发中创建并使用自定义…

ccf A 类与sci 一区那个比较难? + 论文常识

论文常识&#xff1a; ESI 基本科学指标数据库(EssentialScience Indicators ) 高被引论文&#xff08;Highly Cited Paper&#xff09;&#xff1a;根据同一年同一ESI学科统计最近10年发表论文中被引用次数进入世界前1%的论文&#xff1b;在硕士论文中文献综述是作为论文的理…

异步处理优化:多线程线程池与消息队列的选择与应用

目录 一、异步处理方式引入 &#xff08;一&#xff09;异步业务识别 &#xff08;二&#xff09;明确异步处理方式 二、多线程线程池&#xff08;Thread Pool&#xff09; &#xff08;一&#xff09;工作原理 &#xff08;二&#xff09;直面优缺点和适用场景 1.需要快…

IS-IS的原理

IS-IS的基本概念&#xff1a; 概述&#xff1a; IS-IS&#xff0c;中间系统到中间系统&#xff0c;是ISO国际标准化组织为它的无连接网络协议设计的一种动态路由协议 IS-IS支持CLNP网络和IP网络&#xff0c;采用数据链路层封装&#xff0c;区别于ospf只支持IP网络&#xff0…

代理ip工具在网络安全中的作用是什么

代理IP工具在网络安全中扮演着至关重要的角色&#xff0c;它们不仅能够帮助用户保护隐私&#xff0c;还能提高网络性能&#xff0c;增强安全性。本文将深入探讨代理IP工具的定义、工作原理以及在网络安全中的具体应用&#xff0c;旨在为读者提供全面的理解和指导。 一、代理IP工…

IDEA 2024 配置Maven

Step 1:确定下载Apache Maven版本 在IDEA 2024中&#xff0c;随便新建一个Maven项目&#xff1b; 在File下拉菜单栏中&#xff0c;找到Setings&#xff1b; 在Build&#xff0c;Execution&#xff0c;Deployment中找到Maven 确定下载的Apache Maven版本应略低于或等于IDEA绑…

107.【C语言】数据结构之二叉树求总节点和第K层节点的个数

目录 1.求二叉树总的节点的个数 1.容易想到的方法 代码 缺陷 思考:能否在TreeSize函数内定义静态变量解决size的问题呢? 其他写法 运行结果 2.最好的方法:分而治之 代码 运行结果 2.求二叉树第K层节点的个数 错误代码 运行结果 修正 运行结果 其他写法 1.求二…

MySQL笔记-启动时log报错Table ‘mysql.user‘ doesn‘t exist

安装好mysql后&#xff0c;正常使用&#xff08;使用的是rpm版安装的&#xff09; service mysqld start | stop | restart 不会出现这个问题。 我遇到的情况是在凝思操作系统上&#xff0c;已经存在了一个mysql。网上查找了一些资料&#xff0c;卸载&#xff0c;后可能卸载…

Mybatis 复习

1 什么是MyBatis MyBatis是一个优秀的持久层框架&#xff0c;它对JDBC操作数据库的过程进行封装&#xff0c;使开发者只需要关注 SQL 本身&#xff0c;而不需要花费精力去处理例如注册驱动、创建connection、创建statement、手动设置参数、 结果集检索等JDBC繁杂的过程代码 。…

HNTS-MRG 2024 Challenge:是一个包含200个头颈癌病例的磁共振图像及其标注的公开数据集,旨在推动AI在头颈癌放射治疗自动分割领域的研究。

2024-11-28,由德克萨斯大学MD安德森癌症中心创建HNTS-MRG 2024 Challenge数据集&#xff0c;目的通过公开数据集推动自动分割算法的发展&#xff0c;这对于提高放射治疗的精确性和效率具有重要意义。 数据集地址&#xff1a;HNTS-MRG 2024|癌症放射治疗数据集|医学影像分析数据…