大数据 - Spark系列《一》- 分区 partition数目设置详解

目录

🐶3.2.1 分区过程

🐶3.2.2 SplitSize计算和分区个数计算

🐶3.2.3 Partition的数目设置

1. 🥙对于数据读入阶段,输入文件被划分为多少个InputSplit就会需要多少初始task.

2. 🥙对于转换算子产生的RDD的分区数

 3. 🥙如果指定了spark.default.parallelism,在进行shuffle之后的新的rdd会和spark.default.parallelism设置的一致

​编辑

4. 🥙repartition和coalesce操作会聚合成指定分区数。

🐶3.2.4 groupBy不一定会Shuffle


🐶3.2.1 分区过程

每一个过程的任务数,对应一个InputSplit,Paritition 输入可能以多个文件的形式存储在HDFS上面,,每个File都包含了很多块(128切分),称为block

当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,按照SplitSize切成一个个输入分片。随后将为这些输入分片生成具体的task. InputSplit与Task是一一对应的关系

注意:InputSplit不能跨越文件。

随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

  • 每个节点可以起一个或多个Executor.

  • 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个task.

  • 每个task执行的结果就就是生成了目标rdd的一个partition.

注意:这里的core是虚拟的core而不是机器的物理CPU核,可以理解为Executor的一个工作线程。Task被执行的并发度=Executor数目*每个Executor核数(=core总个数)

🐶3.2.2 SplitSize计算和分区个数计算

🐶3.2.3 Partition的数目设置

1. 🥙对于数据读入阶段,输入文件被划分为多少个InputSplit就会需要多少初始task.
  • 集合

    • (优先等级1)指定分区数

    • (优先等级2)使用 set("spark.default.parallelism","8")

    • (优先等级3)所有的可用核数

  • 文件 根据计算来的任务切片大小和输入路径下的文件大小 ,至少2并行度

  • 数据库 指定的

2. 🥙对于转换算子产生的RDD的分区数
  • 默认和父RDD的分区数一致

  • 有些算子可以调用的时候指定分区个数 distinct groupBy groupByKey

  • 特殊的算子 有特殊规定 union(和) join

val rdd3 = rdd1.intersection(rdd2)  // 取大的
val rdd4 = rdd1.subtract(rdd2) // 前面的RDD分区数
println(rdd1.cartesian(rdd2).getNumPartitions) // 两个分区个数乘积

 注意: 可能产生Shuffle的算子可以指定分区个数的

//可能产生shuffle的操作
distinct(p)     减少
groupBy(_._1 , p)    Shuffle 
groupByKey( p)       Shuffle 
groupByKey(_+_, p)   Shuffle 
join( , p)
 3. 🥙如果指定了spark.default.parallelism,在进行shuffle之后的新的rdd会和spark.default.parallelism设置的一致
package com.doit.com.doit.day0128import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}/*** @日期: 2024/1/30* @Author: Wang NaPao* @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343* @Tips: 我是技术大牛* @Description:*//** data/orders.txt
oid01,100,bj
oid02,100,bj
oid03,100,bj
oid04,100,nj
oid05,100,nj
*/object Test06 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Starting...").setMaster("local[*]").set("spark.default.parallelism", "8")val sc = new SparkContext(conf)//设置spark-submit提交程序时不在控制台打印日志信息Logger.getLogger("org.apache.spark").setLevel(Level.WARN)val rdd1 = sc.textFile("data/orders.txt")//将rdd1的分区设置为2rdd1.repartition(2)println("rdd1 partition为:"+rdd1.getNumPartitions)//将rdd1按照城市分组val rdd2 = rdd1.groupBy(tp=>{val arr = tp.split(",")arr(2)})println("rdd2 partition为:"+rdd2.getNumPartitions)sc.stop()}
}
4. 🥙repartition和coalesce操作会聚合成指定分区数。
println(rdd1.repartition(3).getNumPartitions) // 增加 
println(rdd1.repartition(1).getNumPartitions)  //减少
println(rdd1.coalesce(1, true).getNumPartitions)  //减少
println(rdd1.coalesce(3, true).getNumPartitions)  //增加
// 不允许Shuffle就不能增加分区
println(rdd1.coalesce(3, false).getNumPartitions)  //增加失败
println(rdd1.coalesce(1, false).getNumPartitions)  //减少  不会Shuffle

🐶3.2.4 groupBy不一定会Shuffle

