Spark(2)-基础tranform算子(一)

一、算子列表

编号名称
1map算子
2flatMap算子
3filter算子
4mapPartitions算子
5mapPartitionsWithIndex算子
6keys算子
7values算子
8mapValues算子
9flatMaplValues算子
10union算子
11reducedByKey算子
12combineByKey算子
13groupByKey算子
14foldByKey算子
15aggregateByKey算子
16ShuffledRDD算子
17distinct算子
18partitionBy算子

 二、代码示例

package sparkCoreimport org.apache.hadoop.mapreduce.task.reduce.Shuffle
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}/*** spark基本算子*/object basi_transform_02 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//1. map算子val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)val map_rdd: RDD[Int] = rdd1.map(_ * 2)println("*****1. map算子************")map_rdd.foreach(println(_))//2.flatMap算子println("*****2.flatMap算子************")val arr: Array[String] = Array("Hive python spark","Java Hello Word")val rdd2: RDD[String] = sc.makeRDD(arr, 2)val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))flatMap_rdd.foreach(println(_))//3.filter算子println("*****3.filter算子***********")val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)filter_rdd.foreach(println(_))//4. mapPartitions算子:将数据以分区的形式返回,进行map操作,一个分区对应一个迭代器// 应用场景: 比如在进行数据库操作时,在操作数据之前,需要通过JDBC方式连接数据库,如果使用map,那每条数据处理之前//         都需要连接一次数据库,效率显然很低.如果使用mapPartitions,则每个分区连接一次即可println("*****4. mapPartitions算子**********")val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {print("模拟数据库连接操作")iter.map(_ * 2)})mapParition_rdd.foreach(println(_))//5. mapPartitionsWithIndex算子,类似于mapPartitions,不过有两个参数//  第一个参数是分区索引,第二个是对应的迭代器// 注意:函数返回的是一个迭代器println("*****5. mapPartitionsWithIndex算子**********")val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {it.map(e => s"partition:$index,val:$e")})mapPartitionWithIndex_Rdd.foreach(println(_))//6.keys算子: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部keyprintln("*****6.keys算子**********")val lst: List[(String, Int)] = List(("spark", 1), ("spark", 3), ("hive", 2),("Java", 1), ("Scala", 3), ("Python", 2))val rdd6: RDD[(String, Int)] = sc.parallelize(lst)val keysRdd: RDD[String] = rdd6.keyskeysRdd.foreach(println(_))//7.values: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部valueprintln("*****7.values算子**********")val values_RDD: RDD[Int] = rdd6.valuesvalues_RDD.foreach(println(_))//8.mapValues: RDD中的数据为对偶元组类型, 将value进行计算,然后与原Key进行组合返回(即返回的仍然是元组)println("*****8.mapValues算子**********")val lst2: List[(String, Int)] = List(("Hello", 1), ("world", 2),("I", 2), ("love", 3), ("you", 2))val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)mapValues_rdd.foreach(println(_))//9.flatMaplValues:RDD是对偶元组,将value应用传入flatMap打平后,再与key组合println("*****9.flatMaplValues算子**********")// ("spark","1 2 3") => ("spark",1),("spark",2),("spark",3)val lst3: List[(String,String )] = List(("Hello", "1 2 3"), ("world", "4 5 6"),)val rdd9: RDD[(String, String)] = sc.parallelize(lst3)// 第一个_是指初始元组中的value;第二个_是指value拆分后的每一个值(转换成整数)val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))flatMapValues.foreach(println(_))//10.union:将两个类型一样的RDD合并到一起,返回一个新的RDD,新的RDD分区数量是两个RDD分区数量之和println("*****10.union算子**********")val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)union_rdd.foreach(println(_))//11.reducedByKey,在每个分区中进行局部分组聚合,然后将每个分区聚合的结果从上游拉到下游再进行全局分组聚合println("*****11.reducedByKey算子**********")val lst4: List[(String, Int)] = List(("spark", 1), ("spark", 1), ("hive", 3),("Python", 1), ("Java", 1), ("Scala", 3),("flink", 1), ("Mysql", 1), ("hive", 3))val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)reduced_rdd.foreach(println(_))//12.combineByKey:相比reducedByKey更底层的方法,后者分区内和分区之间相同Key对应的value值计算逻辑相同,但是前者可以分别定义不同的//   的计算逻辑.combineByKey 需要传入三个函数作为参数:// 其中第一个函数:key在上游分区第一次出现时,对应的value该如何处理// 第二个函数:分区内相同key对应value的处理逻辑// 第三个函数: 分区间相同Key对应value的处理逻辑println("*****12.combineByKey算子**********")val f1 = (v:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f1 function invoked in stage: $stage,partiton:$partition")v}//分区内相同key对应的value使用乘积val f2 = (a:Int,b:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f2 function invoked in stage: $stage,partiton:$partition")a * b}//分区间相同key对应的value使用加法val f3 = (m:Int,n:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f3 function invoked in stage: $stage,partiton:$partition")m + n}val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)combineByKey_rdd.foreach(println(_))//13.groupByKey:按key进行分组,返回的是(key,iter(value集合)println("*****13.groupByKey算子**********")val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()groupByKey_rdd.foreach(println(_))//14.foldByKey:每个分区应⽤⼀次初始值,先在每个进⾏局部聚合,然后再全局聚合(注意全局聚合的时候,初始值并不会被用到)// 局部聚合的逻辑与全局聚合的逻辑相同println("*****14.foldByKey算子**********")val lst5: List[(String, Int)] = List(("maple", 1), ("kelly", 1), ("Avery", 1),("maple", 1), ("kelly", 1), ("Avery", 1))val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)foldByKey_rdd.foreach(println(_))//15.aggregateByKey:foldByKey,并且可以指定初始值,每个分区应⽤⼀次初始值,传⼊两个函数,分别是局部聚合的计算逻辑// 和全局聚合的逻辑println("*****15.aggregateByKey算子**********")val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )aggregateByKey_rdd.foreach(print(_))//16 ShuffledRDD:reduceByKey、combineByKey、aggregateByKey、foldByKey底层都是使⽤的ShuffledRDD,// 并且 mapSideCombine = trueprintln("*****16.ShuffledRDD算子**********")val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)val partitioner = new HashPartitioner(rdd16.partitions.length)// 对rdd16按照指定分区器进行分区// String是rdd16中Key的数据类型,第一个Int是rdd16中value的数据类型,第二个Int是中间结果的数据类型(当然前提是传入聚合器-里面包含计算逻辑// [可以据此知晓中间结果的数据类型])val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)// 设置一个聚合器: 指定rdd16的计算逻辑(包含三个函数,分别是分区内一个key对应value的处理逻辑;分区内相同key对应value计算逻辑// 和分区间相同Key对应value计算逻辑)val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)// 给shuffledRDD设置聚合器shuffledRDD.setAggregator(aggregator)shuffledRDD.setMapSideCombine(true) // 设置Map端聚合println(shuffledRDD.collect().toList)// 17.distinct算子:对RDD元素进行去重println("*****17.distinct算子**********")val lst6: Array[String] = Array("spark", "spark", "hive","Python", "Python", "Java")val rdd17: RDD[String] = sc.parallelize(lst6)val distinct_rdd: RDD[String] = rdd17.distinct()println(distinct_rdd.collect().toList)// 18.partitionBy: 按照指定的分区器进行分区(底层使用的是ShuffleRDD)println("***** 18.partitionBy算子**********")val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)val partitioner2 = new HashPartitioner(rdd18.partitions.length)val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)println(partitioned_rdd.collect().toList)sc.stop()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/717700.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习工具之tokens计算器

