RDD的操作算子

创建RDD

// 方式1: 读取外部数据集.
sc.textFile("本地文件或者HDFS文件所在的路径地址")
// textFile: 读取本地文件或者HDFS文件  // 方式2: 在程序中对一个集合进行并行化处理.
// 写法1:
sc.parallelize(Array(1,2,3,4,5,6,7,8),num) 
// 写法2:makeRDD底层调用的就是parallelize
sc.makeRDD(Array(1,2,3,4,5,6,7,8),num)    
/*
parallelize: 集合并行化
num代表分区的数量,可有可无,默认是根据节点的cores个数决定.
*/// 查看RDD的分区数量.
// 方法1:
rdd.getNumPartitions
// 方法2:
rdd.partitions.length    

集合的操作应用

集合包括list、array、map、collect、iterator

sortBy-排序

// sortBy  传入一个值,返回一个值.
// collect 返回一个 Array 数组.
val res5 = sc.parallelize(List(1,2,3,10,9,4,5,6,7,8))
res5.map(_*2).sortBy(x=>x).collect          // 默认正序
res5.map(_*2).sortBy(x=>x,true).collect     // 正序-true
res5.map(_*2).sortBy(x=>x,false).collect    // 倒序-false

filter-过滤

// 传入一个 boolean 表达式,返回一个集合
val rdd2 = sc.parallelize(Array(10,1,11,101,2,3,4,5,6,7,8),1)
rdd2.filter(_ > 4).sortBy(x=>x.toString,true).collect
// String类型是按照字典码进行排序

flatMap-压扁

// flatMap 相当于把最外层的List或Array去除后拿到每个值.
val rdd3=sc.parallelize(List(List("a b c","a b b"),List("e f g","a d e"),List("a c h","j t rr")))
rdd3.flatMap(_.flatMap(_.split(" "))).collect    // _.split(" ") 以'空格'进行分割

union-并集

// 注意类型要一致,即使有元素重复也不会过滤掉.
// a.union(b) = a union b
val rdd4 = sc.parallelize(List(1,2,3))
val rdd5 = sc.parallelize(List(4,5,6))
rdd4.union(rdd5).collect

intersection-交集

// 返回两个集合中都有的数据
val rdd10=sc.parallelize(List(4,5,6,1))
val rdd6=sc.parallelize(List(4,5,6,1,1,14,5))
rdd10.intersection(rdd6).collect

join-连接

// 连接后,如果key相等,就返回相同的key,value值组成元组的形式.
// a *Join b =a.*Join(b)
val rdd7=sc.parallelize(List(("hello",1),("hello",2)))
val rdd8=sc.parallelize(List(("hello",1),("hello",2),("hello",1)))
rdd7.join(rdd8).collect             // 内连接      返回两边都有的key值.
rdd7.leftOuterJoin(rdd8).collect    // 左外连接    返回左边都有的key值,如果右边也有该key值,value用Some表示,如果右边没有该key值,value用None表示.
rdd7.rightOuterJoin(rdd8).collect   // 右外连接    类似 左外连接.
/*
此处也爆露出一个sql问题:join的时候,如果后表出现两个相同的数据,前标join的结果会出现两次,此时如果进行count前表会出现数据多的情况.
是否可以通过distinct进行去重呢?
*/

groupByKey-以key分组

val rdd9 = rdd7 union rdd8
rdd9.groupByKey.collect
rdd9.groupByKey.map(x =>(x._1,x._2.sum)).collect   // 普通写法
rdd9.reduceByKey(_+_).collect                      // 简便写法

reduceByKey-以key分组value相加

