SparkRDD常用算子实践(附运行效果图)

  • 目录
    • 1、简单算子说明
    • 2、复杂算子说明

目录

SparkRDD算子分为两类:Transformation与Action.
Transformation:即延迟加载数据,Transformation会记录元数据信息,当计算任务触发Action时,才会真正开始计算。
Action:即立即加载数据,开始计算。
创建RDD的方式有两种:
1、通过sc.textFile(“/root/words.txt”)从文件系统中创建 RDD。
2、#通过并行化scala集合创建RDD:val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

1、简单算子说明

这里先说下简单的Transformation算子

//通过并行化scala集合创建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

//查看该rdd的分区数量
rdd1.partitions.length

//map方法同scala中的一样,将List中的每个数据拿出来做函数运算。
//sortBy:将数据进行排序
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)

//filter:将List中的每个数据进行函数造作,挑选出大于10的值。
val rdd3 = rdd2.filter(_>10)

//collect:将最终结果显示出来
//flatMap:对数据先进行map操作,再进行flat(碾压)操作。
rdd4.flatMap(_.split(’ ‘)).collect
运行效果图
这里写图片描述


val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+”“,true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
这里写图片描述


//intersection求交集
val rdd9 = rdd6.intersection(rdd7)
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))
val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7)))
这里写图片描述


//join
val rdd3 = rdd1.join(rdd2)
这里写图片描述
val rdd3 = rdd1.leftOuterJoin(rdd2)
这里写图片描述
val rdd3 = rdd1.rightOuterJoin(rdd2)


//union:求并集,注意类型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect
这里写图片描述


//groupByKey
val rdd3 = rdd1 union rdd2
rdd3.groupByKey
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
这里写图片描述


//cogroup
val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
这里写图片描述


//cartesian笛卡尔积
val rdd1 = sc.parallelize(List(“tom”, “jerry”))
val rdd2 = sc.parallelize(List(“tom”, “kitty”, “shuke”))
val rdd3 = rdd1.cartesian(rdd2)
这里写图片描述


接下来说下简单的Action算子
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

#collect
rdd1.collect

#reduce
val rdd2 = rdd1.reduce(+)

#count
rdd1.count

#top
rdd1.top(2)

#take
rdd1.take(2)

#first(similer to take(1))
rdd1.first

#takeOrdered
rdd1.takeOrdered(3)
这里写图片描述

2、复杂算子说明

mapPartitionsWithIndex : 把每个partition中的分区号和对应的值拿出来, 看源码
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect
这里写图片描述


aggregate

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func1).collect
###是action操作, 第一个参数是初始值, 二:是2个函数(第一个函数:先对个个分区进行合并, 第二个函数:对个个分区合并后的结果再进行合并)
###0 + (0+1+2+3+4 + 0+5+6+7+8+9)

rdd1.aggregate(0)(_+_, _+_)

这里写图片描述


rdd1.aggregate(0)(math.max(, ), _ + _)
###0分别与0和1分区的List元素对比得到每个分区中的最大值,在这里分别是3和7,然后将0+3+7=10
这里写图片描述


###5和1比, 得5再和234比得5 –> 5和6789比,得9 –> 5 + (5+9)
rdd1.aggregate(5)(math.max(, ), _ + _)


