[spark] RDD 编程指南(翻译)

Overview

从高层次来看,每个 Spark 应用程序都包含一个driver program,该程序运行用户的main方法并在集群上执行各种并行操作。

Spark 提供的主要抽象是 resilient distributed dataset(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统中的文件开始创建的。用户还可以要求 Spark 将 RDD 持久保存在内存中,从而使其能够在并行操作中高效地重用。最后,RDD 会自动从节点故障中恢复。

Spark 中提供的第二抽象是 shared variables ,他可以用在并行操作中。默认情况下,当 Spark 将函数作为一组任务(task)在不同节点上并行运行时,它会将函数中使用的每个变量的副本携带给每个任务。有时,变量需要在任务之间共享或者在driver program和任务之间共享。Spark 支持两种类型的 shared variables,一是 broadcast variables 可用于在所有节点的内存中缓存一个值,二是 accumulators

,他是仅“added”的变量,例如counters和sums。

Resilient Distributed Datasets (RDDs)

Spark围绕 RDD 的概念展开,RDD是可以并行操作的元素的容错集合。有两种方法可以创建RDD:在driver program中并行化现有集合,或者引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。

Parallelized Collections

并行化集合是通过在驱动程序中的现有集合上调用JavaSparkContext的并行化方法创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字1到5的并行化集合:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一旦创建,分布式数据集(distData)就可以并行操作。并行集合的一个重要参数是将数据集划分为的分区数量。 Spark 将为集群的每个分区(partition)运行一个任务(task),任务将分配给节点执行。可以手动指定分区数或使用默认值。

RDD Operations

RDD 支持两种类型的操作:

  • transformations(从现有dataset创建新dataset)。例如,map 是一种transformations,它将每个dataset每个元素传递给函数并返回表示结果的新 RDD
  • actions(在对dataset运行计算后将值返回给driver program)。例如,reduce 是一个使用某个函数聚合 RDD 的所有元素并将最终结果返回给driver program的操作

Spark中的所有transformations都是 lazy 的,因为它们不会立即计算结果。相反,它们只记住应用于某些基本 dataset(例如文件)的transformations。只有当操作需要将结果返回给driver program时,transformations 才会被计算。这样的设计使得Spark能够更高效地运行。例如,我们可以意识到,通过map创建的dataset将在reduce中使用,并且只将reduce的结果返回给driver program,而不是更大的映射dataset。

默认情况下,每次对transform后的RDD运行操作时,都可能会被重新计算。但是,您也可以使用持久(或缓存)方法将RDD持久化在内存中,在这种情况下,Spark将保留集群中的元素,以便在您下次查询时更快地访问它。还支持在磁盘上持久化RDD,或跨多个节点复制。如下图所示,如果不cache/persist 任何内容,那么每次您需要输出时(当您调用诸如“count”之类的操作时),都会从磁盘读取数据并完成操作。您可以在读取后进行缓存,然后所有其他操作都会跳过读取并从缓存的数据开始。

在这里插入图片描述

为了说明 RDD 基础知识,请看下面的简单程序:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行定义了来自外部文件的基本RDD。该数据集没有加载到内存中,也没有以其他方式对其进行操作:行只是指向文件的指针。第二行将lineLengths定义为map transformation的结果。同样,由于 lazy,lineLengths不会立即计算。最后,我们运行 reduce,这是一个操作。此时,Spark将计算分解为在不同机器上运行的任务,每台机器都运行其 map 和本地数据的 reduce,只将其答案返回给driver program。

如果 lineLengths 可能被再次使用,可以增加下面代码

lineLengths.persist(StorageLevel.MEMORY_ONLY());

在reduce之前,这会导致lineLengths在第一次计算后保存在内存中。

Understanding closures

关于Spark,更难的事情之一是理解跨集群执行代码时变量和方法的范围和生命周期。修改超出其范围的变量的RDD操作可能是混淆的常见来源。在下面的示例中,我们将查看使用foreach()增加计数器的代码,但其他操作也可能出现类似的问题。

Example

考虑下面简单的计算,将RDD元素sum。根据是否在同一JVM中执行,它的行为可能会有所不同。一个常见的例子是在本地模式下运行Spark(–master=local[n])与将Spark应用程序部署到集群(例如,通过Spark提交到YARN):

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);println("Counter value: " + counter);
Local vs. cluster modes

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务都由执行器执行。在执行之前,Spark计算任务的 closures。closures 是执行器在RDD上执行计算(在本例中为foreach())时必须可见的变量和方法。此closures 被序列化并发送给每个执行器。

