大数据-玩转数据-Flink状态编程(上)

一、Flink状态编程

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
Flink的状态管理是它的优势之一。

二、什么是状态

在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。

流式计算分为无状态计算和有状态计算两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。

在简单聚合、窗口聚合、处理函数的应用,都会有状态的身影出现。在Flink这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时能正确地恢复,这就需要一套完整的管理机制来处理所有状态。

三、为什么需要管理状态

下面的几个场景都需要使用流处理的状态功能:
去重: 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测: 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合: 对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
更新机器学习模型: 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

四、Flink中的状态分类

Managed State
状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩
状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等
使用场景 绝大数Flink算子。

Raw State
状态管理方式 用户自己管理
状态数据结构 字节数组: byte[]
使用场景 所有算子

从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
在我们平时的使用中Managed State已经足够我们使用。

在这里插入图片描述
对Managed State继续细分,它又有2种类型
Operator State(算子状态)
Keyed State(键控状态)

Operator State
适用用算子类型: 可用于所有算子: 常用于source, sink,
例如:FlinkKafkaConsumer
状态分配:一个算子的子任务对应一个状态
创建和访问方式: 实现CheckpointedFunction或ListCheckpointed(已经过时)接口
横向扩展 :并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量
支持的数据结构: ListState,UnionListStste和BroadCastState

Keyed State
适用用算子类型: 只能用于用于KeyedStream上的算子
状态分配 :一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式:重写RichFunction, 通过里面的RuntimeContext访问w
横向扩展 :并发改变, State随着Key在实例间迁移
支持的数据结构:ValueState, ListState,MapState ReduceState, AggregatingState

五、算子状态的使用

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。

注意: 算子子任务之间的状态不能互相访问
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

Flink为算子状态提供三种基本数据结构:
列表状态(List state),将状态表示为一组数据的列表

联合列表状态(Union list state),也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。

广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

三种状态的实现
在这里插入图片描述

六、键控状态的使用

键控状态是根据输入数据流中定义的键(key)来维护和访问的,只能用于KeyedStream(keyBy算子处理之后)。相同key的所有数据都会访问相同的状态。
键控状态支持的数据类型
在这里插入图片描述
注意:
a)所有的类型都有clear(), 清空当前key的状态
b)这些状态对象仅用于用户与状态进行交互.
c)状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
d)从状态获取的值与输入元素的key相关

七、状态后端

状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
状态后端主要负责两件事:
本地(taskmanager)的状态管理
将检查点(checkpoint)状态写入远程存储

状态后端的分类及配置
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
在这里插入图片描述
MemoryStateBackend
内存级别的状态后端(默认),
存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
特点:快速, 低延迟, 但不稳定
使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用
FsStateBackend
存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统(hdfs)中.
使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境

八、案例列表状态

package com.lyh.flink09;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;public class state_programe1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop100",9999).map(new MyMapFunctin()).print();env.execute();}public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {count++;return count;}// 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initialize.....");state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state",Long.class));for (Long c : state.get()) {count += c;}}// Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshot.....");state.clear();state.add(count);}}
}

九、案例广播状态

package com.lyh.flink09;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class state_broad1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);// 广播流BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从广播状态中取值, 不同的值做不同的业务ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);if ("1".equals(state.get("switch"))) {out.collect("切换到1号配置....");} else if ("0".equals(state.get("switch"))) {out.collect("切换到0号配置....");} else {out.collect("切换到其他配置....");}}@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);// 把值放入广播状态state.put("switch", value);}}).print();env.execute();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

macbookpro怎么删除软件没有鼠标

macbookpro怎么删除软件没有鼠标,macbookpro触摸板可以替代鼠标进行操作。左右键功能与鼠标相同&#xff0c;可用于执行删除操作。此外&#xff0c;还可以利用键盘上的Delete键来删除选中的文件。 删除软件方法 方法1、打开应用程序&#xff0c;键盘按住control&#xff0c;加点…

数据结构与算法之贪心动态规划

一&#xff1a;思考 1.某天早上公司领导找你解决一个问题&#xff0c;明天公司有N个同等级的会议需要使用同一个会议室&#xff0c;现在给你这个N个会议的开始和结束 时间&#xff0c;你怎么样安排才能使会议室最大利用&#xff1f;即安排最多场次的会议&#xff1f;电影的话 那…

高等数学教材重难点题型总结(四)不定积分

难点在于量级&#xff0c;不定积分一定要多练多见才能游刃有余~ 1.利用求导公式验证等式 2.计算不定积分

C语言——指针完全版

目录 一、指针的运算 1.1指针 - 整数 1.2指针 - 指针 二、指针遍历数组 2.1指针遍历数组 1.了解数组名称的含义&#xff08;&数组名和数组名的区别&#xff09;。 2.用指针遍历数组 三、指针数组、数组指针、函数指针 3.1指针数组 3.1.1指针数组的形式 3.1.2指针…

【自学笔记】如何在 Python 中使用 YAML 文件? 了解 YAML 格式和规范

文章目录 如何在 Python 中使用 YAML 文件YAML 的格式、规范和需要注意的点YAML 的缩进对象块其语法规范在 Python 中使用 PyYAML 模块安装 PyYAML 模块使用 PyYAML 模块读取和写入 YAML 文件读取 YAML 文件写入 YAML 文件load() 和 safe_load() 的区别总结如何在 Python 中使用…

day33 List接口

