Flink State 状态管理

文章目录

  • 前言
  • 一、状态分类
  • 二、keyed代码示例
    • ListState
    • MapState
  • 总结


前言

状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容:

  • 状态数据的存储和访问
    在Task内部,如何高效地保存状态数据和使用状态数据。
  • 状态数据的备份和恢复
    作业失败是无法避免的,那么就要考虑如何高效地将状态数据保存下来,避免状态备份降低集群的吞吐量,并且在Failover时恢复作业到失败前的状态。
  • 状态数据的划分和动态扩容
    作业在集群内并行执行那么就要思考对于作业的Task而言如何使用统一的方式对状态数据进行切分,在作业修改并行度导致Task数据改变的时候,如何确保正确地恢复。

一、状态分类

State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同,flink定义了多种state,分别应用于不同的场景,具体实现如下:ValueState、ListState、MapState、ReducingState、AggregatingState。

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

二、keyed代码示例

更多代码示例请下载Flink State体系剖析以及案例实践

ListState

代码如下:


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 需求:当接收到的相同 key 的元素个数等于 3个,就计算这些元素的 value 的平均值。* 计算keyed stream中每3个元素的 value 的平均值*/
public class TestKeyedStateMain {public static void main(String[] args) throws  Exception{//获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(12);//获取数据源DataStreamSource<Tuple2<Long, Long>> dataStreamSource =env.fromElements(Tuple2.of(1L, 3L),Tuple2.of(1L, 7L),Tuple2.of(2L, 4L),Tuple2.of(1L, 5L),Tuple2.of(2L, 2L),Tuple2.of(2L, 6L));/*** 1L, 3L* 1L, 7L* 1L, 5L** 1L,5.0 double** 2L, 4L* 2L, 2L* 2L, 6L** 2L,4.0 double***/// 输出://(1,5.0)//(2,4.0)dataStreamSource.keyBy(tuple -> tuple.f0) //分组.flatMap(new CountAverageWithListState()).print();env.execute("TestStatefulApi");}
}import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;import java.util.Collections;
import java.util.List;/***  ListState<T> :这个状态为每一个 key 保存集合的值*      get() 获取状态值*      add() / addAll() 更新状态值,将数据放到状态中*      clear() 清除状态*/
public class CountAverageWithListStateextends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {// managed keyed state/*** ValueState : 里面只能存一条元素* ListState : 里面可以存很多数据*/private ListState<Tuple2<Long, Long>> elementsByKey;@Overridepublic void open(Configuration parameters) throws Exception {// 注册状态ListStateDescriptor<Tuple2<Long, Long>> descriptor =new ListStateDescriptor<Tuple2<Long, Long>>("average",  // 状态的名字Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型elementsByKey = getRuntimeContext().getListState(descriptor);}@Overridepublic void flatMap(Tuple2<Long, Long> element,Collector<Tuple2<Long, Double>> out) throws Exception {// 拿到当前的 key 的状态值Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();// 如果状态值还没有初始化,则初始化if (currentState == null) {elementsByKey.addAll(Collections.emptyList());}// 更新状态elementsByKey.add(element);// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());if (allElements.size() == 3) {long count = 0;long sum = 0;for (Tuple2<Long, Long> ele : allElements) {count++;sum += ele.f1;}double avg = (double) sum / count;out.collect(Tuple2.of(element.f0, avg));// 清除状态elementsByKey.clear();}}
}

MapState

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;import java.util.List;
import java.util.UUID;/***  MapState<K, V> :这个状态为每一个 key 保存一个 Map 集合*      put() 将对应的 key 的键值对放到状态中*      values() 拿到 MapState 中所有的 value*      clear() 清除状态*/
public class CountAverageWithMapStateextends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {// managed keyed state//1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值/*** MapState:*      Map集合的特点,相同key,会覆盖数据。*/private MapState<String, Long> mapState;@Overridepublic void open(Configuration parameters) throws Exception {// 注册状态MapStateDescriptor<String, Long> descriptor =new MapStateDescriptor<String, Long>("average",  // 状态的名字String.class, Long.class); // 状态存储的数据类型mapState = getRuntimeContext().getMapState(descriptor);}/**** @param element* @param out* @throws Exception*/@Overridepublic void flatMap(Tuple2<Long, Long> element,Collector<Tuple2<Long, Double>> out) throws Exception {mapState.put(UUID.randomUUID().toString(), element.f1); //list// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出List<Long> allElements = Lists.newArrayList(mapState.values());if (allElements.size() == 3) {long count = 0;long sum = 0;for (Long ele : allElements) {count++;sum += ele;}double avg = (double) sum / count;//out.collect(Tuple2.of(element.f0, avg));// 清除状态mapState.clear();}}
}

总结

