05.RDD详解

05.Spark--RDD详解

RDD详解--groupByKey--reduceByKey

[MapPartitionRDD单词统计]

单词统计
import org.apache.spark.{SparkConf,SparkContext}
object WordCountScala{def main(args:Array[String]):Unit={//创建spark配置对象val conf=new SparkConf()conf.setAppName("WCScala")conf.setMaster("local")//创建上下文val sc=new SparkContext(conf)//加载文档,这个文件是文本文件,调的是hadoopFileval rdd1=sc.textFile("file:///d:/mr/word.txt")[textFile,hadoopFile]//K是longtegr  hadoop里面的  pair hadoopFile(path,classOf[TextInputFormat],classOf[LognWritable],classOf[Test],minPartitions).map(pair=>pair._2.toString).setName(path)//map做的版面//压扁val rdd2=rdd1.flatMap(_.split(" "))//标1成对val rdd3=rdd2.map(_,1)//聚合val rdd4=rdd3.reduceByKey(_+_)val arr=rdd4.collect()arr.foreach(println)//链式编程//sc.textFile("file:///d:/mr/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)}
}
RDD的依赖列表是如何呈现的?
//[T:ClassTag]主构造
abstract class RDD[T:ClassTag]{@transient private var _sc:SparContext,//体现出了依赖集合,RDD需要的依赖列表  什么时候创建的?@transient private var deps:Seq[Dependency[_]] //[Dependency[_]]泛型
}extends Serialiizable with Logging{...
}
//映射分区RDD
MapPartitionsRDD(org.apache.spark.rdd)
private[spark] class MapPartitionsRDD[U:ClassTag,T:ClassTag](var prev:RDD[T],f:(TaskCOntext,Int,Iterator[T])=>Iterator[U].preserversPartitioning:Boolean=false)//prev是上级的RDD
extends RDD[U](prev){//构造一个rdd用one-to-one依赖...此时RDD会调用  def this(@transoentoneParent:RDD[_])=this(oneParent.context,List(new OneToOneDependency(oneParent)))//一对一的依赖,OneToOneDependency总结:当它去调MapPartitionsRDD的时候,它继承了父的RDD,而父RDD它只传了一个上级RDD的prev这个属性,因为它走的是(def this(@transoent oneParent:RDD[_]))辅助构造。辅助构造它把这个RDD的上下文(oneParent)取出,放入这里面.这里面创建了一个List(new OneToOneDependency(oneParent),创建了OneToOneDependency依赖。oneParent上级的RDD。
}
)class OneToOneDependency[T](rdd:RDD[T])extends NarrowDependency[T](rdd){override def getParents(partitionId:Int):List[Int]=List(partitionId)//其实它是一个链条,RDD本身是依赖列表。每一个依赖于上级关联。所以不是MapPartitionRDD于preRDD之间直接关联。是通过依赖走了一圈。}
如何判断是宽依赖还是窄依赖的?  MapPartitionsRDD就是窄依赖,在reduceByKey的时候就已经ShuffledRDD了。ShuffledRDD与依赖有啥关系?
那是因为在创建RDD的时候,就已经把依赖关联进了去了。因为huffer依赖不是它划分边界的关键。它通过依赖,因为宽依赖就是Shuffer,窄依赖就不是Shuffer了。当它在创建RDD进来的时候,这个依赖就在这里面了。所以它是固定的。RDD它里面有一个分区列表,分区列表它是一个集合。可以理解为一个引用。集合里面放了一堆的依赖。其中RDD是一个抽象类,有一个是MapPartitionRDD,它是RDD的一个子类。它具备了RDD的特点。也得有RDD的分区列表。它创建了一对一的依赖。RDD中所传的prev是上一家RDD,也是在构造里面。上一个RDD存放哪?为了构造MapPartitionRDD它是通过其它的RDD变换。MapPartitionRDD是如何与preRDD关联起来的。是因为MapPartitionRDD它有依赖,而在这个依赖当中它有一个RDD的属性(deps)关联到preRDD的。从Hadoop到flatMap再到表一成对它们全都是窄依赖。到了reduceByKey它返回的是ShuffledRDD它用到的就是Shuffler依赖了。

1484108-20181015200941328-573628420.png
1484108-20181015200912538-1275199425.png

ShufflerdRDD:这个结果RDD,它是要通过Shuffle来产生的。参数是由上一个RDD还有分区类,K类.V类还有组合函数,ShuffledRDD也是继承了RDD的。RDD是抽象的,它有两个子类MapPartitionsRDD和ShuffleRDD.MapPartition和ShuffleRDD都继承于RDD。RDD它有分区列表,作为Dependecy(依赖)。一个RDD它可以由多个Dependecy(依赖)。这种关系叫做多重性关系。Dependecy(依赖)分为两种依赖,宽依赖(ShuffleDep)和窄依赖(NarrowDep)。宽依赖(NarrowDep)分为三种依赖,One2OneDep,RangeDep,PruneDep它们都继承窄依赖(NarrowDep)。每一个RDD都和上一个RDD是有关系的。它是直接关联上去 的吗?不它不是,它是通过依赖Dependency(依赖关联上去的)。所以1个RDD里面它会有多个依赖。那么每个依赖它有多少个RDD? asttract class Dependency[T]extebds Serializable{def rdd:RDD[T]}只有一个RDD。Dependecy(依赖)与RDD的关系是一对一的关系。对于每一RDD它是走依赖再找上一个RDD。ShuffleRDD是与ShuffleDep有关系的。ShuffledRDD它是重写get依赖的方法。getDependencies,它的依赖它的方法里面List(new ShuffleDependency(prev,part,seralizer,keyOrdering,aggregator,mapSideCombine),它返回的是ShuffleDependency依赖。prev还给了上级。part分区。seralizer串行化类,keyOrdering排序以及aggregator聚合器以及mapSideCombine合成函数。ShuffleRDD是依赖于ShuffleDep。MapPartitionsRDD是依赖于One2OneDep。什么时候创建依赖?是在创建RDD的时候,就已经产生了依赖。Spark给了那么多的RDD。它们都有对应的。RDD的依赖是在RDD的构造函数中出现的。看看filter(过滤)它用的也是MapPartitionsRDD.
groupByKey和reduceByKey之间的区别?假如它们都能实现相同功能下优先使用?优先reduceByKey 为什么? 有一个合成过程,hadoop的合成链条是怎样的?map分为三个阶段,第一setup():做一些初始化的配置的。 第二 while() 找每一行,每一行都会经过while()循环。在调用map()函数的时候,第三cleanup()收尾工作的。Spark的分区和hadoop的分区一样吗?不一样,hadoop的分区是指在map端的分区过程,map之后有一个分区。分区分多少个区,就是Reduce的个数。hadoop的分区只能是Reduce的个数。是Map过程中对key进行分发的目的地。hadoop的MR是map阶段进行完后,它要经过hash。经过分发,分发到集合空间里面去。几个空间就是几个分区。这里的分区数和reduce的个数对应。reduce的个数是和程序来设置的。跟我们的切片没有关系。Spark的是分区,Spark的分区就是切片,map的个数。当加载文件的时候,这个文件被切成了多少片,每一片要一般要对应一个任务。所以Spark的分区就是切片的个数。而且每一个RDD都有自己的分区数。这是它们的不同。Spark的分区就是切片。分成多少片,当你变换之后。也是产生新的RDD,它又有分区。groupByKey在hadoop中,map产生的K,V是要经过分发。要进入到分区,当分区完的下一步就Combiner(合成)。合成必须有吗?不一定 合成的目的就是减少网络负载。单词统计中,hello统计了100万,如果不做Combiner它就要分发做100万遍了。但是如果它做了Combiner它只要做reduce个数了。因为每个分区里面都把数据先聚合起来了。假如有3个分区每个分区都有100万数据它是标1的,如果它不做Combiner。它就要把300万逗号1发走。所以这网络负载是很大的。那就没有必要了。Combiner是map端的聚合。Combiner是map端的Reduce,Combiner也叫做预聚合。这样一来,每个map端就编程了“hell 1百万“(数据格式)了,这样就只要发送这一条数据就行了。因为它已经聚合好了。

groupByKey合reduceByKey : groupByKey是没有Combine过程的,reduceByKey是有Combiner过程。结果一定会变少,变少之后,再经过网络分发。那就是网络带宽就占少了,就不用分发那么多了。它有一种数据的压紧的工作。假如你用的分组是组成一个新的集合List[],这也是一个聚合过程。对于这样的结果来讲groupByKey和reduceByKey的结果相同吗?也不相同 为什么?因为groupbyKey的话它就分到一个组上了。groupByKeyList它没有Combiner所以它在Reduce

在很多map中,可以在map内聚合,可以在map内聚合。在map端聚合完后.不管是groupByKey还是reduceByKey都是调用combineByKeyWithClassTag(按类标记符来合成Key,按k合成)方法。mapSideCombine默认值是true.reduceByKey没有传递这个参数,它就是默认值。groupByKey传递的值是false,所以它不进行map端聚合。groupByKey它可以改变V的类型。reduceByKey没有机会。reduceByKey是两V聚成一V,类型是相同的。如果想用reduceByKey来实现。 变换是没有机会指定的,但是Shuffer是有机会指定的。MapPartitionsRDD当你在分组的时候getPartitions。numPartitions:Int这个是分区数。在这里是可以指定分区数的。而且来可以带一个HashPartitioner(分区函数)默认的是Hash分区打散。

1484108-20181015200755414-678127776.png

转载于:https://www.cnblogs.com/SteveDZC/p/9794325.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/250478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mininet

首先,我折腾了两周多的东西终于弄出一点眉目了。 有以下几个内容需要学习记忆一下。 1.虚拟机,弄不出来共享文件夹,就用U盘吧,贼快还不用安装配置各种东西,virtualbox和VMware都支持。 2.ubantu安装软件中途失败&#…

docker --- 使用docker-compose.yml生成redis,并连接redis-cli

docker.compose.yml 配置 version: 3.1 services:redis:image: redisports:- 6379:6379命令行:docker-compose up 查看: docker ps 进入redis-cli,输入以下 docker exec -it 7dc0a redis-cli -h localhost -p 6379 操作Redis数据 设置 namemarron set name marron 获取nam…

浅谈javaweb三大框架和MVC设计模式

浅谈javaweb三大框架和MVC设计模式转载自:http://blog.csdn.net/sunpeng19960715/article/details/50890705 小序:博主以前在学javaweb的时候开始总不理解javaweb三大框架和MVC框架模式,虽然没有把两者混为一谈,但是也是很晕菜。…

win下配置nginx

1.下载:http://nginx.org/en/download.html 2.在安装目录cmd: start nginx.exe 启动nginx 3.修改默认运行端口80(nginx.conf): HTTP 数据分发 修改配置文件nginx.conf相应节点: 修改完后重启服务: nginx -s reload TCP 数据分发: nginx 1.9以上版本支持tcp转发 配置文件中增加:…

koa --- koa-bouncer验证

使用 koa-bouncer中间件对传入的数据进行验证 const bouncer require(koa-bouncer); app.use(bouncer.middleware());const val async (ctx, next) > {ctx.validateBody(name).required(要求提供用户名).isLength(6, 16, 用户名长度应该为6~16).isString().trim()next();…

fiddler2抓包数据工具使用教程

一款免费且功能强大的数据包抓取软件。它通过代理的方式获取程序http通讯的数据,可以用其检测网页和服务器的交互情况,能够记录所有客户端和服务器间的http请求,支持监视、设置断点、甚至修改输入输出数据等功能。fiddler包含了一个强大的基于…

egg --- 初始化一个egg项目基本结构说明

Egg.js体验 全局安装 // 创建项目 $ npm i egg-init -g $ egg-init egg-example --typesimple $ cd egg-example $ npm i// 启动项目 $ npm run dev $ open localhost:7000Egg.js的结构 路由(Router): 将请求URL和具体承担执行动作的Controller的关系对应控制器(Controller)…

葫芦娃

葫芦娃救爷爷 1.队名——代码那些事儿 2.团队成员 刘佳 211606320(队长)李佳 211660313周世元 211606348王浩 211606378曾丽丽 211606302陈水莲 211606303许燕婷 211606338杨小妮 2116063413.队长博客链接 -https://www.cnblogs.com/LJ-D/p/9799944.html…

webstorm遇到的问题

问题一:英译:未指定node.js的解释器。 解决方法:将webstorm配置支持node.js并自动补全 步骤: 先下载node.jsFile->Setting->输入Node.js(选中点进去)->Node imterpreter(选择node的安装…

eclipse报错: Could not generate secret

在调用微信接口时,出现一个错误: 一直以为是接口调用问题,经多方查询和尝试解决,最后找到根源: edit-->使用default就可以了。 原因: 在eclipse中运行时,把签名信息给去掉了。 转载于:https:…

koa --- [MVC实现之一]自定义路由读取规则

实现MVC分层架构 目标是创建约定大于配置、开发效率高、可维护性强的项目架构路由处理 规范 所有路由,都要放在routes文件夹中若导出路由对象,使用 动词空格路径 作为key, 值是操作方法若导出函数, 则函数返回第二条约定格式的对象 路由定义: 新建 router/index.js, 默认index…

bzoj1128 Lam-lights

题目描述 对于一个长度为n的数列p,数列中任意两个数互质。准备一个无限长的储存器。然后从p1开始,把储存器中p1倍数位置都赋值为p1,把储存器中p2倍数位置都赋值为p2,把储存器中p3倍数位置都赋值为p3。。。把储存器中pn倍数位置都赋…

koa --- [MVC实现之二]Controller层的实现

[MVC实现之一]传送门 https://blog.csdn.net/piano9425/article/details/103362966 Router层 router这一层,不做业务处理,仅仅只是将路由和路由的处理函数结合起来.路由的处理函数由Controller层实现改进目录结构如下(实际上新建了controller文件夹及其内部子文件,mar.js) …

Layui --- [Mar]给渲染后的表格加CSS样式

为什么要控制样式 使用layui生成后的表格的样式有时候,并不能满足我们的需求.因此在渲染完成后,需要自定义类对其操作 Layui表格渲染后一般会出现以下结构 分结构如下 我把使用layui的table渲染后的表格分为如下的几个dom 1.$rawTable: 初始table,即 2.$renderTable: 渲染之…

Python 框架之Flask初步了解

Python 框架之Flask初步了解 前言 ​ 在了解python web 框架之前,我们需要先了解框架实现的基本原理。首先,需要了解WSGI(Web Server Gateway Interface),借助WSGI我们就能实现用Python专注于生成HTML文档&#xff0…

koa --- [MVC实现之三]换个角度重新开始-初始化

说明 下面文章是对该系列前面2篇及项目中经验的总结,重新开始写的实现了Mar类,贯穿Router层、Controller层、Service层基本骨架的搭建 初始 使用Koa创建一个简单的服务器,一般会使用如下 const koa require(koa); const app new koa(); const Router require(koa-router…

java web 服务器环境搭建之jdk安装

Java 部署环境搭建 一 安装centos系统,安装完成后用root用户登录 二 Java 环境安装 下载jdk安装包,使用以下命令下载安装包,也可以在windows环境现在,在上传到linux机器上curl -O -L http://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/…

koa --- [MVC实现之四]Router、Controller、Service的实现

说明 上一篇: [MVC实现之三]上一篇实现了,Router层、Controller层、Service层的基本原则,并且成功的通过Mar类来传递这些层级之间需要的参数.这一篇主要是通过业务层面来具体实现: Router层:监听页面的路由,并调用Controller层的路由处理函数Controller层:给Router层提供服务,…

2017-2018-2 20179317 《网络攻防技术》第七周学习心得体会

教材学习内容总结 课本第七章主要围绕windows操作系统安全攻防技术进行讲述,教材中主要涉及的攻击内容如下: Windows操作系统的基本结构 运行于处理器特权模式的操作系统内核运行在处理器非特权模式的用户空间代码采用宏内核模式来进行构架 Windows操作系…

PE文件格式详解(二)

0x00 前言 上一篇讲到了PE文件头的中IMAGE_FILE_HEADER结构的第二个结构,今天从IMAGE_FILE_HEADER中第三个结构sizeOfOptionalHeader讲起。这个字段的结构名也叫做IMAGE_OPTIONAL_HEDAER讲起。 0x01 IMAGE_OPTIONAL_HEADER概述 其实这个结构是IMAGE_FILE_HEADER结构…