val rdd3 = sc.parallelize(List(“12”,”23”,”345”,”4567”),2)
rdd3.aggregate(“”)((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
######### “”.length分别与两个分区元素的length进行比较得到0分区为字符串”2”,1分区为字符串”4”,然而结果返回不分先后,所以结果是24或42
这里写图片描述


val rdd4 = sc.parallelize(List(“12”,”23”,”345”,”“),2)
rdd4.aggregate(“”)((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
######## “”.length的为0,与“12”比较后得到字符串“0”,然后字符串“0”再与“23”比较得到最小值为1.
这里写图片描述


aggregateByKey

val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
这里写图片描述


pairRDD.aggregateByKey(0)(math.max(, ), _ + _).collect
########## 先对0号分区中的各个数据进行操作(拿初始值和各个数据进行比较)得到(cat,5)(mouse,4).然后再对1号分区中的数据进行操作得到(cat,12)(dog,12)(mouse,2)。然后再对两个分区的数据进行相加得到最终结果
这里写图片描述


coalesce
#coalesce(2, false)代表将数据重新分成2个区,不进行shuffle(将数据重新进行随机分配,数据通过网络可分配在不同的机器上)
val rdd1 = sc.parallelize(1 to 10, 10)
val rdd2 = rdd1.coalesce(2, false)
rdd2.partitions.length
这里写图片描述


repartition
repartition效果等同于coalesce(x, true)


collectAsMap : Map(b -> 2, a -> 1)
val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))
rdd.collectAsMap
这里写图片描述


combineByKey : 和reduceByKey是相同的效果
###第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
###每个分区中每个key中value中的第一个值, (hello,1)(hello,1)(good,1)–>(hello(1,1),good(1))–>x就相当于hello的第一个1, good中的1

val rdd1 = sc.textFile(“hdfs://master:9000/wordcount/input/”).flatMap(.split(” “)).map((, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd1.collect
这里写图片描述


###当input下有3个文件时(有3个block块分三个区, 不是有3个文件就有3个block, ), 每个会多加3个10
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
这里写图片描述


val rdd4 = sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
这里写图片描述


//第一个参数List(_)代表的是将第一个元素转换为一个List,第 二个参数x: List[String], y: String) => x :+ y,代表将元素y加入到这个list中。第三个参数:(m: List[String], n: List[String]) => m ++ n),代表将两个分区的各个list合并成新的List。
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
这里写图片描述


countByKey

val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1)))
rdd1.countByKey
rdd1.countByValue

这里写图片描述


filterByRange

val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1)))
val rdd2 = rdd1.filterByRange(“b”, “d”)
rdd2.collect
这里写图片描述


flatMapValues : Array((a,1), (a,2), (b,3), (b,4))
val rdd3 = sc.parallelize(List((“a”, “1 2”), (“b”, “3 4”)))
val rdd4 = rdd3.flatMapValues(_.split(” “))
rdd4.collect
这里写图片描述


foldByKey

val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey(“”)(+)
这里写图片描述


keyBy : 以传入的参数做key
val rdd1 = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect
这里写图片描述


keys values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect

这里写图片描述


以下是一些方法的英文解释
#

map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.

filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.

flatMap(func)(内部执行顺序是从右往左,先执行Map再执行Flat)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.

sample(withReplacement, fraction, seed)
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.

reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command, [envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

(K,(Iterable,Iterable))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/456731.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

six库是什么

Utilities for writing code that runs on Python 2 and 3”“” 它是一个专门用来兼容 Python 2 和 Python 3 的库。它解决了诸如 urllib 的部分方法不兼容, str 和 bytes 类型不兼容等“知名”问题。

Kali-linux使用Nessus

Nessus号称是世界上最流行的漏洞扫描程序,全世界有超过75000个组织在使用它。该工具提供完整的电脑漏洞扫描服务,并随时更新其漏洞数据库。Nessus不同于传统的漏洞扫描软件,Nessus可同时在本机或远端上遥控,进行系统的漏洞分析扫描…

HDFS读写数据的原理

目录1 概述2 HDFS写数据流程3 HDFS读数据流程 目录 最近由于要准备面试,就把之前学过的东西好好整理下,权当是复习。 下面说下HDFS读写数据的原理。 1 概述 HDFS集群分为两大角色:NameNode、DataNode NameNode负责管理整个文件系统的元数…

理解列存储索引

版权声明:原创作品,谢绝转载!否则将追究法律责任。 优点和使用场景 SQL Server 内存中列存储索引通过使用基于列的数据存储和基于列的查询处理来存储和管理数据。 列存储索引适合于主要执行大容量加载和只读查询的数据仓库工作负荷…

Django项目部署到阿里云服务器上无法发送邮件STMP

部署好项目之后发送邮件无法发送,多方查阅之后,解决问题。 阿里云服务器禁用了25端口,导致无法发送邮件。 25端口申请开放的难度很大,直接放弃。 解决: 在 django项目的 settings.py文件中x修改port端口 。

美国诚实签经验——IMG全球医疗险,TODO

那么,诚实签最关键的4个要点 是什么呢? 第一,证明你有一定的经济实力。 可能需要房产、存款等证明,也需要银行信用卡或借记卡半年流水证明(让人信服的每月进帐和消费能力)。 这些是为了证明,你可…

大数据开发初学者学习路线

目录前言导读:第一章:初识Hadoop第二章:更高效的WordCount第三章:把别处的数据搞到Hadoop上第四章:把Hadoop上的数据搞到别处去第五章:快一点吧,我的SQL第六章:一夫多妻制第七章&…

Python的虚拟环境配置(pyenv+virtualenv)

一、为什么需要配置虚拟环境 Python 2和Python 3之间存在着较大的差异,并且,由于各种原因导致了Python 2和Python 3的长期共存。在实际工作过程中,我们可能会同时用到Python 2和Python 3,因此,也需要经常在Python 2和P…

安卓屏幕适配问题

屏幕适配是根据屏幕密度,dpi为单位的,而不是分辨率。 手机会根据不同手机的密度,自己去不同资源目录下去找对应的资源 比如:   每个图片目录下的图片资源都是一样的,只是大小不一样   比如drawable-sw800dp-mdpi目录&#xff…

MapReduce原理全剖析

MapReduce剖析图 如上图所示是MR的运行详细过程 首先mapTask读文件是通过InputFormat(内部是调RecordReader()–>read())来一次读一行,返回K,V值。(默认是TextInputFormat,还可以输入其他的类型如:音视频&…

利用selenium webdriver点击alert提示框

在进行元素定位时常常遇到这样的alert框: 那么该如何定位并点击确定或取消按钮呢?stackoverflow上找到了这个问题的答案。 OK, Show you the code: 1 driver.findElement(By.id("updateButton")).click(); 2 //pop up w…

Log 日志的使用与重要性

开发过程中出现bug是必不可免的,你会怎样debug?从第1行代码开始看么?还是有个文件里面记录着哪里错了更方便呢!!!log日志 Python中有个logging模块可以完成相关信息的记录,在debug时用它往往事…

webdriver 的三种等待方式

1、显式等待 一个显式等待是你定义的一段代码,用于等待某个条件发生然后再继续执行后续代码。 from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWait #…

Django的核心思想ORM

元类实现ORM 1. ORM是什么 ORM 是 python编程语言后端web框架 Django的核心思想,“Object Relational Mapping”,即对象-关系映射,简称ORM。 一个句话理解就是:创建一个实例对象,用创建它的类名当做数据表名&#x…

Secondary Namenode的Check point机制以及Namenode、Datanode工作机制说明

目录前言:1、NameNode的工作机制2、DataNode的工作机制3、Secondary Namenode的Check point机制 目录 前言: 在说明checkpoint机制之前,先要了解下namenode、datanode的一些功能和职责。 1、NameNode的工作机制 问题场景: 1…

表单验证的初步实现和省市级联

1.表单验证的初步实现 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd"><html xmlns"http://www.w3.org/1999/xhtml" lang"en"><head><meta http-equiv"Conte…

抓包软件:Charles

修正&#xff1a;手机不必一定连接电脑分享的热点&#xff0c;只需要手机和电脑在同一个局域网下就可以了&#xff0c;手机代理IP设置为电脑的IP。 之前写过一篇通过Wireshark进行抓包&#xff0c;分析网络连接的文章《通过WireShark抓取iOS联网数据实例分析》&#xff1a;htt…

Hive的相关介绍

目录前言&#xff1a;1、Hive简介2、Hive架构3、Hive与Hadoop的关系4、Hive与传统数据库对比5、Hive的数据存储总结&#xff1a; 目录 前言&#xff1a; 为什么使用Hive 直接使用hadoop所面临的问题 人员学习成本太高 项目周期要求太短 MapReduce实现复杂查询逻辑开发难…

数据结构实验之排序七:选课名单

数据结构实验之排序七&#xff1a;选课名单 Time Limit: 1000MS Memory Limit: 65536KB Submit Statistic Problem Description 随着学校规模的扩大&#xff0c;学生人数急剧增加&#xff0c;选课名单的输出也成为一个繁重的任务&#xff0c;我校目前有在校生3万多名&#xff0…

Java第五次作业--面向对象高级特性(抽象类和接口)

一、学习要点 认真看书并查阅相关资料&#xff0c;掌握以下内容&#xff1a; 掌握抽象类的设计掌握接口的设计理解简单工厂设计模式理解抽象类和接口的区别掌握包装类的应用掌握对象的比较方法和比较器的使用学习使用日期操作类学习匿名内部类的使用二、作业要求 发布一篇随笔&…