Spark 键值对RDD操作

https://www.cnblogs.com/yongjian/p/6425772.html

概述

键值对RDD是Spark操作中最常用的RDD,它是很多程序的构成要素,因为他们提供了并行操作各个键或跨界点重新进行数据分组的操作接口。

 

 

创建

Spark中有许多中创建键值对RDD的方式,其中包括

  • 文件读取时直接返回键值对RDD
  • 通过List创建键值对RDD

在Scala中,可通过Map函数生成二元组

1
2
3
4
5
6
7
8
9
10
val listRDD = sc.parallelize(List(1,2,3,4,5))
val result = listRDD.map(x => (x,1))
result.foreach(println)
//结果
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)

 

 

键值对RDD的转化操作

 

基本RDD转化操作在此同样适用。但因为键值对RDD中包含的是一个个二元组,所以需要传递的函数会由原来的操作单个元素改为操作二元组。

下表总结了针对单个键值对RDD的转化操作,以 { (1,2) , (3,4) , (3,6) }  为例,f表示传入的函数

函数名目的示例结果
reduceByKey(f)合并具有相同key的值rdd.reduceByKey( ( x,y) => x+y ){ (1,2) , (3,10) }
groupByKey()对具有相同key的值分组rdd.groupByKey(){ (1,2) , (3, [4,6] ) }
mapValues(f)对键值对中的每个值(value)应用一个函数,但不改变键(key)rdd.mapValues(x => x+1){ (1,3) , (3,5) , (3,7) }
combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回类型合并具有相同键的值下面有详细讲解-
flatMapValues(f)对键值对RDD中每个值应用返回一个迭代器的函数,然后对每个元素生成一个对应的键值对。常用语符号化rdd.flatMapValues(x => ( x to 5 ))

{ (1, 2) ,  (1, 3) ,   (1, 4) , (1, 5) ,  (3, 4) , (3, 5) }

keys()获取所有keyrdd.keys(){1,3,3}
values()获取所有valuerdd.values(){2,4,6}
sortByKey()根据key排序rdd.sortByKey(){ (1,2) , (3,4) , (3,6) }

 

 

下表总结了针对两个键值对RDD的转化操作,以rdd1 = { (1,2) , (3,4) , (3,6) }  rdd2 = { (3,9) } 为例,

函数名目的示例结果
subtractByKey删掉rdd1中与rdd2的key相同的元素rdd1.subtractByKey(rdd2){ (1,2) }
join内连接rdd1.join(rdd2)

{(3, (4, 9)), (3, (6, 9))}

leftOuterJoin左外链接rdd1.leftOuterJoin (rdd2)

{(3,( Some( 4), 9)), (3,( Some( 6), 9))}

rightOuterJoin右外链接rdd1.rightOuterJoin(rdd2)

{(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))}

cogroup将两个RDD钟相同key的数据分组到一起rdd1.cogroup(rdd2){(1,([ 2],[])), (3, ([4, 6],[ 9]))}

 

 

combineByKey

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine)

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)

combineByKey( createCombiner, mergeValue, mergeCombiners)

 

函数功能:

聚合各分区的元素,而每个元素都是二元组。功能与基础RDD函数aggregate()差不多,可让用户返回与输入数据类型不同的返回值。

combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以,理解此函数对Spark如何操作RDD会有很大帮助。

 

参数解析:

createCombiner:分区内 创建组合函数

mergeValue:分区内 合并值函数

mergeCombiners:多分区 合并组合器函数

partitioner:自定义分区数,默认为HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

 

工作流程:

  1. combineByKey会遍历分区中的所有元素,因此每个元素的key要么没遇到过,要么和之前某个元素的key相同。
  2. 如果这是一个新的元素,函数会调用createCombiner创建那个key对应的累加器初始值
  3. 如果这是一个在处理当前分区之前已经遇到的key,会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并

 

代码例子:

//统计男女个数

