Spark 键值对RDD操作

https://www.cnblogs.com/yongjian/p/6425772.html

概述

键值对RDD是Spark操作中最常用的RDD,它是很多程序的构成要素,因为他们提供了并行操作各个键或跨界点重新进行数据分组的操作接口。

 

 

创建

Spark中有许多中创建键值对RDD的方式,其中包括

  • 文件读取时直接返回键值对RDD
  • 通过List创建键值对RDD

在Scala中,可通过Map函数生成二元组

1
2
3
4
5
6
7
8
9
10
val listRDD = sc.parallelize(List(1,2,3,4,5))
val result = listRDD.map(x => (x,1))
result.foreach(println)
//结果
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)

 

 

键值对RDD的转化操作

 

基本RDD转化操作在此同样适用。但因为键值对RDD中包含的是一个个二元组,所以需要传递的函数会由原来的操作单个元素改为操作二元组。

下表总结了针对单个键值对RDD的转化操作,以 { (1,2) , (3,4) , (3,6) }  为例,f表示传入的函数

函数名目的示例结果
reduceByKey(f)合并具有相同key的值rdd.reduceByKey( ( x,y) => x+y ){ (1,2) , (3,10) }
groupByKey()对具有相同key的值分组rdd.groupByKey(){ (1,2) , (3, [4,6] ) }
mapValues(f)对键值对中的每个值(value)应用一个函数,但不改变键(key)rdd.mapValues(x => x+1){ (1,3) , (3,5) , (3,7) }
combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回类型合并具有相同键的值下面有详细讲解-
flatMapValues(f)对键值对RDD中每个值应用返回一个迭代器的函数,然后对每个元素生成一个对应的键值对。常用语符号化rdd.flatMapValues(x => ( x to 5 ))

{ (1, 2) ,  (1, 3) ,   (1, 4) , (1, 5) ,  (3, 4) , (3, 5) }

keys()获取所有keyrdd.keys(){1,3,3}
values()获取所有valuerdd.values(){2,4,6}
sortByKey()根据key排序rdd.sortByKey(){ (1,2) , (3,4) , (3,6) }

 

 

下表总结了针对两个键值对RDD的转化操作,以rdd1 = { (1,2) , (3,4) , (3,6) }  rdd2 = { (3,9) } 为例,

函数名目的示例结果
subtractByKey删掉rdd1中与rdd2的key相同的元素rdd1.subtractByKey(rdd2){ (1,2) }
join内连接rdd1.join(rdd2)

{(3, (4, 9)), (3, (6, 9))}

leftOuterJoin左外链接rdd1.leftOuterJoin (rdd2)

{(3,( Some( 4), 9)), (3,( Some( 6), 9))}

rightOuterJoin右外链接rdd1.rightOuterJoin(rdd2)

{(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))}

cogroup将两个RDD钟相同key的数据分组到一起rdd1.cogroup(rdd2){(1,([ 2],[])), (3, ([4, 6],[ 9]))}

 

 

combineByKey

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine)

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)

combineByKey( createCombiner, mergeValue, mergeCombiners)

 

函数功能:

聚合各分区的元素,而每个元素都是二元组。功能与基础RDD函数aggregate()差不多,可让用户返回与输入数据类型不同的返回值。

combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以,理解此函数对Spark如何操作RDD会有很大帮助。

 

参数解析:

createCombiner:分区内 创建组合函数

mergeValue:分区内 合并值函数

mergeCombiners:多分区 合并组合器函数

partitioner:自定义分区数,默认为HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

 

工作流程:

  1. combineByKey会遍历分区中的所有元素,因此每个元素的key要么没遇到过,要么和之前某个元素的key相同。
  2. 如果这是一个新的元素,函数会调用createCombiner创建那个key对应的累加器初始值
  3. 如果这是一个在处理当前分区之前已经遇到的key,会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并

 

代码例子:

//统计男女个数

1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf ().setMaster ("local").setAppName ("app_1")
   val sc = new SparkContext (conf)
   val people = List(("男", "李四"), ("男", "张三"), ("女", "韩梅梅"), ("女", "李思思"), ("男", "马云"))
   val rdd = sc.parallelize(people,2)
   val result = rdd.combineByKey(
     (x: String) => (List(x), 1),  //createCombiner
     (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //mergeValue
     (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners
   result.foreach(println)

结果

(男, ( List( 张三,  李四,  马云),3 ) )
(女, ( List( 李思思,  韩梅梅),2 ) )

 

流程分解:

Spark算子-combineByKey

 

解析:两个分区,分区一按顺序V1、V2、V3遍历

  • V1,发现第一个key=男时,调用createCombiner,即
    (x: String) => (List(x), 1)
  • V2,第二次碰到key=男的元素,调用mergeValue,即
    (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1)
  • V3,发现第一个key=女,继续调用createCombiner,即
    (x: String) => (List(x), 1)
  • … …
  • 待各V1、V2分区都计算完后,数据进行混洗,调用mergeCombiners,即
    (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))

 


 

add by jan 2017-02-27 18:34:39

以下例子都基于此RDD

1
2
3
4
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。

比如,reduceByKey((a,b) => a+b),有四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),对具有相同key的键值对进行合并后的结果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对("spark",1)、("spark",2),a就是1,b就是2。

1
2
3
4
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)

  