发送给每个excutor的closure中的变量现在是副本,因此,当在foreach函数中引用counter时,它不再是driver program上的count。driver program的内存中仍然有一个counter,但不再对excutor可见!

在本地模式下,在某些情况下,foreach 函数实际上将在与driver program相同的 JVM 中执行,并且将引用相同的原始counter,并且可能会更新它。

为了确保在这些场景中定义良好的行为,应该使用 Accumulator.。Spark中的 Accumulator专门用于提供一种机制,用于在集群中跨worker node 执行时安全地更新变量.

一般来说,closure——像循环或本地定义的方法这样的构造——不应该被用来改变一些全局状态。Spark不定义或保证对从闭包外部引用的对象的更改行为。执行此操作的一些代码可能在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会按预期运行。如果需要一些全局聚合,请使用Accumulator。

Printing elements of an RDD

另一个常见的习惯用法是尝试使用rdd. foreach(println)或rdd.map(println)打印出RDD的元素。在单台机器上,这将生成预期的输出并打印RDD的所有元素。但是,在集群模式下,excutor 调用的stdout的输出现在写入执行程序的stdout,而不是driver program上的stdout,因此driver program上的stdout不会显示这些!要在驱动程序上打印所有元素,可以使用 collect() 首先将RDD带到 driver program 节点,因此使用:rdd.collect().foreach(println).

Working with Key-Value Pairs

虽然大多数Spark操作适用于包含任何类型对象的RDD,但一些特殊操作仅适用于键值对的RDD。最常见的是分布式“shuffle”操作,例如通过键对元素进行分组或聚合,reduceByKey和sortByKey等。

Shuffle operations

Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在分区之间以不同的方式分组。这通常涉及跨executor和机器复制数据,这使得shuffle成为一项复杂且成本高昂的操作。

为了理解在shuffle过程中会发生什么,我们可以考虑一个例子,这个例子中有一个reduceByKey 操作,它生成一个新的RDD,其中一个键的所有值都被组合成一个tuple,这个tuple就是键和对与该键相关的所有值执行一个reduce函数的结果。挑战在于,单个键的所有值不一定都位于同一分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。

对于大多数操作,Spark不会自动地将数据重新分布到特定的节点或分区以满足特定操作的需要。相反,每个任务通常只处理一个分区内的数据。然而,对于像reduceByKey这样的操作,Spark需要将具有相同键(key)的所有值(value)聚合在一起以进行计算。这意味着,如果这些值分布在不同的分区中,Spark必须执行一个全局的重组操作(all-to-all operation),这个过程被称为shuffle。在shuffle过程中,Spark会执行以下步骤:

  1. 读取所有分区的数据,以找出每个键对应的所有值。
  2. 将具有相同键的值跨分区传输到相同的节点,以便可以对它们进行聚合。
  3. 在每个节点上,对每个键的所有值进行最终的聚合计算,得到每个键的最终结果。

可能导致随机播放的操作包括repartition operations like repartition and coalesce‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Performance Impact

Shuffle是一项昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织shuffle的数据,Spark生成一组任务——map 任务来组织数据,以及一组reduce任务来聚合数据。这个术语来自MapReduce,与Spark的map和reduce操作没有直接关系。

从内部来看,单个map任务的结果保存在内存中直到内存放不下。然后,根据目标分区对它们进行排序并写入单个文件。在reduce端,任务读取相关的排序block

