大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

Transformation转换算子(基础篇三)


目录

Transformation转换算子(基础篇三)

三、转换算子(Transformation)

1.基本转换算子

1.1 映射(Map)

1.2 过滤(filter)

1.3 扁平映射(flatmap)

1.4基本转换算子的例子

2.聚合算子(Aggregation)

2.1 按键分区(keyBy)

2.2 简单聚合

2.3 归约聚合(reduce)

3.用户自定义函数(UDF)

3.1 函数类(Function Classes)

3.2 富函数类(Rich Function Classes)

4.物理分区(Physical Partitioning)

4.1 随机分区(shuffle)

4.2 轮询分区(Round-Robin)

4.3 重缩放分区(rescale)

4.4 广播(broadcast)

4.5 全局分区(global)

4.6 自定义分区(Custom)


三、转换算子(Transformation)

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream,如图所示。一个Flink程序的核心,其实就是所有的转换操作,它们决定了处理的业务逻辑。

1.基本转换算子

1.1 映射(Map)

map算子接收一个函数作为参数,并把这个函数应用于DataStream的每个元素,最后将函数的返回结果作为结果DataStream中对应元素的值,即将DataStream的每个元素转换成新的元素。

1.2 过滤(filter)

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。

1.3 扁平映射(flatmap)

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理

1.4基本转换算子的例子

代码如下:

import org.apache.flink.streaming.api.scala._object Practice_of_Simple_Operators {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) //设置并行度为1//常见的简单算子 有:map、flatmap、filter//map//从集合中获取不同数据类型数据val dataStream1 = env.fromCollection(List(1,2,3))//对每一个数 都乘以2val resultStream1 = dataStream1.map(data => data * 2)resultStream1.print("resultStream1")//flatmapval dataStream2 = env.fromCollection(List("hello word","hello flink","hello spark"))val resultStream2 = dataStream2.flatMap(_.split(" "))resultStream2.print("resultStream2")//filterval resultStream3 = dataStream1.filter(_%2==0)resultStream3.print("resultStream3")env.execute("Stream Transform")//启动flink作业}
}

运行结果:

2.聚合算子(Aggregation)

  • 直观上看,基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。

  • 而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。

2.1 按键分区(keyBy)
  • 对于 Flink 而言,DataStream是没有直接进行聚合的API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区; 这个操作就是通过keyBy来完成的。

  • keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

  • 基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot 中进行处理了。

  • keyBy算子主要作用于元素类型是元组或数组的DataStream上。使用该算子可以将DataStream中的元素按照指定的key(字段)进行分组,具有相同key的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。在逻辑上将流划分为不相交的分区,在内部是通过哈希分区实现的。

//配置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//fromElements --> 给一个固定的元素集合 创建一个数据流(DataStream)
//数据流是以键值对的形式存在的
val source = env.fromElements((1, 2), (2, 1),(1, 6), (1, 9), (1, 7),  (2, 2), (2, 10), (3, 1))
//keyby算子
source.keyBy(temp => temp._1).print("result")// 执行 Flink 作业
env.execute("Flink FromElements Example")

运行结果:

2.2 简单聚合
  • 有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们 内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

    • sum():在输入流上,对指定的字段做叠加求和的操作。

    • min():在输入流上,对指定的字段求最小值。

    • max():在输入流上,对指定的字段求最大值。

    • minBy():与 min()类似,在输入流上针对指定字段求最小值。

      不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;

      而 minBy()则会返回包含字段最小值的整条数据。

    • maxBy():与 max()类似,在输入流上针对指定字段求最大值。

      不同的是,max()只计算指定字段的最大值,其他字段会保留最初第一个数据的值;

      而 maxBy()则会返回包含字段最大值的整条数据。

  • 简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数; 但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字 段的方式有两种:指定位置,和指定名称。 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字 段的名称,是以1、2、_3、…来命名的。 例如,下面就是对元组数据流进行聚合的测试:

  • 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字 段的名称,是以1、2、_3、…来命名的。

测试:

import org.apache.flink.streaming.api.scala._object TransTupleAggregation {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream = env.fromElements(("a", 1), ("a", 3), ("b", 3), ("b", 4))stream.print("原数据")stream.keyBy(_._1).sum(1).print() //对元组的索引 1 位置数据求和stream.keyBy(_._1).sum("_2").print() //对元组的第 2 个位置数据求和stream.keyBy(_._1).max(1).print() //对元组的索引 1 位置求最大值stream.keyBy(_._1).max("_2").print() //对元组的第 2 个位置数据求最大值stream.keyBy(_._1).min(1).print() //对元组的索引 1 位置求最小值stream.keyBy(_._1).min("_2").print() //对元组的第 2 个位置数据求最小值stream.keyBy(_._1).maxBy(1).print() //对元组的索引 1 位置求最大值stream.keyBy(_._1).maxBy("_2").print() //对元组的第 2 个位置数据求最大值stream.keyBy(_._1).minBy(1).print() //对元组的索引 1 位置求最小值stream.keyBy(_._1).minBy("_2").print() //对元组的第 2 个位置数据求最小值env.execute()}
}

