Spark RDD算子介绍

Spark学习笔记总结

01. Spark基础

1. 介绍

Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

2. Spark-Shell

  1. spark-shell是Spark自带的交互式Shell程序,用户可以在该命令行下用scala编写spark程序。
  2. 直接启动spark-shell,实质是spark的local模式,在master:8080中并未显示客户端连接。
  3. 集群模式:
    /usr/local/spark/bin/spark-shell --master spark://172.23.27.19:7077 --executor-memory 2g --total-executor-cores 2
  4. spark-shell中编写wordcount
    sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/").flatMap(.split(" ")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect

3. RDD介绍与属性

1. 介绍

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(创建了内容不可变)、可分区、里面的元素可并行计算的集合。

2. 属性:

nkfed8Z.png

  1. 由多个分区组成。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。
  2. 一个计算函数用于每个分区。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。
  3. RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。数据丢失时,根据依赖重新计算丢失的分区而不是整个分区。
  4. 一个Partitioner,即RDD的分片函数。默认是HashPartition
  5. 分区数据的最佳位置去计算。就是将计算任务分配到其所要处理数据块的存储位置。数据本地化。
3. 创建方式:
  1. 可通过并行化scala集合创建RDD
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  2. 通过HDFS支持的文件系统创建,RDD里没有真的数据,只是记录了元数据
    val rdd2 = sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/")

查看该rdd的分区数量
rdd1.partitions.length

3. 基础的transformation和action

RDD中两种算子:
transformation转换,是延迟加载的

常用的transformation:
(1)map、flatMap、filter
(2)intersection求交集、union求并集:注意类型要一致
distinct:去重
(3)join:类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
(4)groupByKey:在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
但是效率reduceByKey较高,因为有一个本地combiner的过程。
(5)cartesian笛卡尔积

常用的action
(1)collect()、count()
(2)reduce:通过func函数聚集RDD中的所有元素
(3)take(n):取前n个;top(2):排序取前两个
(4)takeOrdered(n),排完序后取前n个

4. 较难的transformation和action

参考《http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html》

(1)mapPartitions(func)和
mapPartitions(func):
独立地在RDD的每一个分片上运行,但是返回值;foreachPartition(func)也常用,不需要返回值

mapPartitionsWithIndex(func):
可以看到分区的编号,以及该分区数据。
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,func的函数类型必须是
(Int, Interator[T]) => Iterator[U]

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val func = (index: Int, iter: Iterator[(Int)]) => {iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator}
rdd1.mapPartitionsWithIndex(func).collect

