Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

目录

一、RDD 分区的设定规则

(一)parallelize 获取 rdd 时的分区设定

(二)通过外部读取数据 - textFile 时的分区设定

(三)子 RDD 分区数

(四)RDD分区的设定规则

二、高阶函数及 Lambda 表达式

(一)复习 Python 函数语法

(二)需求示例与高阶函数概念引入

一个简单的需求场景

高阶函数的定义

(三)Lambda 表达式与高阶函数的结合使用

使用 Lambda 表达式优化代码

更复杂的需求示例

在集合操作中的应用

三、总结


        在大数据处理领域,Apache Spark 是一个强大的开源分布式计算框架。它提供了丰富的功能和灵活的编程接口,其中弹性分布式数据集(RDD)是其核心概念之一。RDD 的分区设定规则对于数据处理的性能和资源利用至关重要,同时,高阶函数和 Lambda 表达式的运用能让我们在 Spark 编程中更加简洁高效地处理数据。本文将深入探讨 RDD 分区的设定规则以及高阶函数和 Lambda 表达式的相关知识。

一、RDD 分区的设定规则

(一)parallelize 获取 rdd 时的分区设定

方式一:并行化集合:parallelize
没有指定:spark.default.parallelism参数值决定
指定分区:指定几个,就是几个分区
list_rdd = sc.parallelize(data,numSlices=2)

总结:假如指定了分区数,分区数就是这个,假如没有指定spark.default.parallelism。

(二)通过外部读取数据 - textFile 时的分区设定

没有指定:spark.default.parallelism和2取最小值,具体计算并行度的公式:
   min(spark.default.parallelism,2) 
指定分区:最小分区数,最少有这么多分区,具体的分区数可以根据HDFS分片规则来 
hdfs的一片是128M或者128*1.1 = 140.8M
file_rdd =sc.textFile("../datas/function_data/filter.txt", minPartitions=2)

# 假如你这个data.txt = 500M ,此时的分区数是:4  因为 500 =  128+ 128+ 128+ 116
    rdd3 = sc.textFile("hdfs://bigdata01:9820/datas/wordcount/data.txt", minPartitions=2)
    rdd3.foreach(lambda x: print(x))

spark.default.parallelism 参数

        这个参数在 RDD 分区设定中起着关键作用,它用于指定没有父 RDD 的 RDD 的分区数。在不同的运行模式下,其取值规则有所不同。

Local mode(本地模式)

在本地模式下,分区数取决于本地机器的 CPU 核数。这是因为在本地环境中,资源主要受限于本地计算机的硬件配置。例如,如果本地机器是 4 核,那么分区数可能就是 4(具体可能还会受到其他相关设置的微调)。这种基于本地 CPU 核数的分区设定,能够充分利用本地计算资源,实现一定程度的并行计算。

Mesos fine grained mode(Mesos 细粒度模式)

在 Mesos 细粒度模式下,分区数的默认值是 8。Mesos 是一种集群资源管理框架,在这种特定的模式下,Spark 有其默认的分区设定策略。这个默认值 8 是经过实践和设计考虑的,旨在在 Mesos 细粒度管理环境下平衡数据处理的效率和资源利用。

其他模式(Others)

在其他模式下,分区数是集群中所有参与运算的设备的所有核数与 2 相比较,取较大值。这种设定方式考虑了集群的整体计算能力。例如,如果集群中有 10 个节点,每个节点有 2 核,那么总核数是 20,大于 2,所以分区数就是 20。这种策略保证了在不同规模的集群中都能有合适的并行度,避免了因核数过少而导致的处理效率低下问题,同时也防止了因核数过多而可能引起的资源过度分配问题。

(三)子 RDD 分区数

        子 RDD 的分区数与父 RDD 以及所使用的转换操作有关。在一些转换操作中,分区数可能保持不变,而在另一些操作中,分区数可能会根据数据的重新分布规则而改变。比如,在某些聚合操作后,分区数可能会减少,而在数据拆分操作后,分区数可能会增加。了解子 RDD 分区数的变化规律对于优化 Spark 作业的性能和资源利用有着重要意义。

(四)RDD分区的设定规则

(1)local模式
默认并行度取决于本地机器的核数,即
local: 没有指定CPU核数,则所有计算都运行在一个线程当中,没有任何并行计算
local[K]:指定使用K个Core来运行计算,比如local[2]就是运行2个Core来执行
local[*]: 自动帮你按照CPU的核数来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算

        

