大数据-玩转数据-Flink状态编程(上)

一、Flink状态编程

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
Flink的状态管理是它的优势之一。

二、什么是状态

在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。

流式计算分为无状态计算和有状态计算两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。

在简单聚合、窗口聚合、处理函数的应用,都会有状态的身影出现。在Flink这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时能正确地恢复,这就需要一套完整的管理机制来处理所有状态。

三、为什么需要管理状态

下面的几个场景都需要使用流处理的状态功能:
去重: 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测: 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合: 对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
更新机器学习模型: 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

四、Flink中的状态分类

Managed State
状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩
状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等
使用场景 绝大数Flink算子。

Raw State
状态管理方式 用户自己管理
状态数据结构 字节数组: byte[]
使用场景 所有算子

从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
在我们平时的使用中Managed State已经足够我们使用。

在这里插入图片描述
对Managed State继续细分,它又有2种类型
Operator State(算子状态)
Keyed State(键控状态)

Operator State
适用用算子类型: 可用于所有算子: 常用于source, sink,
例如:FlinkKafkaConsumer
状态分配:一个算子的子任务对应一个状态
创建和访问方式: 实现CheckpointedFunction或ListCheckpointed(已经过时)接口
横向扩展 :并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量
支持的数据结构: ListState,UnionListStste和BroadCastState

Keyed State
适用用算子类型: 只能用于用于KeyedStream上的算子
状态分配 :一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式:重写RichFunction, 通过里面的RuntimeContext访问w
横向扩展 :并发改变, State随着Key在实例间迁移
支持的数据结构:ValueState, ListState,MapState ReduceState, AggregatingState

五、算子状态的使用

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。

注意: 算子子任务之间的状态不能互相访问
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

Flink为算子状态提供三种基本数据结构:
列表状态(List state),将状态表示为一组数据的列表

联合列表状态(Union list state),也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。

广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

三种状态的实现
在这里插入图片描述

六、键控状态的使用

键控状态是根据输入数据流中定义的键(key)来维护和访问的,只能用于KeyedStream(keyBy算子处理之后)。相同key的所有数据都会访问相同的状态。
键控状态支持的数据类型
在这里插入图片描述
注意:
a)所有的类型都有clear(), 清空当前key的状态
b)这些状态对象仅用于用户与状态进行交互.
c)状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
d)从状态获取的值与输入元素的key相关

七、状态后端

状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
状态后端主要负责两件事:
本地(taskmanager)的状态管理
将检查点(checkpoint)状态写入远程存储

状态后端的分类及配置
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
在这里插入图片描述
MemoryStateBackend
内存级别的状态后端(默认),
存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
特点:快速, 低延迟, 但不稳定
使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用
FsStateBackend
存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统(hdfs)中.
使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境

八、案例列表状态

package com.lyh.flink09;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;public class state_programe1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop100",9999).map(new MyMapFunctin()).print();env.execute();}public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {count++;return count;}// 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initialize.....");state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state",Long.class));for (Long c : state.get()) {count += c;}}// Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshot.....");state.clear();state.add(count);}}
}

九、案例广播状态

package com.lyh.flink09;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class state_broad1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);// 广播流BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从广播状态中取值, 不同的值做不同的业务ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);if ("1".equals(state.get("switch"))) {out.collect("切换到1号配置....");} else if ("0".equals(state.get("switch"))) {out.collect("切换到0号配置....");} else {out.collect("切换到其他配置....");}}@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);// 把值放入广播状态state.put("switch", value);}}).print();env.execute();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

macbookpro怎么删除软件没有鼠标

macbookpro怎么删除软件没有鼠标,macbookpro触摸板可以替代鼠标进行操作。左右键功能与鼠标相同&#xff0c;可用于执行删除操作。此外&#xff0c;还可以利用键盘上的Delete键来删除选中的文件。 删除软件方法 方法1、打开应用程序&#xff0c;键盘按住control&#xff0c;加点…

数据结构与算法之贪心动态规划

一&#xff1a;思考 1.某天早上公司领导找你解决一个问题&#xff0c;明天公司有N个同等级的会议需要使用同一个会议室&#xff0c;现在给你这个N个会议的开始和结束 时间&#xff0c;你怎么样安排才能使会议室最大利用&#xff1f;即安排最多场次的会议&#xff1f;电影的话 那…

高等数学教材重难点题型总结(四)不定积分

难点在于量级&#xff0c;不定积分一定要多练多见才能游刃有余~ 1.利用求导公式验证等式 2.计算不定积分

C语言——指针完全版

目录 一、指针的运算 1.1指针 - 整数 1.2指针 - 指针 二、指针遍历数组 2.1指针遍历数组 1.了解数组名称的含义&#xff08;&数组名和数组名的区别&#xff09;。 2.用指针遍历数组 三、指针数组、数组指针、函数指针 3.1指针数组 3.1.1指针数组的形式 3.1.2指针…

【力扣每日一题】2023.9.7 修车的最少时间

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一个数值&#xff0c;数组里每个元素表示一个老师傅&#xff0c;老师傅修车花费的时间等于数值乘上车辆数的平方。 问我们修理…

编程语言排行榜

以下是2023年的编程语言排行榜&#xff08;按照流行度排序&#xff09;&#xff1a; Python&#xff1a;Python一直以来都是非常受欢迎的编程语言&#xff0c;它简洁、易读且功能强大。在数据科学、机器学习、人工智能等领域有广泛应用。 JavaScript&#xff1a;作为前端开发…