而如果数据流的类型是样例类,那么就只能通过字段名称来指定,不能通过位置来指定了。

import org.apache.flink.streaming.api.scala._
object TransAggregationCaseClass {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream = env.fromElements(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L))// 使用 user 作为分组的字段,并计算最大的时间戳stream.keyBy(_.user).max("timestamp").print()env.execute()}
}

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。 所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值 的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子, 应该只用在含有有限个 key 的数据流上。

2.3 归约聚合(reduce)
  • 与简单聚合类似,reduce()操作也会将 KeyedStream 转换为 DataStream。它不会改变流的 元素数据类型,所以输出类型和输入类型是一样的。

  • 调用 KeyedStream 的 reduce()方法时,需要传入一个参数,实现 ReduceFunction 接口。接 口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处 理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再 将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据, 这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果” 作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

下面我们来看一个稍复杂的例子。

我们将数据流按照用户 id 进行分区,然后用一个 reduce()算子实现 sum()的功能,统计每 个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce()算子实现 maxBy()的功 能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

import org.apache.flink.streaming.api.scala._object TransReduce {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.addSource(new ClickSource).map(r => (r.user, 1L))//按照用户名进行分组.keyBy(_._1)//计算每个用户的访问频次.reduce((r1, r2) => (r1._1, r1._2 + r2._2))//将所有数据都分到同一个分区.keyBy(_ => true)//通过 reduce 实现 max 功能,计算访问频次最高的用户.reduce((r1, r2) => if (r1._2 > r2._2) r1 else r2).print()env.execute()}
}

reduce()同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以 我们需要将 reduce()算子作用在一个有限 key 的流上。

3.用户自定义函数(UDF)

3.1 函数类(Function Classes)
3.2 富函数类(Rich Function Classes)

4.物理分区(Physical Partitioning)

4.1 随机分区(shuffle)

4.2 轮询分区(Round-Robin)

4.3 重缩放分区(rescale)

4.4 广播(broadcast)
4.5 全局分区(global)
4.6 自定义分区(Custom)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/643312.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis应用(1)缓存(1.2)------Redis三种缓存问题

三者出现的根本原因是&#xff1a;Redis缓存命中率下降&#xff0c;请求直接打到DB上了。 一、 缓存穿透&#xff1a; 1、定义&#xff1a; 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库。…

C++PythonC# 三语言OpenCV从零开发(4):视频流读取

文章目录 相关链接视频流读取CCSharpPython 总结 相关链接 C&Python&Csharp in OpenCV 专栏 【2022B站最好的OpenCV课程推荐】OpenCV从入门到实战 全套课程&#xff08;附带课程课件资料课件笔记&#xff09; OpenCV 教程中文文档|OpenCV中文 OpenCV教程中文文档|W3Csc…

深入理解Linux wall命令:一键向所有用户发送消息(附实例详解和注意事项)

Linux wall命令介绍 wall 是一款命令行工具&#xff0c;主要用于在所有已登录用户的终端上显示消息。你可以直接输入消息或者通过文件传入。 Linux wall命令适用的Linux版本 wall命令在大多数Linux发行版&#xff08;如Debian、Ubuntu、Alpine、Arch Linux、Kali Linux、Red…

RabbitMQ 入门到精通

RabbitMQ入门到精通 一、了解RabbitMQ1.基础知识2.多种交换机模型详解 二、服务端搭建1.简单搭建2.信息持久化到容器外部 三、消息生产者和消费者1.消息生产者2.消息消费者3.RabbitTemplate 详解4.RabbitListener详解5.其他注解 四、如何保证消息可靠性1.发送方进行消息发送成功…

print会默认调用__str__方法吗

一、现象描述&#xff1a; 打印Mongodb自己生成的id&#xff1a; print(res[_id]) res[_id]输出&#xff1a; 659faa6670433c2c86986861 ObjectId(‘659faa6670433c2c86986861’) 二、为什么会不一样呢 在 MongoDB 中&#xff0c;_id 字段通常是一个 ObjectId 类型的实例。…

物联网中南向协议、北向协议是什么?南向协议、北向协议的区别

南向协议通常是用于管控其他厂商设备的数据接口&#xff0c;即向下对接的数据接口。 通过该协议&#xff0c;实现对底层设备上报信息的集中监控、统计&#xff0c;此外可让控制器利用南向协议的下行通道&#xff0c;对设备实现控制功能。 北向协议是借助控制模块向上层业务应用…

Go 虚拟环境管理工具 gvm 原理介绍与使用指南

本文谈下我对 Go 版本管理的一些想法。让后&#xff0c;我将介绍一个小工具&#xff0c;gvm。这个话题说起来也很简单&#xff0c;但如果想用的爽&#xff0c;还是要稍微梳理下。 背景介绍 Go 的版本管理&#xff0c;并非包的依赖管理&#xff0c;而且关于如何在不同的 Go 版…

探索未来:2024 年 5 大前沿生成式 AI 趋势

