Flink之OperatorState

在Flink中状态主要分为三种:

  • Operator State(算子状态)
  • Keyed State(键控状态)
  • Broadcast State(广播状态)

这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

  • 数据源
    这里选用Socket作为Source输入,便于测试
    ➜  ~ nc -lk 8888
    a
    b
    c
    k
    k
    k
    
  • 状态算子代码
    /**
    * @Description TODO 自定义状态MapFunc
    **/
    // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
    class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{private ListState<String> strListState;/*** @Param o* @return String* @Description TODO map方法的正常处理逻辑**/@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中strListState.add(s);Iterable<String> strings = strListState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** @Param functionSnapshotContext* @return void* @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());}/*** @Param functionInitializationContext* @return void* @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState = operatorStateStore.getListState(stateDescriptor);}
    }
    
    要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
  • 业务代码
      public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将自定义的StateOperator传入SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute("Operator State");}
    }
    

具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/150406.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM虚拟机:通过日志学习PS+PO垃圾回收器

我们刚才设置参数的时候看到了-XXPrintGCDetails表示输出详细的GC处理日志&#xff0c;那么我们如何理解这个日志呢&#xff1f;日志是有规则的&#xff0c;我们需要按照这个规则来理解日志中的内容&#xff0c;它有两个格式&#xff0c;一个格式是GC的格式&#xff08;新生代&…

二叉树题目:二叉树的最近公共祖先

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法一思路和算法代码复杂度分析 解法二思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;二叉树的最近公共祖先 出处&#xff1a;236. 二叉树的最近公共祖先 难度 5 级 题目描述 要求 给定一个二叉树&…

Redis 的集群模式实现高可用

来源&#xff1a;Redis高可用&#xff1a;武林秘籍存在集群里&#xff0c;那稳了~ (qq.com) 1. 引言 前面我们已经聊过 Redis 的主从同步&#xff08;复制&#xff09;和哨兵机制&#xff0c;这期我们来聊 Redis 的集群模式。 但是在超大规模的互联网应用中&#xff0c;业务规…

2023年Java核心技术大会(Core Java Week 2023)-核心PPT资料下载

一、峰会简介 人工智能在22年、23年的再次爆发让Python成为编程语言里最大的赢家&#xff1b;云原生的持续普及令Go、Rust等新生的语言有了进一步叫板传统技术体系的资本与底气。我们必须承认在近几年里&#xff0c;Java阵营的确受到了前所未有的挑战&#xff0c;出现了更多更…

机器视觉系统选型-定光照强度

同一个外形结构的光源&#xff0c;光照强度受如下影响&#xff1a; 单颗灯珠的亮度灯珠排列的数量和密度漫射板/防护板的材质&#xff08;透明、半透明、全漫射&#xff09; 在合理范围内提升光照强度&#xff0c;可降低对相机曝光时长的要求 外形结构尺寸相同的两款光源&am…

Mac电脑VSCode配置PHP开发环境

1.安装 PHP 首先&#xff0c;打开终端&#xff0c;安装 Homebrew&#xff0c;输入如下命令&#xff1a; $ /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" 安装了 Homebrew 之后&#xff0c;你可以使用下面的…

C++学习 --pair

目录 1&#xff0c; 什么是pair 2&#xff0c; 创建pair 2-1&#xff0c; 标准数据类型 2-2&#xff0c; 自定义数据类型 3&#xff0c; 查询元素 3-1&#xff0c; 标准数据类型 3-2&#xff0c; 自定义数据类型 1&#xff0c; 什么是pair 数据以键值对形式存放的容器&…

基于C++实现循环赛日程表(分治算法)

一、问题描叙 设有n2^k个运动员&#xff0c;要进行网球循环赛。现在要设计一个满足以下要求的比赛日程表 每个选手必须与其他n-1个选手各赛一场每个选手一天只能赛一次循环赛一共进行n-1天 二、问题分析 按此要求可将比赛日程表设计成n行n-1列的表&#xff0c;在表中第 i 行…

【微客云】外卖霸王餐-城市分站加盟-区域代理-服务商模式

主要功能介绍 程序主要功能&#xff1a; 1.自动定位用户展示活动 2.两个活动推广展示位 3.外卖CPS接入&#xff0c;或者更多CPS接入自己操作 4.报名支付报名费&#xff08;自定义报名费价格&#xff09;&#xff0c;取消订单30分钟内自动退款&#xff08;时间可调&#xff09; …

uniapp中使用render.js进行openers、arcgis等地图操作

uniapp中使用render.js进行openers、arcgis等地图操作 一、为啥需要render.js render.js主要作用于APP上&#xff0c;因为Uniapp本质为vuejshtml进行开发&#xff0c;整个技术栈还是H5&#xff0c;对DOM元素进行操作。而APP中没用Dom元素这个概念。因此利用render.js这个视图层…

NX二次开发UF_CAM_ask_cam_preferences 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;里海NX二次开发3000例专栏 UF_CAM_ask_cam_preferences Defined in: uf_cam.h int UF_CAM_ask_cam_preferences(UF_CAM_preferences_p_t prefs ) overview 概述 This function provides the current settings of the CAM pre…

【Linux】安全审计-audit

文章目录 一、audit简介二、开启auditd服务三、相关文件四、审计规则五、审计日志查询及分析附录1&#xff1a;auditctl -h附录2&#xff1a;systemcall 类型 参考文章&#xff1a; 1、安全-linux audit审计使用入门 2、audit详细使用配置 3、Linux-有哪些常见的System Call&a…

优步让一切人工智能化

优步(Uber)的商业模式建立在对数据的颠覆性使用上--通过将双方智能手机的位置数据关联起来&#xff0c;将出租车司机与乘客配对。这意味着&#xff0c;它可以比传统出租车公司更快地安排司机去接乘客&#xff0c;极大地冲击了传统出租车公司的业务。 优步自成立以来&#xff0…

【PG】PostgreSQL高可用 之repmgr常用命令

注册 / 取消注册(register) repmgr -f /etc/repmgr.conf primary register repmgr -f /etc/repmgr.conf standby registerrepmgr -f /etc/repmgr.conf primary unregister -F --node-id2 repmgr -f /etc/repmgr.conf standby unregister 克隆主库&#xff08;repmgr standby…

【Linux】-进程间通信-命名管道文件(没有关系的进程间进行通信),以及写一个日志模板

&#x1f496;作者&#xff1a;小树苗渴望变成参天大树&#x1f388; &#x1f389;作者宣言&#xff1a;认真写好每一篇博客&#x1f4a4; &#x1f38a;作者gitee:gitee✨ &#x1f49e;作者专栏&#xff1a;C语言,数据结构初阶,Linux,C 动态规划算法&#x1f384; 如 果 你 …

Vue3.0和2.0语法不同分析

前言&#xff1a;本篇文章只做VUE3.0和VUE2.0语法上的不同分析&#xff0c;不做性能和源码架构等的分析。 一、VUE3.0和VUE2.0代码结构不同 VUE3.0代码实例 <template><div><span>count is {{ count }}</span><span>plusOne is {{ plusOne }}…

Games104现代游戏引擎笔记 面向数据编程与任务系统

Basics of Parallel Programming 并行编程的基础 核达到了上限&#xff0c;无法越做越快&#xff0c;只能通过更多的核来解决问题 Process 进程 有独立的存储单元&#xff0c;系统去管理&#xff0c;需要通过特殊机制去交换信息 Thread 线程 在进程之内&#xff0c;共享了内存…

Linux本地docker一键部署traefik+内网穿透工具实现远程访问Web UI管理界面

文章目录 前言1. Docker 部署 Trfɪk2. 本地访问traefik测试3. Linux 安装cpolar4. 配置Traefik公网访问地址5. 公网远程访问Traefik6. 固定Traefik公网地址 前言 Trfɪk 是一个云原生的新型的 HTTP 反向代理、负载均衡软件&#xff0c;能轻易的部署微服务。它支持多种后端 (D…

LabVIEW和NIUSRP硬件加快了认知无线电开发

LabVIEW和NIUSRP硬件加快了认知无线电开发 对于电视频谱&#xff0c;主用户传输有两种类型&#xff1a;广播电视和节目制作和特殊事件(PMSE)设备。广播塔的位置已知&#xff0c;且覆盖电视传输塔&#xff08;复用器&#xff09;附近的某个特定地理区域&#xff08;称为排除区域…

postgresql视图的一些问题

在postgresql中&#xff0c;如果创建视图后&#xff0c;需要修改基础表&#xff0c;会出现一些问题。 基础表和视图 postgres# create table cstech(age int, name varchar(10), class int); CREATE TABLE# 这里我们采用select * 来选择所有字段postgres# create view cs_v a…