31、Flink 的 DataStream API 数据流算子详解

1.算子

可以通过算子将一个或多个 DataStream 转换成新的 DataStream,也可以将多个数据转换算子合并成一个复杂的数据流拓扑。

2.数据流转换
a)Map

DataStream → DataStream

输入一个元素,转换后输出一个元素,示例将输入流中元素数值加倍。

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}
});
b)FlatMap

DataStream → DataStream

输入一个元素,转换后产生零个、一个或多个元素,示例将句子拆分为单词。

dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});
c)Filter

DataStream → DataStream

为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素,示例过滤掉零值。

dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;}
});
d)KeyBy

DataStream → KeyedStream

在逻辑上将流划分为不相交的分区,具有相同 key 的记录都分配到同一个分区;在内部, keyBy() 是通过哈希分区实现的。

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

以下情况,一个类不能作为 key

  • 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
  • 它是任意类的数组。
e)Reduce

KeyedStream → DataStream

在相同 key 的数据流上“滚动”执行 reduce,将当前元素与最后一次 reduce 得到的值组合然后输出新值,示例局部求和。

keyedStream.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 + value2;}
});
f)Window

KeyedStream → WindowedStream

在已经分区的 KeyedStreams 上定义 Window,Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。

dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); 
g)WindowAll

DataStream → AllWindowedStream

在 DataStream 上定义 Window,Window 根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行分组。

注意:适用于非并行转换的大多数场景,所有记录都将收集到 windowAll 算子对应的一个任务中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
h)Window Apply

WindowedStream → DataStream

AllWindowedStream → DataStream

将 function 应用于整个窗口,示例手动对窗口内元素求和。

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});// 在 non-keyed 窗口流上应用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {public void apply (Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});

注意:如果使用 windowAll 转换,则需要改用 AllWindowFunction。

i)WindowReduce

WindowedStream → DataStream

对窗口应用 reduce function 并返回 reduce 后的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}
});
j)Union

DataStream → DataStream*

将两个或多个数据流联合来创建一个包含所有流中数据的新流。

注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

dataStream.union(otherStream1, otherStream2, ...);
k)Window Join

DataStream,DataStream → DataStream

根据指定的 key 和窗口 join 两个数据流。

dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});
l)Interval Join

KeyedStream,KeyedStream → DataStream

根据 key 相等并且在指定的时间范围内(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的条件将分别属于两个 keyed stream 的元素 e1 和 e2 Join 在一起。

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound.upperBoundExclusive(true) // optional.lowerBoundExclusive(true) // optional.process(new IntervalJoinFunction() {...});
m)Window CoGroup

DataStream,DataStream → DataStream

根据指定的 key 和窗口将两个数据流组合在一起。

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});
n)Connect

DataStream,DataStream → ConnectedStream

“连接” 两个数据流并保留各自的类型,connect 允许在两个流的处理逻辑之间共享状态

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
o)CoMap, CoFlatMap

ConnectedStream → DataStream

在连接的数据流上进行 map 和 flatMap。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {@Overridepublic Boolean map1(Integer value) {return true;}@Overridepublic Boolean map2(String value) {return false;}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});
p)Cache

DataStream → CachedDataStream

把算子的结果缓存起来,目前只支持批执行模式下运行的作业

算子的结果在算子第一次执行的时候会被缓存起来,之后的 作业中会复用该算子缓存的结果;如果算子的结果丢失了,它会被原来的算子重新计算并缓存。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.cachedDataStream.print(); // Consume cached result.
env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/12533.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode hot100-40-N

543. 二叉树的直径 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。两节点之间路径的 长度 由它们之间边数表示。这题尝试做了一下&#xff0c;有几个测试案例通不…

4.StableDiffusion各项参数解读

经过前期的努力&#xff0c;我想大家都已经生成了自己的第一张AI作品&#xff0c;但是充满了随机性&#xff0c;每次都是不一样的&#xff0c;并且有时候生成的图片效果还不是很让人满意&#xff0c;暂且先不要着急&#xff0c;先跟着本篇文章&#xff0c;学习一些每个选项和参…

Google Chrome GPU渲染抓包

非安全模式启动 "C:\Program Files\Google\Chrome\Application\chrome.exe" --ignore-certificate-errors --allow-running-insecure-content --disable-web-security 配置环境 set RENDERDOC_HOOK_EGL0 "C:/Program Files/Google/Chrome/Application/chrom…

Windows本地部署直播录屏利器Bililive-go并实现远程添加直播间录屏

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 1. Bililive-go与套件下载1.1 获取ffmpeg1.2 获取Bililive-go1.3 配置套件 2. 本地运行测试3. 录屏…

【OpenCV 基础知识 2】灰度化图片

文章目录 cvCreateImagecvCvtColor完整示例代码 cvCreateImage 使用OpenCV库在Delphi中创建一个灰度图像。让我解释一下&#xff1a; gray_image : cvCreateImage(cvGetSize(image), IPL_DEPTH_8U, 1);cvGetSize(image): 这个函数返回给定图像&#xff08;在这里是image&…

【MySQL】事务及其隔离性/隔离级别

目录 一、事务的概念 1、事务的四种特性 2、事务的作用 3、存储引擎对事务的支持 4、事务的提交方式 二、事务的启动、回滚与提交 1、准备工作&#xff1a;调整MySQL的默认隔离级别为最低/创建测试表 2、事务的启动、回滚与提交 3、启动事务后未commit&#xff0c;但是…

MVP产品设计与数据指标

