2023_Spark_实验十二:Spark高级算子使用

掌握Spark高级算子在代码中的使用
相同点分析
三个函数的共同点,都是Transformation算子。惰性的算子。
不同点分析
map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。
mapPartitions函数是一个partition数据一起处理,也即是说,mapPartitions函数的输入是一个partition的所有数据构成的“迭代器”,然后函数里面可以一条一条的处理,在把所有结果,按迭代器输出。也可以结合yield使用效果更优。
rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。
两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。
mapPartitionsWithIndex函数,其实和mapPartitions函数区别不大,因为mapPartitions背后调的就是mapPartitionsWithIndex函数,只是一个参数被close了。mapPartitionsWithIndex的函数可以或得partition索引号;
假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
mapPartitionsWithIndex则是带上分区下标进行操作。
1、mapPartitionWithIndex

import org.apache.spark.{SparkConf,SparkContext}
object mapPartitionWithIndex {def getPartInfo:(Int,Iterator[Int]) => Iterator[String] = (index:Int,iter:Iterator[Int]) =>{
iter.map(x =>"[ PartId " + index +", elems: " + x + " ]")
}def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddMapPartitionsWithIndexDemo")
val sc = new SparkContext(conf)val rdd1 = sc.parallelize(List(1,2,3,4,5,9,6,7,8),numSlices =3 )
val rdd2 = rdd1.mapPartitionsWithIndex(getPartInfo)
rdd2.collect().foreach(println)}}

2、aggregate
首先我们来创建一个 RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24scala> rdd1.collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)

这个 RDD 仅有 1 个分片,包含 5 个数据: 1, 2, 3, 4, 5 。
 
然后我们来应用一下 aggregate 方法。
 
在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。
scala> 
// Entering paste mode (ctrl-D to finish)
def pfun1(p1: Int, p2: Int): Int = {
p1 * p2
}// Exiting paste mode, now interpreting.
pfun1: (p1: Int, p2: Int)Intscala>scala> 
// Entering paste mode (ctrl-D to finish)
def pfun2(p3: Int, p4: Int): Int = {
p3 + p4
}// Exiting paste mode, now interpreting.
pfun2: (p3: Int, p4: Int)Intscala>
接着是第 2 个函数。就不再解释什么了。
 
然后终于可以开始应用我们的 aggregate 方法了。
scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363scala>
输出结果是 363 !这个结果是怎么算出来的呢?
 
首先我们的 zeroValue 即初值是 3 。然后通过上面小节的介绍,我们知道首先会应用 pfun1 函数,因为我们这个 RDD 只有 1 个分片,所以整个运算过程只会有一次 pfun1 函数调用。它的计算过程如下:
 
首先用初值 3 作为 pfun1 的参数 p1 ,然后再用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后,得到的结果是  3 * 1 * 2 * 3 * 4 * 5 = 360 。这个 pfun1 的应用过程有点像是 “在 RDD 中滑动计算” 。
在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。
 
pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363 

import org.apache.spark.{SparkConf, SparkContext}object RddAggregateDemo {def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddDemos")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("12","23","345",""),numSlices = 2)
//下面代码等价与rdd1.aggregate("")(x+y) => math.min(x.length,y.length)),(x,y)=>x+y))
val result = rdd1.aggregate("")(func1,func2)
println(result)
}
//(x,y)=>math.min(x.Length,y.length)
def func1:(String,String) => String =(x:String,y:String) =>{
println("<x: " + x + ",x.len: " + x.length + ">,<y: " + y +",y.len: " + y.length + ">")
val ret =math.min(x.length,y.length).toString
println("func1 ret:" +ret)
ret
}
//(x+y) => x+y
def func2:(String,String)=>String=(x:String,y:String) =>{
println("========" +(x+y))
x+y
}
}