1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf ().setMaster ("local").setAppName ("app_1")
   val sc = new SparkContext (conf)
   val people = List(("男", "李四"), ("男", "张三"), ("女", "韩梅梅"), ("女", "李思思"), ("男", "马云"))
   val rdd = sc.parallelize(people,2)
   val result = rdd.combineByKey(
     (x: String) => (List(x), 1),  //createCombiner
     (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //mergeValue
     (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners
   result.foreach(println)

结果

(男, ( List( 张三,  李四,  马云),3 ) )
(女, ( List( 李思思,  韩梅梅),2 ) )

 

流程分解:

Spark算子-combineByKey

 

解析:两个分区,分区一按顺序V1、V2、V3遍历

  • V1,发现第一个key=男时,调用createCombiner,即
    (x: String) => (List(x), 1)
  • V2,第二次碰到key=男的元素,调用mergeValue,即
    (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1)
  • V3,发现第一个key=女,继续调用createCombiner,即
    (x: String) => (List(x), 1)
  • … …
  • 待各V1、V2分区都计算完后,数据进行混洗,调用mergeCombiners,即
    (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))

 


 

add by jan 2017-02-27 18:34:39

以下例子都基于此RDD

1
2
3
4
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。

比如,reduceByKey((a,b) => a+b),有四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),对具有相同key的键值对进行合并后的结果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对("spark",1)、("spark",2),a就是1,b就是2。

1
2
3
4
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)

  

groupByKey()

roupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5))。

1
2
3
4
5
6
7
scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34
//从上面执行结果信息中可以看出,分组后,value被保存到Iterable[Int]中
scala> pairRDD.groupByKey().foreach(println)
(Spark,CompactBuffer(11))
(Hive,CompactBuffer(1))
(Hadoop,CompactBuffer(1))

  

keys

keys只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{"spark","spark","hadoop","hadoop"}。

1
2
3
4
5
6
7
scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark

  

values

 values只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{1,2,3,5}。

1
2
3
4
5
6
7
8
scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34
  
scala> pairRDD.values.foreach(println)
1
1
1
1

  

sortByKey()

 sortByKey()的功能是返回一个根据键排序的RDD。

1
2
3
4
5
6
7
scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)

  

mapValues(func)

我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。 

1
2
3
4
5
6
7
scala> pairRDD.mapValues(x => x+1)
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34
scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)

  

join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。对于这个实例,我们下面在spark-shell中运行一下:

1
2
3
4
5
6
7
8
9
10
11
12
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:27
  
scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[25] at parallelize at <console>:27
  
scala> pairRDD1.join(pairRDD2)
res9: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[28] at join at <console>:32
  
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/390456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无服务器架构_如何开始使用无服务器架构

无服务器架构Traditionally, when you wanted to build a web app or API, you’d usually have to spend significant time and effort managing servers and ensuring your app scales up to handle large request volumes. Serverless is a cloud computing model which let…

WPF中的动画——(一)基本概念

原文:WPF中的动画——&#xff08;一&#xff09;基本概念WPF的一个特点就是支持动画&#xff0c;我们可以非常容易的实现漂亮大方的界面。首先&#xff0c;我们来复习一下动画的基本概念。计算机中的动画一般是定格动画&#xff0c;也称之为逐帧动画&#xff0c;它通过每帧不同…

cloud 异步远程调用_异步远程工作的意外好处-以及如何拥抱它们

cloud 异步远程调用In this article, Ill discuss the positive aspects of being a little out of sync with your team.在本文中&#xff0c;我将讨论与您的团队有点不同步的积极方面。 So you’ve started working from home.因此&#xff0c;您已经开始在家工作。 There …

linux 问题一 apt-get install 被 lock

问题&#xff1a; sudo apt-get install vim E: Could not get lock /var/lib/dpkg/lock - open (11: Resource temporarily unavailable)E: Unable to lock the administration directory (/var/lib/dpkg/), is another process using it? 解决&#xff1a; sudo rm /var/cac…

工信部高级软件工程师_作为新软件工程师的信

工信部高级软件工程师Dear Self, 亲爱的自我&#xff0c; You just graduated and you are ready to start your career in the IT field. I cannot spoil anything, but I assure you it will be an interesting ride. 您刚刚毕业&#xff0c;就可以开始在IT领域的职业了。 我…

Python高级网络编程系列之基础篇

一、Socket简介 1、不同电脑上的进程如何通信&#xff1f; 进程间通信的首要问题是如何找到目标进程&#xff0c;也就是操作系统是如何唯一标识一个进程的&#xff01; 在一台电脑上是只通过进程号PID&#xff0c;但在网络中是行不通的&#xff0c;因为每台电脑的IP可能都是不一…

多线程编程和单线程编程_生活与编程的平行线程

多线程编程和单线程编程I’m convinced our deepest desire is, by paying the cost of time, to be shown a glimmer of some fundamental truth about the universe. To hear it whisper its lessons and point towards its purpose.我坚信&#xff0c;我们最深切的愿望是通过…

剑指 Offer 67. 把字符串转换成整数