某些shuffle操作可能会消耗大量的堆内存,因为它们使用内存中的数据结构在传输数据之前或之后组织数据。具体来说,reduceByKey和aggregateByKey在map端创建这些结构,而’ByKey操作在reduce端生成这些结构。当数据在内存放不下时,spark会将这些数据spill到磁盘,从而导致磁盘IO的额外开销和垃圾回收机制的增加。

Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件将被保留,直到相应的RDD不再使用并被垃圾收集掉。这样做是为了在重新计算lineage时不需要重新创建随机文件。垃圾收集可能只在很长一段时间后发生,如果应用程序保留对这些RDD的引用,或者如果GC不经常启动。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。临时存储目录在配置Spark上下文时由park. local.dir配置参数指定。

RDD Persistence

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)dataset。当您持久化RDD时,每个节点都将其计算的任何分区存储在内存中,并在该dataset(或从该dataset派生的dataset)的其他操作中重用它们。这使得未来的操作更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用RDD上的persist() cache()方法将其标记为持久化。第一次在操作中计算时,它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的RDD都可以使用不同的storage level,来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的Java对象(以节省空间),跨节点复制它。cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。

Spark还会在shuffle操作中自动持久化一些中间数据(例如,reduceByKey),即使用户没有调用persist。这样做是为了避免在shuffle期间节点发生故障时重新计算整个input。如果用户计划重用新生成的RDD,我们仍然建议他们在生成的RDD上调用persist。

Which Storage Level to Choose?

Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择一个:

  • 如果您的RDD适合默认存储级别(MEMORY_ONLY),请保持这样。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
  • 如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更加节省空间,但访问速度仍然相当快。(Java和Scala)
  • 不要spill到磁盘,除非计算数据集的函数很重,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
  • 如果您想要快速故障恢复(例如,如果使用Spark处理来自Web应用程序的请求),请使用replicated 的存储级别。所有存储级别都通过重新计算丢失的数据提供完全的容错能力,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。

Removing Data

Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果您想手动删除RDD而不是等待它从缓存中删除,请使用RDD.unpersist()方法。请注意,此方法默认不阻塞。要在释放资源之前阻塞,请在调用此方法时指定blocking=true。

Shared Variables

通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,并且远程机器上的变量的更新不会传播回driver program。跨任务支持通用的读写共享变量将是低效的。然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:broadcast variables and accumulators.

Broadcast Variables

广播变量允许程序员将只读变量缓存在每台机器上,而不是将其副本与task一起发送。例如,它们可以用来以有效的方式为每个节点提供大型输入数据集的副本,减少了数据传输的开销从task粒度下降到节点粒度。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

Spark action通过一组stage执行,由分布式“shuffle”操作分隔。Spark自动广播每个stage内task所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个stage的task需要相同数据或以反序列化形式缓存数据很重要时才有用。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器,可以通过调用value方法访问它的值。下面的代码显示了这一点:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]

创建广播变量后,应该在集群上运行的任何函数中使用它而不是值v,这样v就不会多次发送到节点。此外,对象v在广播后不应该被修改,以确保所有节点都获得相同的广播变量值(例如,如果变量稍后传送到新加入的节点)。

要释放广播变量复制到执行器上的资源,请调用.unpersist()。如果广播之后再次使用,它将被重新广播。要永久释放广播变量使用的所有资源,请调用.destroy()。之后广播变量就不能使用了。请注意,这些方法默认情况下不会阻塞。要阻塞直到资源被释放,请在调用它们时指定blocking=true。

Accumulators

Accumulators是仅通过关联和交换运算“added”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建命名或未命名的累加器。如下图所示,修改该累加器的阶段将在Web UI中显示一个命名累加器(在本例中为计数器)。Spark在“任务”表中显示由任务修改的每个累加器的值。

Accumulators in the Spark UI

然后,在集群上运行的task可以使用add方法add到Accumulators。但是,他们无法读取其值。只有driver program可以使用其value方法读取累加器的值。

LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10

对于仅在action中执行的Accumulators更新,Spark保证每个任务对Accumulators的更新只会应用一次,即重新启动的任务不会更新值。在transformations中,用户应该知道,如果重新执行任务或作业阶段,每个任务的更新可能会应用不止一次。

累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的,则只有当 RDD 作为action的一部分进行计算时,它们的值才会更新。因此,在像 map() 这样的惰性转换中进行累加器更新时,不能保证执行。下面的代码片段演示了这个属性:

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

reference

https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/708228.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot之Session新增、删除、获取配置与使用

SpringBoot之Session新增、删除、获取配置与使用 文章目录 SpringBoot之Session新增、删除、获取配置与使用1. SpringBoot版本2. 定义增删查Session的类3. 定义Session的监听器4. 使用 自定义根据sessionId进行session的新增、删除、获取操作 1. SpringBoot版本 <parent>…

详解单例模式(Java语言实现)

1. 概念 保证类只有一个实例&#xff0c;让类自身负责保存它的唯一实例&#xff0c;并且类提供一个访问该实例的方法。 2. 单线程下的单例模式 public class Singleton {private static Singleton instance;private Singleton(){} //private构造方法&#xff0c;其他类无…

XGB-14:DMatrix的文本输入格式

简要描述XGBoost的文本输入格式。然而&#xff0c;对于具有支持的语言环境&#xff08;如Python或R&#xff09;的用户&#xff0c;建议使用该生态系统中的数据解析器。例如&#xff0c;可以使用sklearn.datasets.load_svmlight_file()。 基本输入格式 XGBoost目前支持两种文…

生成对抗网络

生成对抗网络 GAN 什么是GAN GAN含义&#xff1a;生成对抗网络&#xff08;Generative Adversarial Networks&#xff09;&#xff0c;主要做目标判别&#xff0c;应用在图像分类、语义分割、目标检测。 GAN简述&#xff1a;GAN包括生成器Generator(G)、判别模型Discriminat…

【架构之路】糟糕程序员的20个坏习惯,切记要改掉

文章目录 强烈推荐前言&#xff1a;坏习惯:总结&#xff1a;强烈推荐专栏集锦写在最后 强烈推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站:人工智能 前言&#xff1a; 优秀的程序员…

关于电脑一天24小时多少度电电脑的一天用电量计算

随着这几年物价的上涨&#xff0c;一些地区的电价越来越高&#xff0c;而我们经常需要使用电脑&#xff0c;那么一台电脑一天24小时用多少度电呢&#xff1f; 如何计算电脑一天的用电量&#xff1f; 让我们跟随小编来了解更多吧。 1、功耗、主机箱功耗 现在的计算机中&#xf…

DTD、XML阐述、XML的两种文档类型约束和DTD的使用

目录 ​编辑 一、DTD 什么是DTD&#xff1f; 为什么要使用 DTD&#xff1f; 内部 DTD 声明 具有内部 DTD 的 XML 文档 外部 DTD 声明 引用外部 DTD 的 XML 文档 二、XML 什么是XML&#xff1f; XML 不执行任何操作 XML 和 HTML 之间的区别 XML 不使用预定义的标记…

js 面试 什么是WebSockets?HTTP和HTTPS有什么不同?web worker是什么?

概念&#xff1a; webSocket 是一种在客户端和服务端之间建立持久连接的协议&#xff0c;它提供全双工通信通道&#xff0c;是服务器可以主动向客户端推送数据&#xff0c;同时也可以接受客户端发送的数据。 1 webSocket与https区别&#xff1f; 在网络通信中&#xff0c;We…

vue-waterfall2 瀑布流,触底加载更多

监听滚动到底部事件&#xff1a; function isScrollToBottom() {const scrollTop document.documentElement.scrollTop || document.body.scrollTop;const scrollHeight document.documentElement.scrollHeight || document.body.scrollHeight;const clientHeight document…