groupByKey()

roupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5))。

1
2
3
4
5
6
7
scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34
//从上面执行结果信息中可以看出,分组后,value被保存到Iterable[Int]中
scala> pairRDD.groupByKey().foreach(println)
(Spark,CompactBuffer(11))
(Hive,CompactBuffer(1))
(Hadoop,CompactBuffer(1))

  

keys

keys只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{"spark","spark","hadoop","hadoop"}。

1
2
3
4
5
6
7
scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark

  

values

 values只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{1,2,3,5}。

1
2
3
4
5
6
7
8
scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34
  
scala> pairRDD.values.foreach(println)
1
1
1
1

  

sortByKey()

 sortByKey()的功能是返回一个根据键排序的RDD。

1
2
3
4
5
6
7
scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)

  

mapValues(func)

我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。 

1
2
3
4
5
6
7
scala> pairRDD.mapValues(x => x+1)
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34
scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)

  

join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。对于这个实例,我们下面在spark-shell中运行一下:

1
2
3
4
5
6
7
8
9
10
11
12
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:27
  
scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[25] at parallelize at <console>:27
  
scala> pairRDD1.join(pairRDD2)
res9: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[28] at join at <console>:32
  
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/390456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python高级网络编程系列之基础篇

一、Socket简介 1、不同电脑上的进程如何通信&#xff1f; 进程间通信的首要问题是如何找到目标进程&#xff0c;也就是操作系统是如何唯一标识一个进程的&#xff01; 在一台电脑上是只通过进程号PID&#xff0c;但在网络中是行不通的&#xff0c;因为每台电脑的IP可能都是不一…

剑指 Offer 67. 把字符串转换成整数

写一个函数 StrToInt&#xff0c;实现把字符串转换成整数这个功能。不能使用 atoi 或者其他类似的库函数。 首先&#xff0c;该函数会根据需要丢弃无用的开头空格字符&#xff0c;直到寻找到第一个非空格的字符为止。 当我们寻找到的第一个非空字符为正或者负号时&#xff0c…

4.RabbitMQ Linux安装

这里使用的Linux是CentOS6.2 将/etc/yum.repo.d/目录下的所有repo文件删除 先下载epel源 # wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo 修改epel-erlang.repo文件&#xff0c;如下图 添加CentOS 的下载源…

二、数据库设计与操作

一、 数据库设计仿QQ数据库一共包括5张数据表&#xff0c;每张数据表结构如下&#xff1a;1、 tb_User&#xff08;用户信息表&#xff09;这张表主要用来存储用户的好友关系与信息字段名数据类型是否Null值默认值绑定描述IDint否用户账号PwdVarchar(50)否用户密码Frie…

剑指 Offer 36. 二叉搜索树与双向链表

输入一棵二叉搜索树&#xff0c;将该二叉搜索树转换成一个排序的循环双向链表。要求不能创建任何新的节点&#xff0c;只能调整树中节点指针的指向。 为了让您更好地理解问题&#xff0c;以下面的二叉搜索树为例&#xff1a; 我们希望将这个二叉搜索树转化为双向循环链表。链表…

1736. 替换隐藏数字得到的最晚时间

给你一个字符串 time &#xff0c;格式为 hh:mm&#xff08;小时&#xff1a;分钟&#xff09;&#xff0c;其中某几位数字被隐藏&#xff08;用 ? 表示&#xff09;。 有效的时间为 00:00 到 23:59 之间的所有时间&#xff0c;包括 00:00 和 23:59 。 替换 time 中隐藏的数…

十 web爬虫讲解2—Scrapy框架爬虫—Scrapy安装—Scrapy指令

Scrapy框架安装 1、首先&#xff0c;终端执行命令升级pip: python -m pip install --upgrade pip2、安装&#xff0c;wheel(建议网络安装) pip install wheel3、安装&#xff0c;lxml(建议下载安装)4、安装&#xff0c;Twisted(建议下载安装)5、安装&#xff0c;Scrapy(建议网络…

阿里与珠海横琴新区达成战略合作,阿里云助力打造横琴智能岛