  1. 是否存在当前处理的 key(current key):operator state 是没有当前 key 的概念,而 keyed
    state 的数值总是与一个 current key 对应。
  2. 存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现;而 keyed state
    backend 有 on-heap 和 off-heap(RocksDB)的多种实现。
  3. 是否需要手动声明快照(snapshot)和恢复 (restore) 方法:operator state 需要手动实现
    snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。
  4. 数据大小:一般而言,我们认为 operator state 的数据规模是比较小的;认为 keyed state 规模是
    相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。
    更多内容和代码示例请下载Flink State体系剖析以及案例实践

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620883.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Golang对比Java、python为什么要保留指针

为什么要用指针&#xff1f; 平时我们在Golang使用指针一般是为了以下的情况&#xff1a; 方法直接修改原来对象保证参数传递的自由&#xff0c;可以在传递重量级对象时使用指针 但Go 保留指针不仅仅是为了解决传递参数的问题&#xff0c;还跟它的语言特性有密不可分的联系。…

JOSEF约瑟端子排中间继电器 DZY-204 DC110V 导轨安装,板前接线

DZY系列端子排中间继电器 系列型号&#xff1a; DZY-101端子排中间继电器 DZY-104端子排中间继电器 DZY-105端子排中间继电器 DZY-301端子排中间继电器 DZY-106端子排中间继电器 DZY-401端子排中间继电器 DZY-204端子排中间继电器 一、 概述 DZY-204端子排中间继电器用于各种…

unity C#什么时候用“可空类型”

文章目录 例子1. **声明一个可空类型变量&#xff1a;**2. **给可空类型变量赋值&#xff1a;**3. **检查可空类型变量是否有值&#xff1a;**4. **转换与比较&#xff1a;**5. **使用null合并运算符&#xff1a;** 可空类型的重要意义1. **表示缺失或未知的值&#xff1a;**2.…

每日一博 - 使用APIFOX调测 @RequestBody标注的对象

文章目录 概述发送 post 请求步骤1.新建接口&#xff0c;设置为 post 请求2. 填写 URL 和参数3.发送请求 实战 RequestBody 概述 APIFOX&#xff08;类似Postman&#xff09;提供了丰富的功能来支持用户发送包含各种信息的 POST 请求&#xff0c;如文本数据、JSON 或 XML 数据…

JS数据类型转换成Boolean型

在javaScript中 布尔值用到的很频繁 接下来让我稍微为大家介绍一下数据类型转换为布尔型 转换成布尔值为false的类型 console.log(Boolean("")) //falseconsole.log(Boolean(0)) //falseconsole.log(Boolean(undefined)) //falseconsole.log(Boolean(null)) //false…

云卷云舒:AI for DB、DB for AI

云卷云舒&#xff1a;算力网络云原生&#xff08;下&#xff09;&#xff1a;云数据库发展的新篇章-CSDN博客https://blog.csdn.net/bishenghua/article/details/135050556 随着数据库和AI技术的分支同向演进&#xff0c;AI 和数据库间的关联越发紧密了。 大模型的演进发展&a…

element中el-cascader级联选择器只有最后一级可以多选

文章目录 一、前言二、实现2.1、设置popper-class和multiple2.2、设置样式 三、最后 一、前言 element-ui中el-cascader级联选择器只有最后一级可以多选&#xff0c;其它级只有展开子节点的功能&#xff0c;如下图所示&#xff1a; 可以观察到最后一级的li节点上没有属性aria-…

Java开发的常见报错

Java开发的常见报错 长期更新 2023年12月9日 1、java.lang.IllegalArgumentException: Null input buffer 这个异常通常在方法内部检查参数的有效性时抛出。要解决这个问题&#xff0c;你可以考虑以下几点&#xff1a; 检查参数是否满足方法的要求&#xff1a;首先&#xf…

java基础知识点系列——基础语法(三)

java基础知识点系列——基础语法&#xff08;三&#xff09; 注释 注释概述 注释是在程序指定位置添加的说明性信息。注释不参与程序运行&#xff0c;仅起到说明作用。 注释分类 单行注释&#xff0c;格式&#xff1a;// 注释信息多行注释&#xff0c;格式&#xff1a;/* …

AcWing:4965. 三国游戏

标签:贪心 描述: 小蓝正在玩一款游戏。 游戏中魏蜀吴三个国家各自拥有一定数量的士兵 X,Y,Z&#xff08;一开始可以认为都为 0&#xff09;。 游戏有 n 个可能会发生的事件&#xff0c;每个事件之间相互独立且最多只会发生一次&#xff0c;当第 i 个事件发生时会分别让 X,…

代码随想录 496. 下一个更大元素 I

题目 nums1 中数字 x 的 下一个更大元素 是指 x 在 nums2 中对应位置 右侧 的 第一个 比 x 大的元素。 给你两个 没有重复元素 的数组 nums1 和 nums2 &#xff0c;下标从 0 开始计数&#xff0c;其中nums1 是 nums2 的子集。 对于每个 0 < i < nums1.length &#xff0c…

面向对象的三大特征之二:继承 --java学习笔记

什么是继承? 关键字extends,用这个关键字&#xff0c;可以让一个类和另一个类建立起父子关系 继承的特点&#xff1a;子类能继承父类的非私有成员&#xff08;成员变量、成员方法&#xff09;继承后对象的创建&#xff1a;子类的对象时由子类、父类共同完成的 代码演示&am…

Hex Editor的使用教程(VS Code)

Hex Editor&#xff08;十六进制编辑器&#xff09;是一种用于查看和编辑计算机文件的低级别编辑工具。与常规文本编辑器不同&#xff0c;它允许用户直接查看和修改文件的二进制数据。在 Hex Editor 中&#xff0c;数据通常以十六进制&#xff08;hex&#xff09;格式显示&…

Qt/QML编程学习之心得:使用camera摄像头(35)

汽车应用中,camera起到了越来越多的作用,数字化的作用,这点无可争议,而作为GUI设计工具,如何让Camera类的应用能更好的发挥作用呢? You can use Camera to capture images and movies from a camera, and manipulate the capture and processing settings that get appl…

EI论文复现:考虑多能互补的综合能源系统/虚拟电厂/微电网优化运行程序代码!

本程序参考EI论文《基于多能互补的热电联供型微网优化运行》&#xff0c;文章通过储能设备解耦热电联系&#xff0c;建立基于多能互补的综合能源系统/虚拟电厂/微电网优化运行模型。模型包含系统供给侧的多能互补协调与需求侧的综合能源响应两个方面&#xff0c;使供给侧通过能…

springboot 整合 actuator监控详情

SpringBoot自带监控功能Actuator&#xff0c;可以帮助实现对程序内部运行情况监控&#xff0c;比如监控状况、Bean加载情况、环境变量、日志信息、线程信息等 pom文件中添加 <!-- actuator start--> <dependency><groupId>org.springframework.boot</gr…

Grind75第9天 | 733.图像渲染、542.01矩阵、1235.规划兼职工作

733.图像渲染 题目链接&#xff1a;https://leetcode.com/problems/flood-fill 解法&#xff1a; 可以用深度优先搜索和广度优先搜索。 深度优先搜索。每次搜索到一个方格时&#xff0c;如果其与初始位置的方格颜色相同&#xff0c;就将该方格的染色&#xff0c;然后继续对…

CMake Error at CMakeLists.txt:14 (project): The CMAKE_CXX_COMPILER:

报错 CMake Error at CMakeLists.txt:14 (project):The CMAKE_CXX_COMPILER:arm-none-eabi-g 解决办法1 Arm GNU Toolchain Downloads – Arm Developer x86_64 linux上&#xff1a; x86_64 Linux hosted cross toolchains AArch32 bare-metal target (arm-none-eabi)arm-g…

Qt构建MSVC2015环境过程

Qt构建MSVC2015环境过程 前言 之前用的Qt都是基于默认的MinGW编译器&#xff0c;由于目前工作的QT界面主要是跑在X86上&#xff0c;所以记录一下Qt配置MSVC2015的配置过程。根据查阅了解以后&#xff0c;个人理解的MinGW跟MSVC的区别在于前者主要是用于跨平台程序构建&#x…