SVN教程-SVN的基本使用

SVN&#xff08;Apache Subversion&#xff09;是一款强大的集中式版本控制系统&#xff0c;它在软件开发项目中扮演着至关重要的角色&#xff0c;用于有效地跟踪、记录和管理代码的演变过程。与分布式系统相比&#xff0c;SVN 的集中式架构使得团队能够更加协同地进行开发&…

如何进行写作的刻意练习

写作从来不可能一蹴而就&#xff0c;而是一件需要我们持续坚持、努力的事情。 人如果没有目标就会迷失方向&#xff0c; 既然需要长期的坚持&#xff0c;就需要我们根据自身情况制定每一阶段的目标。 目标的制定要清晰可达&#xff0c;不能模棱两可&#xff0c;要认证对待。 …

基于springboot+vue的二手手机管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

【Vue】插槽-slot

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Vue ⛺️稳中求进&#xff0c;晒太阳 插槽 作用&#xff1a;让组件内部一些 结构 支持 自定义 插槽的分类&#xff1a; 默认插槽。具名插槽。 基础语法 组件内需要定制的结构部分&…

WEB漏洞 逻辑越权之支付数据篡改安全

水平越权 概述&#xff1a;攻击者尝试访问与他拥有相同权限的用户的资源 测试方法&#xff1a;能否通过A用户操作影响到B用户 案例&#xff1a;pikachu-本地水平垂直越权演示-漏洞成因 1&#xff09;可以看到kobe很多的敏感信息 2&#xff09;burp抓包&#xff0c;更改user…

Codeforces Round 929 (Div. 3)(A,B,C,D,E,F,G)

这场没考什么算法&#xff0c;比较水&#xff0c;难度也不是很高。比赛链接 硬要说的话E有个 前缀和 加 二分&#xff0c;F是数学BFS&#xff0c;G是个构造 A. Turtle Puzzle: Rearrange and Negate 题意&#xff1a; 给你一个由 n n n 个整数组成的数组 a a a 。您必须对…

Unix Domain Socket 比 localhost(127.0.0.1)更快

当本机的进程间通讯时&#xff0c;使用localhost&#xff08;127.0.0.1&#xff09;、本机IP 和 Unix Domain Socket 之间有什么区别以前理解比较模糊&#xff0c;今天看了一篇文章&#xff0c;终于高明白了&#xff0c;就是这篇文章&#xff0c;写的非常好&#xff1a; (65 封…

【ERROR-pip-ubuntu】error: can‘t find Rust compiler

这个错误的关键信息是&#xff1a; error: cant find Rust compiler这表示无法找到 Rust 编译器。 针对这个问题&#xff0c;你可以尝试以下解决方法之一&#xff1a; **安装 Rust 编译器&#xff1a;**根据提示&#xff0c;你可以尝试安装 Rust 编译器。你可以从 Rust 官网&…

vscode安装配置

一、通过Code-Server安装 1.1、脚本安装 curl -fsSL https://code-server.dev/install.sh | sh#!/bin/sh set -eu# code-servers automatic install script. # See https://coder.com/docs/code-server/latest/installusage() {arg0"$0"if [ "$0" sh ];…

uniapp+node.js前后端做帖子模块:获取帖子列表(社区管理平台的小程序)

目录 0前提1.一些准备1.1表帖子表 post帖子评论表 postComment帖子点赞表 postLike 1.2总体思路 2.前端3.后端 &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&#xff01; ⭐️ 收藏&#xff0c;你的青睐是我努力的方向&#xff01; ✏️ 评论&#xff0c;你的意见是…

IOC 和 AOP

IOC 所谓的IOC&#xff08;inversion of control&#xff09;&#xff0c;就是控制反转的意思。何为控制反转&#xff1f; 在传统的程序设计中&#xff0c;应用程序代码通常控制着对象的创建和管理。例如&#xff0c;一个对象需要依赖其他对象&#xff0c;那么它会直接new出来…