探索未来&#xff1a;2024 年 5 大前沿生成式 AI 趋势 1. 多模态 AI 模型的崛起2. 功能强大且强大的小型语言模型3. 自主代理的崛起4. 开源模型将与专有模型匹敌5. 云原生成为本地 GenAI 的关键总结 2023 年标志着技术发展的一个分水岭&#xff0c;生成式 AI 走入了主流。随着 …

牛客:X图形

描述 KiKi学习了循环&#xff0c;BoBo老师给他出了一系列打印图案的练习&#xff0c;该任务是打印用“*”组成的X形图案。 输入描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜线和…

浙政钉-H5小程序应用采集开发手册

浙政钉-H5&小程序应用采集开发手册 埋点代码分为:稳定性监控代码(Emas)和流量分析代码(A+)。稳定性监控代码(Emas)只需要在首页加入。流量分析代码(A+)每个页面都需要加入,但是可以写通用js,在其他页面引入。 适用范围 本文档适用于浙政钉业务web(H5)或小程序应…

GPT只是开始,Autonomous Agents即将到来

生成式AI虽然很早便已经引起了广泛关注&#xff0c;但直到ChatGPT的出现&#xff0c;许多公司的领导层才切身感受到了大语言模型&#xff08;LLM&#xff09;带来的深远影响。面临这种行业变革&#xff0c;诸多企业正争先恐后地加入到这场潮流中&#xff0c;但生成式AI的进步速…

【力扣 445】两数相加 II C++题解(链表+模拟+数学+头插法)

给你两个 非空 链表来代表两个非负整数。数字最高位位于链表开始位置。它们的每个节点只存储一位数字。将这两数相加会返回一个新的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数字都不会以零开头。 示例1&#xff1a; 输入&#xff1a;l1 [7,2,4,3], l2 [5,6,4]…

indexedDB的基本操作

indexedDB概述 IndexedDB 就是浏览器提供的本地数据库&#xff0c;它可以被网页脚本创建和操作。IndexedDB 允许储存大量数据&#xff0c;提供查找接口&#xff0c;还能建立索引。这些都是 LocalStorage 所不具备的。就数据库类型而言&#xff0c;IndexedDB 不属于关系型数据库…

算法训练营Day55(子序列--编辑距离)

392.判断子序列 392. 判断子序列 - 力扣&#xff08;LeetCode&#xff09; 这道题目算是 编辑距离问题 的入门题目&#xff08;毕竟这里只是涉及到减法&#xff09;&#xff0c;慢慢的&#xff0c;后面就要来解决真正的编辑距离问题了 和最长公共子序列相似 他那道题区别就是e…

期待已久!阿里云容器服务 ACK AI 助手正式上线

作者&#xff1a;行疾 大模型技术的蓬勃发展持续引领 AI 出圈潮流&#xff0c;各行各业都在尝试采用 AI 工具实现智能增效。 2023 年云栖大会上&#xff0c;阿里云容器服务团队正式发布 ACK AI 助手&#xff0c;带来大模型增强智能诊断&#xff0c;帮助企业和开发者降低 K8s …

Spark运行架构以及容错机制

Spark运行架构以及容错机制 1. Spark的角色区分1.1 Driver1.2 Excuter 2. Spark-Cluster模式的任务提交流程2.1 Spark On Yarn的任务提交流程2.1.1 yarn相关概念2.1.2 任务提交流程 2.2 Spark On K8S的任务提交流程2.2.1 k8s相关概念2.2.2 任务提交流程 3. Spark-Cluster模式的…

BACnet网关BA100实现Modbus转BACnet,专为Modbus协议设备与BA系统的高效对接设计

随着物联网技术的迅猛发展&#xff0c;人们深刻认识到在智能化生产和生活中&#xff0c;实时、可靠、安全的数据传输至关重要。在此背景下&#xff0c;高性能的物联网数据传输解决方案——协议转换网关应运而生&#xff0c;广泛应用于工业自动化和数字化工厂应用环境中。 钡铼…

搜维尔科技:【简报】元宇宙数字人赛道,《莉思菱娜》

个性有些古灵精怪时儿安静时而吵闹&#xff0c;虽然以人类寿命来算已经200多岁但在 吸血鬼中还只是个小毛头&#xff0c;从中学开始喜欢打扮偏爱黑白灰色系的服装喜欢时 尚圈&#xff0c;立志想成为美妆或时尚网红不过目前还是学生&#xff0c;脸上的浅色血迹是纹身 贴纸&#…

深度学习-循环神经网络-RNN实现股价预测-LSTM自动生成文本

序列模型(Sequence Model) 基于文本内容及其前后信息进行预测 基于目标不同时刻状态进行预测 基于数据历史信息进行预测 序列模型:输入或者输出中包含有序列数据的模型 突出数据的前后序列关系 两大特点: 输入(输出)元素之间是具有顺序关系。不同的顺序,得到的结果应…

安全基础~通用漏洞1

文章目录 知识补充Acess数据库注入MySQL数据库PostgreSQL-高权限读写注入MSSQL-sa高权限读写执行注入Oracle 注入Mongodb 注入sqlmap基础命令 知识补充 order by的意义&#xff1a; union 操作符用于合并两个或多个 select语句的结果集。 union 内部的每个 select 语句必须拥有…