(2)集群模式
集群模式包含Stanalone、Yarn模式,Mesos的默认并行度为8
默认并行度取决于所有executor上的总核数与2的最大值,比如集群模式的设置如下:
--num-executors 5
--executor-cores 2
上面配置Executor的数量为5,每个Executor的CPU Core数量为2,
executor上的总核数10,则默认并行度为Max(10,2)=10。


注意,上面只是默认并行度(defaultParallelism)的取值,并不一定是RDD最终的分区数。具体来说,对于从集合中创建的RDD,其最终分区数等于defaultParallelism,但是从外部存储系统的数据集创建创建的RDD,其最终的分区数:需要文件的总大小计算得到。

二、高阶函数及 Lambda 表达式

(一)复习 Python 函数语法

        在深入探讨高阶函数和 Lambda 表达式之前,我们先来复习一下 Python 函数的语法。函数是一段可重复使用的代码块,用于完成特定的任务。在 Python 中,我们使用def关键字来定义函数,例如:

创建
def 函数名(参数):代码逻辑返回值:return
调用
返回值 = 函数名(参数)def function_name(parameters):# 函数体return resultdef add(a,b)return a + bx = add(1,2)
print(x) # x = 3

        函数可以接受参数,在函数体中进行计算,并返回一个结果。参数可以有默认值,函数也可以没有返回值(此时return语句可以省略)。

(二)需求示例与高阶函数概念引入

一个简单的需求场景

        假设我们有一个列表list1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],我们想要对这个列表中的每个元素进行平方和次方运算。我们可以定义一个函数,然后使用map函数来将函数应用到list1的每个元素上:

import mathlist1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print(*list1)def pingFang(x):return x * xdef liFang(x):return math.pow(x, 3)def getNum(_list, flag):if flag == 2:return map(pingFang, _list)if flag == 3:return map(liFang, _list)# 此时的map是python的一个方法,不是spark里面一个算子RDD
# map(函数,后面是一个迭代器): 循环将一个迭代器中的元素拿过来,传递个前面的函数,计算得到一个map对象
# lambda 其实是一个匿名函数,主要用于一个方法中需要传递一个函数,这个函数只用一次,并且函数中的代码只有一行。
def getNum2(_list, flag):if flag == 2:return map(lambda x: x * x, _list)if flag == 3:return map(lambda x: math.pow(x, 3), _list)for e in getNum2(list1, 2):print(e)print("--" * 20)
for e in getNum2(list1, 3):print(e)

高阶函数的定义

        高阶函数是一种特殊的函数,它的某个参数是一个函数。就像我们上面提到的map函数,它接受一个函数(这里是comp)和一个可迭代对象(这里是list1)作为参数。一般来说,作为参数的函数(这里的comp)被称为参数函数。

        函数和算子在概念上有一些区别,函数通常是指一般的方法,它们在单机环境下执行,不能并行执行。而算子在分布式计算环境中,如 Spark 中,数据是分布式存储的,计算也是分布式进行的。

(三)Lambda 表达式与高阶函数的结合使用

使用 Lambda 表达式优化代码

        对于前面的平方运算示例,我们可以使用 Lambda 表达式来简化代码。Lambda 表达式是一种匿名函数,它的语法形式为lambda parameters: expression。使用 Lambda 表达式来实现列表元素平方运算的代码如下:

# 举例说明:
list1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def comp(x):return x*x
rs = map(comp, list1)
print(*rs)# 使用lambda 优化一下:
list1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rs = map(lambda x: x*x, list1)
print(*rs)

这里的lambda x: x * x就是一个匿名函数,它替代了我们之前定义的comp函数。

更复杂的需求示例

        我们来看一个更复杂一点的需求。给定一个值,我们可以求这个值的 2 次方或者 3 次方。首先是正常的写法:

# 正常思路
import mathdef getPingFang(num):return num ** 2def getLiFang(num):return math.pow(num, 3)def getNum(num, flat):if flat == 2:return getPingFang(num)if flat == 3:return getLiFang(num)print(getNum(10, 2))
print(getNum(10, 3))

这种写法相对比较繁琐,尤其是当函数体比较简单的时候。我们可以使用高阶函数和 Lambda 表达式来优化。我们定义一个高阶函数getNum2

def getNum2(fun, num):return fun(num)

然后我们可以这样使用它:

print(getNum2(getPingFang, 10))
print(getNum2(getLiFang, 10))

我们还可以使用 Lambda 表达式:

print(getNum2(lambda x: x ** 2, 10))
print(getNum2(lambda x: math.pow(x, 3), 10))