1.什么是Token Token是GPT处理文本的基本单位。Token可以是一个字、一个词语或特定语言中的一个字符。它们负责将输入的文本数据转换为 GPT 可以处理的数据格式。每个 GPT 模型都有一个预设的最大 Tokens 数量,例如,GPT-3 每次调用允许处理的最大 Token…

韦东山嵌入式Liunx入门驱动开发五

文章目录 一、驱动程序基石1-1 休眠与唤醒1-2 POLL机制1-3 异步通知(1) 异步通知程序解析(2) 异步通知机制内核代码详解 1-4 阻塞与非阻塞1-5 定时器(1) 内核函数(2) 定时器时间单位 1-6 中断下半部 tasklet 本人学习完韦老师的视频,因此来复习巩固,写以…

《幻兽帕鲁》游戏对服务器性能的具体要求是什么?

《幻兽帕鲁》游戏对服务器性能的具体要求是什么? CPU:官方最低要求为i5-3570K,但在多人游玩时可能会有明显卡顿。此外,还有建议选择4核或更高性能的处理器,以确保游戏运行流畅。 内存:对于不同人数的联机&…

苹果ios群控软件开发常用源代码分享!

在移动软件开发领域,苹果设备由于其封闭性和安全性受到了广大开发者的青睐,然而,这也为开发者带来了一些挑战,特别是在进行群控软件开发时。 群控软件是指可以同时控制多台设备的软件,这在自动化测试、批量操作等场景…

数据要素:数字化转型中的新“金矿”及其发展潜力

作为一名在数字化转型项目中摸爬滚打的实践者,我们见证了数据从简单的信息处理工具逐渐演变为驱动经济社会发展的关键要素。近日,多部门联合发布的《“数据要素”三年行动计划(2024—2026年)》更是将数据要素的重要性提升到了新的…