Shuffle:上游一个分区的数据可能被下游所有分区引用

package com.doit.com.doit.day0128import org.apache.spark.SparkContext.jarOfObject
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/*** @日期: 2024/1/29* @Author: Wang NaPao* @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343* @Tips: 我是技术大牛* @Description:*/object Test03 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("doe").setMaster("local[*]")val sc = new SparkContext(conf)val rdd1 = sc.makeRDD(List("a b c d e f g"), 2)val rdd2: RDD[String] = rdd1.flatMap(_.split("\\s+"))val wordOne = rdd2.map(line=>{println("aaaaaa")(line,1)})   //2//对数据使用HashPartitioner在分区 2val rdd3 = wordOne.partitionBy(new HashPartitioner(3))rdd3.mapPartitionsWithIndex((p,iter)=>{iter.map(e=>(p,e))}).foreach(println)//底层默认是HashPartition分区 2val rdd4: RDD[(String, Iterable[(String, Int)])] = rdd3.groupBy(_._1, 3)val rdd5: RDD[(Int, (String, Iterable[(String, Int)]))] = rdd4.mapPartitionsWithIndex((p, iter) => {iter.map(e => (p, e))})rdd5.foreach(println)sc.stop()}
}

 结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/662089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中国文化之光:微博数据的探索与可视化分析

大家好,我是八块腹肌的小胖 下面我们针对主题“中国文化”相关的微博数据进行爬取 使用LDA、情感分析、情感演化、词云等可视化操作进行相关的展示 1、导包 第一步我们开始导包工作 下面这段代码,首先,pandas被请来了,因为它是…

2024年美赛 (A题MCM)| 海蟒鳗鱼 |数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时,你是否曾经感到茫然无措?作为2022年美国大学生数学建模比赛的O奖得主,我为大家提供了一套优秀的解题思路,让你轻松应对各种难题。 让我们来看看美赛的A题! 完整内容可以在文章末尾领…

Camunda流程引擎概念

💖专栏简介 ✔️本专栏将从Camunda(卡蒙达) 7中的关键概念到实现中国式工作流相关功能。 ✔️文章中只包含演示核心代码及测试数据,完整代码可查看作者的开源项目snail-camunda ✔️请给snail-camunda 点颗星吧😘 💖流程定义 …

服务器C盘突然满了,是什么问题

随着时代的发展、互联网的普及,加上近几年云计算服务的诞生以及大规模普及,对于服务器的使用目前是非常普遍的,用户运维的主要对象一般也主要是服务器方面。在日常使用服务器的过程中,我们也会遇到各式各样的问题。最近就有遇到用…

【2024美赛C题】网球大佬带你无背景压力分析解题思路!

2024美赛数学建模c题思路分享 加群可以享受定制等更多服务,或者搜索B站:数模洛凌寺 联络组织企鹅:936670395 以下是C题老师的解题思路(企鹅内还会随时更新文档): 1背景介绍 2024MCM问题C:网…

LeetCode:42. 接雨水

42. 接雨水 1)题目2)思路3)代码4)结果 1)题目 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 示例 1: 输入:height …

强化学习 - Monte Carlo Tree Search (MCTS)

什么是机器学习 强化学习中的Monte Carlo Tree Search (MCTS) 是一种用于决策制定和搜索的算法,特别在不确定环境下表现出色。 1. 强化学习背景 在强化学习中,一个智能体通过与环境的交互学习,以便在某个任务上获得最大的奖励。MCTS是一种…

2024美国大学生数学建模美赛选题建议+初步分析

总的来说&#xff0c;去年算是美赛环境题元年&#xff0c;去年的开放度是较高的&#xff0c;今年每种赛题类型相对而言平均了起来 提示&#xff1a;DS C君认为的难度&#xff1a;E<BCF<AD&#xff0c;开放度&#xff1a;DBCE<A<F。 以下为A-F题选题建议及初步分析…

【ArcGIS Pro】从0开始

1.导入excel&#xff0c;需要安装驱动程序 安装用于 Microsoft Excel 文件的驱动程序 https://pro.arcgis.com/zh-cn/pro-app/latest/help/data/excel/prepare-to-work-with-excel-in-arcgis-pro.htm 2.修改投影坐标系 点到地图图标上&#xff0c;右键才能设置坐标系。 3.…

前端通过nginx,访问一个文件夹里面的全部数据,nginx 咋配置

