spark将rdd转为string_八、Spark之详解Tranformation算子

RDD中的所有转换(Transformation)算子都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用Transformation类算子列表

a484cfc46d844e378e06d4f7817ddad3.png

常用Transformation类算子列表

常用Transformation类算子实例

  • map(func): 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成, map操作是一对一操作,每进去一个元素,就出来一个元素
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24# 对每个元素乘以10返回新的rdd2scala> val rdd2 = rdd1.map(_*10)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at :25scala> rdd2.collectres1: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)# 对每个元素拼接一个字符串,返回新的String类型的RDDscala> val rdd3 = rdd1.map(_+"@map.com")rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at map at :25scala> rdd3.collectres3: Array[String] = Array(1@map.com, 2@map.com, 3@map.com, 4@map.com, 5@map.com, 6@map.com, 7@map.com, 8@map.com, 9@map.com, 10@map.com)
  • filter(func): 过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成, RDD元素的类型不会改变。
scala> val rdd1 = sc.parallelize(Array("乔峰","段誉","虚竹","鸠摩智","达摩祖师"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24# filter中为true的会被保留,会false的会被过滤scala> val rdd2 = rdd1.filter(!_.contains("摩"))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at :25scala> rdd2.collectres4: Array[String] = Array(乔峰, 段誉, 虚竹)
  • flatMap(func): 压平。类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
scala> val rdd1 = sc.parallelize(Array("say you say me say it together","good good study day day up"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at :24# 进去一条,出来多条,是一对多的转换scala> val rdd2 = rdd1.flatMap(_.split(" "))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at :25scala> rdd2.collectres5: Array[String] = Array(say, you, say, me, say, it, together, good, good, study, day, day, up)

集合类Transformation算子实例

  • union(otherRDD): 对源RDD和参数RDD求并集后返回一个新的RDD, 需要两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求两个RDD的并集scala> val rdd3 = rdd1.union(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[11] at union at :27scala> rdd3.collectres6: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 6)
  • subtract(otherRDD): 对源RDD和参数RDD求差集后返回一个新的RDD, 需要两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24scala> val rdd3 = rdd1.subtract(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at subtract at :27# rdd1与rdd2的差集是"1"scala> rdd3.collectres7: Array[Int] = Array(1)# rdd2与rdd1的差集是"6"scala> val rdd4 = rdd2.subtract(rdd1)rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at subtract at :27scala> rdd4.collect()res8: Array[Int] = Array(6)
  • intersection(otherRDD): 对源RDD和参数RDD求交集后返回一个新的RDD, 需要有两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求两个RDD的交集返回新的RDDscala> val rdd3 = rdd1.intersection(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at intersection at :27scala> rdd3.collect()res9: Array[Int] = Array(4, 3, 5, 2)
  • distinct(): 对源RDD进行去重后返回一个新的RDD, 只需要一个RDD
scala> val rdd1 = sc.parallelize(Array(1,1,1,2,2,2,3,3,3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24# 在一个RDD中实现去重功能scala> val rdd2 = rdd1.distinct()rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at distinct at :25scala> rdd2.collect()res10: Array[Int] = Array(1, 3, 2)

其底层的实现原理(如下面Java代码所示)是:mapToPair+reduceByKey+mapToPair =>

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.*;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * distinct: 对RDD中的元素去重 */public class distinctOperator {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setMaster("local")                .setAppName("distinct");        JavaSparkContext sc = new JavaSparkContext(conf);        sc.setLogLevel("WARN");        JavaRDD rdd1 = sc.parallelize(Arrays.asList(                "a", "a", "a", "a",                "b", "b", "b", "b"        ));        /**         * 传统方式实现RDD元素去重需要三步         *  第一步:把RDD转换成K,V格式的RDD, K为元素,V为1         *  每二步:对K,V格式的RDD的Key进行分组计算         *  第三步:对得到的RDD只取第一位键         */        // [(a,1),(a,1),(a,1),(a,1),(b,1),b,1),b,1),b,1)]        JavaPairRDD mapToPairRDD = rdd1.mapToPair(new PairFunction() {            @Override            public Tuple2 call(String s) throws Exception {                return new Tuple2(s, 1);            }        });        //对每个key进行聚合        //[(a,4),(b,4)]        JavaPairRDD reduceRDD = mapToPairRDD.reduceByKey(new Function2() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });        //只取键,不要值        JavaRDD mapRDD = reduceRDD.map(new Function, String>() {            @Override            public String call(Tuple2 tuple) throws Exception {                return tuple._1;            }        });        mapRDD.foreach(new VoidFunction() {            @Override            public void call(String s) throws Exception {                System.out.println(s);            }        });        System.out.println("-----------------------------------");        //使用Spark提供的算子distinct实现RDD元素去重        JavaRDD distinctRDD = rdd1.distinct();        distinctRDD.foreach(new VoidFunction() {            @Override            public void call(String s) throws Exception {                System.out.println(s);            }        });        sc.stop();    }}

分组类的转换算子

groupByKey([numTasks]): 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD。偏底层
scala> val rdd1 = sc.parallelize(List(("张军",1000),("李军",2500),("王军",3000),("张军",1500)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :24scala> val rdd2 = rdd1.groupByKey()rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[31] at groupByKey at :25scala> rdd2.collect()res11: Array[(String, Iterable[Int])] = Array((王军,CompactBuffer(3000)), (张军,CompactBuffer(1000, 1500)), (李军,CompactBuffer(2500)))
  • reduceByKey(func, [numTasks]): 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置。调用groupByKey。
scala> val rdd1 = sc.parallelize(Array(("red",10),("red",20),("red",30),("red",40),("red",50),("yellow",100),("yellow",100)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at :24# 按照key进行聚合操作scala> val rdd2 = rdd1.reduceByKey(_+_)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :25scala> rdd2.collect()res12: Array[(String, Int)] = Array((yellow,200), (red,150))
  • cogroup(otherRDD, [numTasks]): 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
scala> val rdd1 = sc.parallelize(Array(("张飞","丈八蛇矛"),("关羽","青龙偃月刀"),("吕布","方天画戟")))rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(("张飞",30),("关羽",35),("吕布",45),("刘备",42)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at parallelize at :24scala> val rdd3=rdd1.cogroup(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[Int]))] = MapPartitionsRDD[37] at cogroup at :27scala> rdd3.collect()res13: Array[(String, (Iterable[String], Iterable[Int]))] = Array((吕布,(CompactBuffer(方天画戟),CompactBuffer(45))), (关羽,(CompactBuffer(青龙偃月刀),CompactBuffer(35))), (张飞,(CompactBuffer(丈八蛇矛),CompactBuffer(30))), (刘备,(CompactBuffer(),CompactBuffer(42))))

排序类Transformation算子

sortBy(func,[ascending], [numTasks]): 与sortByKey类似,但是更灵活
scala> val rdd1=sc.parallelize(Array(10,9,8,7,4,6,5,3,1,2))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at :24scala> val rdd2=rdd1.sortBy(x=>x,true)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at :25scala> rdd2.collect()res14: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)# K, V格式的RDDscala> val rdd1=sc.parallelize(Array(("张飞",30),("刘备",42),("关羽",32),("曹操",46),("公孙瓒",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24scala> val rdd2=rdd1.sortBy(tuple=>tuple._2, false)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[45] at sortBy at :25scala> rdd2.collect()res15: Array[(String, Int)] = Array((公孙瓒,62), (曹操,46), (刘备,42), (关羽,32), (张飞,30))
sortByKey([ascending], [numTasks]): 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
scala> val rdd1=sc.parallelize(Array(("张飞",30),("刘备",42),("关羽",32),("曹操",46),("公孙瓒",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24# 同样对rdd1调用,需要进行转换scala> val rdd2=rdd1.map(tuple=>tuple.swap).sortByKey(false).map(tuple=>tuple.swap)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at :25scala> rdd2.collect()res16: Array[(String, Int)] = Array((公孙瓒,62), (曹操,46), (刘备,42), (关羽,32), (张飞,30))

高级类的转换算子

  • mapPartitionWithIndex(func): 类似于mapPartitions, 但是func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]。其功能是对RDD中的每个分区进行操作,带有索引下标,可以取到分区号。
    • func: 接收两个参数,第一个参数代表分区号,第二参数代表分区中的元素。
scala> val rdd1 = sc.parallelize(List("son1","son2","son3","son4","son5","son6","son7","son8","son9","son10","son11","son12"),4)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :24scala> val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {iter.toList.map(x=> "【分区号为:"+index+", 值为:" + x+ "】").iterator})rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at :25scala> rdd2.collect()res0: Array[String] = Array(【分区号为:0, 值为:son1】, 【分区号为:0, 值为:son2】, 【分区号为:0, 值为:son3】, 【分区号为:1, 值为:son4】, 【分区号为:1, 值为:son5】, 【分区号为:1, 值为:son6】, 【分区号为:2, 值为:son7】, 【分区号为:2, 值为:son8】, 【分区号为:2, 值为:son9】, 【分区号为:3, 值为:son10】, 【分区号为:3, 值为:son11】, 【分区号为:3, 值为:son12】)
  • aggregateByKey: 后面有单独文章讲解此算子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html的过渡属性,CSS3属性transition(过渡)多属性详解

transform呈现的是一种变形结果,而transition呈现的是一种过渡,通俗点说就是一种动画转换过程,如渐显、渐弱、动画快慢等。transition和transform是两种不同的动画模型。transition属性是一个简写属性,用于设置四个过渡属性transi…

2021年呼和浩特高考段考成绩查询,2019届呼和浩特市高三段考成绩排名分析

原标题:2019届呼和浩特市高三段考成绩排名分析不忘初心 天道酬勤╳✎校对:刘姝坤✎文稿:王涛老师✎声明:如有转载请联系并注明出处好乐(巨人)教育2019高三普文理集训段考班火热招生中全呼市唯一一家吃住学一体封闭式管理的学校唯一…

dj打碟怎么学_学DJ要不要去培训学校?

酒吧学DJ打碟他有很多种的说法,有些酒吧他是自己招学生,当这样的酒吧在现今是挺少的,也有,但要求很高。还有一种就是说你自己在酒吧里上班的人自己招私人徒弟什么的,那也是就学DJ打碟,那一搬酒吧都是怎么学…

html设置table表格的弧度,用CSS3和table标签实现一个圆形轨迹的动画的示例代码

html:其实就是根据table标签把几个实心圆div进行等边六角形的排布,并放入一个div容器中,然后利用CSS3的循环旋转的动画效果对最外层的div容器进行自转实现,当然不要忘了把div容器的外边框设置圆形弧度的。BMI色盲色弱心率开始测试…

ios时间相差多少天_上海自驾拉萨,走川进青出,应如何规划线路?需要多少天时间?...

线路总览上海自驾西藏拉萨,川进青出,全程约8000公里,需用时18~25天,行程主要分为4段:1、进藏之前:上海—成都,2000公里,3~5天;2、川藏线进:成都—拉萨&#x…

儒林外史每回概括简短10字_早安心语正能量经典短句 一句话的简短励志语录

1、人生就两件事, 一件是拿事儿把时间填满,另一件是拿感觉把心填满 。早安!早安心语正能量经典短句 一句话的简短励志语录点击添加图片描述(最多60个字)2、凭着一股子信念往前冲,到哪儿都是优秀的人。生活它从来不会允诺我们一份轻…

半个小时用计算机怎么算,CPA机考计算器操作指南,掌握这些快捷键,考试“延长”半小时!...

原标题:CPA机考计算器操作指南,掌握这些快捷键,考试“延长”半小时!注册会计师考试就是一场在有限的时间内得分最多的比拼游戏!不仅要求考生学习各种专业知识,同时还需要掌握一定的策略。例如注会考试采用机…

c均值聚类matlab程序_机器学习笔记-9-聚类

1 K-means算法K-means是最普及的聚类算法,算法接受一个未标记的数据集,然后将数据聚类成不同的组。它是一个迭代算法,假设我们想要将数据聚类成 n 个组,其方法为:选择K个随机的点,称为聚类中心对于数据集中的每一个数据…

php井字游戏代码_PHP初级笔试题:Tic-Tac-Toe(n阶井字棋)判断胜负

//Tic-Tac-Toe$n 5;//五阶棋盘$res array();function check($arr){$n $GLOBALS[n];$res $GLOBALS[res];//已经下过这一步,返回false;没有,赋值if (isset($res[$arr[1]][$arr[2]])) {return false;} else {$res[$arr[1]][$arr[2]] $arr[0…

js与html编码不同,js与html中unicode编码的使用

【转】javascript和html中unicode编码和字符转义的详解不是十分理解unicode和html转义的情况下,可能会误用,所以下面会对它们再做比较容易理解的解释: 1.html中的转义:在html中如果遇到转义字符(如“ ”),不管你的页面字符编码是utf-8 ...javascript和html中unicode编码和字符转…

g标签 怎么设置svg_SVG g元素

SVG 元素SVG 元素用于将SVG形状分组在一起。分组后,您可以像变形单个形状一样变换整个形状。与 不能单独成为转换目标的嵌套 元素相比,这是一个优势。您还可以设置分组元素的样式,并像对待单个元素一样重复使用它们。元素g是用来组合对象的容…

html和css哪个优先,CSS3 | 样式和优先级

css3一般介绍:CSS注释:/*CSS*/CSS长度单位:1.px(像素)2.em(倍数,一般用于文字)一、HTML嵌套CSS3样式:1.外部(推荐)例如HTML文件为index.html将样式放入另一文件中,index.css以上两个文件放入同一文件夹下2.…

java上传视频到七牛云_Java进阶学习:将文件上传到七牛云中

Java进阶学习:将文件上传到七牛云中通过本文,我们将讲述如何利用七牛云官方SDK,将我们的本地文件传输到其存储空间中去。JavaSDK:https://developer.qiniu.com/kodo/sdk/1239/java#server-upload官方SDK:https://devel…

计算机网络讨论4,计算机网络实验四

实验四IEEE 802.3协议分析和以太网一、实验目的1、分析802.3协议2、熟悉以太网帧的格式二、实验环境与因特网连接的计算机网络系统;主机操作系统为windows;Ethereal、IE 等软件。三、实验步骤(注:本次实验先完成前面的“1 俘获并分析以太网帧…

rust新版组队指令_Rust最新控制台命令2017

物品名称物品代码电池battery.small骨头碎片bone.fragments空的豆罐头can.beans.empty空的金枪鱼罐头can.tuna.empty摄像头cctv.camera木炭charcoal煤coal石油crude.oil炸药explosives动物脂肪fat.animal火药gunpowder高级金属矿hq.metal.ore金属碎片metal.fragments金属矿meta…

python实现mini-batch_Mini-Batch 、Momentum、Adam算法的实现

def random_mini_batches(X,Y,mini_batch_size64,seed0):"""从(X,Y)中创建一个随机的mini-batch列表参数:X - 输入数据,维度为(输入节点数量,样本的数量)Y - 对应的是X的标签,【1 | 0】(蓝|红)&#xf…

html5+shim脚本,HTML5探秘:用requestAnimationFrame优化Web动画

requestAnimationFrame是什么?在浏览器动画程序中,我们通常使用一个定时器来循环每隔几毫秒移动目标物体一次,来让它动起来。如今有一个好消息,浏览器开发商们决定:“嗨,为什么我们不在浏览器里提供这样一个…

计算机科学与技术的专业论述,关于计算机科学专业的论文题目 计算机科学专业论文题目怎样定...

【100道】关于关于计算机科学专业的论文题目汇总,作为大学生的毕业生应该明白了计算机科学专业论文题目怎样定,选一个好的题目后续的计算机科学专业论文写作起来会更轻松!一、比较好写的计算机科学专业论文题目:1、计算机科学与技术专业应用型人才培养改革调研分析—…

ming window 交叉编译_opencv3编译pc端及交叉编译arm端

环境: opensuse opencv3.4.1 交叉编译器arm-openwrt-linux 作者:帅得不敢出门https://github.com/opencv/opencv/tree/3.4.1选择右边的"clone or download"按钮进行下载,选择下载zip我下的是opencv-3.4.1.zip, 3.4.1的版本号…

锁定计算机 背景图片,win7系统电脑更换锁屏壁纸的方法

当win7系统电脑在一段时间不动的话就进入锁屏状态,然而很多用户觉得默认的锁屏壁纸不好看,就想要更换自己喜欢的锁屏壁纸,那么win7怎么更换锁屏壁纸呢?下面给大家讲解一下win7系统电脑更换锁屏壁纸的方法。1、同时按下窗口键winR组…