在集合操作中的应用

        假设我们有一个列表list = [1, 3, 4, 45, 56, 8],我们想要求这个列表中每个数的平方和立方。我们可以使用map这个高阶函数结合 Lambda 表达式来实现:

a = map(lambda x: math.pow(x, 2), list)
b = map(lambda x: math.pow(x, 3), list)
print(*a)
print(*b)

        这里map函数将 Lambda 表达式所定义的函数应用到列表的每个元素上,分别得到平方和立方的结果集。

三、总结

        在 Spark 编程中,理解 RDD 分区的设定规则对于优化数据处理性能至关重要。不同的获取 RDD 方式和运行模式下,分区数的设定都有其特定的规则。同时,高阶函数和 Lambda 表达式是提高代码简洁性和效率的有力工具。通过合理运用高阶函数和 Lambda 表达式,我们可以在处理数据集合时更加灵活和高效。无论是简单的数学运算还是复杂的数据分析场景,这些知识都能帮助我们更好地利用 Spark 的强大功能。在实际的大数据处理项目中,深入掌握这些概念并灵活运用,可以使我们的 Spark 作业更加高效地运行,提高数据处理的速度和质量,为企业和组织从海量数据中获取有价值的信息提供有力支持。希望本文能帮助读者更好地理解和应用这些重要的 Spark 编程知识点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60468.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring学习笔记_30——事务接口PlatformTransactionManager