3、aggregateByKey
通过scala集合以并行化方式创建一个RDD
scala> val pairRdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
pairRdd 这个RDD有两个区,一个区中存放的是:
(“cat”,2),(“cat”,5),(“mouse”,4)
另一个分区中存放的是:
(“cat”,12),(“dog”,12),(“mouse”,2)
然后,执行下面的语句
scala > pairRdd.aggregateByKey(100)(math.max(_ , _), _ + _ ).collect
结果:
res0: Array[(String,Int)] = Array((dog,100),(cat,200),(mouse,200)
下面是以上语句执行的原理详解:
aggregateByKey的意思是:按照key进行聚合
第一步:将每个分区内key相同数据放到一起
分区一
(“cat”,(2,5)),(“mouse”,4)
分区二
(“cat”,12),(“dog”,12),(“mouse”,2)
第二步:局部求最大值
对每个分区应用传入的第一个函数,math.max(_ , _),这个函数的功能是求每个分区中每个key的最大值
这个时候要特别注意,aggregateByKe(100)(math.max(_ , _),_+_)里面的那个100,其实是个初始值
在分区一中求最大值的时候,100会被加到每个key的值中,这个时候每个分区就会变成下面的样子
分区一
(“cat”,(2,5,100)),(“mouse”,(4,100))
然后求最大值后变成:
(“cat”,100), (“mouse”,100)
分区二
(“cat”,(12,100)),(“dog”,(12.100)),(“mouse”,(2,100))
求最大值后变成:
(“cat”,100),(“dog”,100),(“mouse”,100)
第三步:整体聚合
将上一步的结果进一步的合成,这个时候100不会再参与进来
最后结果就是:
(dog,100),(cat,200),(mouse,200)
对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。
import org.apache.spark.{SparkConf, SparkContext}object RddAggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("RddAggregateByKeyDemo")
val sc = new SparkContext(conf)
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),numSlices = 2)
val rdd = pairRDD.aggregateByKey(100)(func2,func2)
val resultArray = rdd.collect
resultArray.foreach(println)
sc.stop()}
def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {
iter.map(x=>"[partID:" + index + ",val: " +x +"]")
}//(x,y)=>math.max(x+y)
def func1:(Int,Int) => Int =(x:Int,y:Int) =>{
println("<x: " + x + "," +",y:"+ y + ">")
val ret =math.max(x,y)
println("func1 max:" +ret)
ret
}
//(x+y) => x+y
def func2:(Int,Int)=>Int=(x:Int,y:Int) =>{
println("========func2 x :" + x + ",y:"+y)
println("========func2 ret =====" +(x+y))
x+y
}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/83648.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络编程day03(UDP中的connect函数、tftp)

今日任务&#xff1a;tftp的文件上传下载&#xff08;服务端已经准备好&#xff09; 服务端&#xff08;已上传&#xff09; 客户端&#xff1a; 代码&#xff1a; #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h…

编译工具:CMake(八) | cmake 常用指令

编译工具&#xff1a;CMake&#xff08;八&#xff09; | cmake 常用指令 基本指令 基本指令 ADD_DEFINITIONS向 C/C编译器添加-D 定义&#xff0c;比如:ADD_DEFINITIONS(-DENABLE_DEBUG-DABC)&#xff0c;参数之间用空格分割。 如果你的代码中定义了#ifdef ENABLE_DEBUG #end…

Java 调用 GitLabAPI 获取仓库里的文件件 提交记录

1. 需求 项目组 需要做统计&#xff0c;获取每个开发人员的代码提交次数&#xff0c;提交时间&#xff0c;提交人等等&#xff0c;因代码在GitLab上管理&#xff0c;所以需要调用GitLabAPI来获取。 2. 开发 API官网&#xff1a;https://docs.gitlab.com/ee/api/ 2.1 创建自…

java Spring Boot验证码美化,白色背景 随机四个数 每个字随机颜色

我前文 Spring Boot2.7生成用于登录的图片验证码讲述了生成验证码的方法 但是这样生成验证码 非常难看 比较说 验证码是要展示到web程序中的 这样让用户看着 属实不太好 我们可以将接口改成 GetMapping(value "/captcha", produces MediaType.IMAGE_PNG_VALUE) …

RocketMQ 发送事务消息

文章目录 事务的相关理论事务ACID特性CAP 理论BASE 理论 事务消息应用场景MQ 事务消息处理处理逻辑 RocketMQ 事务消息处理流程官网事务消息流程图 rocketmq-client-java 示例&#xff08;gRPC 协议&#xff09;创建事务主题生产者消费者 rocketmq-client 示例&#xff08;Remo…

代码随想录Day1 数组基础

本文详细说明和思路来源于: 代码随想录 视频讲解: 手把手带你撕出正确的二分法 | 二分查找法 | 二分搜索法 | LeetCode&#xff1a;704. 二分查找_哔哩哔哩_bilibili Leetcode T 704 题目链接 704. 二分查找 - 力扣&#xff08;LeetCode&#xff09; 题目概述1: 思路: 1.因…

