05.RDD详解

05.Spark--RDD详解

RDD详解--groupByKey--reduceByKey

[MapPartitionRDD单词统计]

单词统计
import org.apache.spark.{SparkConf,SparkContext}
object WordCountScala{def main(args:Array[String]):Unit={//创建spark配置对象val conf=new SparkConf()conf.setAppName("WCScala")conf.setMaster("local")//创建上下文val sc=new SparkContext(conf)//加载文档,这个文件是文本文件,调的是hadoopFileval rdd1=sc.textFile("file:///d:/mr/word.txt")[textFile,hadoopFile]//K是longtegr  hadoop里面的  pair hadoopFile(path,classOf[TextInputFormat],classOf[LognWritable],classOf[Test],minPartitions).map(pair=>pair._2.toString).setName(path)//map做的版面//压扁val rdd2=rdd1.flatMap(_.split(" "))//标1成对val rdd3=rdd2.map(_,1)//聚合val rdd4=rdd3.reduceByKey(_+_)val arr=rdd4.collect()arr.foreach(println)//链式编程//sc.textFile("file:///d:/mr/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)}
}
RDD的依赖列表是如何呈现的?
//[T:ClassTag]主构造
abstract class RDD[T:ClassTag]{@transient private var _sc:SparContext,//体现出了依赖集合,RDD需要的依赖列表  什么时候创建的?@transient private var deps:Seq[Dependency[_]] //[Dependency[_]]泛型
}extends Serialiizable with Logging{...
}
//映射分区RDD
MapPartitionsRDD(org.apache.spark.rdd)
private[spark] class MapPartitionsRDD[U:ClassTag,T:ClassTag](var prev:RDD[T],f:(TaskCOntext,Int,Iterator[T])=>Iterator[U].preserversPartitioning:Boolean=false)//prev是上级的RDD
extends RDD[U](prev){//构造一个rdd用one-to-one依赖...此时RDD会调用  def this(@transoentoneParent:RDD[_])=this(oneParent.context,List(new OneToOneDependency(oneParent)))//一对一的依赖,OneToOneDependency总结:当它去调MapPartitionsRDD的时候,它继承了父的RDD,而父RDD它只传了一个上级RDD的prev这个属性,因为它走的是(def this(@transoent oneParent:RDD[_]))辅助构造。辅助构造它把这个RDD的上下文(oneParent)取出,放入这里面.这里面创建了一个List(new OneToOneDependency(oneParent),创建了OneToOneDependency依赖。oneParent上级的RDD。
}
)class OneToOneDependency[T](rdd:RDD[T])extends NarrowDependency[T](rdd){override def getParents(partitionId:Int):List[Int]=List(partitionId)//其实它是一个链条,RDD本身是依赖列表。每一个依赖于上级关联。所以不是MapPartitionRDD于preRDD之间直接关联。是通过依赖走了一圈。}
如何判断是宽依赖还是窄依赖的?  MapPartitionsRDD就是窄依赖,在reduceByKey的时候就已经ShuffledRDD了。ShuffledRDD与依赖有啥关系?
那是因为在创建RDD的时候,就已经把依赖关联进了去了。因为huffer依赖不是它划分边界的关键。它通过依赖,因为宽依赖就是Shuffer,窄依赖就不是Shuffer了。当它在创建RDD进来的时候,这个依赖就在这里面了。所以它是固定的。RDD它里面有一个分区列表,分区列表它是一个集合。可以理解为一个引用。集合里面放了一堆的依赖。其中RDD是一个抽象类,有一个是MapPartitionRDD,它是RDD的一个子类。它具备了RDD的特点。也得有RDD的分区列表。它创建了一对一的依赖。RDD中所传的prev是上一家RDD,也是在构造里面。上一个RDD存放哪?为了构造MapPartitionRDD它是通过其它的RDD变换。MapPartitionRDD是如何与preRDD关联起来的。是因为MapPartitionRDD它有依赖,而在这个依赖当中它有一个RDD的属性(deps)关联到preRDD的。从Hadoop到flatMap再到表一成对它们全都是窄依赖。到了reduceByKey它返回的是ShuffledRDD它用到的就是Shuffler依赖了。

1484108-20181015200941328-573628420.png
1484108-20181015200912538-1275199425.png

ShufflerdRDD:这个结果RDD,它是要通过Shuffle来产生的。参数是由上一个RDD还有分区类,K类.V类还有组合函数,ShuffledRDD也是继承了RDD的。RDD是抽象的,它有两个子类MapPartitionsRDD和ShuffleRDD.MapPartition和ShuffleRDD都继承于RDD。RDD它有分区列表,作为Dependecy(依赖)。一个RDD它可以由多个Dependecy(依赖)。这种关系叫做多重性关系。Dependecy(依赖)分为两种依赖,宽依赖(ShuffleDep)和窄依赖(NarrowDep)。宽依赖(NarrowDep)分为三种依赖,One2OneDep,RangeDep,PruneDep它们都继承窄依赖(NarrowDep)。每一个RDD都和上一个RDD是有关系的。它是直接关联上去 的吗?不它不是,它是通过依赖Dependency(依赖关联上去的)。所以1个RDD里面它会有多个依赖。那么每个依赖它有多少个RDD? asttract class Dependency[T]extebds Serializable{def rdd:RDD[T]}只有一个RDD。Dependecy(依赖)与RDD的关系是一对一的关系。对于每一RDD它是走依赖再找上一个RDD。ShuffleRDD是与ShuffleDep有关系的。ShuffledRDD它是重写get依赖的方法。getDependencies,它的依赖它的方法里面List(new ShuffleDependency(prev,part,seralizer,keyOrdering,aggregator,mapSideCombine),它返回的是ShuffleDependency依赖。prev还给了上级。part分区。seralizer串行化类,keyOrdering排序以及aggregator聚合器以及mapSideCombine合成函数。ShuffleRDD是依赖于ShuffleDep。MapPartitionsRDD是依赖于One2OneDep。什么时候创建依赖?是在创建RDD的时候,就已经产生了依赖。Spark给了那么多的RDD。它们都有对应的。RDD的依赖是在RDD的构造函数中出现的。看看filter(过滤)它用的也是MapPartitionsRDD.
groupByKey和reduceByKey之间的区别?假如它们都能实现相同功能下优先使用?优先reduceByKey 为什么? 有一个合成过程,hadoop的合成链条是怎样的?map分为三个阶段,第一setup():做一些初始化的配置的。 第二 while() 找每一行,每一行都会经过while()循环。在调用map()函数的时候,第三cleanup()收尾工作的。Spark的分区和hadoop的分区一样吗?不一样,hadoop的分区是指在map端的分区过程,map之后有一个分区。分区分多少个区,就是Reduce的个数。hadoop的分区只能是Reduce的个数。是Map过程中对key进行分发的目的地。hadoop的MR是map阶段进行完后,它要经过hash。经过分发,分发到集合空间里面去。几个空间就是几个分区。这里的分区数和reduce的个数对应。reduce的个数是和程序来设置的。跟我们的切片没有关系。Spark的是分区,Spark的分区就是切片,map的个数。当加载文件的时候,这个文件被切成了多少片,每一片要一般要对应一个任务。所以Spark的分区就是切片的个数。而且每一个RDD都有自己的分区数。这是它们的不同。Spark的分区就是切片。分成多少片,当你变换之后。也是产生新的RDD,它又有分区。groupByKey在hadoop中,map产生的K,V是要经过分发。要进入到分区,当分区完的下一步就Combiner(合成)。合成必须有吗?不一定 合成的目的就是减少网络负载。单词统计中,hello统计了100万,如果不做Combiner它就要分发做100万遍了。但是如果它做了Combiner它只要做reduce个数了。因为每个分区里面都把数据先聚合起来了。假如有3个分区每个分区都有100万数据它是标1的,如果它不做Combiner。它就要把300万逗号1发走。所以这网络负载是很大的。那就没有必要了。Combiner是map端的聚合。Combiner是map端的Reduce,Combiner也叫做预聚合。这样一来,每个map端就编程了“hell 1百万“(数据格式)了,这样就只要发送这一条数据就行了。因为它已经聚合好了。

groupByKey合reduceByKey : groupByKey是没有Combine过程的,reduceByKey是有Combiner过程。结果一定会变少,变少之后,再经过网络分发。那就是网络带宽就占少了,就不用分发那么多了。它有一种数据的压紧的工作。假如你用的分组是组成一个新的集合List[],这也是一个聚合过程。对于这样的结果来讲groupByKey和reduceByKey的结果相同吗?也不相同 为什么?因为groupbyKey的话它就分到一个组上了。groupByKeyList它没有Combiner所以它在Reduce

在很多map中,可以在map内聚合,可以在map内聚合。在map端聚合完后.不管是groupByKey还是reduceByKey都是调用combineByKeyWithClassTag(按类标记符来合成Key,按k合成)方法。mapSideCombine默认值是true.reduceByKey没有传递这个参数,它就是默认值。groupByKey传递的值是false,所以它不进行map端聚合。groupByKey它可以改变V的类型。reduceByKey没有机会。reduceByKey是两V聚成一V,类型是相同的。如果想用reduceByKey来实现。 变换是没有机会指定的,但是Shuffer是有机会指定的。MapPartitionsRDD当你在分组的时候getPartitions。numPartitions:Int这个是分区数。在这里是可以指定分区数的。而且来可以带一个HashPartitioner(分区函数)默认的是Hash分区打散。

1484108-20181015200755414-678127776.png

转载于:https://www.cnblogs.com/SteveDZC/p/9794325.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/250478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mininet

首先,我折腾了两周多的东西终于弄出一点眉目了。 有以下几个内容需要学习记忆一下。 1.虚拟机,弄不出来共享文件夹,就用U盘吧,贼快还不用安装配置各种东西,virtualbox和VMware都支持。 2.ubantu安装软件中途失败&#…

docker --- 使用docker-compose.yml生成redis,并连接redis-cli

docker.compose.yml 配置 version: 3.1 services:redis:image: redisports:- 6379:6379命令行:docker-compose up 查看: docker ps 进入redis-cli,输入以下 docker exec -it 7dc0a redis-cli -h localhost -p 6379 操作Redis数据 设置 namemarron set name marron 获取nam…

浅谈javaweb三大框架和MVC设计模式

浅谈javaweb三大框架和MVC设计模式转载自:http://blog.csdn.net/sunpeng19960715/article/details/50890705 小序:博主以前在学javaweb的时候开始总不理解javaweb三大框架和MVC框架模式,虽然没有把两者混为一谈,但是也是很晕菜。…

win下配置nginx

1.下载:http://nginx.org/en/download.html 2.在安装目录cmd: start nginx.exe 启动nginx 3.修改默认运行端口80(nginx.conf): HTTP 数据分发 修改配置文件nginx.conf相应节点: 修改完后重启服务: nginx -s reload TCP 数据分发: nginx 1.9以上版本支持tcp转发 配置文件中增加:…

在springBoot中配置web.xml中配置的servlet

第一种 web.xml (截取的需要转换的) 当拦截到 /socke t时执行该servlet <servlet><servlet-name>websocket</servlet-name><servlet-class>org.ldd.ssm.hangyu.socket.MyWebSocketServlet</servlet-class></servlet><servlet-mapping&g…

koa --- koa-bouncer验证

使用 koa-bouncer中间件对传入的数据进行验证 const bouncer require(koa-bouncer); app.use(bouncer.middleware());const val async (ctx, next) > {ctx.validateBody(name).required(要求提供用户名).isLength(6, 16, 用户名长度应该为6~16).isString().trim()next();…

static关键字的作用

//C/C程序员面试指南 杨国祥等编著 定义全局静态变量。全局静态变量有以下特点&#xff1a; 在全局数据区分配内存&#xff1b;如果没有初始化&#xff0c;其默认值为0&#xff1b;该变量在本文件内从定义开始到文件结束可见。定义局部静态变量。局部静态变量有以下特点&…

Redis 初次尝试

Redis 初次尝试 第一次接触redis&#xff0c;也不知道要写些什么。就玩了下将redis列表中的数据存入mysql数据库中。 首先有三个文件&#xff1a; redis.php 添加数据进redis&#xff1b; insert_class.php 将数据插入数据库&#xff1b; inert.php 调用insert_class.php;…

fiddler2抓包数据工具使用教程

一款免费且功能强大的数据包抓取软件。它通过代理的方式获取程序http通讯的数据&#xff0c;可以用其检测网页和服务器的交互情况&#xff0c;能够记录所有客户端和服务器间的http请求&#xff0c;支持监视、设置断点、甚至修改输入输出数据等功能。fiddler包含了一个强大的基于…

egg --- 初始化一个egg项目基本结构说明

Egg.js体验 全局安装 // 创建项目 $ npm i egg-init -g $ egg-init egg-example --typesimple $ cd egg-example $ npm i// 启动项目 $ npm run dev $ open localhost:7000Egg.js的结构 路由(Router): 将请求URL和具体承担执行动作的Controller的关系对应控制器(Controller)…

葫芦娃

葫芦娃救爷爷 1.队名——代码那些事儿 2.团队成员 刘佳 211606320&#xff08;队长&#xff09;李佳 211660313周世元 211606348王浩 211606378曾丽丽 211606302陈水莲 211606303许燕婷 211606338杨小妮 2116063413.队长博客链接 -https://www.cnblogs.com/LJ-D/p/9799944.html…

webstorm遇到的问题

问题一&#xff1a;英译&#xff1a;未指定node.js的解释器。 解决方法&#xff1a;将webstorm配置支持node.js并自动补全 步骤&#xff1a; 先下载node.jsFile->Setting->输入Node.js&#xff08;选中点进去&#xff09;->Node imterpreter&#xff08;选择node的安装…

egg --- 配置连接mysql 创建模型 插入数据

在egg中使用egg-sequelize插件 sequelize是与数据库操作相关的库安装: npm install --save egg-sequelize mysql2 在egg中配置sequelize 1.在 config/plugin.js中引入 egg-sequelize插件,代码如下 sequelize: {enable: true,package: egg-sequelize }2.在config/config.def…

Flask 在 Debug 模式下初始化2次

请移步&#xff1a; http://blog.zengrong.net/post/2632.html https://stackoverflow.com/questions/9449101/how-to-stop-flask-from-initialising-twice-in-debug-mode/9476701#9476701 https://stackoverflow.com/questions/25504149/why-does-running-the-flask-dev-serve…

eclipse报错: Could not generate secret

在调用微信接口时&#xff0c;出现一个错误&#xff1a; 一直以为是接口调用问题&#xff0c;经多方查询和尝试解决&#xff0c;最后找到根源&#xff1a; edit-->使用default就可以了。 原因&#xff1a; 在eclipse中运行时&#xff0c;把签名信息给去掉了。 转载于:https:…

koa --- [MVC实现之一]自定义路由读取规则

实现MVC分层架构 目标是创建约定大于配置、开发效率高、可维护性强的项目架构路由处理 规范 所有路由,都要放在routes文件夹中若导出路由对象,使用 动词空格路径 作为key, 值是操作方法若导出函数, 则函数返回第二条约定格式的对象 路由定义: 新建 router/index.js, 默认index…

sql中的left join、right join、inner join

sql中的left join、right join、inner join 转自&#xff1a;http://www.cnblogs.com/pcjim/articles/799302.html left join(左联接) 返回包括左表中的所有记录和右表中联结字段相等的记录 right join(右联接) 返回包括右表中的所有记录和左表中联结字段相等的记录inner join…

bzoj1128 Lam-lights

题目描述 对于一个长度为n的数列p&#xff0c;数列中任意两个数互质。准备一个无限长的储存器。然后从p1开始&#xff0c;把储存器中p1倍数位置都赋值为p1&#xff0c;把储存器中p2倍数位置都赋值为p2&#xff0c;把储存器中p3倍数位置都赋值为p3。。。把储存器中pn倍数位置都赋…

koa --- [MVC实现之二]Controller层的实现

[MVC实现之一]传送门 https://blog.csdn.net/piano9425/article/details/103362966 Router层 router这一层,不做业务处理,仅仅只是将路由和路由的处理函数结合起来.路由的处理函数由Controller层实现改进目录结构如下(实际上新建了controller文件夹及其内部子文件,mar.js) …

k8s install

https://xiangyu123.github.io/2018/10/17/k8s-install/转载于:https://www.cnblogs.com/robinunix/p/9809937.html