PlatformTransactionManager是Spring框架中事务管理的核心接口,它负责管理事务的创建、提交和回滚等操作。 源码 /** Copyright 2002-2020 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you m…

力扣 LeetCode 203. 移除链表元素(Day2:链表)

解题思路: 方法一:头节点和非头节点分开处理 方法二:定义一个dummy虚拟节点,后面的节点就可以采用相同的处理方式 注意: cur需要指向要删除的节点的上一个节点,因为要越过这一个被删除的节点 class Sol…

Python小游戏25——黄金矿工

首先,你需要安装Pygame库。 如果你还没有安装,可以使用以下命令进行安装: 【bash】 pip install pygame 【python】代码展示 import pygame import random # 初始化Pygame pygame.init() # 设置屏幕尺寸 screen_width 800 screen_height 60…

鸿蒙5.0版开发:订阅卡死事件(ArkTS)

在HarmonyOS 5.0中,开发者可以通过ArkTS订阅应用的卡死事件,以便在应用出现卡死时进行相应的处理。这不仅可以帮助开发者监控应用的稳定性,还可以在问题发生时快速定位问题原因。本文将详细介绍如何在ArkTS中订阅卡死事件,并提供示…

IEC60870-5-104 协议源码架构详细分析

IEC60870-5-104 协议源码架构 前言一、资源三、目录层级一二、目录层级二config/lib60870_config.hdependencies/READMEexamplesCMakeLists.txtcs101_master_balancedcs104_client_asyncmulti_client_servertls_clienttls_server说明 make这些文件的作用是否需要导入这些文件&a…

turbo c 2.0 画螺栓

代码; #include<graphics.h> void bolt(x0,y0,d,l) int x0,y0,d,l; {int x1,x2,x3,x4,x5,x6,x7,x8;int y1,y2,y3,y4,y5,r1,r2,b,c;if(l>2*d) b2*d;else b1;r11.5*d;r20.38*d;c0.1*d;x1x0-0.7*d;x2x0-0.61*d;x3x0-0.32*d;x4x00.8*d;x5x0l-b;x6x0l-c;x7x0l-0.05*d;x8x0…

网络服务综合项目-博客

一、运行环境&#xff1a; 主机主机名系统服务192.168.31.128Server-WebLinuxWeb192.168.31.129Server-NFS-DNSLinuxNFS 二、基础配置&#xff1a; 配置主机名开启防火墙并配置部分开启selinux并配置服务器之间使用ntp.aliyun.com进行时间同步服务器之间实现ssh免密登录 三…

衡石科技BI如何助力企业实现数字化转型

在当今数字化浪潮席卷全球的背景下&#xff0c;企业纷纷寻求通过数字化转型来降低成本、提升效率、优化决策&#xff0c;从而在激烈的市场竞争中脱颖而出。衡石科技&#xff0c;作为一家专注于商业智能&#xff08;BI&#xff09;与数据分析的创新型企业&#xff0c;通过其先进…

leetcode86:分隔链表

给你一个链表的头节点 head 和一个特定值 x &#xff0c;请你对链表进行分隔&#xff0c;使得所有 小于 x 的节点都出现在 大于或等于 x 的节点之前。 你应当 保留 两个分区中每个节点的初始相对位置。 示例 1&#xff1a; 输入&#xff1a;head [1,4,3,2,5,2], x 3 输出&am…

Android Mobile Network Settings | APN 菜单加载异常

问题 从log看是有创建APN对应的Controller&#xff08;功能逻辑是ok的&#xff09;&#xff0c;但是Mobile Network Settings无法显示&#xff08;UI异常&#xff09;。 日志分析 看似APN 菜单已经创建了&#xff0c;实际上并没有显示。 11-12 07:01:28.150 8773 8773 D Pr…

上海市计算机学会竞赛平台2020年4月月赛丙组永恒的生命游戏

题目背景 2020年4月11日&#xff0c;英国数学家 约翰霍顿康威&#xff08;John Horton Conway&#xff09;因为新型冠状病毒肺炎不幸逝世。他在群论、数论、代数、几何拓扑、理论物理、组合博弈论和几何等领域&#xff0c;都做出了重大贡献。他的离去是人类文明的损失。他最著…

php 之添加图片水印,根据比例计算水印的新尺寸

以下是 _imgWatermark 函数的中文注释和解析。该函数用于在图像上添加水印&#xff1a; function _imgWatermark($src_image, $water_image, $path_image , $position 10, $pct 30, $angle 15) {// 检查源图和水印图文件是否存在if (!is_file($src_image)) {$error 源图不…

表格理解专题(五)表头和数据项定义

在 Excel 中&#xff0c;表头和数据项的区分主要取决于数据的组织结构。简单来说&#xff0c;表头 是用来描述数据含义的标签&#xff0c;而 数据项 是表格中存储的实际值。下面我会列举不同情况下&#xff0c;什么是表头&#xff0c;什么是数据项。 1. 表头的定义 表头通常是…

Day09 C++ 存储类

2024.11.12 C 存储类 一、C 存储类 存储类定义 C 程序中变量/函数的范围&#xff08;可见性&#xff09;和生命周期。这些说明符放置在它们所修饰的类型之前。下面列出 C 程序中可用的存储类&#xff1a; auto&#xff1a;这是默认的存储类说明符&#xff0c;通常可以省略不…

FS8x 功能安全

fail-safe是电独立的和物理隔离的。fail-safe由自己的参考电压和电流提供,有自己的振荡器,有重复的模拟路径以最小化常见的故障,并有LBIST/ABIST来覆盖潜在故障。fail-safe根据设备部件号提供ASIL B或ASIL D遵从性。除非另有规定,fail-safe定时来自故障安全振荡器,其精度为…

项目模块十七:HttpServer模块

一、项目模块设计思路 目的&#xff1a;实现HTTP服务器搭建 思想&#xff1a;设计请求路由表&#xff0c;记录请求方法与对应业务的处理函数映射关系。用户实现请求方法和处理函数添加到路由表&#xff0c;服务器只接受请求并调用用户的处理函数即可。 处理流程&#xff1a; …

内网域环境、工作组、局域网等探针方案

1. 信息收集 1.1 网络收集 了解当前服务器的计算机基本信息&#xff0c;为后续判断服务器角色&#xff0c;网络环境做准备 systeminfo 详细信息 net start 启动服务 tasklist 进程列表 schtasks 计划任务&#xff08;受权限影响&#xff09; 了解当前服务器的网络接口信息…

什么是量化交易

课程大纲 内容初级初识量化&#xff0c;理解量化 初识量化 传统量化和AI量化的区别 量化思想挖掘 量化思想的挖掘及积累技巧 量化代码基础&#xff1a; python、pandas、SQL基础语法 金融数据分析 常用金融分析方式 常用因子分析方式 数据分析实战练习 回测及交易引擎 交易引擎…

OpenHarmony-1.启动流程

OpenHarmony启动流程 1.kernel的启动 流程图如下所示&#xff1a;   OpenHarmony(简称OH)的标准系统的底层系统是linux&#xff0c;所以调用如下代码&#xff1a; linux-5.10/init/main.c: noinline void __ref rest_init(void) {struct task_struct *tsk;int pid;rcu_sch…

简单理解回调函数

回调函数是编程中一个非常重要的概念&#xff0c;它是一种以函数作为参数并在某个事件或条件满足时被调用的函数。这种机制使得程序能够以非线性的方式执行&#xff0c;增加了代码的灵活性和模块化。下面我将详细解释回调函数的几个关键点&#xff1a; 定义和作用 回调函数是一…