C++ //练习 10.15 编写一个lambda,捕获它所在函数的int,并接受一个int参数。lambda应该返回捕获的int和int参数的和。

C Primer(第5版) 练习 10.15 练习 10.15 编写一个lambda,捕获它所在函数的int,并接受一个int参数。lambda应该返回捕获的int和int参数的和。 环境:Linux Ubuntu(云服务器) 工具:v…

十六、异常和File

异常和File 一、异常1.1异常的分类1.2 异常的作用1.3 异常的处理方式1.3.1 JVM默认的处理方式1.3.2 自己处理(捕获异常)1.3.3 自己处理(灵魂四问) 1.4 异常中的常见方法1.5 抛出异常综合练习(键盘录入数据)…

基于springboot+vue的社区养老服务平台

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

黑马点评-商户查询业务

缓存原理 本文的业务就是redis的经典应用,标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis。 缓存更新策略 根据…

Spring重点记录

文章目录 1.Spring的组成2.Spring优点3.IOC理论推导4.IOC本质5.IOC实现:xml或者注解或者自动装配(零配置)。6.hellospring6.1beans.xml的结构为:6.2.Spring容器6.3对象的创建和控制反转 7.IOC创建对象方式7.1以有参构造的方式创建…

【OneAPI】猫狗类别检测API

OneAPI新接口发布:猫狗类别检测 45种狗狗类别和15种猫猫类别检测。 API地址:POST https://oneapi.coderbox.cn/openapi/api/detect/dogcat 请求参数(body) 参数名类型必填含义说明imageUrlstring是图片地址网络图片地址&#…

Vue路由(黑马程序员)

路由介绍 将资代码/vue-project(路由)/vue-project/src/views/tlias/DeptView.vue拷贝到我们当前EmpView.vue同级,其结构如下: 此时我们希望,实现点击侧边栏的部门管理,显示部门管理的信息,点击员工管理,显…

【周总结平淡但不平凡的周末】

上周总结 根据系统生产环境的日志文件,写了个脚本统计最近使用我们系统的用户的手机型号以及系统,帮助聚焦主要测试的机型,以及系统类型 依然是根据时区不同对项目进行改造,还有一个开发好的接口需要下周联调 2024/3/3 晴…

QT Mingw32/64编译ffmpeg源码生成32/64bit库以及测试

文章目录 前言下载msys2ysamFFmpeg 搭建编译环境安装msys2安装QT Mingw编译器到msys环境中安装ysam测试 编译FFmpeg测试 前言 FFmpeg不像VLC有支持QT的库文件,它仅提供源码,需要使用者自行编译成对应的库,当使用QTFFmpeg实现播放视频以及视频…

连接 mongodb集群的集中方式

mongodb 连接到复制集 mongodb://node1,node2,node3.../database?[options]mongodb 连接到分片集 mongodb://mongos1,mongos2,mongos3.../database?[options]使用 mongosrv 通过域名解析得到所有的 mongos 或 节点的地址, 而不是把这些写在连接字符串中. mongodbsrv://se…

经典的算法面试题(1)

题目: 给定一个整数数组 nums,编写一个算法将所有的0移到数组的末尾,同时保持非零元素的相对顺序。 示例: 输入: [0,1,0,3,12] 输出: [1,3,12,0,0] 注意:必须在原数组上操作,不能拷贝额外的数组。尽量减少操作次数。 这…

数据处理——一维数组转列向量(分割时间序列为数据块时的问题)

记录在处理数据时被磕绊了一下的一个处理细节。 1.想要达到的要求 在某次滑动窗口取样时间序列数据时,我得到如下一个以一维数组为元素的列表: 对于如上输出列表中的每个一维数组,我希望将其转换为下图中的形式,简单说就是希望他…

【详识JAVA语言】面向对象程序三大特性之三:多态

多态 多态的概念 多态的概念:通俗来说,就是多种形态,具体点就是去完成某个行为,当不同的对象去完成时会产生出不同的状态。 多态实现条件 在java中要实现多态,必须要满足如下几个条件,缺一不可&#xf…

循环队列与循环双端队列

文章目录 前言循环队列循环双端队列 前言 1、学习循环队列和循环双端队列能加深我们对队列的理解,提高我们的编程能力。 2、本文循环队列使用的是数组,循环双端队列用的是双向链表 3、题目连接:设计循环队列 ,设计循环双端队列。 …

【机器学习】有监督学习算法之:支持向量机

支持向量机 1、引言2、决策树2.1 定义2.2 原理2.3 实现方式2.4 算法公式2.5 代码示例 3、总结 1、引言 小屌丝:鱼哥,泡澡啊。 小鱼:不去 小屌丝:… 此话当真? 小鱼:此话不假 小屌丝:到底去还是…