Spark算子reduceByKey深度解析

原文地址:http://blog.csdn.net/qq_23660243/article/details/51435257

--------------------------------------------

最近经常使用到reduceByKey这个算子,懵逼的时间占据多数,所以沉下心来翻墙上国外的帖子仔细过了一遍,发现一篇不错的,在此加上个人的理解整体过一遍这个算子,那么我们开始:

国外的大牛一上来给出这么一句话,个人感觉高度概括了reduceByKey的功能:

[plain] view plain copy
 print?在CODE上查看代码片派生到我的代码片
  1. Spark RDD reduceByKey function merges the values for each key   
  2. using an associative reduce function.【Spark的RDD的reduceByKey  
  3. 是使用一个相关的函数来合并每个key的value的值的一个算子(那么主  
  4. 干就是reduceByKey是个算子/函数)】。  

那么这就基本奠定了reduceByKey的作用域是key-value类型的键值对,并且是只对每个key的value进行处理,如果含有多个key的话,那么就对多个values进行处理。这里的函数是我们自己传入的,也就是说是可人为控制的【其实这是废话,人为控制不了这算子一点用没有】。那么举个例子:

  

scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),| ("a", 1), ("b", 1), ("b", 1),| ("b", 1), ("b", 1)), 3)

我们创建了一个Array的字符串,并把其存入Spark的集群上,设置了三个分区【这里我们不关注分区,只关注操作】。那么我们调用reduceByKey并且传入函数进行相应操作【本处我们对相同key的value进行相加操作,类似于统计单词出现次数】:

scala> val y = x.reduceByKey((pre, after) => (pre + after))
这里两个参数我们逻辑上让他分别代表同一个key的两个不同values,那么结果想必大家应该猜到了:

scala> y.collect
res0: Array[(String, Int)] = Array((a,3), (b,5))
嗯,到这里大家对reduceByKey有了初步的认识和体会。论坛中有一段写的也很有帮助,由于英文不好怕翻译过来误导大家,所以每次附上原话:

[plain] view plaincopy
print?在CODE上查看代码片派生到我的代码片
  1. Basically reduceByKey function works only for RDDs which contains key and value pairs kind of  
  2.  elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation   
  3. which means it is lazily evaluated.We need to pass one associative function as a parameter,   
  4. which will be applied to the source RDD and will create anew RDD as with resulting values(i.e.  
  5. key value pair). This operation is a wide operation as data shuffling may happen across the   
  6. partitions.【本质上来讲,reduceByKey函数(说算子也可以)只作用于包含key-value的RDDS上,它是  
  7. transformation类型的算子,这也就意味着它是懒加载的(就是说不调用Action的方法,是不会去计算的  
  8. ),在使用时,我们需要传递一个相关的函数作为参数,这个函数将会被应用到源RDD上并且创建一个新的  
  9. RDD作为返回结果,这个算子作为data Shuffling 在分区的时候被广泛使用】  

看到这大家对这个算子应该有了更加深入的认识,那么再附上我的Scala的一个小例

子,同样是统计字母出现次数:

[plain] view plaincopy
print?在CODE上查看代码片派生到我的代码片
  1. import org.apache.spark.{SparkContext, SparkConf}  
  2.   
  3. /**  
  4.  * mhc  
  5.  * Created by Administrator on 2016/5/17.  
  6.  */  
  7. object MyTest {  
  8.   def main(args: Array[String]) {  
  9.     val conf = new SparkConf().setAppName("MyTestApp").setMaster("local[1]")  
  10.     val sc = new SparkContext(conf)  
  11.     val x = sc.parallelize(List("a", "b", "a", "a", "b", "b", "b", "b"))  
  12.     val s = x.map((_, 1))  
  13.     val result = s.reduceByKey((pre, after) => pre + after)  
  14.     println(result.collect().toBuffer)  
  15.   
  16.   }  
  17. }  

结果是:ArrayBuffer((a,3), (b,5)),很简单对吧。论坛给出了java和python的版本的,如下:

Java:

