Spark---RDD的创建分类和基础操作算子详解

一、RDD的创建

原生api提供了两种创建方式,一种就是读取文件textFile,还有一种就是加载一个scala集合parallelize。当然,也可以通过transformation算子来创建的RDD。

    //创建RDD//加载数据,textFile(参数1,参数2),参数1可以读取本地文件也可以读取hdfs上的文件,参数2为最小分区数量,但spark有自己的判断,在允许的范围内参数2有效,否则失效val rdd = sc.textFile("F:\\test\\words.txt")//适合加载一堆小文件,wholeTextFile(参数1,参数2),参数1可以读取本地文件也可以读取hdfs上的文件,参数2为最小分区数量,最多只能开到文件数量val rdd1 = sc.wholeTextFile("F:\\test\\words.txt")//从scala集合创建val list = List(1,2,3,4)val arr = Array(1,2,3,4)//parallelize(参数1,参数2)参数1为集合数据,参数2是指定分区数,没有就是没有指定分区数,默认是CPU核数val rdd2 = sc.parallelize(list)//makeRDD底层调用了parallelizeval rdd3 = sc.makeRDD(arr)//从其他RDD转换而来val rdd4 = rdd1.flatMap(_.split(" "))

二、RDD的分类及基本操作

基本上分为两类:transformation和action

1、transformation

