弹性式分布数据集RDD——Pyspark基础 (二)

连载中:http://ihoge.cn/tags/pyspark/

title: 弹性式分布数据集RDD——Pyspark基础 (二)
date: 2018-04-15 17:59:21
comments: true
categories:
- Spark
tags:

- pyspark

RDD的内部运行方式

RDD不仅是一组不可变的JVM(Java虚拟机)对象的分布集,而且是Spark的核心,可以让任务执行高速运算。

RDD将跟踪(计入日记)应用于每个快的所有转换,以加速计算速度,并在发生错误和部分数据丢失时提供回退(容错机制)。

RDD采用并行的运行方式,也就是每个转换操作并行执行,从而提高速度。
RDD有两种并行操作:
- 转换操作(返回指向新的RDD的指针)
- 动作操作(在运行计算后向驱动程序返回值)

数据集的转换通常是惰性的,这也意味着任何转换操作仅在调用数据集上的操作时才执行。该延迟执行会产生风多的精细查询:针对性能进行优化查询。这种优化始于Spark的DAGScheduler——面向阶段的调度器。DAGScheduler负责Stage级的调度详见:Spark运行原理剖析

由于具有单独的RDD转换和动作,DAGScheduler可以在查询中执行优化。包括但不限于避免shuffle数据(最耗费资源的任务)

创建RDD

方式一: 用.parallelize(...)集合(元素list或array)

data = sc.parallelize([('a',1),('b',2),('c',3),('d',5),('e',5)])

方式二: 读入外部文件

  • 支持多文件系统中读取:如NTFS、FAT、HFS+(Mac OS Extended),或者如HDFS、S3、Cassandra这类的分布式文件系统,还有其他类文件系统。
  • 指出多种数据格式:如文本、parquet、JSON、Hive tables(Hive表)以及使用JDBC驱动程序可读取的关系数据库中的数据。(注意:Spark可以自动处理压缩数据集)

��Tip1:读取的方式不同,持有对象表达方式也不同。从文件中读取的数据表示为MapPartitionsRDD;使用集合方法的数据表示为ParallelCollectionRDD

��**Tip2:**RDD是无schema的数据结构(和DataFrame不同),所以我们几乎可以混用任何数据结构:tuple、dict、list和spark等都能支持。如果对数据集使用.collect()方法,将把RDD对所有元素返回给驱动程序,驱动程序将其序列化成了一个列表。

