spark将rdd转为string_八、Spark之详解Tranformation算子

RDD中的所有转换(Transformation)算子都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用Transformation类算子列表

a484cfc46d844e378e06d4f7817ddad3.png

常用Transformation类算子列表

常用Transformation类算子实例

  • map(func): 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成, map操作是一对一操作,每进去一个元素,就出来一个元素
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24# 对每个元素乘以10返回新的rdd2scala> val rdd2 = rdd1.map(_*10)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at :25scala> rdd2.collectres1: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)# 对每个元素拼接一个字符串,返回新的String类型的RDDscala> val rdd3 = rdd1.map(_+"@map.com")rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at map at :25scala> rdd3.collectres3: Array[String] = Array(1@map.com, 2@map.com, 3@map.com, 4@map.com, 5@map.com, 6@map.com, 7@map.com, 8@map.com, 9@map.com, 10@map.com)
  • filter(func): 过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成, RDD元素的类型不会改变。
scala> val rdd1 = sc.parallelize(Array("乔峰","段誉","虚竹","鸠摩智","达摩祖师"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24# filter中为true的会被保留,会false的会被过滤scala> val rdd2 = rdd1.filter(!_.contains("摩"))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at :25scala> rdd2.collectres4: Array[String] = Array(乔峰, 段誉, 虚竹)
  • flatMap(func): 压平。类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
scala> val rdd1 = sc.parallelize(Array("say you say me say it together","good good study day day up"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at :24# 进去一条,出来多条,是一对多的转换scala> val rdd2 = rdd1.flatMap(_.split(" "))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at :25scala> rdd2.collectres5: Array[String] = Array(say, you, say, me, say, it, together, good, good, study, day, day, up)

集合类Transformation算子实例

  • union(otherRDD): 对源RDD和参数RDD求并集后返回一个新的RDD, 需要两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求两个RDD的并集scala> val rdd3 = rdd1.union(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[11] at union at :27scala> rdd3.collectres6: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 6)
  • subtract(otherRDD): 对源RDD和参数RDD求差集后返回一个新的RDD, 需要两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24scala> val rdd3 = rdd1.subtract(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at subtract at :27# rdd1与rdd2的差集是"1"scala> rdd3.collectres7: Array[Int] = Array(1)# rdd2与rdd1的差集是"6"scala> val rdd4 = rdd2.subtract(rdd1)rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at subtract at :27scala> rdd4.collect()res8: Array[Int] = Array(6)
  • intersection(otherRDD): 对源RDD和参数RDD求交集后返回一个新的RDD, 需要有两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求两个RDD的交集返回新的RDDscala> val rdd3 = rdd1.intersection(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at intersection at :27scala> rdd3.collect()res9: Array[Int] = Array(4, 3, 5, 2)
  • distinct(): 对源RDD进行去重后返回一个新的RDD, 只需要一个RDD
scala> val rdd1 = sc.parallelize(Array(1,1,1,2,2,2,3,3,3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24# 在一个RDD中实现去重功能scala> val rdd2 = rdd1.distinct()rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at distinct at :25scala> rdd2.collect()res10: Array[Int] = Array(1, 3, 2)

其底层的实现原理(如下面Java代码所示)是:mapToPair+reduceByKey+mapToPair =>

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.*;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * distinct: 对RDD中的元素去重 */public class distinctOperator {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setMaster("local")                .setAppName("distinct");        JavaSparkContext sc = new JavaSparkContext(conf);        sc.setLogLevel("WARN");        JavaRDD rdd1 = sc.parallelize(Arrays.asList(                "a", "a", "a", "a",                "b", "b", "b", "b"        ));        /**         * 传统方式实现RDD元素去重需要三步         *  第一步:把RDD转换成K,V格式的RDD, K为元素,V为1         *  每二步:对K,V格式的RDD的Key进行分组计算         *  第三步:对得到的RDD只取第一位键         */        // [(a,1),(a,1),(a,1),(a,1),(b,1),b,1),b,1),b,1)]        JavaPairRDD mapToPairRDD = rdd1.mapToPair(new PairFunction() {            @Override            public Tuple2 call(String s) throws Exception {                return new Tuple2(s, 1);            }        });        //对每个key进行聚合        //[(a,4),(b,4)]        JavaPairRDD reduceRDD = mapToPairRDD.reduceByKey(new Function2() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });        //只取键,不要值        JavaRDD mapRDD = reduceRDD.map(new Function, String>() {            @Override            public String call(Tuple2 tuple) throws Exception {                return tuple._1;            }        });        mapRDD.foreach(new VoidFunction() {            @Override            public void call(String s) throws Exception {                System.out.println(s);            }        });        System.out.println("-----------------------------------");        //使用Spark提供的算子distinct实现RDD元素去重        JavaRDD distinctRDD = rdd1.distinct();        distinctRDD.foreach(new VoidFunction() {            @Override            public void call(String s) throws Exception {                System.out.println(s);            }        });        sc.stop();    }}

分组类的转换算子

groupByKey([numTasks]): 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD。偏底层
scala> val rdd1 = sc.parallelize(List(("张军",1000),("李军",2500),("王军",3000),("张军",1500)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :24scala> val rdd2 = rdd1.groupByKey()rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[31] at groupByKey at :25scala> rdd2.collect()res11: Array[(String, Iterable[Int])] = Array((王军,CompactBuffer(3000)), (张军,CompactBuffer(1000, 1500)), (李军,CompactBuffer(2500)))
  • reduceByKey(func, [numTasks]): 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置。调用groupByKey。
scala> val rdd1 = sc.parallelize(Array(("red",10),("red",20),("red",30),("red",40),("red",50),("yellow",100),("yellow",100)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at :24# 按照key进行聚合操作scala> val rdd2 = rdd1.reduceByKey(_+_)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :25scala> rdd2.collect()res12: Array[(String, Int)] = Array((yellow,200), (red,150))
  • cogroup(otherRDD, [numTasks]): 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
scala> val rdd1 = sc.parallelize(Array(("张飞","丈八蛇矛"),("关羽","青龙偃月刀"),("吕布","方天画戟")))rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(("张飞",30),("关羽",35),("吕布",45),("刘备",42)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at parallelize at :24scala> val rdd3=rdd1.cogroup(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[Int]))] = MapPartitionsRDD[37] at cogroup at :27scala> rdd3.collect()res13: Array[(String, (Iterable[String], Iterable[Int]))] = Array((吕布,(CompactBuffer(方天画戟),CompactBuffer(45))), (关羽,(CompactBuffer(青龙偃月刀),CompactBuffer(35))), (张飞,(CompactBuffer(丈八蛇矛),CompactBuffer(30))), (刘备,(CompactBuffer(),CompactBuffer(42))))

排序类Transformation算子

sortBy(func,[ascending], [numTasks]): 与sortByKey类似,但是更灵活
scala> val rdd1=sc.parallelize(Array(10,9,8,7,4,6,5,3,1,2))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at :24scala> val rdd2=rdd1.sortBy(x=>x,true)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at :25scala> rdd2.collect()res14: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)# K, V格式的RDDscala> val rdd1=sc.parallelize(Array(("张飞",30),("刘备",42),("关羽",32),("曹操",46),("公孙瓒",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24scala> val rdd2=rdd1.sortBy(tuple=>tuple._2, false)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[45] at sortBy at :25scala> rdd2.collect()res15: Array[(String, Int)] = Array((公孙瓒,62), (曹操,46), (刘备,42), (关羽,32), (张飞,30))
sortByKey([ascending], [numTasks]): 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
scala> val rdd1=sc.parallelize(Array(("张飞",30),("刘备",42),("关羽",32),("曹操",46),("公孙瓒",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24# 同样对rdd1调用,需要进行转换scala> val rdd2=rdd1.map(tuple=>tuple.swap).sortByKey(false).map(tuple=>tuple.swap)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at :25scala> rdd2.collect()res16: Array[(String, Int)] = Array((公孙瓒,62), (曹操,46), (刘备,42), (关羽,32), (张飞,30))

高级类的转换算子

  • mapPartitionWithIndex(func): 类似于mapPartitions, 但是func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]。其功能是对RDD中的每个分区进行操作,带有索引下标,可以取到分区号。
    • func: 接收两个参数,第一个参数代表分区号,第二参数代表分区中的元素。
scala> val rdd1 = sc.parallelize(List("son1","son2","son3","son4","son5","son6","son7","son8","son9","son10","son11","son12"),4)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :24scala> val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {iter.toList.map(x=> "【分区号为:"+index+", 值为:" + x+ "】").iterator})rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at :25scala> rdd2.collect()res0: Array[String] = Array(【分区号为:0, 值为:son1】, 【分区号为:0, 值为:son2】, 【分区号为:0, 值为:son3】, 【分区号为:1, 值为:son4】, 【分区号为:1, 值为:son5】, 【分区号为:1, 值为:son6】, 【分区号为:2, 值为:son7】, 【分区号为:2, 值为:son8】, 【分区号为:2, 值为:son9】, 【分区号为:3, 值为:son10】, 【分区号为:3, 值为:son11】, 【分区号为:3, 值为:son12】)
  • aggregateByKey: 后面有单独文章讲解此算子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2021年呼和浩特高考段考成绩查询,2019届呼和浩特市高三段考成绩排名分析

原标题:2019届呼和浩特市高三段考成绩排名分析不忘初心 天道酬勤╳✎校对:刘姝坤✎文稿:王涛老师✎声明:如有转载请联系并注明出处好乐(巨人)教育2019高三普文理集训段考班火热招生中全呼市唯一一家吃住学一体封闭式管理的学校唯一…

html设置table表格的弧度,用CSS3和table标签实现一个圆形轨迹的动画的示例代码

html:其实就是根据table标签把几个实心圆div进行等边六角形的排布,并放入一个div容器中,然后利用CSS3的循环旋转的动画效果对最外层的div容器进行自转实现,当然不要忘了把div容器的外边框设置圆形弧度的。BMI色盲色弱心率开始测试…

ios时间相差多少天_上海自驾拉萨,走川进青出,应如何规划线路?需要多少天时间?...

线路总览上海自驾西藏拉萨,川进青出,全程约8000公里,需用时18~25天,行程主要分为4段:1、进藏之前:上海—成都,2000公里,3~5天;2、川藏线进:成都—拉萨&#x…

儒林外史每回概括简短10字_早安心语正能量经典短句 一句话的简短励志语录

1、人生就两件事, 一件是拿事儿把时间填满,另一件是拿感觉把心填满 。早安!早安心语正能量经典短句 一句话的简短励志语录点击添加图片描述(最多60个字)2、凭着一股子信念往前冲,到哪儿都是优秀的人。生活它从来不会允诺我们一份轻…

半个小时用计算机怎么算,CPA机考计算器操作指南,掌握这些快捷键,考试“延长”半小时!...

原标题:CPA机考计算器操作指南,掌握这些快捷键,考试“延长”半小时!注册会计师考试就是一场在有限的时间内得分最多的比拼游戏!不仅要求考生学习各种专业知识,同时还需要掌握一定的策略。例如注会考试采用机…

c均值聚类matlab程序_机器学习笔记-9-聚类

1 K-means算法K-means是最普及的聚类算法,算法接受一个未标记的数据集,然后将数据聚类成不同的组。它是一个迭代算法,假设我们想要将数据聚类成 n 个组,其方法为:选择K个随机的点,称为聚类中心对于数据集中的每一个数据…

html和css哪个优先,CSS3 | 样式和优先级

css3一般介绍:CSS注释:/*CSS*/CSS长度单位:1.px(像素)2.em(倍数,一般用于文字)一、HTML嵌套CSS3样式:1.外部(推荐)例如HTML文件为index.html将样式放入另一文件中,index.css以上两个文件放入同一文件夹下2.…

锁定计算机 背景图片,win7系统电脑更换锁屏壁纸的方法

当win7系统电脑在一段时间不动的话就进入锁屏状态,然而很多用户觉得默认的锁屏壁纸不好看,就想要更换自己喜欢的锁屏壁纸,那么win7怎么更换锁屏壁纸呢?下面给大家讲解一下win7系统电脑更换锁屏壁纸的方法。1、同时按下窗口键winR组…

计算机二级循环队列知识点,考点!计算机二级考试公共基础知识冲刺复习笔记:栈、队列和循环队列...

小编所收集到的相关计算机二级考试公共基础知识冲刺复习笔记:栈、队列和循环队列的资料 大家要认真阅读哦!1、栈(Stack)又称堆栈。(1)栈是一种运算受限的线性表,其限制是仅允许在表的一端进行插入和删除运算。人们把此端称为栈顶,…

lua 字符串包含_Programming in Lualua学习第11期 Lua模块与包

微信公众号:GameToolDev关注可了解更多的游戏工具开发教程。问题或建议,请公众号留言;从Lua 5.1开始,我们可以使用require和module函数来获取和创建Lua中的模块。从使用者的角度来看,一个模块就是一个程序库,可以通过r…

.net 从txt中读取行数据_【VBA项目】从指定文件中读取数据并绘制图表

VBA 是一种很久远的编程语言,但并不过时。在满足以下两个条件时,借助 VBA 可以极大的提升生产率,降低出错率:你的电脑上不允许自主安装软件; 你需要执行的工作中大部分的步骤都是固定且重复的。项目背景近期接到一个工…

axure实现复选框全选_jq简单的全选、反选和全不选效果

jquery是很实用和方便的前端效果库,可以让我减少很多的操作和节省很多的时间。今天,我们来说一下jq的全选、全不选和反选效果,本篇讲的是最简单简洁的jq全选、全不选和反选的例子。如果还有什么其他的功能要求可自己根据所学到的基础来扩展一…

计算机设备管理器驱动,设备管理器安装驱动程序的详细教程

系统出现问题,很多人都会选择重装系统。但系统重装后,我们所做的第一件事,就是安装驱动。有的驱动程序有安装包,直接安装就行了。但是有的驱动是只有驱动程序文件,而没有执行程序,这时候就需要通过设备管理…

电路串联和并联图解_一个关于交流电路谐振现象的仿真实验

对于一个具有电阻、电感、电容的交流电路中,交流电源两端的电压一般不和它输出的电流同相位。如果调节电路的参数或者电源频率使它们同相位,这时电路就发生了谐振现象。按照发生谐振现象的电路不同,可以分为串联谐振和并联谐振。1、串联谐振在…

sync不生效 vue_Vue实战项目-记账器-重要知识点汇总

历时3周,记账器项目终于可以运行了,这次项目是基于Vue开发,用到了typeScript和Scss,下面基于项目做一个阶段性的总结,回顾一下项目中用到的知识点。一.组件一开始用的是JS对象的写法:构造选项:{ data(){ret…

fifo页面置换算法设计思路_千万级并发!如何设计一个多级缓存系统?

什么是一个多级缓存系统?它有什么用?我们又如何设计一个多级缓存系统?图片来自 Pexels所谓多级缓存系统,就是指在一个系统的不同的架构层级进行数据缓存,以提升访问效率。我们都知道,一个缓存系统,它面临着许多问题&#xff0c…

apple quicktime怎么在ppt中用_PPT情感专题大赏No. 007:一份这就是街舞第三季主题PPT(上集)...

Hello,大家好,这里是千师傅小作坊第35期,我是你们的老朋友千千。熟悉千师傅小作坊的人都知道,千师傅特别喜欢看综艺,尤其是音乐、表演、舞蹈类。作为一个十八线PPT设计师,如果我看到好看的节目视觉设计&…

idea内存溢出解决_各种OOM代码样例及解决方法

针对目前大家对OOM的类型不太熟悉,那么来总结一下各种OOM出现的情况以及解决方法。把各种OOM的情况列出来,然后逐一进行代码编写复现和提供解决方法。1. 堆溢出-java.lang.OutOfMemoryError: Java heap space。2. 栈溢出-java.lang.OutOfMemorryError。3…

win7安装硬盘后无法启动计算机,硬盘装Win7系统电脑后开机提示DISK BOOT FAILURE怎么办【图文】...

很多人都喜欢安装win7系统,而硬盘安装系统的方式也是深受用户们的喜欢,但是最近有用户反映说硬盘安装win7系统之后,在重启计算机的时候屏幕提示DISK BOOT FAILURE,INSERT SYSTEM DISK AND PRESS ENTER,导致无法正常进入系统&#…

导入数据中文乱码_基于Navicat和Kettle的数据迁移完全解读(多图)

需求描述对于数据分析人员来说,工作的基础是数据,没有数据分析就无从谈起,即巧妇难为无米之炊。#数据库# #数据迁移# #Oracle# 然而,数据分析往往在实验环境或者准生产环境中开展,而数据分布在生产环境,因此…