packagecom.backtobazics.sparkexamples;importjava.util.Arrays;importorg.apache.spark.api.java.JavaPairRDD;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.Function2;importscala.Tuple2;public classReduceByKeyExample {public static void main(String[] args) throws Exception {JavaSparkContext sc = new JavaSparkContext();//Reduce Function for sum Function2<Integer, Integer, Integer> reduceSumFunc = (accum, n) -> (accum + n);// Parallelized with 2 partitions JavaRDD<String> x = sc.parallelize(Arrays.asList("a", "b", "a", "a", "b", "b", "b", "b"),3);// PairRDD parallelized with 3 partitions// mapToPair function will map JavaRDD to JavaPairRDD JavaPairRDD<String, Integer> rddX = x.mapToPair(e -> newTuple2<String,Integer>(e, 1));// New JavaPairRDD JavaPairRDD<String, Integer> rddY = rddX.reduceByKey(reduceSumFunc);//Print tuples for(Tuple2<String, Integer> element : rddY.collect()){System.out.println("("+element._1+", "+element._2+")");}}
}// Output:// (b, 5)// (a, 3) 
python:

 Bazic reduceByKey example in python# creating PairRDD x with key value pairs>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)# Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n)
>>> y.collect()
[('b', 5), ('a', 3)]# Define associative function separately >>>def sumFunc(accum, n):
...     return accum + n
...
>>> y = x.reduceByKey(sumFunc)
>>> y.collect()
[('b', 5), ('a', 3)]
感谢大家捧场,客官慢走。



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/538590.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

绕固定轴分解_3轴 / 5轴 / 3+2到底是什么......??

一、 什么是32定位加工在一个三轴铣削程序执行时&#xff0c;使用五轴机床的两个旋转轴将切削刀具固定在一个倾斜的位置&#xff0c;32加工技术的名字也由此而来&#xff0c;这也叫做定位五轴机床&#xff0c;因为第四个轴和第五个轴是用来确定在固定位置上刀具的方向&#xff…

unix环境高级编程 pdf_UNIX环境高级编程——记录锁

引言在多进程环境下&#xff0c;多个进程同时读写一个文件&#xff0c;如果不进行同步&#xff0c;就可能导致不期待的结果&#xff0c;如后一个进程覆盖了前一个进程写的内容。Unix为此提供了一种强大的解决办法&#xff1a;记录锁记录锁记录锁本质上就是对文件加读写锁&#…

LNMP源码安装脚本

LNMP安装脚本&#xff0c;脚本环境 #LNMP环境搭建centos6.8 2.6.32-696.28.1.el6.x86_64 nginx:1.12.2 mysql:5.6.36 PHP:5.5.36 #!/bin/bash#LNMP环境搭建centos6.8 2.6.32-696.28.1.el6.x86_64 nginx:1.12.2 mysql:5.6.36 PHP:5.5.36trap echo "error line: $LINE…

启动spark shell

spark集群安装教程&#xff1a;http://blog.csdn.net/zengmingen/article/details/72123717 启动spark shell. 在spark安装目录bin文件夹下 ./spark-shell --master spark://nbdo1:7077 --executor-memory 2g --total-executor-cores 2 参数说明&#xff1a; --master spark…

python发送excel文件_Python操作Excel, 开发和调用接口,发送邮件

接口开发&#xff1a; importflaskimporttoolsimportjson,redisimportrandom server flask.Flask(__name__)#新建一个服务&#xff0c;把当前这个python文件当做一个服务 ip 118.24.3.40passwordHK139bc&*r redis.Redis(hostip,passwordpassword,port6379,db10, decode_res…

第一个Spark实例:求PI值

向spark提交jar&#xff0c;需要使用 bin下的spark-submit [hadoopnbdo1 bin]$ ./spark-submit --help Usage: spark-submit [options] <app jar | python file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submi…

go conn 读取byte数组后是否要_【技术推荐】正向角度看Go逆向

Go语言具有开发效率高&#xff0c;运行速度快&#xff0c;跨平台等优点&#xff0c;因此正越来越多的被攻击者所使用&#xff0c;其生成的是可直接运行的二进制文件&#xff0c;因此对它的分析类似于普通C语言可执行文件分析&#xff0c;但是又有所不同&#xff0c;本文将会使用…

Confluence 6 选择一个外部数据库

2019独角兽企业重金招聘Python工程师标准>>> 注意&#xff1a; 选择一个合适的数据库通常需要花费很多时间。同时 Confluence 自带的 XML 数据备份和恢复功能通常也不适合合并和备份有大量数据的数据库。如果你想在系统运行后进行数据合并&#xff0c;你通常需要使用…

spark中saveAsTextFile如何最终生成一个文件

原文地址&#xff1a;http://www.cnblogs.com/029zz010buct/p/4685173.html ----------------------------------------------------------------------- 一般而言&#xff0c;saveAsTextFile会按照执行task的多少生成多少个文件&#xff0c;比如part-00000一直到part-0000n&…

python爬取内容乱码_python爬取html中文乱码

环境&#xff1a; python3.6 爬取代码&#xff1a; import requests url https://www.dygod.net/html/tv/hytv/ req requests.get(url) print(req.text) 爬取结果&#xff1a; / _-如上&#xff0c;title内容出现乱码&#xff0c;自己感觉应该是编码的问题&#xff0c;但是不…

前端每日实战:34# 视频演示如何用纯 CSS 创作在文本前后穿梭的边框

效果预览 按下右侧的“点击预览”按钮可以在当前页面预览&#xff0c;点击链接可以全屏预览。 https://codepen.io/comehope/pen/qYepNv 可交互视频教程 此视频是可以交互的&#xff0c;你可以随时暂停视频&#xff0c;编辑视频中的代码。 请用 chrome, safari, edge 打开观看。…

not support mysql_MYSQL出现quot; Client does not support authentication quot;的解决方法

MYSQL 帮助&#xff1a;A.2.3 Client does not support authentication protocolMySQL 4.1 and up uses an authentication protocol based on a password hashing algorithm that is incompatible with that used by older clients. If you upgrade the server to 4.1, attemp…

spark shell中编写WordCount程序

启动hdfs 略http://blog.csdn.net/zengmingen/article/details/53006541 启动spark 略安装&#xff1a;http://blog.csdn.net/zengmingen/article/details/72123717 spark-shell&#xff1a;http://blog.csdn.net/zengmingen/article/details/72162821准备数据 vi wordcount.t…

初级英语02

做客 1 Diana,i havent seen you for ages,how have you been? 2 would you like something to drink? 3 give my best to your parents. 4 did you hear what happened?whats the matter with him? 5 id like to applogize for leaving so early,i brought a little gift,…

mysql计算机二级选择题题库_全国计算机二级mysql数据库选择题及答案

全国计算机二级mysql数据库选择题及答案选择题是全国计算机二级mysql考试里的送分题&#xff0c;下面小编为大家带来了全国计算机二级mysql数据库选择题及答案&#xff0c;欢迎大家阅读&#xff01;全国计算机二级mysql数据库选择题及答案1) 函数 max( ) 表明这是一个什么函数?…

git add 撤销_更科学地管理你的项目,Git 简明教程(二)

修改文件内容上回说到&#xff0c;我们已经成功创建并提交了一个 README.md 文件到 FirstGit 版本库中1、修改文件现在我们更改 README.md 内容2、查看版本库状态该文件夹内右键运行 Git Bash Here执行命令 git statusGit 提示我们的改动还没有 commit&#xff0c;并且它给出了…

Eclipse中Copy Qualified Name复制类全名解决办法

原文链接&#xff1a;http://www.cnblogs.com/zyh1994/p/6393550.html ----------------------------------------------------------------------------------------------- Eclipse中 用Copy Qualified Name复制类全名时 总是这样的/struts1/src/me/edu/HelloAction.java很不…

c 连接mysql错误信息_使用C语言访问MySQL数据 —— 连接和错误处理

2011-05-09 wcdj可以通过许多不同的编程语言来访问MySQL&#xff0c;例如&#xff0c;C&#xff0c;C&#xff0c;Java&#xff0c;Perl&#xff0c;Python&#xff0c;Tcl&#xff0c;PHP等。本文主要总结使用C语言接口如何访问MySQL数据。(一) 连接例程(二) 错误处理(一) 连接…

eclipse编写wordcount提交spark运行

采用集成了scala的eclipse编写代码 代码&#xff1a; package wordcountimport org.apache.spark.SparkConf import org.apache.spark.SparkContextobject WordCount {def main(args: Array[String]): Unit {//非常重要&#xff0c;是通向Spark集群的入口val confnew SparkCon…

gitlab 删除分支_如何删除gitlab上默认受保护的master主分支

今天开发在检查代码的时候&#xff0c;发现master分支有问题&#xff0c;现在准备删除此主分支&#xff0c;并且重新提交正确的代码&#xff0c;不过在删除时发现&#xff0c;master分支不能被删除。ps&#xff1a;主分支一般都是线上分支&#xff0c;需要开发确认后并且做好备…