基于微信小程序的高校宿舍信息管理系统设计与实现(源码+lw+部署文档+讲解等)

前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f447;&#x1f3fb;…

局域网下共享文件夹全流程

请注意&#xff1a;配置共享文件夹以便他人无需输入账户和密码访问可能带来安全风险。请确保你明白这一点并在适当的网络环境中操作。 以下说明是基于 Windows 系统的&#xff1a; 步骤 1&#xff1a;共享文件夹 找到你想要共享的文件夹&#xff0c;右击选择“属性”。 转到…

Docker从认识到实践再到底层原理(六-1)|Docker容器基本介绍+命令详解

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 然后就是博主最近最花时间的一个专栏…

RabbitMQ学习总结(11)—— RabbitMQ 核心概念与架构

MQ 技术概述 什么是 MQ MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服务。使用…

网页的快捷方式打开自动全屏--Chrome、Firefox 浏览器相关设置

Firefox 的全屏方式与 Chrome 不同&#xff0c;Chrome 自带全屏模式以及APP模式&#xff0c;通过简单的参数即可设置&#xff0c;而Firefox暂时么有这个功能&#xff0c;Firefox 的全屏功能可以通过全屏插件实现。 全屏模式下&#xff0c;按 F11 不会退出全屏&#xff0c;鼠标…

grafana对指标进行组合计算

在使用Grafana配置图表看板时&#xff0c;我们可能需要对多个查询条件筛选出来的结果进行计算&#xff0c;把计算结果生成最终的图表。此时需要用到transform功能【add field from calculation】&#xff1a;

flutter iOS 缺少通知权限,缺少位置权限

App Store Connect 亲爱的开发者, 我们发现了一个或多个问题与您的应用程序&#xff0c;“hayya附近的朋友Chat&Meet”1.0.3(1)最近的交付。您的交付是成功的&#xff0c;但您可能希望在您的下一次交付纠正以下问题: ITMS-90078:缺少推送通知授权-你的应用程序似乎注册了…

pytorch的卷积层池化层和非线性变化 和机器学习线性回归

卷积层&#xff1a; 两个输出的情况 就会有两个通道 可以改变通道数的 最简单的神经网络结构&#xff1a; nn.Mudule就是继承父类 super执行的是 先执行父类函数里面的 forward执行的就是前向网络&#xff0c;就是往前推进的&#xff0c;当然也有反向转播&#xff0c;那就是…

福建福州大型钢结构件3D扫描全尺寸三维测量平面度平行度检测-CASAIM中科广电

高精度三维扫描技术已经在大型工件制造领域发挥着重要作用&#xff0c;特别是在质量检测环节&#xff0c;高效、高精度&#xff0c;可以轻松实现全尺寸三维测量。本期&#xff0c;我们要分享的应用是在大型钢结构件的关键部位尺寸及形位公差检测。 钢结构件&#xff0c;是将多…

fabic.js Quadratic Curve /可控制的曲线

需要绘制一条可控制的贝塞尔曲线&#xff0c;发现fabic官网中一个demo有点类似。感兴趣的可以移步官网查看demo。 官网的demo是对于html 而言的&#xff0c;放在vue中需要变换一下&#xff0c;具体代码如下&#xff1a; <template><div class"dashboard-contai…

个人所思所想录

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;CSDN实力新星&#xff0c;后端开发两年经验&#xff0c;曾担任甲方技术代表&#xff0c;业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开…

Java版分布式微服务云开发架构 Spring Cloud+Spring Boot+Mybatis 电子招标采购系统功能清单

项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以及审…

在VMware虚拟机中固定CentOS系统ip(使用桥接模式)

目录 一、前置说明二、前置准备2.1、切换虚拟机网络为桥接模式2.2、查看本机网络信息 三、配置CentOS系统IP3.1、进入系统输入ip addr 查看本机网络配置名称3.2、查看网络配置目录&#xff0c;网络配置文件名称3.3、修改网络配置文件 ifcfg-ens33 固定IP3.4、重启网络 一、前置…

linux下解决tomcat错误问题

错误一&#xff1a; Linux下Tomcat启动报错&#xff1a;Neither the JAVA_HOME nor the JRE_HOME environment variable is defined 原因&#xff1a;可能是Linux环境变了&#xff0c;需要在catalina.sh文件里指定JDK路径 解决方式&#xff1a; 在/bin/catalina.sh配置文件中加…