SparkRDD常用算子实践(附运行效果图)

  • 目录
    • 1、简单算子说明
    • 2、复杂算子说明

目录

SparkRDD算子分为两类:Transformation与Action.
Transformation:即延迟加载数据,Transformation会记录元数据信息,当计算任务触发Action时,才会真正开始计算。
Action:即立即加载数据,开始计算。
创建RDD的方式有两种:
1、通过sc.textFile(“/root/words.txt”)从文件系统中创建 RDD。
2、#通过并行化scala集合创建RDD:val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

1、简单算子说明

这里先说下简单的Transformation算子

//通过并行化scala集合创建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

//查看该rdd的分区数量
rdd1.partitions.length

//map方法同scala中的一样,将List中的每个数据拿出来做函数运算。
//sortBy:将数据进行排序
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)

//filter:将List中的每个数据进行函数造作,挑选出大于10的值。
val rdd3 = rdd2.filter(_>10)

//collect:将最终结果显示出来
//flatMap:对数据先进行map操作,再进行flat(碾压)操作。
rdd4.flatMap(_.split(’ ‘)).collect
运行效果图
这里写图片描述


val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+”“,true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
这里写图片描述


//intersection求交集
val rdd9 = rdd6.intersection(rdd7)
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))
val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7)))
这里写图片描述


//join
val rdd3 = rdd1.join(rdd2)
这里写图片描述
val rdd3 = rdd1.leftOuterJoin(rdd2)
这里写图片描述
val rdd3 = rdd1.rightOuterJoin(rdd2)


//union:求并集,注意类型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect
这里写图片描述


//groupByKey
val rdd3 = rdd1 union rdd2
rdd3.groupByKey
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
这里写图片描述


//cogroup
val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
这里写图片描述


//cartesian笛卡尔积
val rdd1 = sc.parallelize(List(“tom”, “jerry”))
val rdd2 = sc.parallelize(List(“tom”, “kitty”, “shuke”))
val rdd3 = rdd1.cartesian(rdd2)
这里写图片描述


接下来说下简单的Action算子
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

#collect
rdd1.collect

#reduce
val rdd2 = rdd1.reduce(+)

#count
rdd1.count

#top
rdd1.top(2)

#take
rdd1.take(2)

#first(similer to take(1))
rdd1.first

#takeOrdered
rdd1.takeOrdered(3)
这里写图片描述

2、复杂算子说明

mapPartitionsWithIndex : 把每个partition中的分区号和对应的值拿出来, 看源码
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect
这里写图片描述


aggregate

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func1).collect
###是action操作, 第一个参数是初始值, 二:是2个函数(第一个函数:先对个个分区进行合并, 第二个函数:对个个分区合并后的结果再进行合并)
###0 + (0+1+2+3+4 + 0+5+6+7+8+9)

rdd1.aggregate(0)(_+_, _+_)

这里写图片描述


rdd1.aggregate(0)(math.max(, ), _ + _)
###0分别与0和1分区的List元素对比得到每个分区中的最大值,在这里分别是3和7,然后将0+3+7=10
这里写图片描述


###5和1比, 得5再和234比得5 –> 5和6789比,得9 –> 5 + (5+9)
rdd1.aggregate(5)(math.max(, ), _ + _)


