第 5 章 Spark Shuffle 解析
- 5.1 Shuffle 的核心要点
- 1. 数据分区:
- 2.数据传输:
- 3. 数据排序:
- 4.数据聚合:
- 5. 数据重分发:
- 6.数据持久化:
- 5.1.1 ShuffleMapStage 与 ResultStage
- 5.2 HashShuffle 解析
- 5.2.1 未优化的 HashShuffle
- 5.2.2 优化后的 HashShuffle
- 5.3 SortShuffle 解析
- 5.3.1 普通 SortShuffle
- 5.3.2 bypass SortShuffle
5.1 Shuffle 的核心要点
Shuffle 是在数据处理和计算过程中的一个重要操作,主要用于打乱数据的顺序和重新分配数据。在大数据处理和分布式计算中,Shuffle 是实现数据传输、数据聚合和任务分发的关键步骤之一。以下是 Shuffle 的核心要点:
1. 数据分区:
Shuffle 首先将原始数据根据某种规则(如哈希函数)划分为多个数据分区,以便后续的数据处理和计算操作。每个数据分区通常包含一部分原始数据,且可以被并行处理。
数据分区是在分布式计算中将数据划分为多个逻辑块或分区的过程。数据分区是为了将大规模数据集拆分成小块,以便在计算节点上并行处理和分配计算任务。数据分区的目的是实现数据的并行处理、负载均衡和提高计算性能。
在数据分区过程中,数据根据一定的规则或策略被分配到不同的分区中。每个分区通常包含一部分数据,可以由一个或多个计算节点进行处理。常见的数据分区方法包括:
-
哈希分区(Hash Partitioning):根据数据的哈希值将数据均匀地分配到不同的分区中。这样可以确保具有相同哈希值的数据项在同一个分区中,从而便于后续的聚合和处理操作。
-
范围分区(Range Partitioning):根据数据的排序或范围条件将数据划分为不同的分区。例如,将数据按照数值的大小或时间的先后顺序进行范围分区。
-
列分区(Column Partitioning):根据数据的列值将数据分配到不同的分区中。例如,在关系型数据库中,可以根据表的某个列进行分区,将具有相同列值的数据放在同一个分区中。
-
轮流分区(Round-robin Partitioning):按照轮询的方式将数据依次分配到不同的分区中。每个分区依次接收一个数据项,直到所有数据项都被分配完毕。
-
自定义分区(Custom Partitioning):根据特定的业务需求,自定义实现数据分区策略。例如,根据地理位置、用户ID等自定义规则进行数据分区。
数据分区的目标是将数据均匀地分配到不同的计算节点上,以便实现并行处理和负载均衡。通过合理的数据分区策略,可以提高分布式计算的性能和效率,并更好地利用计算资源。在具体的分布式计算框架中,可以根据需求和场景选择合适的数据分区方法和分区策略。
2.数据传输:
在 Shuffle 过程中,数据分区需要从计算节点传输到不同的节点,以便进行后续的数据处理或计算。数据传输可能发生在网络中,涉及节点间的数据交换。
数据传输是指在计算系统中将数据从一个节点传输到另一个节点的过程。在分布式计算和数据处理中,数据传输是一个重要的环节,涉及计算节点之间的数据交换和通信。
数据传输可以发生在以下场景中:
-
数据分发:在分布式计算中,数据需要从一个节点传输到多个节点,以便并行处理。这种数据传输通常发生在任务启动时,将输入数据分发给各个计算节点。
-
Shuffle传输:Shuffle是在数据处理过程中的一个重要操作,涉及数据重新分区、排序和合并。在Shuffle过程中,数据需要从Map节点传输到Reduce节点,以进行数据重分区和分组操作。
-
数据合并:在一些情况下,多个节点上的计算结果需要进行合并,以生成最终的计算结果。这涉及将数据从各个节点传输到一个节点,并进行合并操作。
数据传输可以通过多种方式进行,包括但不限于:
-
网络传输:数据可以通过计算节点之间的网络进行传输。这可能涉及节点之间的数据交换、消息传递和网络通信。
-
磁盘传输:数据可以通过将数据写入磁盘,并在其他节点上读取磁盘上的数据来进行传输。这通常用于将数据从一个节点传输到另一个节点,特别是在Shuffle过程中。
-
内存传输:如果计算节点之间有共享的内存或高速连接,数据可以通过内存进行传输。这种方式通常比网络传输更快,并且在某些情况下可以减少数据的序列化和反序列化开销。
在设计和实现分布式计算系统时,需要考虑数据传输的效率和性能。合理的数据传输策略和机制可以减少数据传输的开销、降低网络延迟,并提高整体的计算性能和效率。这包括优化网络带宽利用、数据压缩、缓存机制和并行传输等技术。
数据持久化是指将数据存储到长期存储介质(如磁盘、数据库、文件系统等)中,以便在程序运行结束后仍然能够访问和使用数据的过程。
数据持久化的目的是确保数据在程序运行结束后的持久性和可靠性,以便后续的读取、查询、分析或其他操作。通过数据持久化,数据可以长期保存,随时可以被读取和处理,即使在系统重启或程序重新运行后也能保留数据状态。
3. 数据排序:
在 Shuffle 过程中,对于需要排序的数据,会对每个数据分区内的数据进行排序。排序操作确保后续的聚合和计算操作能够有效地处理数据。
数据排序是将一组数据按照指定的排序规则进行重新排列的过程。在计算和数据处理中,数据排序是常见的操作,用于整理数据、提取有序子集或为后续计算提供有序输入。
数据排序通常基于某个特定的排序键或排序规则,按照指定的顺序对数据进行排列。常见的排序规则包括升序(从小到大)和降序(从大到小)。排序键可以是任何可比较的数据类型,如整数、浮点数、字符串等。
在排序过程中,通常使用排序算法来实现。常见的排序算法包括冒泡排序、插入排序、选择排序、快速排序、归并排序等。这些算法在时间复杂度和空间复杂度上有所差异,适用于不同规模和类型的数据集。
数据排序的目的主要有两个:
-
数据整理:排序可以将数据按照指定的顺序进行整理,使其更易于处理和分析。有序的数据可以提供更好的查询性能和更高效的数据操作。
-
数据分析和计算:在一些计算场景中,有序的数据可以更有效地进行计算。例如,排序后的数据可以更好地利用二分查找、归并操作和其他分析算法。
数据排序在各种领域和应用中都有广泛的应用。在数据库系统中,排序用于执行ORDER BY查询、索引维护和连接操作。在大数据处理和分布式计算中,排序常用于数据预处理、Shuffle阶段和结果合并等操作。
对于大规模的数据集,数据排序可能需要考虑性能、资源消耗和分布式环境下的并行处理。一些优化技术,如外部排序、并行排序和分布式排序,可以用于处理大规模数据的排序需求。
总之,数据排序是对数据按照指定规则进行重新排列的过程,它在数据处理和计算中起到重要的作用,提供了有序数据的基础。
4.数据聚合:
在 Shuffle 过程中,根据业务需求,可以进行数据的合并和聚合操作。例如,对相同键的数据进行合并,计算键对应的总和、平均值等统计量。
数据聚合是将多个数据项合并为一个或多个汇总结果的过程。在计算和数据处理中,数据聚合是一种常见的操作,用于计算统计信息、生成摘要信息或将数据集合并为更小的集合。
数据聚合可以应用于不同类型的数据,包括数值数据、文本数据和结构化数据等。聚合操作可以根据具体需求进行不同的计算,如求和、平均值、最大值、最小值、计数、频率统计、分组统计等。
以下是一些常见的数据聚合操作:
-
求和(Sum):将一组数值相加,得到总和。
-
平均值(Average):将一组数值相加后除以数量,得到平均值。
-
最大值(Maximum):从一组数值中找到最大值。
-
最小值(Minimum):从一组数值中找到最小值。
-
计数(Count):统计一组数据中的元素数量。
-
频率统计(Frequency Count):统计一组数据中不同元素的出现次数。
-
分组统计(Group By):根据某个特定的属性对数据进行分组,然后对每个组进行聚合操作。
数据聚合可以应用于不同的数据处理场景,包括数据分析、数据挖掘、报告生成和业务决策等。在大规模数据处理和分布式计算中,数据聚合通常涉及对分布在不同节点上的数据进行并行计算和合并。
对于大规模数据集和复杂的聚合操作,需要考虑性能、资源消耗和分布式环境下的并行计算。一些优化技术,如局部聚合、合并树、混合聚合等,可以用于提高聚合操作的性能和效率。
总之,数据聚合是将多个数据项合并为一个或多个汇总结果的过程。它在数据处理和计算中起到重要的作用,用于计算统计信息、生成摘要信息或将数据集合并为更小的集合。
5. 数据重分发:
在 Shuffle 过程中,经过排序和聚合后,需要将数据重新分发到不同的计算节点上,以便进一步的计算操作。数据重分发通常涉及将数据分发到正确的节点,以满足后续计算任务的需求。
数据重分发是在分布式计算中将数据从一个节点重新分发到另一个节点的过程。它通常发生在Shuffle阶段,用于重新分配和组合数据,以便进行后续的计算和处理。
数据重分发的主要目的是将具有相同key的数据项聚集到同一个节点上,以便进行分组、合并或聚合操作。它是实现分布式计算中数据重组和数据交换的重要环节。
数据重分发的过程如下:
-
Map阶段:在Map阶段,输入数据被映射为(key, value)对,并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存或磁盘中。
-
Shuffle阶段:
- 数据分区:在Shuffle阶段,根据分区器的规则,计算节点上的每个分区会被划分为多个数据块(chunk)。每个数据块包含具有相同key的数据项。
- 数据重分发:在数据重分发过程中,各个分区的数据项将根据key的哈希值或其他分区规则重新分配到不同的节点上。这通常涉及数据的网络传输或磁盘写入和读取操作。
-
Reduce阶段:在Reduce阶段,重新分发的数据项到达相应的节点,并被组合到相同key的数据集中。这样可以在每个节点上进行后续的分组、合并或聚合操作。
数据重分发的实现方式可以根据具体的分布式计算框架和算法进行选择和优化。在分布式计算系统中,数据重分发的效率和性能对整体计算的速度和可靠性有重要影响。因此,需要考虑网络带宽、数据传输延迟、数据量、节点负载等因素,选择合适的数据重分发策略和机制。一些优化技术,如数据压缩、数据本地化、数据合并等,可以用于提高数据重分发的效率和性能。
6.数据持久化:
在 Shuffle 过程中,为了保持数据的可靠性和容错性,通常需要将数据持久化存储,以便在出现故障或需要恢复时使用。
Shuffle 的性能和效率对于大数据处理和分布式计算来说非常重要。优化 Shuffle 过程可以提高整体的计算性能和效率,减少数据传输和网络开销,并避免数据倾斜和资源浪费问题。
需要根据具体的数据处理框架和场景来实现 Shuffle 操作。一些常见的大数据处理框架,如Apache Hadoop、Apache Spark等,提供了内置的 Shuffle 支持,以简化和优化数据处理过程中的 Shuffle 操作。
数据持久化是指将数据存储到长期存储介质(如磁盘、数据库、文件系统等)中,以便在程序运行结束后仍然能够访问和使用数据的过程。
数据持久化的目的是确保数据在程序运行结束后的持久性和可靠性,以便后续的读取、查询、分析或其他操作。通过数据持久化,数据可以长期保存,随时可以被读取和处理,即使在系统重启或程序重新运行后也能保留数据状态。
数据持久化的方式和方法可以根据具体的应用场景和需求进行选择,常见的数据持久化方式包括:
-
关系型数据库:使用关系型数据库管理系统(如MySQL、Oracle、PostgreSQL等)进行数据的持久化存储和管理。关系型数据库提供结构化的数据存储和丰富的查询功能,适用于需要事务支持和复杂数据模型的应用。
-
NoSQL数据库:使用NoSQL数据库(如MongoDB、Redis、Cassandra等)进行数据的持久化存储。NoSQL数据库提供灵活的数据模型和高吞吐量的数据访问,适用于大规模数据存储和快速读写的应用场景。
-
文件系统:将数据以文件的形式存储在文件系统中,可以使用文本文件、JSON文件、XML文件等格式进行存储。文件系统提供简单的数据持久化方式,适用于小型数据集或简单数据结构的应用。
-
分布式存储系统:使用分布式存储系统(如Hadoop HDFS、Amazon S3、Google Cloud Storage等)进行大规模数据的分布式持久化存储。分布式存储系统提供高容量、高可靠性和可扩展性的数据存储解决方案。
-
内存数据库:将数据存储在内存中进行快速访问和处理,常见的内存数据库包括Redis、Memcached等。内存数据库提供低延迟的数据访问和高性能的数据处理,适用于对实时性要求较高的应用。
在选择数据持久化方式时,需要考虑数据的性质、访问模式、数据量、性能需求、可靠性要求等因素。不同的数据持久化方式有不同的优势和限制,需要根据具体的业务需求和系统要求进行权衡和选择。
5.1.1 ShuffleMapStage 与 ResultStage
ShuffleMapStage和ResultStage是Apache Spark中两个关键的概念,用于表示Spark中的任务执行流程和数据流转过程。
- ShuffleMapStage:
ShuffleMapStage是Spark中的一个执行阶段(Stage),用于执行包含Shuffle操作的任务。在Spark的执行过程中,当需要进行Shuffle操作(如reduceByKey、groupBy等)时,会触发一个ShuffleMapStage。ShuffleMapStage将数据进行重新分区和排序,并将结果写入磁盘上的中间文件(shuffle文件)。
ShuffleMapStage包含一组任务(Tasks),每个任务负责处理输入数据的一个分区,并生成Shuffle输出数据的一部分。每个任务在一个计算节点上执行,可以并行处理多个分区,以提高执行效率。执行ShuffleMapStage的任务之间通常是相互独立的,可以并行执行。
- ResultStage:
ResultStage是Spark中的另一个执行阶段(Stage),用于执行不包含Shuffle操作的任务。ResultStage接收ShuffleMapStage的输出数据(中间文件),并根据任务的计算逻辑进行计算和处理。ResultStage的输入数据已经根据键进行了分组和排序,可以直接进行后续的聚合、过滤或其他操作。
ResultStage中的任务(Tasks)执行并行处理数据,每个任务负责处理一部分输入数据,可以在不同的计算节点上并行执行。任务之间相互独立,可以同时进行计算和处理。
ShuffleMapStage和ResultStage是Spark中任务执行流程中的两个关键阶段。ShuffleMapStage负责处理包含Shuffle操作的任务,并生成Shuffle输出数据;而ResultStage接收ShuffleMapStage的输出数据,进行进一步的计算和处理。这两个阶段的协同工作实现了数据的重新分区、排序和计算过程,以提供高性能和高效率的数据处理能力。
- 在划分 stage 时,最后一个 stage 称为 finalStage,它本质上是一个 ResultStage 对象,前
面的所有 stage 被称为 ShuffleMapStage。 - ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。
- ResultStage 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition
的数据集上,意味着一个 job 的运行结束。
5.2 HashShuffle 解析
HashShuffle是一种常见的Shuffle实现策略,用于在分布式计算中重新分配和组合数据。在HashShuffle中,数据的重新分发和分组是基于哈希函数的。
HashShuffle的过程如下:
-
Map阶段:在Map阶段,输入数据被映射为(key, value)对,并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存中。
-
Shuffle阶段:
- 数据分区:在Shuffle阶段,计算节点上的每个分区会被划分为多个数据块(chunk)。每个数据块包含相同key的数据项。
- 数据排序:对于每个数据块,数据项根据key进行排序。排序的目的是将具有相同key的数据项相邻存储,以方便后续的分组操作。
-
Reduce阶段:在Reduce阶段,数据项被重新组合,具有相同key的数据项被分组到同一个计算节点上的Reducer任务中进行处理。Reducer任务对每个key的数据项进行聚合、计算等操作,生成最终的结果。
HashShuffle的优点是简单且高效,适用于处理大规模数据集。它通过使用哈希函数将数据重新分配到不同的计算节点上,并通过数据块和排序操作提高了后续分组操作的效率。然而,HashShuffle也有一些限制,例如需要足够的内存来存储数据块和排序操作可能引入的磁盘IO开销。
值得注意的是,Spark和其他分布式计算框架通常提供多种Shuffle实现策略,例如SortShuffle、TungstenSort等,以根据不同的场景和需求选择最合适的Shuffle策略。这些策略可能在性能、内存消耗和网络开销等方面有所不同,开发人员可以根据具体的应用场景选择合适的Shuffle实现方式。
- 这里我们先明确一个假设前提:每个 Executor 只有 1 个 CPU core,也就是说,无论这个 Executor 上分配多少个 task 线程,同一时间都只能执行一个 task 线程。
- 如下图中有 3 个 Reducer,从 Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce 取模),分类出 3 个不同的类别,每个 Task 都分成 3 种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以 Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每 1 个 Task 输出 3 份本地文件,这里有 4 个Mapper Tasks,所以总共输出了 4 个 Tasks x 3 个分类文件 = 12 个本地小文件。
5.2.1 未优化的 HashShuffle
未优化的HashShuffle是指在分布式计算中使用基本的HashShuffle实现策略,没有进行额外的优化措施。它可以被视为最简单和最基本的Shuffle实现方式,但可能存在性能瓶颈和资源利用不足的问题。
未优化的HashShuffle的特点如下:
-
数据传输:在未优化的HashShuffle中,所有的数据都会通过网络传输,包括从Mapper节点到Reducer节点的数据传输和中间数据的传输。这可能导致网络带宽和延迟成为性能瓶颈,并且可能造成资源的浪费。
-
数据持久化:未优化的HashShuffle通常需要将中间数据写入磁盘,以便在Reducer节点上进行排序和合并操作。这可能导致磁盘IO成为瓶颈,并且可能导致额外的存储开销。
-
数据排序:在未优化的HashShuffle中,中间数据的排序操作可能是基于磁盘的,即需要将数据读取到内存中进行排序。这可能会导致大量的磁盘IO操作和额外的计算开销。
-
资源利用:未优化的HashShuffle可能没有充分利用计算节点的资源,如内存和CPU。由于数据传输、磁盘IO和排序操作的开销,可能导致资源利用率低下,降低了整体的计算性能和效率。
对于大规模的数据集和复杂的计算任务,未优化的HashShuffle可能无法满足性能和可扩展性的要求。因此,在实际的分布式计算环境中,通常需要进行Shuffle的优化,以减少数据传输、磁盘IO和排序操作的开销,提高计算性能和资源利用率。这可以通过采用更高级的Shuffle实现策略(如SortShuffle、TungstenSort)、调整Shuffle的参数和配置,以及使用缓存和内存管理等技术来实现。
5.2.2 优化后的 HashShuffle
优化后的HashShuffle是指在分布式计算中对HashShuffle进行了一系列的优化措施,以提高性能、减少资源消耗和优化数据传输过程。以下是一些常见的HashShuffle优化技术:
-
拷贝优化:为了减少数据传输的网络开销,可以采用拷贝优化技术。在Map阶段,可以在Map任务本地生成中间结果,并将其拷贝到Reducer任务所在的节点上。这样可以减少数据的传输量,提高数据传输的效率。
-
聚合优化:在Shuffle阶段,可以进行本地聚合操作,即在Mapper节点上对具有相同键的数据项进行局部聚合。这样可以减少中间数据量,并减少数据传输的开销。
-
压缩优化:在数据传输过程中,可以使用压缩技术对数据进行压缩。压缩后的数据可以减少网络传输的带宽消耗,并降低数据传输的延迟。
-
内存优化:在Shuffle阶段,可以针对中间数据的排序和合并操作进行内存优化。例如,使用基于磁盘的排序算法,避免将所有数据加载到内存中进行排序,以减少内存消耗。
-
磁盘IO优化:对于磁盘IO操作,可以采用批量写入和异步IO等技术,减少磁盘读写的次数,提高IO的效率。
-
基于内存的Shuffle:使用内存进行数据传输和排序操作,可以极大地提高Shuffle的性能。这可以通过使用堆外内存或内存映射文件来实现。
-
动态分区优化:根据数据分布和负载情况,动态调整分区策略和分区数量,以实现负载均衡和提高计算性能。
-
数据倾斜处理:对于可能出现的数据倾斜问题,可以采用一些技术,如数据重分区、局部聚合、采样和随机化等,以均衡数据分布并提高计算效率。
这些优化措施可以根据具体的分布式计算框架和应用场景进行选择和应用。通过对HashShuffle的优化,可以提高数据处理的性能和效率,减少资源的消耗,并提升分布式计算系统的整体可靠性和可扩展性。
- 优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。
- 这里还是有 4 个 Tasks,数据类别还是分成 3 种类型,因为 Hash 算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过 Task,都会把同样的 Key 放在同一个 Buffer里,然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中,(一个 Core 只有一种类型的 Key 的数据),每 1 个 Task 所在的进程中,分别写入共同进程中的 3 份本地文件,这里有 4 个 Mapper Tasks,所以总共输出是 2 个 Cores x 3 个分类文件 = 6 个本地小文件。
5.3 SortShuffle 解析
SortShuffle是Apache Spark中的一种Shuffle实现策略,用于在分布式计算中重新分配和组合数据。SortShuffle通过排序操作对数据进行重新分区和组合,以提高数据传输和聚合操作的效率。
SortShuffle的过程如下:
-
Map阶段:在Map阶段,输入数据被映射为(key, value)对,并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存中。
-
Shuffle阶段:
- 数据分区:在Shuffle阶段,计算节点上的每个分区会被划分为多个数据块(chunk)。每个数据块包含相同key的数据项。
- 数据排序:对于每个数据块,数据项根据key进行排序。排序的目的是将具有相同key的数据项相邻存储,以方便后续的分组操作。
-
Reduce阶段:在Reduce阶段,数据项被重新组合,具有相同key的数据项被分组到同一个计算节点上的Reducer任务中进行处理。Reducer任务对每个key的数据项进行聚合、计算等操作,生成最终的结果。
SortShuffle相较于未优化的HashShuffle具有以下优点:
-
数据局部性:SortShuffle通过排序操作,将具有相同key的数据项在内存中相邻存储。这样可以提高数据的局部性,减少数据传输的网络开销。
-
内存效率:SortShuffle利用排序操作,可以将具有相同key的数据项紧密存储,从而减少内存的使用量。这可以提高内存的利用效率,并减少额外的存储开销。
-
聚合效率:由于具有相同key的数据项已经相邻存储,Reduce阶段的聚合操作可以更加高效地进行。Reducer任务可以逐个读取具有相同key的数据项,并在内存中进行聚合计算,而无需频繁地访问磁盘。
尽管SortShuffle提供了优化的数据传输和聚合操作,但它也存在一些限制。在数据倾斜的情况下,具有相同key的数据项可能不均匀地分布在不同的节点上,从而导致性能不均衡。此外,SortShuffle可能在内存不足的情况下导致大量的磁盘IO操作。
在实际应用中,可以根据数据集的特点和应用需求选择合适的Shuffle实现策略。除了SortShuffle,还有其他的Shuffle实现策略可供选择,如HashShuffle、TungstenSort等。选择合适的Shuffle策略可以根据数据的大小、计算负载、可用资源等因素进行评估和权衡。
5.3.1 普通 SortShuffle
普通的SortShuffle是指在分布式计算中基于排序的Shuffle实现方式,用于重新分配和组合数据。它通过对数据进行排序操作,以提高数据传输和聚合操作的效率。
普通SortShuffle的过程如下:
-
Map阶段:在Map阶段,输入数据被映射为(key, value)对,并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存中。
-
Shuffle阶段:
- 数据分区:在Shuffle阶段,计算节点上的每个分区会被划分为多个数据块(chunk)。每个数据块包含相同key的数据项。
- 数据排序:对于每个数据块,数据项根据key进行排序。排序的目的是将具有相同key的数据项相邻存储,以方便后续的分组操作。
-
Reduce阶段:在Reduce阶段,数据项被重新组合,具有相同key的数据项被分组到同一个计算节点上的Reducer任务中进行处理。Reducer任务对每个key的数据项进行聚合、计算等操作,生成最终的结果。
普通SortShuffle的主要特点是基于全排序的方式进行数据的重新分区和排序。它可以通过并行化和排序优化来提高数据的传输效率和聚合效率,减少数据传输的网络开销和磁盘IO操作。
然而,普通SortShuffle也存在一些潜在的限制。由于需要对所有数据项进行全局排序,因此它对内存的需求较高,可能会导致内存溢出或额外的磁盘交换开销。在处理大规模数据集或数据倾斜的情况下,普通SortShuffle可能会面临性能瓶颈和资源消耗过大的问题。
为了克服这些限制,一些优化的SortShuffle实现策略被提出,如TungstenSort、SortMergeShuffle等。这些优化策略通常使用更高效的排序算法、内存管理技术和数据分区策略,以提高性能、降低资源消耗和应对特殊情况,如数据倾斜。
在实际应用中,选择合适的Shuffle实现策略需要根据具体的场景和需求进行评估和权衡,以获得最佳的性能和可靠性。
- 在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局部聚合,一遍写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
- 在溢写磁盘前,先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为 10000 条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件, 也就是说一个 Task 过程会产生多个临时文件。
- 最后在每个 Task 中,将所有的临时文件合并,这就是 merge 过程,此过程将所有临时文件读取出来,一次写入到最终文件。 意味着一个 Task 的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
5.3.2 bypass SortShuffle
Bypass SortShuffle是一种优化的Shuffle实现策略,旨在减少不必要的排序操作和数据传输开销。它适用于某些特定的场景,其中不需要对Shuffle的输出进行排序或合并。
在传统的Shuffle过程中,数据在Map阶段被划分为多个分区,并在Shuffle阶段进行排序和数据传输。然后,在Reduce阶段对具有相同key的数据项进行合并和处理。这种方式在某些情况下可能会导致不必要的开销,尤其是在不需要排序和合并的情况下。
Bypass SortShuffle通过跳过排序和合并操作来减少这些开销,从而提高性能和效率。它在Map阶段生成分区文件,并在Reduce阶段直接读取分区文件,而不需要排序和合并。
以下是Bypass SortShuffle的主要步骤:
-
Map阶段:在Map阶段,输入数据被映射为(key, value)对,并根据指定的分区器将其分配到不同的分区中。每个分区的数据被写入磁盘上的分区文件中,而不进行排序。
-
Shuffle阶段:在Shuffle阶段,Reducer任务根据分区文件的位置和分区信息,直接读取分区文件,并对具有相同key的数据项进行处理。这个过程不涉及排序和合并操作,因为数据已经按分区存储。
Bypass SortShuffle的优点是减少了排序和合并操作的开销,提高了Shuffle过程的性能和效率。它特别适用于一些场景,如对已经预先有序的数据进行处理或需要保留原始数据顺序的情况。
需要注意的是,Bypass SortShuffle并不适用于所有的场景,尤其是需要排序和合并操作的情况。在选择Shuffle实现策略时,需要根据具体的需求和场景来评估和权衡各种选项,以获得最佳的性能和结果。
bypass 运行机制的触发条件如下:
- shuffle reduce task 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值,默认
为 200。 - 不是聚合类的 shuffle 算子(比如 reduceByKey)。此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件,并将数据按 key 进行hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。而该机制与普通 SortShuffleManager 运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。