// 利用wordcount比较groupByKey--reduceByKey的区别
sc.textFile("hdfs://input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect   
sc.textFile("hdfs://input").flatMap(_.split(" ")).map((_,1)).groupByKey.map(kv=>(kv._1,kv._2.sum)).sortBy(_._2,false).collect  
/*
reduceByKey() 只能对value值进行操作
groupByKey()  可以对key和value进行操作
*/ 

cogroup-组合

// 对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合
val rdd7=sc.parallelize(List(("hello",1),("hello",2)))
val rdd8=sc.parallelize(List(("hello",1),("hello",2),("hello",1)))
rdd7.cogroup(rdd8).collect
rdd7.cogroup(rdd8).map(p=>(p._1,p._2._1.sum + p._2._2.sum)).collect   // 此行中的'+'就是起到算数运算符的作用

cartesian-笛卡尔积

// 相当于双层循环
val rdd7=sc.parallelize(List(("hello",1),("hello",2)))
val rdd8=sc.parallelize(List(("hello",1),("hello",2),("hello",1)))
rdd7.cartesian(rdd8).collect

mapPartitions

// 针对每个分区进行操作,每个分区内是迭代器集合
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.mapPartitions((it:Iterator[Int]) => {it.toList.map(x => x * 10).iterator})

mapPartitionsWithIndex

// 函数,传入参数 index,iter返回一堆数据.
val func = (index: Int, iter: Iterator[Int]) => {iter.map(x => "[partID:" + index + ", val: " + x + "]")
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
// 传入一堆数据,返回一堆数据.
rdd1.mapPartitionsWithIndex(func).collect

aggregate

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
// 看每个分区里面都包含那些数字元素    
rdd1.mapPartitionsWithIndex(func1).collect
// math.max(a,b)   从a,b两个数字中找出最大值
rdd1.aggregate(0)(math.max(_, _), _ + _)   // 4+9 =13
rdd1.aggregate(5)(math.max(_, _), _ + _)  // 5+(5+9)=19/*解析:
* 首先,aggregate函数需要给定一个初始值,传入两个匿名函数作为参数.
* 其次,如果是并行运行的话,两个函数都需要进行运算,第一个函数是每个线程的单独运算(初始值参与运算),第二个函数是每个线程的运算结果再进行运算(初始值依旧参与运算).
* 最后,得出结果.
* 而其中的(_,_)和(_+_)就是两个匿名函数,相当于(a:Int,b:Int)=>a+b.
*/

aggregateByKey

// 以key值分组value进行计算
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {iter.map(x => "[partID:" + index + ", val: " + x + "]")
} 
pairRDD.mapPartitionsWithIndex(func2).collect
// key只是起到分组的作用,并不参与运算.
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
// 求所有动物之和
pairRDD.aggregateByKey(0)((_+ _), _ + _).collect

checkpoint

/*
checkpoint  执行在action算子之后,checkpoint会忘记血缘关系,并且把数据放到hdfs上. 
cache       不会忘记血缘关系,把数据放到内存.
*/
// 指定快照在hdfs上存放的位置(没有check会自动创建)
sc.setCheckpointDir("hdfs://check")   
val rdd = sc.textFile("hdfs://input").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
// 给rdd执行快照命令
rdd.checkpoint
// 看rdd是否存放快照成功
rdd.isCheckpointed
// 发现快照存放失败?为什么呢?因为checkpoint是一个transformation操作,只有rdd遇到action操作的时候才会存放快照成功.
// count是一个action操作
rdd.count
// 发现存放快照成功
rdd.isCheckpointed
// 获取存放的快照文件
rdd.getCheckpointFile

coalesce、repartition

/**
coalesce:    联合,合并
repartition: 重分区从源码中可以看出repartition方法其实就是调用了coalesce方法,shuffle参数值为true的情况(默认shuffle参数值是fasle).即repartition(num)= coalesce(num,true)
现在假设RDD有X个分区,需要重新划分成Y个分区.
1.如果x<y(少变多),说明x个分区里有数据分布不均匀的情况,利用HashPartitioner把x个分区重新划分成了y个分区;
此时,需要把shuffle设置成true才行,因为如果设置成false,不会进行shuffle操作,此时父RDD和子RDD之间是窄依赖,这时并不会增加RDD的分区.
2.如果x>y(多变少,合并),需要先把x分区中的某些个分区合并成一个新的分区,然后最终合并成y个分区,此时,需要把coalesce方法的shuffle设置成false. 
*/// 由少变多,需要shuffle.
// 由多变少,不需要shuffle.
val rdd1 = sc.parallelize(1 to 10, 10)  // 1到10,分10个区
val rdd2 = rdd1.coalesce(2, false)      // false:不进行shuffle  默认false; 多分区转少分区用false 窄依赖.
rdd2.partitions.length                  // 查看分区数
val rdd3 = rdd2.repartition(5)          // 将分区由2个重新分为5个
rdd3.getNumPartitions                   // 查看分区数
/*
repartition 既可以将分区数变多,又可以将分区数变少.
*/  

collectAsMap

// 本身collect是返回Array数组
val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap
// 只能用在(k,v)这种结构的
// 而如果是下面这种情况呢?
val rdd = sc.parallelize(List(("a", 1), ("a", 2)))
rdd.collectAsMap  // Map(a -> 2) ,其中一个value会覆盖另一个value

combineByKey

// 以key分组,value合并.
val rdd1 = sc.textFile("hdfs://input").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
// x => x+10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n
// 上面是3个函数:
// 第一个函数 x=>x 相当于初始值,传入的是每个分区第一个传入的参数,根据需求变换逻辑运算,而且初始值只参与它后面的函数的一次运算,因为有2个分区,每个分区+10.
// 第二个函数 (a: Int, b: Int) => a + b 是计算同一分区相同key的value值之和.
// 第三个函数 (m: Int, n: Int) => m + n 是计算不同分区相同key的value总和,汇总.val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)// 注意:此处是rdd5.zip(rdd4)   (1,"dog")....也就是key是1,value是"dog"     分区后 1 -> (dog,cat,turkey)     2 -> (gnu,salmon,rabbit,wolf,bear,bee)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
rdd7.collect
// 上面代码,以 1 -> (dog,cat,turkey) 为例进行分析:
/* List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n
* 首先,dog先传进去第一个函数,变成List("dog")集合,也即是初始值.
* 第二步,生成的List("dog")集合作为参数传入第二个函数中,第二个函数的第二个参数是String类型,可以传进去"cat",key为2的gnu变成了List("gnu"),至此,第一个分区完成. (1,List(dog,cat)),(2,List(gnu))
* 第二个分区和第三个分区步骤同第一分区.
* 3个分区都完成以后,第三个函数开始汇总了,相同key的List集合会合并,最中结果就是如图所示了.
* 当然,由于是多线程运行,结果可以是(dog,cat,turkey)或者(turkey,dog,cat).顺序杂乱.
*/

countByKey

// 统计key出现的次数.
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1.countByKey                 // 统计key出现的次数.
rdd1.countByValue               // 统计value出现的次数,将("a",1)整体当作value.
rdd1.reduceByKey(_+_).collect   // 可以求得每个单词后的value值相加之和 Array((a,1), (b,4), (c,3)).  
// 在rdd中[k,v]这样的会被分清key和value,而[(k,v)]则会把(k,v)当作一个整体来看待.

filterByRange

// 根据特定的key值范围进行过滤.
val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
// 是根据key值进行过滤的,只适用于(k,v)结构的,[b,d]:从b到d的key
val rdd2 = rdd1.filterByRange("b","d")  
rdd2.collect

flatMapValues

// 将values单独拿出操作,key跟着处理后的value值继续配对.
val a = sc.parallelize(List(("a","1 2"),("b","3 4")))
a.flatMapValues(_.split(" ")).collect

foldByKey

// 和foldLeft的用法类似,需要有初始值,传入的参数类型可以不同,需要并行运行.
val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey("")(_+_)
rdd3.collect

foreachPartition

// 遍历每个分区,对每个分区里的元素进行操作.
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
rdd1.foreachPartition(x => println(x.reduce(_ + _)))
// 6 15 24  

keyBy

// 以keyBy后面括号内的作为key,既然有key了,肯定也有value啊,value就是没有操作之前的传入值.
val rdd1 = sc.parallelize(List("dog","salmon","salmon","rat","elephant"),3)
val rdd2 = rdd1.keyBy(_.length)
// 也可以写成    rdd1.map((_.length,_))    
rdd2.collect

keys、values

// 取rdd的所有key值和value值
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/16493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QML基本语法介绍

为什么使用QML 开发者效率 将前后端分离,QML和JavaScript语言主要用于前度UI的方法,后端有C++来完成绘制。将JavaScript和C++分开能够快速迭代开发; 跨平台移植性 基于Qt平台的统一抽象概念,现在可以更加容易和快速和将Qt移植到更多的平台上。 开发的开放 Qt是由Qt-Proje…

40岁的戴尔在AI时代翻红了

戴尔公司股价创历史新高&#xff0c;市值达1138亿美元&#xff0c;涨幅110%。戴尔向AI押注多年&#xff0c;收购企业转型&#xff0c;成为数据基础设施厂商。AI服务器销售增长&#xff0c;分析师看好戴尔未来发展。 5月24日美股收盘&#xff0c;很多人可能不太关注的戴尔公司股…

Matlab进阶绘图第57期—带填充纹理的横向柱状图

带填充纹理的横向柱状图是通过在原始横向柱状图的基础上添加不同的纹理得到的&#xff0c;可以很好地解决由于颜色区分不足而导致的对象识别困难问题。 由于Matlab中未提供纹理填充选项&#xff0c;因此需要大家自行设法解决。 本文使用Kesh Ikuma制作的hatchfill2工具&#…

别人不愿意教,那我来教你Simulink建模(二)【语法知识】【原创分享】

文章目录 前言节点和状态的区别?local 和非 local 的区别?事件的作用?Bus 总线?Memory 模块?caller用法?自己瞎练习的(我也不知道为啥会多出来.h文件)自己瞎练习的(这个没有多出来.h文件)autosar实例学习前言 继续更新去年的博文系列,请君切记,师父领进门修行在个…

echarts- 热力图, k线图,雷达图

热力图 热力图可以看成是一种矩形的散点图。 热力图的矩形受itemStyle的影响。 通常配合visualmap组件来根据值的大小做颜色的变化。 热力图主要通过颜色去表现数值的大小&#xff0c;必须要配合 visualMap 组件使用。 visualMap:视觉映射组件 let options {tooltip: {},xAx…

取代pip,Python依赖管理的终极武器:Poetry

大家好&#xff0c;使用python过程中&#xff0c;包管理是一个永恒的话题。从早期的setuptools到后来的pip&#xff0c;再到现在的Poetry&#xff0c;开发者们一直在寻找更高效、更稳定、更可依赖的包管理方案。今天&#xff0c;我们就来聊聊这个现代Python项目的管理神器——P…

【全开源】CMS内容管理系统源码(ThinkPHP+FastAdmin)

基于ThinkPHPFastAdmin的CMS内容管理系统&#xff0c;自定义内容模型、自定义单页、自定义表单、专题、统计报表、会员发布等 提供全部前后台无加密源代码和数据库私有化部署&#xff0c;UniAPP版本提供全部无加密UniAPP源码。 ​构建高效内容管理的基石 一、引言&#xff1a…

深入分析 Android Activity (六)

文章目录 深入分析 Android Activity (六)1. Activity 的权限管理1.1 在 Manifest 文件中声明权限1.2 运行时请求权限1.3 处理权限请求结果1.4 处理权限的最佳实践 2. Activity 的数据传递2.1 使用 Intent 传递数据2.2 使用 Bundle 传递复杂数据 3. Activity 的动画和过渡效果3…

Python 机器学习 基础 之 数据表示与特征工程 【分箱、离散化、线性模型与树 / 交互特征与多项式特征】的简单说明

Python 机器学习 基础 之 数据表示与特征工程 【分箱、离散化、线性模型与树 / 交互特征与多项式特征】的简单说明 目录 Python 机器学习 基础 之 数据表示与特征工程 【分箱、离散化、线性模型与树 / 交互特征与多项式特征】的简单说明 一、简单介绍 二、分箱、离散化、线性…

使用 Ollama框架 下载和使用 Llama3 AI大模型的完整指南

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;AI大模型部署与应用专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年5月24日20点59分 &#x1f004;️文章质量&#xff1a;96分 目录 &#x1f4a5;Ollama介绍 主要特点 主要优点 应…

*Python中的None与null:深入解析与实用指南

Python中的None与null:深入解析与实用指南 在编程的世界里,None和null是两个经常出现的概念,它们代表了“无”或“空”的状态。然而,在Python和许多其他编程语言中,这两个术语有着不同的含义和用法。本文将深入解析Python中的None,并对比其与其他编程语言中的null,提供…

如何将Spring Security添加到一个新的Spring Boot应用程序

如何将Spring Security添加到一个新的Spring Boot应用程序 这篇文章是关于如何将Spring Security添加到一个新的Spring Boot应用程序的详细步骤及其效果的介绍。 起点 首先&#xff0c;我们有一个简单的Spring Boot应用程序&#xff0c;它仅依赖于Web组件&#xff0c;并包含…

【张量乘法】pytorch中的mul、dot、mm、matmul

张量的乘法是pytorch等神经网络开发框架中最常见、最基本的操作之一。 1&#xff0c;torch.mul 对应位置的元素相乘。mul即表示张量中对应位置元素的相乘&#xff0c;也是最容易理解的乘法。 import torch a torch.tensor([[1, 2], [3, 4]]) b torch.tensor([[5, 6], [7, …

如何在Spring中配置声明式事务?

在Spring中配置声明式事务&#xff0c;主要有两种方式&#xff1a;通过XML配置文件和使用注解。声明式事务让你能够将事务管理代码从业务逻辑代码中分离出来&#xff0c;通过声明的方式来管理事务&#xff0c;使得代码更加简洁&#xff0c;易于维护。下面我将分别展示这两种方式…

【应用层】域名系统DNS

目录 1、互联网的域名结构 2、域名服务器 域名系统 DNS (Domain Name System) 是互联网使用的命名系统&#xff0c;用来把便于人们使用的机器名字转换为 IP 地址&#xff0c;域名系统其实就是名字系统。 互联网的域名系统 DNS 被设计成为一个联机分布式数据库系统&#xff0c…

Dockerfile构建Vue开发环境

# 指定基础镜像 FROM ubuntu:20.04# apt-get更换国内源解决 RUN sed -i s/archive.ubuntu.com//mirrors.aliyun.com/g /etc/apt/sources.list RUN sed -i s/security.ubuntu.com//mirrors.aliyun.com/g /etc/apt/sources.list ## 添加新的APT源 # RUN echo "deb http://se…

Facebook的心灵之镜:探寻数字社交的灵魂深处

在当今数字化时代&#xff0c;社交媒体已经成为了人们生活的一部分&#xff0c;而Facebook作为其中的佼佼者&#xff0c;更是承载了数以亿计的用户情感和交流。然而&#xff0c;Facebook不仅仅是一个简单的社交平台&#xff0c;它更像是一面心灵之镜&#xff0c;反映着数字社交…

2024年京东618购物节,“雷蛇 猎魂光蛛V2 模拟光轴“机械键盘的购买体验: 京东售后很优秀, 雷蛇Razer品控让人担忧

[简介] 常用网名: 猪头三 出生日期: 1981.XX.XX QQ联系: 643439947 个人网站: 80x86汇编小站 https://www.x86asm.org 编程生涯: 2001年~至今[共22年] 职业生涯: 20年 开发语言: C/C、80x86ASM、PHP、Perl、Objective-C、Object Pascal、C#、Python 开发工具: Visual Studio、D…

充电宝哪个牌子好用?充电宝品牌怎么选?充电宝最好的牌子排名

现在市面上的充电宝品牌琳琅满目&#xff0c;但并不是所有的充电宝都安全可靠。据央视的一个报道&#xff0c;市面上有35%充电宝质量是不过关的!充电宝买不对就非常容易出现爆炸的一个情况&#xff0c;所以大家对选充电宝不仅能保障设备的安全。那么&#xff0c;充电宝哪个牌子…

IP地址在广告行业中的重要地位

新时代&#xff0c;广告已经成为了企业推广产品的必要手段&#xff0c;而企业想要广告效果好&#xff0c;就要做到精准投放营销广告&#xff0c;将“花钱”的广告精准送到产品的受众用户面前&#xff0c;让收益大于花销&#xff0c;而归根究底就是广告转化率与回报率是否达到预…