val rdd3 = sc.parallelize(List(“12”,”23”,”345”,”4567”),2)
rdd3.aggregate(“”)((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
######### “”.length分别与两个分区元素的length进行比较得到0分区为字符串”2”,1分区为字符串”4”,然而结果返回不分先后,所以结果是24或42
这里写图片描述


val rdd4 = sc.parallelize(List(“12”,”23”,”345”,”“),2)
rdd4.aggregate(“”)((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
######## “”.length的为0,与“12”比较后得到字符串“0”,然后字符串“0”再与“23”比较得到最小值为1.
这里写图片描述


aggregateByKey

val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
这里写图片描述


pairRDD.aggregateByKey(0)(math.max(, ), _ + _).collect
########## 先对0号分区中的各个数据进行操作(拿初始值和各个数据进行比较)得到(cat,5)(mouse,4).然后再对1号分区中的数据进行操作得到(cat,12)(dog,12)(mouse,2)。然后再对两个分区的数据进行相加得到最终结果
这里写图片描述


coalesce
#coalesce(2, false)代表将数据重新分成2个区,不进行shuffle(将数据重新进行随机分配,数据通过网络可分配在不同的机器上)
val rdd1 = sc.parallelize(1 to 10, 10)
val rdd2 = rdd1.coalesce(2, false)
rdd2.partitions.length
这里写图片描述


repartition
repartition效果等同于coalesce(x, true)


collectAsMap : Map(b -> 2, a -> 1)
val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))
rdd.collectAsMap
这里写图片描述


combineByKey : 和reduceByKey是相同的效果
###第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
###每个分区中每个key中value中的第一个值, (hello,1)(hello,1)(good,1)–>(hello(1,1),good(1))–>x就相当于hello的第一个1, good中的1

val rdd1 = sc.textFile(“hdfs://master:9000/wordcount/input/”).flatMap(.split(” “)).map((, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd1.collect
这里写图片描述


###当input下有3个文件时(有3个block块分三个区, 不是有3个文件就有3个block, ), 每个会多加3个10
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
这里写图片描述


val rdd4 = sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
这里写图片描述


//第一个参数List(_)代表的是将第一个元素转换为一个List,第 二个参数x: List[String], y: String) => x :+ y,代表将元素y加入到这个list中。第三个参数:(m: List[String], n: List[String]) => m ++ n),代表将两个分区的各个list合并成新的List。
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
这里写图片描述


countByKey

val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1)))
rdd1.countByKey
rdd1.countByValue

这里写图片描述


filterByRange

val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1)))
val rdd2 = rdd1.filterByRange(“b”, “d”)
rdd2.collect
这里写图片描述


flatMapValues : Array((a,1), (a,2), (b,3), (b,4))
val rdd3 = sc.parallelize(List((“a”, “1 2”), (“b”, “3 4”)))
val rdd4 = rdd3.flatMapValues(_.split(” “))
rdd4.collect
这里写图片描述


foldByKey

val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey(“”)(+)
这里写图片描述


keyBy : 以传入的参数做key
val rdd1 = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect
这里写图片描述


keys values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect

这里写图片描述


以下是一些方法的英文解释
#

map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.

filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.

flatMap(func)(内部执行顺序是从右往左,先执行Map再执行Flat)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.

sample(withReplacement, fraction, seed)
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.

reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command, [envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

(K,(Iterable,Iterable))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/456731.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kali-linux使用Nessus

Nessus号称是世界上最流行的漏洞扫描程序,全世界有超过75000个组织在使用它。该工具提供完整的电脑漏洞扫描服务,并随时更新其漏洞数据库。Nessus不同于传统的漏洞扫描软件,Nessus可同时在本机或远端上遥控,进行系统的漏洞分析扫描…

HDFS读写数据的原理

目录1 概述2 HDFS写数据流程3 HDFS读数据流程 目录 最近由于要准备面试,就把之前学过的东西好好整理下,权当是复习。 下面说下HDFS读写数据的原理。 1 概述 HDFS集群分为两大角色:NameNode、DataNode NameNode负责管理整个文件系统的元数…

理解列存储索引

版权声明:原创作品,谢绝转载!否则将追究法律责任。 优点和使用场景 SQL Server 内存中列存储索引通过使用基于列的数据存储和基于列的查询处理来存储和管理数据。 列存储索引适合于主要执行大容量加载和只读查询的数据仓库工作负荷…

大数据开发初学者学习路线

目录前言导读:第一章:初识Hadoop第二章:更高效的WordCount第三章:把别处的数据搞到Hadoop上第四章:把Hadoop上的数据搞到别处去第五章:快一点吧,我的SQL第六章:一夫多妻制第七章&…

安卓屏幕适配问题

屏幕适配是根据屏幕密度,dpi为单位的,而不是分辨率。 手机会根据不同手机的密度,自己去不同资源目录下去找对应的资源 比如:   每个图片目录下的图片资源都是一样的,只是大小不一样   比如drawable-sw800dp-mdpi目录&#xff…

MapReduce原理全剖析

MapReduce剖析图 如上图所示是MR的运行详细过程 首先mapTask读文件是通过InputFormat(内部是调RecordReader()–>read())来一次读一行,返回K,V值。(默认是TextInputFormat,还可以输入其他的类型如:音视频&…

利用selenium webdriver点击alert提示框

在进行元素定位时常常遇到这样的alert框: 那么该如何定位并点击确定或取消按钮呢?stackoverflow上找到了这个问题的答案。 OK, Show you the code: 1 driver.findElement(By.id("updateButton")).click(); 2 //pop up w…

Django的核心思想ORM

元类实现ORM 1. ORM是什么 ORM 是 python编程语言后端web框架 Django的核心思想,“Object Relational Mapping”,即对象-关系映射,简称ORM。 一个句话理解就是:创建一个实例对象,用创建它的类名当做数据表名&#x…

Secondary Namenode的Check point机制以及Namenode、Datanode工作机制说明

目录前言:1、NameNode的工作机制2、DataNode的工作机制3、Secondary Namenode的Check point机制 目录 前言: 在说明checkpoint机制之前,先要了解下namenode、datanode的一些功能和职责。 1、NameNode的工作机制 问题场景: 1…

抓包软件:Charles

修正:手机不必一定连接电脑分享的热点,只需要手机和电脑在同一个局域网下就可以了,手机代理IP设置为电脑的IP。 之前写过一篇通过Wireshark进行抓包,分析网络连接的文章《通过WireShark抓取iOS联网数据实例分析》:htt…

Hive的相关介绍

目录前言:1、Hive简介2、Hive架构3、Hive与Hadoop的关系4、Hive与传统数据库对比5、Hive的数据存储总结: 目录 前言: 为什么使用Hive 直接使用hadoop所面临的问题 人员学习成本太高 项目周期要求太短 MapReduce实现复杂查询逻辑开发难…

Java第五次作业--面向对象高级特性(抽象类和接口)

一、学习要点 认真看书并查阅相关资料,掌握以下内容: 掌握抽象类的设计掌握接口的设计理解简单工厂设计模式理解抽象类和接口的区别掌握包装类的应用掌握对象的比较方法和比较器的使用学习使用日期操作类学习匿名内部类的使用二、作业要求 发布一篇随笔&…

gulp教程之gulp-minify-css【gulp-clean-css】

原文:http://www.ydcss.com/archives/41 简介: 使用gulp-minify-css压缩css文件,减小文件大小,并给引用url添加版本号避免缓存。重要:gulp-minify-css已经被废弃,请使用gulp-clean-css,用法一致…

win7 IE11卸载后无法上网

今天某同事需要访问一个银行网站,必须使用IE8,我在win7中降级IE11,直接卸载了IE11和其语言包,发现IE8再也打不开网页了,每次打开都提示保存html网页。测试Google Chrome上网完全没有问题。IE8的internet选项等任何工具菜单点击均无反应&#…

关于django的模板

模板 问题 如何向请求者返回一个漂亮的页面呢? 肯定需要用到html、css,如果想要更炫的效果还要加入js,问题来了,这么一堆字段串全都写到视图中,作为HttpResponse()的参数吗?这样定义就太麻烦了吧&#x…

Hbase简介及常用命令相关知识总结

文章目录目录前言:1.Hbase简介1.1、什么是Hbase1.2、与传统数据库的对比1.3、Hbase集群中的角色2、Hbase数据模型3、Hbase命令总结:目录 前言: 对于Hbase来说,由于其是基于列的数据库,所以比传统的数据库快许多&…

Storm入门简介

目录前言:1、Storm简介2、Storm与Hadoop的区别3、Storm核心组件4、Storm编程模型5、流式计算一般架构图(重要)总结: 目录 前言: 在介绍Storm之前,先介绍下离线计算。 离线计算:批量获取数据…

前端模板预编译技术

什么是前端模板预编译 前端模板预编译通过预编译技术让前端模板突破浏览器限制,实现后端模板一样的同步“文件”加载能力。它采用目录来组织维护前端模板,从而让前端模板实现工程化管理,最终保证前端模板在复杂单页 web 应用下的可维护性。同…

node08-express

目录:node01-创建服务器 node02-util node03-events node04-buffer node05-fs node06-path node07-http node08-express node09-cookie express模块: 1 /*2 * express是一个应用框架3 * 1、路由4 * 2、中间件5 * 3、模板引擎6 * */7 8 var express requ…

Java基础常见笔试题总结

以下是自己总结的一些Java常见的基础知识题,答案仅供参考,如有异议请指出。一直保持更新状态。 1.什么是Java虚拟机?为什么Java被称作是“平台无关的编程语言”? Java虚拟机是一个可以执行Java字节码的虚拟机进程。Java源文件被编…