Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...

本博文的主要内容是:

1、rdd基本操作实战

2、transformation和action流程图

3、典型的transformation和action

 

 

 

RDD有3种操作:

1、  Trandformation      对数据状态的转换,即所谓算子的转换

2、  Action    触发作业,即所谓得结果的

3、  Contoller  对性能、效率和容错方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

 

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

传入类型是T,返回类型是U。

 

 

 

元素之间,为什么reduce操作,要符合结合律和交换律?
答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。
在交换律基础上,想要reduce操作,必须要符合结合律。

/**

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

RDD.scala(源码)


这里,新建包com.zhouls.spark.cores

package com.zhouls.spark.cores

/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {

}


下面,开始编代码

本地模式

自动 ,会写好

源码来看,

所以, val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身

 

 

val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple


val textLines = lineCount.reduceByKey(_+_)


textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

 成功!



 现在,将此行代码,

     textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
 

总结:

本地模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。

 

 

 

集群模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行无法通过,因为结果是分布在各个节点上。
 
collect源码:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
Tuple是元组。通过concat合并!


foreach源码:

/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
  


 

rdd实战(rdd基本操作实战)至此!

 

 

 

 

 

 rdd实战(transformation流程图)

 拿wordcount为例!

 

启动hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

 

 启动spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 

启动spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

 

 

scala> val partitionsReadmeRdd =  sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

 或者

 scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

 scala>  val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

 

注意,~目录,不是这里。

 

 

 

 为什么,我的,不是这样的显示呢?

 

 

 

RDD的transformation和action执行的流程图

 

 

典型的transformation和action

转载于:https://www.cnblogs.com/zlslch/p/5913334.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/254337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用GDB调试程序

GDB概述GDB 是GNU开源组织发布的一个强大的UNIX下的程序调试工具。或许,各位比较喜欢那种图形界面方式的,像VC、BCB等IDE的调试,但如果你是在 UNIX平台下做软件,你会发现GDB这个调试工具有比VC、BCB的图形化调试器更强大的功能。所…

灯塔的出现给那些有想法,有能力而又缺乏资金的社区人士提供了一条途径

2019独角兽企业重金招聘Python工程师标准>>> 在上个月,BCH社区传出基于比特币现金的众筹平台Lighthouse(灯塔)正在复活的消息,并且有网友在论坛上贴出了部分网站图片。当消息被证实为真,官网和项目的审核细…

PID 算法理解

PID 算法 使用环境:受到外界的影响不能按照理想状态发展。如小车的速度不稳定的调节,尽快达到目标速度。 条件:闭环系统->有反馈 要求:快准狠 分类:位置式、增量式 增量式 输入:前次速度、前前次速度、前…

C#字符串的基本操作

文章目录简介字符串判断是否相等语法实例字符串比较大小语法实例判断字符串变量是否包含指定字符或字符串语法实例查找字符串变量中指定字符或字符串出现的位置语法实例取子串语法实例插入子串语法实例删除子串语法实例替换子串语法实例去除字符串空格语法实例博主写作不容易&a…

C++利用SOCKET传送文件

C利用SOCKET传送文件 /*server.h*/ #pragma comment(lib, "WS2_32") #include <WinSock2.h> #include <iostream> //#include <stdio.h> #include <assert.h> #ifndef COMMONDEF_H #define COMMONDEF_H #define MAX_PACKET_SIZE 10240 …

三种方式在CentOS 7搭建KVM虚拟化平台

KVM 全称是基于内核的虚拟机&#xff08;Kernel-based Virtual Machine&#xff09;&#xff0c;它是一个 Linux的一个内核模块&#xff0c;该内核模块使得 Linux变成了一个Hypervisor&#xff1a;它由 Quramnet开发&#xff0c;该公司于 2008年被 Red Hat 收购 KVM的整体结构&…

(五)EasyUI使用——datagrid数据表格

DataGrid以表格形式展示数据&#xff0c;并提供了丰富的选择、排序、分组和编辑数据的功能支持。DataGrid的设计用于缩短开发时间&#xff0c;并且使开发人员不需要具备特定的知识。它是轻量级的且功能丰富。单元格合并、多列标题、冻结列和页脚只是其中的一小部分功能。具体功…

拾取模型的原理及其在THREE.JS中的代码实现

1. Three.js中的拾取 1.1. 从模型转到屏幕上的过程说开 由于图形显示的基本单位是三角形&#xff0c;那就先从一个三角形从世界坐标转到屏幕坐标说起&#xff0c;例如三角形abc 乘以模型视图矩阵就进入了视点坐标系&#xff0c;其实就是相机所在的坐标系&#xff0c;如下图&am…

StringBuilder-C#字符串对象