idea:java: Compilation failed: internal java compiler error

java: Compilation failed: internal java compiler error错误 检查下面2个即可&#xff1a;

docker 生成镜像的几个问题

docker 生成镜像的几个问题 根据jdk8.tar.gz 打包Jdk8 镜像失败运行镜像报错差不多是网络ip错误,在网上说重启docker即可解决运行mysql5.7.25 镜像失败向daemon.json文件添加内容导致docker重启失败docker run 命令常用参数根据jdk8.tar.gz 打包Jdk8 镜像失败 首选做准备工作…

卡牌类游戏推荐,卡牌类三国手游排行榜

以下是小编要推荐给大家的关于卡牌类三国手游排行榜的内容。这里有来自各个历史阶段的名将和美女&#xff0c;让你体验最真实的三国战役。你可以将各种战略思维运用到其中&#xff0c;感受步步为营的喜悦&#xff0c;最终赢得战火纷飞的三国&#xff0c;如果想了解每个游戏的具…

浅谈安科瑞ADL200仪表在爱尔兰工厂的应用

摘要&#xff1a;用户端消耗着整个电网80%的电能&#xff0c;用户端智能化用电管理对用户可靠、安全、节约用电有十分重要的意义。构建智能用电服务体系&#xff0c;推广用户端智能多功能仪表、智能用电管理终端等设备用电管理解决方案&#xff0c;实现电网与用户的双向良性互动…

失效的访问控制及漏洞复现

文章目录 渗透测试漏洞原理失效的访问控制1. 失效的访问控制1.1 OWASP TOP 101.1.1 A5:2017-Broken Access Control1.1.2 A01:2021-Broken Access Control 1.2 失效的访问控制类别1.2.1 水平越权1.2.2 垂直越权 1.3 攻防案例1.3.1 DVWA越权 1.4 相关漏洞1.4.1 目录遍历1.4.2 未…

【Redis】Bitmap 使用及应用场景

前言&#xff1a;bitmap 占用空间小&#xff0c;查询效率高&#xff0c;在一些场景中使用 bitmap 是一个很好的选择。 一、bitmap 相关命令 SETBIT - 设置指定位置的比特值&#xff0c;可以设为 1 或 0 例如 SETBIT key 10 1&#xff0c;将在 key 对应的 bitmap 中第10位设置为…

explain 实战-----查看hive sql执行计划

目录 1.join/left join/full join 语句会过滤关联字段 null 的值吗&#xff1f; &#xff08;1&#xff09;join &#xff08;2&#xff09; left join /full join 2.group by 分组语句会进行排序吗&#xff1f; 1.join/left join/full join 语句会过滤关联字段 null 的值吗…

【java】【SSM框架系列】【一】Spring

目录 一、简介 1.1 为什么学 1.2 学什么 1.3 怎么学 1.4 初识Spring 1.5 Spring发展史 1.6 Spring Framework系统架构图 1.7 Spring Framework学习线路 二、核心概念&#xff08;IoC/DI&#xff0c;IoC容器&#xff0c;Bean&#xff09; 2.1 概念 2.2 IoC入门案例 …

CH06_第一组重构(上)

提取函数&#xff08;Extract Function |106&#xff09; 曾用名&#xff1a;提炼函数&#xff08;Extract Function&#xff09; 反向重构&#xff1a;内联函数&#xff08;115&#xff09; 示例代码 function printOwing(invoice) {printBanner();let outstanding calcul…

API安全学习 - crAPI漏洞靶场与API测试思路

crAPI漏洞靶场与解题思路 1. 前置基础1.1 认识crAPI1.2 环境搭建1.3 API的分类与鉴别 2. 漏洞验证2.1 失效的对象级别授权挑战1&#xff1a;访问其它用户车辆的详细信息挑战2&#xff1a;访问其它用户的机械报告 2.2 失效的用户身份验证挑战3&#xff1a;重置其它用户的密码 2.…

NIFI实现JSON转SQL并插入到数据库表中

说明 本文中的NIFI是使用docker进行安装的&#xff0c;所有的配置参考&#xff1a;docker安装Apache NIFI 需求背景 现在有一个文件&#xff0c;里面存储的是一些json格式的数据&#xff0c;要求将文件中的数据存入数据库表中&#xff0c;以下是一些模拟的数据和对应的数据库…

centos7使用docker-compose一键搭建mysql高可用主从集群

docker部署 环境准备 卸载旧版本 yum remove -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine 安装依赖 yum install -y yum-utils \…

伪微分反馈控制(Pesudo-Drivative Feedback Control——PDF)

运动控制-单轴伺服控制带宽分析&#xff08;二&#xff09; - 知乎 (zhihu.com) 伪微分反馈控制_百度百科 (baidu.com) 伺服电机控制器的参数整定_老马过河hhh的博客-CSDN博客 伪微分PIIP控制_yukee10的博客-CSDN博客

docker搭建个人网盘和私有仓库Harbor

目录 1、使用mysql:5.7和 owncloud 镜像&#xff0c;构建一个个人网盘 2、安装搭建私有仓库 Harbor 1、使用mysql:5.7和owncloud&#xff0c;构建一个个人网盘 1.拉取mysql:5.6镜像&#xff0c;并且运行mysql容器 [rootnode8 ~]# docker pull mysql:5.7 [rootnode8 ~]# doc…