写一个函数 StrToInt&#xff0c;实现把字符串转换成整数这个功能。不能使用 atoi 或者其他类似的库函数。 首先&#xff0c;该函数会根据需要丢弃无用的开头空格字符&#xff0c;直到寻找到第一个非空格的字符为止。 当我们寻找到的第一个非空字符为正或者负号时&#xff0c…

搭建MSSM框架(Maven+Spring+Spring MVC+MyBatis)

https://github.com/easonjim/ssm-framework 先欠着&#xff0c;后续再进行讲解&#xff1a; 一、Spring内核集成 二、Spring MVC集成 三、MyBatis集成 四、代码生成工具集成 >如有问题&#xff0c;请联系我&#xff1a;easonjim#163.com&#xff0c;或者下方发表评论。<…

4.RabbitMQ Linux安装

这里使用的Linux是CentOS6.2 将/etc/yum.repo.d/目录下的所有repo文件删除 先下载epel源 # wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo 修改epel-erlang.repo文件&#xff0c;如下图 添加CentOS 的下载源…

freecodecamp_如何对freeCodeCamp文章提供反馈

freecodecampWe at the freeCodeCamp editorial team do our best to ensure articles are as accurate as they can be.我们的freeCodeCamp编辑团队竭尽所能&#xff0c;以确保文章尽可能准确。 Still, we occasionally miss factual inaccuracies, non-functioning code exa…

如何对接oracle 建立pdb

Oracle数据库的结构是一个数据库实例下有许多用户&#xff0c;每一个用户有自己的表空间&#xff0c;即每一个用户相当于MySQL中的一个数据库。不久前下了oracle 12c的数据库&#xff0c;安装之后建user时才知道oracle12c 有一个很大的变动就是引入了pdb可插入数据库&#xff0…

二、数据库设计与操作

一、 数据库设计仿QQ数据库一共包括5张数据表&#xff0c;每张数据表结构如下&#xff1a;1、 tb_User&#xff08;用户信息表&#xff09;这张表主要用来存储用户的好友关系与信息字段名数据类型是否Null值默认值绑定描述IDint否用户账号PwdVarchar(50)否用户密码Frie…

hdu 过山车_从机械工程师到软件开发人员–我的编码过山车

hdu 过山车There arent many people out there who grew up dreaming of writing code. I definitely didnt. I wanted to design cars. But somehow I ended up building software.很少有人梦见编写代码。 我绝对没有。 我想设计汽车。 但是我最终以某种方式开发了软件。 I u…

mysql 两列互换

mysql 如果想互换两列的值&#xff0c;直接写 update 表 set col1col2&#xff0c;col2col1 这样的后果就是两列都是 col2 的值 注意这和sql server 是不同的&#xff0c; 如果想实现上述功能&#xff0c;添加一个自增列作为标识&#xff08;必须的&#xff09;&#xff0c; u…

剑指 Offer 36. 二叉搜索树与双向链表

输入一棵二叉搜索树&#xff0c;将该二叉搜索树转换成一个排序的循环双向链表。要求不能创建任何新的节点&#xff0c;只能调整树中节点指针的指向。 为了让您更好地理解问题&#xff0c;以下面的二叉搜索树为例&#xff1a; 我们希望将这个二叉搜索树转化为双向循环链表。链表…

游戏引擎开发和物理引擎_视频游戏开发的最佳游戏引擎

游戏引擎开发和物理引擎In this article, well look at some of the most popular game engines for video game development. Youll get a brief overview of each engine so you can choose which to use for your project.在本文中&#xff0c;我们将介绍一些用于视频游戏开…

TPS和QPS的区别和理解

TPS和QPS的区别和理解 原创 2016年04月26日 17:11:3114010QPS&#xff1a;Queries Per Second意思是“每秒查询率”&#xff0c;是一台服务器每秒能够相应的查询次数&#xff0c;是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。 TPS&#xff1a;是Transaction…

1893. 检查是否区域内所有整数都被覆盖

theme: healer-readable 给你一个二维整数数组 ranges 和两个整数 left 和 right 。每个 ranges[i] [starti, endi] 表示一个从 starti 到 endi 的 闭区间 。 如果闭区间 [left, right] 内每个整数都被 ranges 中 至少一个 区间覆盖&#xff0c;那么请你返回 true &#xff…

004-docker常用命令[二]-容器操作ps,top,attach,export

2.3、容器操作 2.3.1、docker ps docker ps : 列出容器 语法 docker ps [OPTIONS] OPTIONS说明&#xff1a; -a :显示所有的容器&#xff0c;包括未运行的。 -f :根据条件过滤显示的内容。 --format :指定返回值的模板文件。 -l :显示最近创建的容器。 -n :列出最近创建的n…