5月17日&#xff0c;阿里巴巴集团、蚂蚁金服集团与珠海横琴新区管理委员会签署战略合作协议&#xff0c;三方将围绕云计算、政务民生服务、城市治理、电子商务等领域展开深入合作&#xff0c;推动横琴产业发展&#xff0c;共同建设新型智慧城市。 &#xff08;阿里巴巴集团、蚂…

chrome 开发工具_我最喜欢的Chrome开发工具提示和技巧

chrome 开发工具Chrome Developer Tools are a super powerful suite of tools for developing web applications. They can do so much, from very basic operations like traversing the DOM, to checking out network requests or even profiling your applications perform…

三十四 Python分布式爬虫打造搜索引擎Scrapy精讲—scrapy信号详解

信号一般使用信号分发器dispatcher.connect()&#xff0c;来设置信号&#xff0c;和信号触发函数&#xff0c;当捕获到信号时执行一个函数 dispatcher.connect()信号分发器&#xff0c;第一个参数信号触发函数&#xff0c;第二个参数是触发信号&#xff0c; 以下是各种信号 sig…

敏捷管理之绩效考核方案

前段时间&#xff0c;公司签了年终奖确认。觉得公司发放年终奖完全是凭主观发放&#xff0c;没有事实依据&#xff0c;由此产生了对如何发放年终奖的一些想法。 奖金发放作为激励员工最直接的手段&#xff0c;往往也是让管理人员最难抉择的&#xff0c;而且很多公司&#xff0c…

记一个蒟蒻的绝望

感觉现在…… 怎么讲&#xff0c;心挺冷的。 今天一月五号了。距离省选&#xff0c;时间好短啊。 我还有那么多东西不懂。甚至听都没听说过。 等到真正去省选的时候&#xff0c;我可能跟现在一样&#xff0c;什么都不会。 我的名字能不能被看到都不知道。哈&#xff0c;还进队呢…

671. 二叉树中第二小的节点

给定一个非空特殊的二叉树&#xff0c;每个节点都是正数&#xff0c;并且每个节点的子节点数量只能为 2 或 0。如果一个节点有两个子节点的话&#xff0c;那么该节点的值等于两个子节点中较小的一个。 更正式地说&#xff0c;root.val min(root.left.val, root.right.val) 总…

Android基础夯实--你了解Handler有多少?

概述 对于刚入门的同学来说&#xff0c;往往都会对Handler比较迷茫&#xff0c;到底Handler是个什么样的东西。当然&#xff0c;可能对于一些有工作经验的工程师来说&#xff0c;他们也不一定能很准确地描述&#xff0c;我们来看下API的介绍。 Handler是用来结合线程的消息队列…

sketch-a-net_Adobe XD,Sketch,Figma,InVision-如何在2020年选择最佳设计软件

sketch-a-netComparing Adobe XD vs Sketch vs Figma vs InVision studio is a very common topic among designers who are looking for the best design software. 在寻求最佳设计软件的设计师中&#xff0c;比较Adobe XD&#xff0c;Sketch&#xff0c;Figma和InVision Stud…

Nancy简单实战之NancyMusicStore(四):实现购物车

原文:Nancy简单实战之NancyMusicStore(四)&#xff1a;实现购物车前言 上一篇&#xff0c;我们完成了商品的详情和商品的管理&#xff0c;这一篇我们来完成最后的一个购物车功能。 购物车&#xff0c;不外乎这几个功能&#xff1a;添加商品到购物车&#xff0c;删除购物车中的商…

《七步掌握业务分析》读书笔记六

分析技术和呈现格式 词汇表 强有力沟通的一个重要内容是一致地使用术语和惯用语。每次谈话都涉及对术语的共同理解。 工作流图&#xff08;也称为流程图、UNL活动图和过程图&#xff09; 工作流程把一个或多个业务过程的细节可视化地呈现出来&#xff0c;以澄清理解或提出过程改…

个人作业——软件工程实践总结作业

一、请回望暑假时的第一次作业&#xff0c;你对于软件工程课程的想象 1&#xff09;对比开篇博客你对课程目标和期待&#xff0c;“希望通过实践锻炼&#xff0c;增强计算机专业的能力和就业竞争力”&#xff0c;对比目前的所学所练所得&#xff0c;在哪些方面达到了你的期待和…

(转)在阿里,我们如何管理代码分支?

阿里妹导读&#xff1a;代码分支模式的选择并没有绝对的正确和错误之分&#xff0c;关键是与项目的规模和发布节奏相匹配。阿里协同研发平台在经过众多实践历练后&#xff0c;总结出了一套独创的分支管理方法&#xff1a;AoneFlow&#xff0c;通过兼备灵活高效与简单实用的流程…

WIN10系统 截图或者某些程序时屏幕会自动放大怎么办

右击这个应用程序&#xff0c;兼容性&#xff0c;以兼容模式运行&#xff0c;同时勾选高DPI设置时禁止显示缩放即可