(2)aggregate
action操作,
第一个参数是初始值,
第二个参数:是2个函数[每个函数都是2个参数(第一个参数:先对个个分区进行的操作, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]

例子:

rdd1.aggregate(0)(_+_, _+_)
//前一个是对每一个分区进行的操作,第二个是对各分区结果进行的结果rdd1.aggregate(5)(math.max(_, _), _ + _)
//结果:5 + (5+9) = 19val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
//结果:24或者42val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
//结果01或者10

(3)aggregateByKey
将key值相同的,先局部操作,再整体操作。。和reduceByKey内部实现差不多

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
//结果:Array((dog,12), (cat,17), (mouse,6))

PS:
和reduceByKey(+)调用的都是同一个方法,只是aggregateByKey要底层一些,可以先局部再整体操作。

(4)combineByKey
和reduceByKey是相同的效果,是reduceByKey的底层。
第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
每个分区中每个key中value中的第一个值,

val rdd1 = sc.textFile("hdfs://master:9000/wordcount/input/").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect

第一个参数的含义:
每个分区中相同的key中value中的第一个值
如:
(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相当于hello的第一个1, good中的1

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
//每个会多加3个10val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
//将key相同的数据,放入一个集合中

(5)collectAsMap
Action
Map(b -> 2, a -> 1)//将Array的元祖转换成Map,以后可以通过key取值

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap
//可以下一步使用

(6)countByKey
根据key计算key的数量
Action

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1.countByKey
rdd1.countByValue//将("a", 1)当做一个元素,统计其出现的次数

(7)flatMapValues
对每一个value进行操作后压平

转载于:https://www.cnblogs.com/wangrd/p/6216924.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/457215.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 写tb级文件_三管齐下!TB 级文件的上传性能瞬间被优化 100 倍!

作者 | 中华石杉责编 | 伍杏玲本文经授权转载石杉的架构笔记(ID:shishan100)这篇文章我们来看看,世界上最优秀的分布式文件系统HDFS,是如何对超大文件的上传做性能优化的?首先,我们还是通过一张图来看一下文件上传的大…

CentOS7下安装Redis — 单节点

2019独角兽企业重金招聘Python工程师标准>>> 1. 环境准备 安装编译所需要的包: yum install gcc tcl 2. 下载redis http://download.redis.io/releases/redis-3.2.7.tar.gz 3. 安装redis ## 创建redis的安装目录 mkdir /usr/local/redis## 解压redis tar…

笔记本中美化代码的方法

这里向大家推荐一个很好用的记笔记软件,微软的OneNote,这个笔记软件,支持分区和分区组的创建,而且入门简单,界面简洁,很适合从word过渡过来的人来记笔记! 不过如果直接记笔记,对于程序员来说,可能希望代码在笔记本上更好看一些,那么应该怎么办呢?下面提供了在OneNote中,让代码…

工具使用——印象(汇总)

作者:桂。 时间:2017-02-09 23:11:30 链接:http://www.cnblogs.com/xingshansi/articles/6384097.html 说明:转载请注明出处,谢谢。 前言 本文仅仅介绍印象笔记的使用,至于挖掘机哪家强,本文不…

深入理解Python的logging模块:从基础到高级

在Python编程中,日志记录是一种重要的调试和错误追踪工具。Python的logging模块提供了一种灵活的框架,用于发出日志消息,这些消息可以被发送到各种输出源,如控制台、文件、HTTP GET/POST位置等。本文将深入探讨Python的logging模块…

centos7安装java6_CentOS7.6安装jdk1.8

2、登录Linux服务器,通过rz命令将jdk导入服务器如果没有rz命令 需要先安装lrzszyum install lrzsz -y3、将jdk压缩包解压到指定路径 -C 指定路径4、配置环境变量编辑/etc/profile文件 在末尾加上以下内容 wq保存退出source /etc/profile文件 使配置文件生效export J…

人生苦短,我用python——当我在玩python的时候我玩些什么 -

程序的基本思路 用一个txt文件记录电脑的一天内累计使用时间累计使用时间超过若干小时就会自动关机程序开机自动运行 为什么我最后选择了python 想着怎么写、搜资料的时候就发现Java并不适合,虽然不是不能实现,但有好几个问题解决起来都有点麻烦。对我这…

Twisted入门教程(5)

2019独角兽企业重金招聘Python工程师标准>>> 第五部分:由Twited支持的诗歌下载服务客户端 你可以从这里从头开始阅读这个系列 抽象地构建客户端 在第四部分中,我们构建了第一个使用Twisted的客户端。它确实能很好地工作,但仍有提高…

**print('人生苦短 我爱Python')**

print(‘人生苦短 我爱Python’) 一、变量 **""" 1.代码自上而下执行 2_运算符和表达式.一行一句,不要把多个语句写到一行上,可读性不好 3中文只能出现在引号里,其他地方不能出现中文 4不能随意缩进 """**pr…

笔记本(华硕UL80VT)软件超频setFSB

Warning !!!If you are a beginner, do not use this software. This software is for power users only. Use "SetFSB.exe" at your own risk.试了setfsb各种版本,基本不能打开。还有官网的免费版,居然不能用,真是很奇怪。 官网&a…

Node.js~在linux上的部署

我们以centOS为例来说说如何部署node.js环境 一 打开centos,然后开始下载node.js包 curl --silent --location https://rpm.nodesource.com/setup_6.x | bash - yum -y install nodejs 二 安装gcc环境 yum install gcc-c make 安装完成! 三 安装nodejs的npm,这是一个包程序工具…

LeetCode题解-3-Longest Substring Without Repeating Characters

2019独角兽企业重金招聘Python工程师标准>>> 解题思路 首先要读懂题目,它要求的是找到最长的子串,并且子串中没有出现重复的字符。 我的想法,是用一个map存储每个字符最后出现的位置,还要有个变量start,它用…

java从哪学到哪_Java JVM怎么学习啊?从哪方面入手?

叮当猫咪一、 JVM的生命周期  1. JVM实例对应了一个独立运行的java程序它是进程级别  a) 启动。启动一个Java程序时,一个JVM实例就产生了,任何一个拥有public static void main(String[] args)函数的class都可以作为JVM实例运行的起点  b) 运行。m…

JMeter处理Cookie与Session

cookie 和session 的区别: 1、cookie数据存放在客户的浏览器上,session数据放在服务器上。 2、cookie不是很安全,别人可以分析存放在本地的COOKIE并进行COOKIE欺骗 考虑到安全应当使用session。 3、session会在一定时间内保存在服务器上。当…

Maximum sum(poj 2479)

题意:给一段数列,将这个数列分成两部分,使两部分的最大子段和的和最大,输出和/*看数据没想到是(O)n的算法,求出从前向后的最大子段和和从后向前的最大子段和,然后枚举断点。 第一次提交不小心折在数组最小值…

蚂蚁分类信息系统 5.8 信息浏览量后台自定义设置

mymps 蚂蚁分类信息是一款基于PHPMySQL的建站系统,为在各种服务器上架设分类信息以及地方门户网站提供完美的解决方案. mymps5.8 下载 蚂蚁分类系统 5.8下载 蚂蚁分类系统下载 mymps下载 蚂蚁分类信息系统 5.8 原信息浏览量后台无法自定义,现增加后台自定义浏览量…

python编写四位数验证码

def verifycode(request):#引入绘图模块from PIL import Image, ImageDraw, ImageFont#引入随机函数模块import random#定义变量,用于画面的背景色、宽、高bgcolor (random.randrange(20, 100), random.randrange(20, 100), random.randrange(20, 100))width 100h…

php 计算数据偏离度,关于偏离度的测算方法

2015年6月技术总结——关于偏离度的测算方法研究院公用事业部 路璐引言《原理》中说“偏离度是指每一种偿债来源与财富创造能力的距离,所体现的是偿债来源对债务安全的保障程度,唯有通过揭示偿债来源与财富创造能力偏离度才能真正区别每一种偿债来源的风…

Django中celery配置总结

情景: 用户发起request,并等待response返回。在本些views中,可能需要执行一段耗时的程序,那么用户就会等待很长时间, 造成不好的用户体验,比如发送邮件、手机验证码等。 使用celery后,情况就不…

test.php.bak,MongoDB热备份工具:解决官方版备份缺陷

贺春旸,凡普金科DBA团队负责人,《MySQL管理之道:性能调优、高可用与监控》第一、二版作者,曾任职于中国移动飞信、安卓机锋网。致力于MariaDB、MongoDB等开源技术的研究,主要负责数据库性能调优、监控和架构设计。工具…