data_from_file = sc.textFile("hdfs://master:9000/pydata/VS14MORT.txt.gz",4) # 这里表示4个分区
def extractInformation(row):import reimport numpy as npselected_indices = [2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,19,21,22,23,24,25,27,28,29,30,32,33,34,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,58,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,81,82,83,84,85,87,89]'''Input record schemaschema: n-m (o) -- xxxn - position fromm - position too - number of charactersxxx - description1. 1-19 (19) -- reserved positions2. 20 (1) -- resident status3. 21-60 (40) -- reserved positions4. 61-62 (2) -- education code (1989 revision)5. 63 (1) -- education code (2003 revision)6. 64 (1) -- education reporting flag7. 65-66 (2) -- month of death8. 67-68 (2) -- reserved positions9. 69 (1) -- sex10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated11. 71-73 (3) -- number of units (years, months etc)12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)13. 75-76 (2) -- age recoded into 52 categories14. 77-78 (2) -- age recoded into 27 categories15. 79-80 (2) -- age recoded into 12 categories16. 81-82 (2) -- infant age recoded into 22 categories17. 83 (1) -- place of death18. 84 (1) -- marital status19. 85 (1) -- day of the week of death20. 86-101 (16) -- reserved positions21. 102-105 (4) -- current year22. 106 (1) -- injury at work23. 107 (1) -- manner of death24. 108 (1) -- manner of disposition25. 109 (1) -- autopsy26. 110-143 (34) -- reserved positions27. 144 (1) -- activity code28. 145 (1) -- place of injury29. 146-149 (4) -- ICD code30. 150-152 (3) -- 358 cause recode31. 153 (1) -- reserved position32. 154-156 (3) -- 113 cause recode33. 157-159 (3) -- 130 infant cause recode34. 160-161 (2) -- 39 cause recode35. 162 (1) -- reserved position36. 163-164 (2) -- number of entity-axis conditions37-56. 165-304 (140) -- list of up to 20 conditions57. 305-340 (36) -- reserved positions58. 341-342 (2) -- number of record axis conditions59. 343 (1) -- reserved position60-79. 344-443 (100) -- record axis conditions80. 444 (1) -- reserve position81. 445-446 (2) -- race82. 447 (1) -- bridged race flag83. 448 (1) -- race imputation flag84. 449 (1) -- race recode (3 categories)85. 450 (1) -- race recode (5 categories)86. 461-483 (33) -- reserved positions87. 484-486 (3) -- Hispanic origin88. 487 (1) -- reserved89. 488 (1) -- Hispanic origin/race recode'''record_split = re\.compile(r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')try:rs = np.array(record_split.split(row))[selected_indices]except:rs = np.array(['-99'] * len(selected_indices))return rsdata_file = data_from_file.map(extractInformation)
data_file.map(lambda row: row).take(1)
data_file.cache()
data_file.is_cached
True

全局作用域和局部作用域

Spark可以在两种模式下运行:本地和集群。本地运行Spark代码时和目前使用的python没有说明不同。然而他如果将相同的代码部署到集群,便可能会导致大量的困扰,这就需要了解Spark是怎么在集群上执行工作的。这里有一篇文章介绍的很详细。参考:Spark运行原理详解

在集群模式下,提交任务时任务发送给了Master节点。该驱动程序节点为任务创建DAG,并且决定哪一个执行者(Worker)节点运行特定的任务。然后该驱动程序知识工作者执行它们的任务,并且在结束时将结果返回给驱动程序。然而在这之前,驱动程序为每一个任务的终止做准备:驱动程序中有一组变量和方法,以变工作者在RDD上执行任务。

这组变量和方法在执行者的上下问本质上是静态的,每个执行器从驱动程序中获取的一份变量和方法的副本。这意味着运行任务时,如果执行者改变这些变量或覆盖这些方法,它不影响任何其他执行者的副本或者驱动程序的变量和方法。这可能会导致一些意想不到的行为和运行错误,这些行为和错误通常都很难被追踪到。

转换

转换操作可以调整数据集。包括映射、筛选、链接、转换数据集中的值。

.map()转换

data_2014 = data_file.map(lambda x: x[16])
data_2014.take(10)
['2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '-99']

.filter()转换

data_filter = data_file.filter(lambda x: x[16] == '2014' and x[21] == '0')
print(data_filter.count())
data_file.take(2)
22[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11','  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ','238', '070', '   ', '24', '01', '11I64  ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '01','I64  ', '     ', '     ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40'),array(['1', '  ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08','  ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250','214', '062', '   ', '21', '03', '11I250 ', '61I272 ', '62E669 ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '       ','       ', '       ', '       ', '       ', '       ', '03','I250 ', 'E669 ', 'I272 ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '     ','     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40')]

.flatMap()转换

.flatMap()方法和.map()工作类似,不同的是flatMap()返回一个扁平的结果而不是一个列表。

data_flat = data_file.flatMap(lambda x: (x[16], int(x[16])+1))
data_flat.take(10)
['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

.flatMap()可以用于过滤一些格式不正确的记录。在这个机制下,.flatMap()方法吧每一行看作一个列表对待,然后将所有记录简单的加入到一起,通过传递一个空列表可以丢弃格式不正确的记录。

.distinct()转换

这里用该方法检查性别列表是否只包含了男性和女性验证我们是否准确解释了数据集。

distinct_gender = data_file.map(lambda x: x[5]).distinct()
distinct_gender.collect()
['M', 'F', '-99']

.sample() 转换

该方法返回数据集的随机样本。第一个参数withReplacement指定采样是否应该替换,第二个参数fraction定义返回数据量的百分比,第三个参数是伪随机数产生器的种子seed

为了节省运算时间,这里选取愿数据千分之一的随机数据作为下面的练习数据。

data_sample = data_file.sample(False, 0.001, 666)
data_sample.cache()
PythonRDD[25] at RDD at PythonRDD.scala:48

.leftOuterJoin()转换

  • .leftOuterJoin(): 根据两个数据集中都有得值来连接两个RDD,并返回左侧的RDD记录,而右边的记录副加载两个RDD匹配的地方。
  • .join() :只返回两个RDD之间的关联数值
  • .intersection():返回两个RDD中相等的记录
rdd1 = sc.parallelize([('a',1), ('b',4), ('c',10)])
rdd2 = sc.parallelize([('a',4), ('a',1), ('b',6), ('d',15)])
print("leftOuterJoin: ",rdd1.leftOuterJoin(rdd2).collect())
print("Join: ",rdd1.join(rdd2).collect())
print("intersection: ", rdd1.intersection(rdd2).collect())
leftOuterJoin:  [('c', (10, None)), ('b', (4, 6)), ('a', (1, 1)), ('a', (1, 4))]
Join:  [('b', (4, 6)), ('a', (1, 1)), ('a', (1, 4))]
intersection:  [('a', 1)]

.repartition()转换

重新对数据集进行分区,改变数据集分赛区的数量。此功能应该谨慎并且仅当真正需要的时候使用,因为它会充足数据,导致性能产生巨大的影响。

print(len(rdd2.glom().collect()))
rdd2 = rdd2.repartition(4)
print(len(rdd2.glom().collect()))
3
4

动作

.collect() 动作

返回所有RDD的元素给驱动程序

��同时常用的还有: .collectAsMap()方法

.take() 动作

可以说这事最有用的方法,返回单个数据分区的前n行。

rdd.take(1)
#等同于:
rdd.first()

.reduce() 动作

该方法使用指定的方法减少RDD中的元素。可以用该方法计算RDD总的元素之和:

rdd1.map(lambda x: x[1]).reduce(lambda x, y: x + y)

在每一个分区里,reduce()方法运行求和方法,将改总和返回给最终聚合所在的程序节点。

⚠️警告:
要谨慎注意的是,reduce传递的函数需要时关联的,既满足元素顺序改变结果不变,操作符顺序改变结果不变。如:

rdd = sc.parallelize([1, 2, 0.5, 0.1],1)
rdd.reduce(lambda x, y: x / y)out: 10.0
rdd = sc.parallelize([1, 2, 0.5, 0.1],2)
rdd.reduce(lambda x, y: x / y)out: 0.1

这里我们希望输出结果是10.0,第一个只把RDD放在一个分区,输出结果符合预期。但是在第二个例子中,分了2个区,结果就不对了。因为该方法是在每个分区并行计算的。

.reduceByKey() 动作

该方法和.reduce()方法类似,但是实在key-key基础上运行:

data_key = sc.parallelize([('a',3), ('a',1), ('b',6), ('d',1), ('b',6), ('d',15), ('d',3), ('a',7), ('b', 8)],4)
data_key.reduceByKey(lambda x, y: x+y).collect()
[('b', 20), ('a', 11), ('d', 19)]

.count() 动作

.count() 方法统计出了RDD里所有的元素数量。

rdd.count()

.count() 方法产生入戏方法同样的结果,但不需要把整个数据集移动到驱动程序:

len(rdd.collect()). # ⚠️警告:不要这样做!!

.countByKey() 动作

如果数据集是Ket-Value形式,可以使用.countByKey()方法

data_key.countByKey().items()
dict_items([('a', 3), ('b', 3), ('d', 3)])

.saveAsTextFile() 动作

该方法将RDD保存为文本文件:每个文件一个分区

data_key.saveAsTextFile('hdfs://master:9000/out/data_key.txt')

要读取它的时候需要解析,因为所有行都被视为字符串:

def parseInput(row):import repattern = re.compile(r"\(\'([a-z]+)\',.([0-9]+)\)") # 这里“+”号代表匹配一个或多个匹配字符,否则针对双位数动作操作会报错row_split = pattern.split(row)return (row_split[1], row_split[2])
data_key_read = sc.textFile('hdfs://master:9000/out/data_key.txt')
data_key_read.map(parseInput).collect()
[('a', '3'),('a', '1'),('b', '6'),('d', '1'),('b', '6'),('d', '15'),('d', '3'),('a', '7'),('b', '8')]

��同时还有:
- rdd.saveAsHadoopDataset
- rdd.saveAsSequenceFile
- …
等方法

.foreach() 动作

这个方法对RDD里的每个元素,用迭代方法应用相同的函数;和.map()相比,.foreach()方法按照一个接一个的方式,对每一条记录应用一个定义好的函数。当希望将数据曹村道PySpark本身不支持的数据库是,该方法很有用。

def f(x):print(x)rdd.foreach(f)

小结:

  • RDD是Spark的核心;这些无schema数据结构早Spark中处理的最基本的数据结构。
  • RDD的两种创建方式: parallelize 和 文件读取
  • Spark中的转化是惰性的,只在操作被调用时应用。
  • Scala 和 Python RDD之间一个主要的区别是速度: Python RDD 比 Scala 慢很多!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/293003.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

导师都有哪些“秘密”没有告诉你?

全世界只有3.14 % 的人关注了爆炸吧知识真正决定人与人之间的差距的&#xff0c;其实是我们对事物的见识与内心的格局&#xff0c;见识的深浅决定人生的深浅&#xff0c;格局的大小决定了人生之路是宽是窄。今天给大家推荐几个有深度、有想法的公众号&#xff0c;希望能够给你带…

2021年全球知名开源项目大更新

下面简单回顾 2021 年一些带来较大变化和影响的项目更新信息。PHP 8.1.0 正式发布枚举&#xff0c;只读属性&#xff0c;First-class 可调用语法&#xff0c;新的初始化器&#xff0c;纯交集类型&#xff0c;Never 返回类型&#xff0c;Final 类常量&#xff0c;显式八进制数字…

mysql workbench_爬虫实例:玩转mysql(预备篇)

考完试的第一篇文章&#xff0c;开心~/1.什么是数据库/数据库是“按照数据结构来组织、存储和管理数据的仓库”。是一个长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。数据库是以一定方式储存在一起、能与多个用户共享、具有尽可能小的冗余度、与应用程…

RHEL6   Kickstart 无人值守安装

方法&#xff1a;FTPTFTPDHCPKickstartPXE从网络引导系统的做法可以不必从硬盘、软盘或者CD&#xff0d;ROM光盘&#xff0c;而是完全通过网络来引导一台计算机。这对于安装来说很方便&#xff0c;因为它意味着你可以坐在桌子旁边&#xff0c;不必走到机器那里插入CD&#xff0…

免安装免配置 还免费的Spark 集群 --Databrickes Spark Clould

http://ihoge.cn/2018/Databrickes.html 摘要&#xff1a;本文带你畅游Databrickes Spark Clould云服务。小白迅速上手大数据Spark开发环境&#xff0c;从此告别集群Bug的烦恼&#xff0c;彻底解放物理机负担让你随时随地想Run就Run&#xfffd;&#xfffd;。 目录&#xff…

C# 运算符的优先级和关联性

表1 显示了 C#运算符的优先级&#xff0c;其中顶部的运算符有最高的优先级&#xff08;即在包含多个运算符的表达式中&#xff0c;最先计算该运算符&#xff09;。除了运算符优先级&#xff0c;对于二元运算符&#xff0c;需要注意运算符是从左向右还是从右向左计算。除了少数运…

char *a 与char a[] 的区别和char** argv与char *argv[]区别

char *a 与char a[] 的区别 char *a "hello" 中的a是指向第一个字符‘a的一个指针 char a[20] "hello" 中数组名a也是执行数组第一个字符‘h’的指针 但二者并不相同&#xff1a; 看实例&#xff1a;把两个字符串相加&#xff1a; 结果&#xff1a; hell…

arduino代码_纯纯小白开发arduino--我的调试经验

arduino 是什么我就不做介绍了。这里的小白并不是说我没有嵌入式开发经验而是说从来没有实际开发过arduino。虽然它在世界范围内都很流行&#xff0c;可是不知为何国内专业做嵌入式开发的人对它大多都嗤之以鼻。我对arduino的想法是&#xff1a;”不管黑猫白猫&#xff0c;抓到…

重要的ui组件——Behavior

v7包下的组件类似CoordinatorLayout推出也有一段时间了&#xff0c;大家使用的时候应该会体会到其中很多的便利&#xff0c;今天这篇文章带大家来了解一个比较重要的ui组件——Behavior。从字面意思上就可以看出它的作用&#xff0c;就是用来规定某些组件的行为的&#xff0c;那…

Spark的基本架构

http://ihoge.cn/2018/IntroductionToSpark.html Spark的基本架构 当单机没有足够的能力和资源来执行大量信息的计算&#xff08;或者低延迟计算&#xff09;&#xff0c;这时就需要一个集群或一组机器将许多机器的资源集中在一起&#xff0c;使我们可以使用全部累积的在一起…

简析TCP的三次握手与四次分手

TCP是什么&#xff1f; 具体的关于TCP是什么&#xff0c;我不打算详细的说了&#xff1b;当你看到这篇文章时&#xff0c;我想你也知道TCP的概念了&#xff0c;想要更深入的了解TCP的工作&#xff0c;我们就继续。它只是一个超级麻烦的协议&#xff0c;而它又是互联网的基础&am…

for循环延时_前端中的事件循环eventloop机制

我们知道 js 是单线程执行的&#xff0c;那么异步的代码 js 是怎么处理的呢&#xff1f;例如下面的代码是如何进行输出的&#xff1a;console.log(1);setTimeout(function() { console.log(2);}, 0);new Promise(function(resolve) { console.log(3); resolve(Date.no…

androidActivity生命周期

Activity生命周期Activity是一个用来提供用户交互界面的组件&#xff0c;它是四大组件之一&#xff0c;对于我们刚刚学习android的菜鸟来说是非常重要的&#xff0c;我们可以将一个屏幕理解为一个Activity&#xff0c;Activity通常是一个全屏的界面&#xff0c;每一个应用程序可…

Autofac实现有条件的DI

Autofac.Annotation框架是我用.netcore写的一个DI框架&#xff0c;基于Autofac参考 Spring注解方式所有容器的注册和装配,切面,拦截器等都是依赖标签来完成。开源地址&#xff1a;https://github.com/yuzd/Autofac.Annotation本期讲的是最新实现的功能有条件的DI有些时候我们想…

公众平台关注用户达到5万即可开通流量主功能 可以推广APP应用

今天微信公众平台发布发布了一些更新&#xff0c;公众帐号的关注用户达到5万&#xff0c;即可开通流量主功能&#xff0c;之前的是要求10万粉丝&#xff0c;这是一个微信开放的信号。广告主可推广苹果商店应用或腾讯开放平台应用。新增卡片和图文广告规格。以下是微信团队的公告…

二进制全排列 java_排列组合算法真厉害,傻瓜都能学会

作者&#xff1a;枕边书来源&#xff1a;https://zhenbianshu.github.io/2019/01/charming_alg_permutation_and_combination.html需求最近工作中碰到一个需求&#xff1a;我们的数据表有多个维度&#xff0c;任意多个维度组合后进行 group by 可能会产生一些”奇妙”的反应&am…