MVP&#xff08;minimum viable product&#xff0c;最小化可行产品&#xff09;概念最早由埃里克莱斯提出&#xff0c;刊载于哈弗商业评论&#xff0c;并有出版物《精益创业》 和常规产品不同&#xff0c;MVP更侧重于对未知市场的勘测&#xff0c;用最小的代价接触客户的方法…

学习Nginx(一):基础

介绍 Nginx是一个高性能的HTTP和反向代理的web服务器&#xff0c;它的设计重点是高并发、高性能和低内存消耗。它常被用于提供静态内容、负载均衡和作为Web服务器。 Nginx具有以下功能和特点&#xff1a; 静态文件服务&#xff1a;作为一个Web服务器&#xff0c;Nginx可以处…

GPT-4o可以用了

方法&#xff1a;挂日本/新加坡的梯子就可以了&#xff0c;打开就会弹出以下的弹窗。不过不知道可以用多久呢&#xff1f; 2024/5/15

木里风景文化|基于SSM+vue的木里风景文化管理平台的设计与实现(源码+数据库+文档)

木里风景文化管理平台 目录 基于SSM&#xff0b;vue的木里风景文化管理平台的设计与实现 一、前言 二、系统设计 三、系统功能设计 1 系统功能模块 2 管理员功能模块 3 用户功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源…

精通Linux中的编辑器(非常详细!!!)

今天我们来说一下编辑器…… Linux中的编辑器 vi&#xff1a;是一个文本编辑器&#xff0c;用于撰写文档&#xff0c;或者开发程序。 vim&#xff1a;是vi的增强版功能一致&#xff0c;可视化效果更好一些。去鼠标化编辑更加方便可定制化 注意&#xff1a;vim编辑器是一个模式…

信息系统项目管理师——十大管理过程输入、工具和技术、输出(论文篇)一

一、项目整合管理 制定项目章程 在项目管理中&#xff0c;制定项目章程是一个关键的初始过程&#xff0c;它正式授权项目的开始并为项目设定高层次的方向。项目章程的编制涉及特定的输入、采用的工具和技术&#xff0c;以及产生的输出。以下是这些方面的详细说明&#xff1a;…

如何避免父组件重新渲染,子组件也跟着渲染

当父组件重新渲染时&#xff0c;通常情况下&#xff0c;子组件也会跟着重新渲染。但是&#xff0c;有一些方法可以避免这种情况发生&#xff0c;让子组件在父组件重新渲染时不进行渲染。以下是五种常见的方法&#xff1a; 使用 React.memo 或 PureComponent&#xff1a; 使用 …

前端 JS 经典:为什么需要模块化

首先&#xff0c;自我评定一下&#xff0c;一个 js 文件&#xff0c;各位兄弟&#xff0c;最多能掌控多少行&#xff0c;什么意思呢&#xff0c;就是说&#xff0c;一个 js 文件在多少行之内&#xff0c;你是可以清楚的知道这个 JS 实现了哪些业务逻辑&#xff0c;并对这些业务…

专项资金!2024年自贡市高新技术企业奖励政策及申报各类补贴政策汇总

第一章 总 则 第一条 为积极应对经济发展新常态&#xff0c;加快培育工业发展新动力&#xff0c;持续推动产业结构优化升级&#xff0c;实现工业经济平稳较快增长&#xff0c;结合我县实际&#xff0c;制定本扶持激励办法。 第二条 人民政府将继续建立工业企业发展引导专项…

全知人工智能的黎明:OpenAI 革命性的 GPT-4o 揭晓

全知人工智能的黎明&#xff1a;OpenAI 革命性的 GPT-4o 揭晓 在一项突破性的公告中&#xff0c;OpenAI 推出了其最新的旗舰人工智能模型 GPT-4o&#xff0c;该模型有望彻底改变我们与人工智能交互的方式。这种无所不知的人工智能模型拥有前所未有的能力&#xff0c;从实时翻译…

Redis-持久化操作-AOF

持久化操作-AOF AOF是什么&#xff1f; 以日志的形式来记录每个写操作&#xff0c;将Redis执行过的所有写指令记录下来&#xff08;读操作不记录&#xff09;&#xff0c;只允许加文 件但不可以改写文件&#xff0c;redis启动之初会读取该文件重新构建数据&#xff0c;换言之…

python:使用pip安装wxPython

1、找到python安装路径的Scripts文件夹&#xff0c;复制文件夹地址 2、进入cmd&#xff0c;粘贴地址&#xff0c;在后面加上\pip install wxPython 完整示例&#xff1a; C:\PythonXX\Scripts\pip install wxPython 回车运行&#xff0c;等待下载安装完成。 如果报错&…

电力系统潮流计算的计算机算法(四)——PQ快速解耦潮流算法

本篇为本科课程《电力系统稳态分析》的笔记。 本篇为这一章的第四篇笔记。上一篇传送门。 潮流计算的快速解耦法 牛顿-拉夫逊法潮流计算&#xff0c;主要的工作量在于形成雅可比矩阵和求解修正方程。由于雅可比矩阵的阶数为nm-1&#xff0c;约为节点总数的两倍&#xff0c;非…

Axure10_win安装教程(安装、汉化、授权码,去弹窗)

1.下载Axure10 链接&#xff1a;https://pan.baidu.com/s/1fc8Bgyic8Ct__1IOv-afUg 提取码&#xff1a;9qew 2.安装Axure10 因为我的电脑是Windows操作系统&#xff0c;所以我下载的AxureRP-Setup-Beta v10.0.0.3816 (Win).exe 一直点下一步就行 3.Axure10中文 打开Axure…