转换算子(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
1、map算子
对RDD集合中的每一个元素,都作用一次该func匿名函数,之后返回值为生成元素构成的一个新的RDD。

	 //map映射val rdd = sc.parallelize(1 to 7)//将每一个元素扩大10倍val res = rdd.map(_*10)//打印输出println(res.collect().toBuffer)

2.flatMap算子
集合中的每一个元素,都要作用func匿名函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。是一个one-to-many的操作

	//flatMap=map+flattenval list = List("jia jing kan kan kan","gao di di di di","zhan yuan qi qi")//将集合转换为RDDval rdd = sc.parallelize(list)//按照指定分隔符进行切分val res = rdd.flatMap(_.split(" "))//将结果输出res.foreach(print)//zhanyuanqiqijiajingkankankangaodidididi

3.mapPartitions算子
mapPartitions(p: Iterator[A] => Iterator[B])一次性处理一个partition分区中的数据。执行性能要高于map,但是其一次性将一个分区的数据加载到执行内存空间,如果该分区数据集比较大,存在OOM的风险。

	//mapPartitions:一次操作一个分区的数据val rdd = sc.parallelize(Array(1,2,3,4,5),3)//一次操作一个分区的数据val res = rdd.mapPartitions(x=>Iterator(x.mkString("-")))res.foreach(println)/*12-34-5*/

4、mapPartitionsWithIndex算子
mapPartitionsWithIndex((index, p: Iterator[A] => Iterator[B])),该操作比mapPartitions多了一个index,代表就是后面p所对应的分区编号。

	//mapPartitionsWithIndex:查看每个分区当中都保存了哪些元素val rdd = sc.parallelize(1 to 16,2)//查看每个分区当中都保存了哪些元素val res = rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(",")))res.foreach(println)/*1:9-10-11-12-13-14-15-160:1-2-3-4-5-6-7-8*/

5、sample算子
sample(withReplacement, fraction, seed):随机抽样算子,去代替全量研究会出现类似数据倾斜(dataSkew)等问题,无法进行全量研究,只能用样本去评估整体。
withReplacement:Boolean :有放回的抽样和无放回的抽样
fraction:Double:样本空间占整体数据量的比例,大小在[0, 1],比如0.2, 0.65
seed:Long:是一个随机数的种子,有默认值,通常不需要传参

	//sample:随机抽样算子,样品的预期大小个数不确定val rdd = sc.parallelize(1 to 10)//随机抽取样本占总体的0.5,有放回,会有重复val res = rdd.sample(true,0.5)println(res.collect().toBuffer)//ArrayBuffer(3, 8, 10, 10)//无放回,不会有重复val res1 = rdd.sample(false,0.8)println(res1.collect().toBuffer)//ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9)//takeSample精确抽样,参数2为样本大小,确定抽几个val rdd = sc.parallelize(1 to 10)val res = rdd.takeSample(false,7)println(res.toBuffer)//ArrayBuffer(2, 3, 9, 8, 10, 6, 4)

6、union算子
rdd1.union(rdd2)
相当于sql中的union all,进行两个rdd数据间的联合,需要说明一点是,rdd1如果有N个分区,rdd2有M个分区,那么union之后的分区个数就为N+M。

 	//union :整合两个RDD当中的元素,并且整合分区数val rdd1= sc.parallelize(1 to 5,3)val rdd2= sc.parallelize(3 to 7,2)rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)/*0:12:4,51:2,3*/rdd2.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)/*0:3,41:5,6,7*/val res = rdd1.union(rdd2)//查看有多少元素res.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)/*0:11:2,32:4,54:5,6,73:3,4*/println(res.collect().toBuffer)//ArrayBuffer(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)//查看分区数println(res.getNumPartitions)//5

7、join算子

	//join:相同的key进行输出,不同的key不进行输出val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))val rdd2 = sc.parallelize(Array((1,4),(2,5),(7,6)))//调用算子val res : RDD[(Int, (String, Int))]= rdd1.join(rdd2)println(res.collect().toBuffer) //ArrayBuffer((1,(a,4)), (2,(b,5)))//rightOuterJoinval res1 : RDD[(Int,(Option[String],Int))]= rdd1.rightOuterJoin(rdd2)println(res1.collect().toBuffer)//ArrayBuffer((1,(Some(a),4)), (2,(Some(b),5)), (7,(None,6)))//leftOuterJoinval res2:RDD[(Int,(String,Option[Int]))] = rdd1.leftOuterJoin(rdd2)println(res2.collect().toBuffer)//ArrayBuffer((1,(a,Some(4))), (2,(b,Some(5))), (3,(c,None)))

8、coalesce算子
coalesce(numPartition, shuffle=false): 分区合并的意思
numPartition:分区后的分区个数
shuffle:此次重分区是否开启shuffle,决定当前的操作是宽(true)依赖还是窄(false)依赖

	//coalesce:分区合并val rdd :RDD[Int]= sc.parallelize(1 to 16,4)println(rdd.getNumPartitions)//4//查看每个分区当中都保存了哪些元素rdd.mapPartitionsWithIndex((index,item)=>Iterator{index+":"+item.mkString(",")}).foreach(println)/*3:13,14,15,160:1,2,3,41:5,6,7,82:9,10,11,12*///缩减分区数,默认直接分区合并不会进行shuffle洗牌,也就是说默认只能缩减分区数不能增加val res:RDD[Int] = rdd.coalesce(3)//查看分区数println(res.getNumPartitions)//3//查看分区中都保存了那些元素res.mapPartitionsWithIndex((index,item)=>{Iterator(index+":"+item.mkString("-"))}).foreach(println)/*0:1-2-3-41:5-6-7-82:9-10-11-12-13-14-15-16*///如果想要增加分区数,将shuffle改为trueval res:RDD[Int] = rdd.coalesce(5,true)//查看分区数println(res.getNumPartitions)//5//查看分区中都保存了那些元素res.mapPartitionsWithIndex((index,item)=>{Iterator(index+":"+item.mkString("-"))}).foreach(println)/*0:10-132:2-6-12-153:3-7-161:1-5-11-144:4-8-9*/

9、repartition算子
repartition底层调用了coalesce(numPartitions, shuffle = true),shuffle过程默认为ture

	val rdd :RDD[Int]= sc.parallelize(1 to 16,4)println(rdd.getNumPartitions)//查看每个分区当中都保存了哪些元素rdd.mapPartitionsWithIndex((index,item)=>{Iterator(index+":"+item.mkString("-"))}).foreach(println)/*0:1-2-3-43:13-14-15-162:9-10-11-121:5-6-7-8*///调用算子val res = rdd.repartition(2)println(res.getNumPartitions)//查看res.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString("-"))).foreach(println)/*分区不是两两合并,而是重新洗牌分为两个分区1:2-4-6-8-10-12-14-160:1-3-5-7-9-11-13-15*/

10、sortBy算子
sortBy(func,[ascending], [numTasks])
ascending:true为升序,false为降序
numTasks:分区数

	//sortByval rdd:RDD[(String,Int)] = sc.parallelize(List(("a",1),("b",8),("c",6)),3)println(rdd.getNumPartitions)//3//按照第二个字段进行排序val res = rdd.sortBy(_._2,false,2)println(res.collect().toBuffer)//ArrayBuffer((b,8), (c,6), (a,1))println(res.getNumPartitions)//2

11、sortByKey([ascending], [numTasks])算子
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    val rdd:RDD[(String,Int)] = sc.parallelize(List(("a",1),("b",8),("c",6)),4)val res = rdd.sortByKey(true,3)println(res.collect().toBuffer)//ArrayBuffer((a,1), (b,8), (c,6))println(res.getNumPartitions)//3

12、groupBy和 groupByKey
groupByKey相比较reduceByKey而言,没有本地预聚合操作,显然其效率并没有reduceByKey效率高,在使用的时候如果可以,尽量使用reduceByKey等去代替groupByKey。

case class Student(id:Int,name:String,province:String)val stuRDD = sc.parallelize(List(Student(1, "张三", "安徽"),Student(2, "李梦", "山东"),Student(3, "王五", "甘肃"),Student(4, "周七", "甘肃"),Student(5, "Lucy", "黑吉辽"),Student(10086, "魏八", "黑吉辽")))//按照省份进行排序//groupBy就是对不是kv键值对的数据进行分组val res = stuRDD.groupBy(stu=>stu.province)println(res.collect().toBuffer)//ArrayBuffer((安徽,CompactBuffer(Student(1,张三,安徽))), (黑吉辽,CompactBuffer(Student(5,Lucy,黑吉辽), Student(10086,魏八,黑吉辽))), (甘肃,CompactBuffer(Student(3,王五,甘肃), Student(4,周七,甘肃))), (山东,CompactBuffer(Student(2,李梦,山东))))//groupByKey针对的是kv键值对的数据,numPartition指的是分组之后的分区个数val stures=stuRDD.map(stu=>(stu.province,stu))//调用算子val result = stures.groupByKey()println(result.collect().toBuffer)//ArrayBuffer((安徽,CompactBuffer(Student(1,张三,安徽))), (黑吉辽,CompactBuffer(Student(5,Lucy,黑吉辽), Student(10086,魏八,黑吉辽))), (甘肃,CompactBuffer(Student(3,王五,甘肃), Student(4,周七,甘肃))), (山东,CompactBuffer(Student(2,李梦,山东))))

13、reduceByKey算子

	//reduceByKey,会进行预聚合,效率比groupbykey高,聚合的是key对应的value值case class Student(id: Int, name:String, province: String)val stuRDD = sc.parallelize(List(Student(1, "张三", "安徽"),Student(2, "李梦", "山东"),Student(3, "王五", "甘肃"),Student(4, "周七", "甘肃"),Student(5, "Lucy", "黑吉辽"),Student(10086, "魏八", "黑吉辽")))//按照相同的省份进行聚合val res = stuRDD.map(stu=>(stu.province,1))val count = res.reduceByKey(_+_)println(count.collect().toBuffer)//ArrayBuffer((安徽,1), (黑吉辽,2), (甘肃,2), (山东,1))

14、foldByKey算子

	//foldByKey与reduceByKey的区别就是多了一个初始值case class Student(id: Int, name:String, province: String)val stuRDD = sc.parallelize(List(Student(1, "张三", "安徽"),Student(3, "王五", "甘肃"),Student(5, "Lucy", "黑吉辽"),Student(2, "李梦", "山东"),Student(4, "周七", "甘肃"),Student(10086, "魏八", "黑吉辽")), 2)//查看每个分区当中都保存了哪些元素stuRDD.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)/*1:Student(2,李梦,山东),Student(4,周七,甘肃),Student(10086,魏八,黑吉辽)0:Student(1,张三,安徽),Student(3,王五,甘肃),Student(5,Lucy,黑吉辽)*///调用算子进行聚合val res = stuRDD.map(stu=>(stu.province,1))//初始化的值针对的是每个分区当中,相同key下只有一个初始值val sount = res.foldByKey(1)(_+_)println(sount.collect().toBuffer)//ArrayBuffer((安徽,2), (甘肃,4), (山东,2), (黑吉辽,4))

15、combineByKey算子

 	//combineByKey,reduceByKey和groupByKey底层都是通过combineByKeyWithClassTag来实现的val array = sc.parallelize(Array("hello you","hello me","hello you","hello you","hello me","hello you"), 5)//按照分隔符进行切分val word = array.flatMap(line=>line.split(" "))//每个单词记为一次val word1 = word.map((_,1))//调用算子//第一个参数是初始化,第二个参数是小聚合,分区之内聚合,第三个参数是大聚合,分区之间聚合val res = word1.combineByKey(createCombiner,mergeValue,mergeCombiner)println(res.collect().toBuffer)//ArrayBuffer((me,2), (hello,6), (you,4))//例子val rdd: RDD[Int] = sc.parallelize(List(1,1,1,2,2,2,2,2,3,3,3,3,3,4,4,4,4,4,4,4),2)//将数据转为key,value形式val rdd1: RDD[(Int, Int)] = rdd.map((_,1))//查看每个分区当中都保存了哪些元素rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)//调用算子//初始化:针对每个分区当中,相同key下第一条元素进行初始化val result = rdd1.combineByKey(-_,(a:Int,b:Int)=>a+b,(a:Int,b:Int)=>a+b)result.foreach(println)def createCombiner(num:Int)={num}def mergeValue(sum:Int,num:Int)={sum+num}def mergeCombiner(sum:Int,num:Int)={sum+num}

16、aggregateByKey算子
combineByKey和aggregateByKey的区别就相当于reduceByKey和foldByKey。

val array = sc.parallelize(Array("hello you","hello me","hello you","hello you","hello me","hello you"), 2)//切分并将每个单词记为1次val wordAndOne: RDD[(String, Int)] = array.flatMap(_.split(" ")).map((_,1))//查看每个分区当中都保存了哪些元素wordAndOne.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)/*1:(hello,1),(you,1),(hello,1),(me,1),(hello,1),(you,1)0:(hello,1),(you,1),(hello,1),(me,1),(hello,1),(you,1)*///调用算子进行聚合//第一个参数是分区之内进行聚合,也就是小聚合//第二个参数是分区之间进行聚合,也就是大聚合//初始化的值针对的是每个分区当中,相同key下只有一个初始值val res = wordAndOne.aggregateByKey(1)(_+_,_+_)println(res.collect().toBuffer)//ArrayBuffer((hello,8), (me,4), (you,6))

2、action

操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
1、foreach算子
foreach主要功能,就是用来遍历RDD中的每一条纪录,其实现就是将map或者flatMap中的返回值变为Unit即可,即foreach(A => Unit)
2、count算子
统计该rdd中元素的个数
3、collect算子
该算子的含义就是将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。
4、take&first算子
返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是TopN;而first是take(n)中比较特殊的一个take(1)。
5、takeOrdered(n)
返回前几个的排序
6、reduce算子
reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。
7、countByKey算子
统计key出现的次数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/1493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新牛市新方向:探索加密货币生态的未来

序章:牛市来袭,新的探索 新的牛市来临,带来了加密货币世界的一次次惊喜。比特币、以太坊、Solana等生态系统在这场盛宴中展现出各自的独特魅力,带来了一场场引人入胜的探索之旅。让我们跟随着这些生态系统的脚步,一起…

基础算法前缀和与差分

前言 本次博客会介绍一维和二维的前缀和,以及一维二维差分的基本使用,尽量画图,多使用配合文字 使大家理解,希望有所帮助吧 一维前缀和 问题描述 这里有一个长度为n的数组,我们要算出【2,5】区间的元素和 暴力思…

Mogdb 5.0新特性:SQL PATCH绑定执行计划

前言 熟悉Oracle的dba都知道,生产系统出现性能问题时,往往是SQL走错了执行计划,紧急情况下,无法及时修改应用代码,dba可以采用多种方式针对于某类SQL进行执行计划绑定,比如SQL Profile、SPM、SQL Plan Base…

Linux——网络管理nmcli

nmcli 不能独立使用,需要对应的服务启动 1. NetworkManager.service 2. 网络配置和服务不相关 3. 通过 nmcl i 建立网络配置和网卡之前的映射关系 网卡 简称:nmcli d DEVICE :物理设备 TYPE: 物理设备类型 ethernet 以太网…

C++设计模式:适配器模式(十四)

1、定义与动机 定义:将一个类的接口转换成客户希望的另外一个接口。Adapter模式使得原本由于接口不兼容而不能一起工作的哪些类可以一起工作。 动机: 在软件系统中,由于应用环境的变化,常常需要将“一些现存的对象”放在新的环境…

强固型工业电脑在码头智能闸口、OCR(箱号识别)、集装箱卡车车载电脑行业应用

集装箱卡车车载电脑应用 背景介绍 针对码头集装箱卡车的调度运用, 结合码头TOS系统设计出了各种平台的车载电脑(VT系列)和车载LED显示屏(VLD系列),同时提供各种安装支架,把车载电脑固定到狭小的驾驶室中;同时提供了各种天线选择(…

【JVM常见问题总结】

文章目录 jvm介绍jvm内存模型jvm内存分配参数jvm堆中存储对象:对象在堆中创建分配内存过程 jvm 堆垃圾收集器垃圾回收算法标记阶段引用计数算法可达性分析算法 清除阶段标记清除算法复制算法标记压缩算法 实际jvm参数实战jvm调优jvm常用命令常用工具 jvm介绍 Java虚…

高速公路交通运输大数据平台解决方案

前言 交通运输行业面临着多重挑战。其管控困难,涉及广泛地理范围,导致监控成本高且难以及时响应;同时,行业内数据量大,地理信息数据繁多,缺乏高效的可视化工具来揭示数据规律并优化业务;货运和…

数据结构——第7章 查找

1 线性表的查找 数据元素和顺序表的定义 typedef struct{KeyType key;InfoType otherinfo; }ElemType; typedef struct{ElemType *R;int length; }SSTable; 1.1 顺序查找 int Search_Seq(SSTable ST,KeyType key){ST.R[0].keykey;for(int iST.length;ST.R[i].key!key;i--);…

回溯算法-组合问题

回溯算法-组合问题 77. 组合 问题描述 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1: 输入:n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4], ]示例 2&a…

05集合-CollectionListSet

Collection体系的特点、使用场景总结 如果希望元素可以重复,又有索引,索引查询要快? 用ArrayList集合, 基于数组的。(用的最多) 如果希望元素可以重复,又有索引,增删首尾操作快? 用LinkedList集合, 基于链表的。 如果希望增…

基于SpringBoot + Vue实现的奖学金管理系统设计与实现+毕业论文+答辩PPT

介绍 角色:管理员、学院负责人、学校负责人、学生 管理员:管理员登录进入高校奖助学金系统的实现可以查看系统首页、个人中心、学生管理、学院负责人管理、学校负责人管理、奖学金类型管理、奖学金申请管理、申请提交管理、系统管理等信息 学院负责人:学院负责人登录系统后&am…

python3--lxml pytoml.core.TomlError expected_equals报错解决

文章目录 一、问题二. 解决方法:三. 参考:四. 总结 一、问题 在ubuntu的armbian上的python3中安装lxml时报错了 安装命令是 pip3 install lxml报错简略信息如下图 File "/usr/share/python-wheels/pytoml-0.1.2-py2.py3-none-any.whl/pytoml/par…

nlp(6)--构建找规律模型任务

前言 仅记录学习过程,有问题欢迎讨论 包含了两个例子 第一个为5分类任务 第二个为2分类任务 Demo1比Demo2难一点,放上边方便以后看。 练习顺序为 Demo2—>Demo1 代码 DEMO1: """ 自定义一个模型 解决 5分类问题 问题如下&#xf…

SQL概述

1. SQL的分类 SQL语言在功能上主要分为如下3大类: DDL(Data Definition Languages、数据定义语言),这些语句定义了不同的数据库、表、视图、索引等数据库对象,还可以用来创建、删除、修改数据库和数据表的结构。主要…

2.1K Star微软开源的高质量 iot库

功能描述 该项目是一个开源的 .NET Core 实现,旨在帮助开发者构建适用于物联网(IoT)设备和场景的应用程序。它提供了与传感器、显示器和输入设备等相互作用所需的 GPIO 引脚、串口等硬件的接口。该仓库包含 System.Device.Gpio 库以及针对各种板卡(如 Ra…

redis底层数据结构之ziplist

目录 一、概述二、ziplist结构三、Entry结构四、为什么ZipList特别省内存五、ziplist的缺点 上一篇 redis底层数据结构之SDS 下一篇 明天更新 一、概述 一种连续内存空间存储的顺序数据结构,每个元素可以是字符串或整数。优点:节省内存空间。适用于存储小规模的列表…

STM32 | USART实战案例

STM32 | 通用同步/异步串行接收/发送器USART带蓝牙(第六天)随着扩展的内容越来越多,很多小伙伴已经忘记了之前的学习内容,然后后面这些都很难理解。STM32合集已在专栏创建,方面大家学习。1、通过电脑串口助手发送数据,控制开发板LED灯 从题目中可以挖掘出,本次使用led、延…

【Linux文件系统开发】认知篇

【Linux文件系统开发】认知篇 文章目录 【Linux文件系统开发】认知篇一、文件系统的概念二、文件系统的种类(文件管理系统的方法)三、分区四、文件系统目录结构五、虚拟文件系统(Virtual File System)1.概念2.原因3.作用4.总结 一…

[ LeetCode ] 题刷刷(Python)-第35题:搜索插入位置

题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 nums 为 无重复元素 的 升序 排列数组 请必须使用时间复杂度为 O(log n) 的算法。 示例 示例 1: 输入: …