Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148240.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

九、Linux用户管理

1.基本介绍 Linux系统是一个多用户多任务的操作系统&#xff0c;任何一个要使用系统资源的用户&#xff0c;都必须首先向系统管理员申请一个账号&#xff0c;让后以这个账号的身份进入系统 2.添加用户 基本语法 useradd 用户名 应用案例 案例1&#xff1a;添加一个用户 m…

Vue中的watch的使用

先看下Vue运行机制图 那么我们思考一件事&#xff0c;vue是通过watcher监听数据的变化然后给发布-订阅&#xff0c;这样实现了dom的渲染&#xff0c;那么我们思考一件事&#xff0c;我们往往需要知道一个数据的变化然后给页面相应的渲染&#xff0c;那么我们工作中在组件中的数…

可拖动、可靠边的 popupWindow 实现

0 背景 开发要实现一个可以拖动的圆角小窗&#xff0c;要求松手时&#xff0c;哪边近些靠哪边。并且还规定了拖动范围。样式如下&#xff1a; 1 实现 首先把 PopupWindow 的布局文件 pop.xml 实现 <?xml version"1.0" encoding"utf-8"?> <R…

抽象工厂模式-C++实现

抽象工厂模式是一种创建型设计模式&#xff0c;它提供了一种在不指定具体产品类的情况下创建一系列相关或依赖对象的方法。 抽象工厂模式分为四个角色&#xff1a;抽象工厂、具体工厂、抽象产品、具体产品。 抽象工厂和工厂方法其实很类似&#xff0c;但也有一定的区别&#…

7.22 SpringBoot项目实战【收藏 和 取消收藏】

文章目录 前言一、编写控制器二、编写服务层三、Postman测试最后前言 本系统还支持 收藏图书,就是对心仪的书加一下收藏,大家都懂,这是一个很常见的功能。 那么我们来看看怎么来做,先分析一下:【一个人】对【一本书】只需【收藏一次】,但可以【收藏N本】不同的书,收藏…

【Java并发编程六】多线程越界问题

ArrayList()越界错误 import java.util.ArrayList; public class myTest implements Runnable {static ArrayList<Integer> a new ArrayList<>(10);public static void main(String[] args) throws InterruptedException {Thread t1 new Thread(new myTest());T…

Mac M1 M1 pro安装 protobuf 2.5.0

因为项目中的protobuf是2.5.0版本&#xff0c;但是旧版本的protobuf 不支持M1&#xff0c;此时需要修改源码重新编译 操作步骤&#xff1a; 从git上面下载对应版本的protobuf&#xff0c;地址&#xff1a;Release Protocol Buffers v2.5.0 protocolbuffers/protobuf GitHub…

深度学习之基于YoloV5苹果新鲜程度检测识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 深度学习之基于 YOLOv5 苹果新鲜程度检测识别系统介绍YOLOv5 简介苹果新鲜程度检测系统系统架构应用场景 二、功能三、系统四. 总结 一项目简介 深度学习之…

三十一、W5100S/W5500+RP2040树莓派Pico<TCP_Server多路socket>

文章目录 1 前言2 简介2. 1 使用多路socket的优点2.2 多路socket数据交互原理2.3 多路socket应用场景 3 WIZnet以太网芯片4 多路socket设置示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意事项6 相关链接 1 前言 W5100S/W5500是一…

c#字符串

字符串比较 namespace demo1 {internal class progrm{static void Main(string[] args){string str "hello world";Console.WriteLine("大写字符{0}",str.ToUpper());Console.WriteLine("小写字符{0}",str.ToLower());Console.WriteLine("…

GitHub如何删除仓库

GitHub如何删除仓库 删除方法第一步第二步第三步 删除方法 第一步 在仓库的界面选择Settings 第二步 选择General,页面拉到最后。 第三步 删除仓库。

C++ 虚函数和多态性

虚函数和多态性 派生类中与基类重名的成员 如果派生类的成员与基类的成员重名&#xff0c;则派生类的成员将隐藏同名的基类成员。 /*正方形类&#xff0c;基类*/ class Square { protected:double L; public:Square() :L{ 4 } {};Square(double a) :L{ a } {}double GetArea…

C#密封类、偏类

C#密封类 在C#中&#xff0c;密封类&#xff08;Sealed Class&#xff09;是一种特殊的类&#xff0c;它阻止其他类继承它。你可以通过在类定义前面加上 sealed 关键字来创建一个密封类。 以下是一个密封类的例子&#xff1a; public sealed class MyClass {// Class member…

【Redis】zset常用命令集合间操作内部编码使用场景

文章目录 前置知识列表、集合、有序集合三者的异同点 普通命令ZADDZCARDZCOUNTZRANGEZREVRANGEZRANGEBYSCOREZPOPMAXBZPOPMAXZPOPMINBZPOPMINZRANKZREVRANKZSCOREZREMZREMRANGEBYRANKZREMRANGEBYSCOREZINCRBY 集合之间的操作ZINTERSTOREZUNIONSTORE 命令小结内部编码测试内部编…

PyTorch 成功安装验证

一、确认PyTorch版本 安装PyTorch之后&#xff0c;可以运行以下代码来确认PyTorch的版本&#xff1a; import torchprint(torch.__version__)如果没有报错&#xff0c;同时输出了正确版本号&#xff0c;就说明PyTorch已经成功安装了。 二、测试PyTorch基础功能 为了确保PyT…

C/C++---------------LeetCode第LCR. 024.反转链表

反转链表 题目及要求双指针 题目及要求 双指针 思路&#xff1a;遍历链表&#xff0c;并在访问各节点时修改 next 引用指向&#xff0c;首先&#xff0c;检查链表是否为空或者只有一个节点&#xff0c;如果是的话直接返回原始的头节点&#xff0c;然后使用三个指针来迭代整个…

DHCP配置命令

排除IP地址或IP范围&#xff08;保留&#xff09;&#xff1a; Switch(config)#ip dhcp excluded-address 192.168.10.1 192.168.10.10 为地址池取名&#xff1a; Switch(config)#ip dhcp pool name 分配地址范围&#xff1a; Switch(dhcp-config)#network 192.168.10.0 2…

windows 安装 Oracle Database 19c

目录 什么是 Oracle 数据库 下载 Oracle 数据库 解压文件 运行安装程序 测试连接 什么是 Oracle 数据库 Oracle数据库是由美国Oracle Corporation&#xff08;甲骨文公司&#xff09;开发和提供的一种关系型数据库管理系统&#xff0c;它是一种强大的关系型数据库管理系统…

lambda表达式c++

介绍 可调用对象 对于一个表达式&#xff0c;如果可以对其使用调用运算符&#xff08;&#xff09;&#xff0c;则称它为可调用对象。如函数就是一个可调用对象&#xff0c;当我们定义了一个函数f(int)时&#xff0c;我们可以通过f(5)来调用它。 可调用对象有&#xff1a; …

c语言中*p1++和p1++有啥区别

在C语言中&#xff0c;*p1和p1是两个不同的表达式&#xff0c;有以下区别&#xff1a; *p1&#xff1a;这是一个后缀递增运算符的组合。首先&#xff0c;*p1会取出指针p1所指向的值&#xff0c;并且对p1进行递增操作。简而言之&#xff0c;这个表达式会先取出p1指向的值&#x…