目录 1 问题2 实现 1 问题 前端通过nginx,访问一个文件夹里面的全部数据&#xff0c;nginx 咋配置 2 实现 location /logs {alias /mnt/www/logs/;autoindex on; }

【QT+QGIS跨平台编译】之二十二:【FontConfig+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、FontConfig介绍二、文件下载三、文件分析四、pro文件五、编译实践 一、FontConfig介绍 FontConfig 是一个用于配置和定制字体的库&#xff0c;广泛应用于基于X Window系统的操作系统中&#xff0c;尤其是在Linux和Unix-like系统中。它为应用程序提供了一种统一的…

JavaWeb前端——HTML/CSS

HTML/CSS概述 HTML&#xff1a;学习标签&#xff0c;CSS&#xff1a;学习样式 HTML 1. 不区分大小写。 2. 属性可以使用单引号/双引号 3. 在记事本/编辑器中编写html语言&#xff0c;通过浏览器解析渲染语言 4. 语法结构松散&#xff08;编写时要尽量严谨&#xff09; VSc…

搭建 idea 插件仓库私服

正常情况下&#xff0c;我们开发的 idea 插件会发布到 idea 官方商城中&#xff0c;这样用户就可以在 idea 的 Marketplace 中搜索安装。 但是在企业内部&#xff0c;有可能我们开发了很多内部插件&#xff0c;而不能发布到公共市场中&#xff0c;这种情况下我们就需要搭建一个…

探索微服务治理:从发展到实践构建高效稳定的系统|负载均衡技术解析

二、微服务治理的相关技术 微服务治理涉及多个方面&#xff0c;包括服务注册与发现、负载均衡、容错处理、服务配置管理等&#xff0c;这些技术共同确保微服务架构的稳定运行。 2、负载均衡 负载均衡作为服务治理中的核心技术之一&#xff0c;对于提高系统的可用性、性能和扩…

网络空间测绘在安全领域的应用(上)

近年来&#xff0c;网络空间测绘已经跻身为网络通信技术、网络空间安全、地理学等多学科融合的前沿领域。 该领域聚焦于构建网络空间信息的“全息地图”&#xff0c;致力于建立面向全球网络的实时观测、准确采样、映射和预测的强大基础设施。 通过采用网络探测、数据采集、信…

华为FreeClip耳机可以调节音量大小吗?附教程!

不会只有我一个人吧&#xff1f;都用华为FreeClip耳机一段时间了&#xff0c;才发现它竟然不支持在耳机上直接调节音量&#xff0c;也是没谁了&#xff01;但是后来自己摸索了一下&#xff0c;发现了华为FreeClip耳机原来是几个简单有效的调节音量大小的方法滴~不得不说&#x…

在Android Studio中配置OpenCV

在Android Studio中配置OpenCV 1 下载OpenCV2 导入OpenCV模块3 修改配置4 增加依赖5 拷贝libopencv_java.so6 Activity中加入代码1 下载OpenCV 下载OpenCV的Android包并解压。 2 导入OpenCV模块 在Android应用中,导入OpenCV模块。 导入目录时选择Opencv Android中的sdk目…

TiDB架构设计和实践:高性能分布式数据库解决方案

摘要&#xff1a;TiDB是一个开源的分布式NewSQL数据库&#xff0c;具备强大的水平扩展能力和高性能查询能力。本文将介绍TiDB的架构设计和实践经验&#xff0c;帮助读者了解如何利用TiDB构建可靠、高性能的分布式数据库系统。 正文&#xff1a; ### 1. 引言 随着互联网规模的…

鸿蒙ArkUI下拉列表组件

鸿蒙ArkUI下拉列表组件&#xff0c;官方提供的只是基础使用&#xff0c;在使用过程非常不方便&#xff0c;我们进行了组件的封装。 import {IDynamicObject} from ./IType /*** 自定义颜色*/ Component export default struct DiygwSelect{//绑定的值Link Watch(onValue) val…

如何做好员工离职风险防范和离职危机处理工作

员工退出与离职是企业发展中都会面临的一个普遍现象&#xff0c;这种现象本身没有什么问题&#xff0c;但是如果企业退出与离职管理不善&#xff0c;就会增加企业的管理成本&#xff0c;影响企业的正常经营活动。该电子科技有限公司在发展中也遇到员工离职管理不善带来的问题。…