List实现类 java.util.ArrayList&#xff1a; 底层通过数组保存数据 &#xff0c; 查询快&#xff0c;增删慢 java.util.LinkedList&#xff1a; 底层通过链表保存数据&#xff0c; 查询慢&#xff0c;增删快 如果对操作性能没有特殊要求&#xff0c;我们一般选择ArrayList…

【力扣每日一题】2023.9.7 修车的最少时间

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一个数值&#xff0c;数组里每个元素表示一个老师傅&#xff0c;老师傅修车花费的时间等于数值乘上车辆数的平方。 问我们修理…

vue中使用tailwindcss

Tailwind CSS with Vue tailwindcss官方文档 创建Vue项目 npm create vitelatest my-project -- --template vue cd my-project安装Tailwind CSS&#xff0c;创建tailwind.config.js和postcss.config.js npm install -D tailwindcss postcss autoprefixer npx tailwindcss …

编程语言排行榜

以下是2023年的编程语言排行榜&#xff08;按照流行度排序&#xff09;&#xff1a; Python&#xff1a;Python一直以来都是非常受欢迎的编程语言&#xff0c;它简洁、易读且功能强大。在数据科学、机器学习、人工智能等领域有广泛应用。 JavaScript&#xff1a;作为前端开发…

你不知道的JavaScript---对象

1.语法 对象可以通过两种方式定义&#xff1a;一种是对象字面量形式&#xff0c;一种是构造形式 对象字面量&#xff1a; var muObject {key: value }构造形式的&#xff1a; var myObject new Object() myObject.key value不管是使用对象字面量形式还是构造形式创建出来…

idea:java: Compilation failed: internal java compiler error

java: Compilation failed: internal java compiler error错误 检查下面2个即可&#xff1a;

docker 生成镜像的几个问题

docker 生成镜像的几个问题 根据jdk8.tar.gz 打包Jdk8 镜像失败运行镜像报错差不多是网络ip错误,在网上说重启docker即可解决运行mysql5.7.25 镜像失败向daemon.json文件添加内容导致docker重启失败docker run 命令常用参数根据jdk8.tar.gz 打包Jdk8 镜像失败 首选做准备工作…

卡牌类游戏推荐,卡牌类三国手游排行榜

以下是小编要推荐给大家的关于卡牌类三国手游排行榜的内容。这里有来自各个历史阶段的名将和美女&#xff0c;让你体验最真实的三国战役。你可以将各种战略思维运用到其中&#xff0c;感受步步为营的喜悦&#xff0c;最终赢得战火纷飞的三国&#xff0c;如果想了解每个游戏的具…

浅谈安科瑞ADL200仪表在爱尔兰工厂的应用

摘要&#xff1a;用户端消耗着整个电网80%的电能&#xff0c;用户端智能化用电管理对用户可靠、安全、节约用电有十分重要的意义。构建智能用电服务体系&#xff0c;推广用户端智能多功能仪表、智能用电管理终端等设备用电管理解决方案&#xff0c;实现电网与用户的双向良性互动…

失效的访问控制及漏洞复现

文章目录 渗透测试漏洞原理失效的访问控制1. 失效的访问控制1.1 OWASP TOP 101.1.1 A5:2017-Broken Access Control1.1.2 A01:2021-Broken Access Control 1.2 失效的访问控制类别1.2.1 水平越权1.2.2 垂直越权 1.3 攻防案例1.3.1 DVWA越权 1.4 相关漏洞1.4.1 目录遍历1.4.2 未…

【Redis】Bitmap 使用及应用场景

前言&#xff1a;bitmap 占用空间小&#xff0c;查询效率高&#xff0c;在一些场景中使用 bitmap 是一个很好的选择。 一、bitmap 相关命令 SETBIT - 设置指定位置的比特值&#xff0c;可以设为 1 或 0 例如 SETBIT key 10 1&#xff0c;将在 key 对应的 bitmap 中第10位设置为…

explain 实战-----查看hive sql执行计划

目录 1.join/left join/full join 语句会过滤关联字段 null 的值吗&#xff1f; &#xff08;1&#xff09;join &#xff08;2&#xff09; left join /full join 2.group by 分组语句会进行排序吗&#xff1f; 1.join/left join/full join 语句会过滤关联字段 null 的值吗…

【java】【SSM框架系列】【一】Spring

目录 一、简介 1.1 为什么学 1.2 学什么 1.3 怎么学 1.4 初识Spring 1.5 Spring发展史 1.6 Spring Framework系统架构图 1.7 Spring Framework学习线路 二、核心概念&#xff08;IoC/DI&#xff0c;IoC容器&#xff0c;Bean&#xff09; 2.1 概念 2.2 IoC入门案例 …

docker安装RabbitMQ教程

可以通过Docker来安装RabbitMQ&#xff0c;具体步骤如下&#xff1a; 安装Docker&#xff1a;请参考官方文档进行安装。 拉取RabbitMQ镜像&#xff1a;通过以下命令拉取最新版本的RabbitMQ镜像。 docker pull rabbitmq:latest运行RabbitMQ容器&#xff1a;通过以下命令运行Rab…

简单YUV数据转换

YUV是一种亮度信号Y和色度信号U、V是分离的色彩空间&#xff0c;它主要用于优化彩色视频信号的传输&#xff0c;使其向后相容老式黑白电视。其中“Y”表示明亮度&#xff08;Luminance或Luma&#xff09;&#xff0c;也就是灰阶值&#xff1b;而“U”和“V”表示的则是色度&…