博主写作不容易&#xff0c;孩子需要您鼓励 万水千山总是情 , 先点个赞行不行 在C# 中&#xff0c;string是引用类型&#xff0c;每次改变string类对象的值&#xff0c;即修改字符串变量对应的字符串&#xff0c;都需要在内存中为新的字符串重新分配空间。在默写特定的情况…

java 19 - 11 异常的注意事项

1 /*2 * 异常注意事项:3 * A:子类重写父类方法时&#xff0c;子类的方法必须抛出相同的异常或父类异常的子类。(父亲坏了,儿子不能比父亲更坏)4 * B:如果父类抛出了多个异常,子类重写父类时,只能抛出相同的异常或者是他的子集,子类不能抛出父类没有的异常5 * C:如果被重写的…

数组去重的各种方式对比

数组去重&#xff0c;是一个老生常谈的问题了&#xff0c;在各厂的面试中也会有所提及&#xff0c;接下来就来细数一下各种数组去重的方式吧&#xff1b; 对于以下各种方式都统一命名为 unique&#xff0c;公用代码如下&#xff1a; // 生成一个包含100000个[0,50000)随机数的数…

Linux平台Makefile文件的编写基础篇和GCC参数详解

问&#xff1a;gcc中的-I.是什么意思。。。。看到了有的是gcc -I. -I/usr/xxxxx..那个-I.是什么意思呢 最佳答案 答&#xff1a;-Ixxx 的意思是除了默认的头文件搜索路径(比如/usr/include等&#xff09;外&#xff0c;同时还在路径xxx下搜索需要被引用的头文件。 所以你的gcc …

旧知识打造新技术--AJAX学习总结

AJAX是将旧知识在新思想的容器内进行碰撞产生的新技术&#xff1a;推翻传统网页的设计技术。改善用户体验的技术。 学习AJAX之初写过一篇《与Ajax的初次谋面》。当中都仅仅是一些自己浅显的理解&#xff0c;这次就总结一下它在历史长河中的重要地位。 【全】 AJAX全称为Asnychr…

C#数组基本操作

文章目录简介数组排序和反转语法实例查找数组元素语法实例数组元素求和、最大值、最小值、平均值语法实例数组字符串相互转化语法实例在字符串中查找、删除字符数组元素语法实例博主写作不容易&#xff0c;孩子需要您鼓励 万水千山总是情 , 先点个赞行不行 简介 C#提供了许…

redis(一)--认识redis

Redis官网对redis的定义是&#xff1a;“Redis is an open source, BSD licensed, advanced key-value cache and store”&#xff0c;可以看出&#xff0c;Redis是一种键值系统&#xff0c;可以用来缓存或存储数据。Redis是“Remote Dictionary Server”&#xff08;远程字典服…

转:如何用gcc编译生成动态链接库*.so文件 动态库

转&#xff1a;如何编译.so动态库问&#xff1a;我源文件为main.c, x.c, y.c, z.c,头文件为x.h,y.h,z.h如何编译成.so动态库&#xff1f;编译器用gcc最好能给出详细参数解释&#xff0c;谢谢答&#xff1a;# 声称动代连接库&#xff0c;假设名称为libtest.sogcc x.c y.c z.c -f…

工业镜头的主要参数与选型

文章目录简介1、镜头的分类(1) 以镜头安装分类(2) 以摄像头镜头规格分类(3) 以镜头光圈分类(4) 以镜头的视场大小分类(5) 从镜头焦距上分2、选择镜头的技术依据(1) 镜头的成像尺寸(2) 镜头的分辨率(3) 镜头焦距与视野角度(4) 光圈或通光量3、变焦镜头&#xff08;zoom lens&…

SQLSEVER 中的那些键和约束

SQL Server中有五种约束类型&#xff0c;各自是 PRIMARY KEY约束、FOREIGN KEY约束、UNIQUE约束、DEFAULT约束、和CHECK约束。查看或者创建约束都要使用到 Microsoft SQL Server Managment Studio。1. PRIMARY KEY约束 在表中常有一列或多列的组合&#xff0c;其值能唯一标识表…

数据库 sqlite 进阶

http://www.cppblog.com/czy463/archive/2013/12/16/204816.html 董淳光 前序&#xff1a; Sqlite3 的确很好用。小巧、速度快。但是因为非微软的产品&#xff0c;帮助文档总觉得不够。这些天再次研究它&#xff0c;又有一些收获&#xff0c;这里把我对 sqlite3 的研究列出来&a…

形象的列举-C# 枚举

文章目录简介例子分析点拨博主写作不容易&#xff0c;孩子需要您鼓励 万水千山总是情 , 先点个赞行不行 简介 枚举类型用于声明一组命名常数。 定义枚举类型语法格式如下&#xff1a;enum 枚举数组名{枚举成员列表};例如&#xff1a; enum week{星期一&#xff0c;星期二…