Spark---RDD(Key-Value类型转换算子)

文章目录

  • 1.RDD Key-Value类型
      • 1.1 partitionBy
      • 1.2 reduceByKey
      • 1.3 groupByKey
          • reduceByKey和groupByKey的区别
          • 分区间和分区内
      • 1.4 aggregateByKey
          • 获取相同key的value的平均值
      • 1.5 foldByKey
      • 1.6 combineByKey
      • 1.7 sortByKey
      • 1.8 join
      • 1.9 leftOuterJoin
      • 1.10 cogroup

1.RDD Key-Value类型

Key-Value类型的算子即对键值对进行操作。

1.1 partitionBy

将数据按照指定的 Partitioner(分区器) 重新进行分区。Spark 默认的分区器为HashPartitioner,Spark除了默认的分区器外,常见的分区器还有:RangePartitioner、Custom Partitioner、SinglePartitioner等。

函数定义:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

	//使用HashPartitioner分区器并设置分区个数为2val data1: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)data1.partitionBy(new HashPartitioner(2));data1.collect().foreach(println)

在这里插入图片描述

1.2 reduceByKey

可以将数据按照相同的 Key 对 Value 进行聚合

函数定义:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

	//将数据按照相同的key对value进行聚合val data1: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))val data2: RDD[(String, Int)] = data1.reduceByKey((x: Int, y: Int) => {x + y})data2.collect().foreach(println)

在这里插入图片描述

1.3 groupByKey

将数据源的数据根据 key 对 value 进行分组

函数定义:
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

    val dataRDD1 = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))val data1 = dataRDD1.groupByKey()//指定分区个数为2val data2 = dataRDD1.groupByKey(2)//指定分区器和分区个数val data3 = dataRDD1.groupByKey(new HashPartitioner(2))data1.collect().foreach(println)println("-------------------->")data2.collect().foreach(println)println("-------------------->")data3.collect().foreach(println)

在这里插入图片描述

reduceByKey和groupByKey的区别

从功能的角度来看:reduceByKey包含了分组和聚合功能,而groupByKey只包含了分组功能。
从shuffle的角度来看:为了避免占用过多的内存空间,reduceByKey和groupByKey在执行的过程中,都会执行shuffle操作,将数据打散写入到磁盘的临时文件中,而reduceByKey在进行shuffle前会对数据进行预聚合的操作,致使shuffle的效率得到的提升,因为减少了落盘的数据量。但是groupByKey在shuffle前不会进行预聚合操作。所以,reduceByKey在进行分组的时候,效率相对groupByKey来说较高。

reduceByKey:
在这里插入图片描述

groupByKey:

在这里插入图片描述

分区间和分区内

分区间: 顾名思义,分区间就是指的多个分区之间的操作。如reduceByKey在shuffle操作后将不同分区的数据传输在同一个分区中进行聚合。
分区内: 分区内字面意思指的是单个分区内之间的操作。如reduceByKey的预聚合功能就是在分区内完成

1.4 aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算,如reduceByKey中分区间和分区内都是聚合操作,而使用aggregateByKey可以设置分区间和分区内执行不同的操作。

函数定义:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

    //取出每个分区内相同 key 的最大值然后分区间相加// aggregateByKey 算子是函数柯里化,存在两个参数列表// 1. 第一个参数列表中的参数表示初始值// 2. 第二个参数列表中含有两个参数// 2.1 第一个参数表示分区内的计算规则// 2.2 第二个参数表示分区间的计算规则val data1 = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)val data2 = data1.aggregateByKey(0)((x,y)=>{Math.max(x,y)},(x,y)=>{x+y})data2.collect().foreach(println)**

在这里插入图片描述
注意:最终的结果会受到设置的初始值的影响,返回结果的值的类型和初始值保持一致。

获取相同key的value的平均值
    val data1:RDD[(String,Int)] = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4),("b",5),("a",6)),2)//设置初始值,初始值为一个元组,元组第一个元素表示value,第二个表示出现次数,初始默认都为0val data2:RDD[(String,(Int,Int))] = data1.aggregateByKey((0,0))((t, v)=> {(t._1 + v, t._2 + 1)} ,//分区内计算(t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)}//分区间计算)//和除以次数求出平均值val data3 = data2.mapValues({case (sum, count) => sum / count})data3.collect().foreach(println)

在这里插入图片描述

1.5 foldByKey

当分区内和分区间的计算规则相同的时候,aggregateByKey 就可以简化为 foldByKey

函数定义:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("a",3)))val dataRDD2 = dataRDD1.foldByKey(0)(_+_)dataRDD2.collect().foreach(println)

在这里插入图片描述

1.6 combineByKey

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

函数定义:

def combineByKey[C](
createCombiner: V => C,//对数据进行转换
mergeValue: (C, V) => C, //分区内合并
mergeCombiners: (C, C) => C): RDD[(K, C)] //分区间合并

//将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个 key 的平均值
val rddSource: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
val combinRdd: RDD[(String, (Int, Int))] = rddSource.combineByKey(((x:Int)=>{(x,1)}),//对每个value进行转换,转换后为(value,1),第一个元素为值,第二个元素为出现的次数((t1:(Int,Int),v)=>{(t1._1+v,t1._2+1)}),//分区内合并((t1,t2)=>{(t1._1+t2._1,t1._2+t2._2)})//分区间合并
)//mapValues算子是在key保持不变的时候对value进行操作val mapRdd: RDD[(String, Int)] = combinRdd.mapValues({case ((sum: Int, count: Int)) => sum / count})mapRdd.collect().foreach(println)

在这里插入图片描述
由此看出,combineByKey和aggreateByKey的不同之处在于,combineByKey可以不设置初始值,只需要对第一个元素进行转换,转换到合适的计算格式即可。

1.7 sortByKey

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

函数定义:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

//升序排序val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("c",3)))val sortRdd: RDD[(String, Int)] = dataRDD1.sortByKey()sortRdd.collect().foreach(print)

在这里插入图片描述
sortByKey默认为升序排序,如果想要降序排序,只需要将sortByKey第一个参数修改为false即可。
在这里插入图片描述

1.8 join

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

函数定义:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

	//join操作相当于数据库中的内连接,在连接的时候自动去除两边的悬浮元组val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5), (3, 6)))rdd0.join(rdd1).collect().foreach(print)//修改rdd1,使其少了key=3的这个元素val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5)))rdd0.join(rdd1).collect().foreach(print)

在这里插入图片描述
在这里插入图片描述

1.9 leftOuterJoin

类似于 SQL 语句的左外连接

函数定义:

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5),(3, 6)))val rddRes = rdd0.leftOuterJoin(rdd1)rddRes.collect().foreach(print)

在这里插入图片描述

1.10 cogroup

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,即先对

函数定义:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b"),(3,"c")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5)))val rddRes = rdd0.cogroup(rdd1)rddRes.collect().foreach(print)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610105.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙南向开发—PWM背光(OpenHarmony技术)

背光驱动模型也是基于HDF框架开发的,整个框架如下: 现在以RK3568为例,来看看PWM背光整个驱动,这里使用的是PWM占空比控制的背光,默认基于hdf的pwm驱动已经OK! 需要注意的是:这里是基于HDF实现的…

安卓Android Studioy读写NXP ICODE2 15693标签源码

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?spma1z10.5-c-s.w4002-21818769070.11.4391789eCLwm3t&id615391857885 <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xm…

低轨卫星最低轨道高度及发展意义

一、天与空的区别 “空”指的是地球表面到大气层之内的高度范围&#xff0c;通过气球、飞机、飞艇等航空器可达&#xff1b;而大气层之外的空间&#xff0c;才可以称之为“天”&#xff0c;一般需要通过火箭才可到达。 1960年第53届巴塞罗那国际航空联合大会决议规定&#xf…

阿里云 WindowsServer 使用之 配置 SQL Server 允许远程连接

阿里云 WindowsServer 使用之 配置 SQL Server 允许远程连接 第一步&#xff1a;安装 SQL Server 数据库 这是一个很详细的安装教程&#xff0c;可以参考一下 安装SQL Server详细教程 需要注意&#xff1a;安装实例时&#xff0c;建议在‘身份验证模式’直接选择“混合模式”…

编码技巧(二) element-ui table中根据状态控制是否可以勾选

项目中使用element-ui时,表格中的数据有不同的状态,需要对某个状态的数据进行 勾选操作 如图所示: 只有id为12的符合条件可以进行勾选 <el-table-column type="selection" header-align="center" :selectable="selectable" align="c…

Maven 基础总结篇

Maven 基础总结篇 Maven是专门用于管理和构建Java项目的工具&#xff0c;它的主要功能有&#xff1a; 提供了一套标准化的项目结构&#xff1a;用于解决不同IDE&#xff08;例如eclipse与IDEA&#xff09;不同的项目结构的问题 提供了一套标准化的构建流程&#xff08;编译&…

Redis底层原理

持久化 Redis虽然是个内存数据库,但是Redis支持RDB和AOF两种持久化机制,将数据写往磁盘,可以有效地避免因进程退出造成的数据丢失问题,当下次重启时利用之前持久化的文件即可实现数据恢复。 RDB RDB持久化是把当前进程数据生成快照保存到硬盘的过程。所谓内存快照,就是…

网络爬虫中的代理IP应用与高效管理策略探析

在网络爬虫技术日益普及的今天&#xff0c;面对目标网站对访问频率、IP地址等的严格限制&#xff0c;如何合理、有效地利用和管理代理IP资源成为了一项至关重要的任务。本文将深入探讨代理IP在爬虫项目中的应用&#xff0c;并提出一套科学高效的管理策略。 一、代理IP在网络爬…

分布式I/O应用于智慧停车场的方案介绍

客户案例背景 目前车位检测技术有磁电技术、超声波技术、红外线技术、图像识别车位技术。考虑到例如电磁干扰、信号干扰等的环境因素影响&#xff0c;通常会采用组合使用的方式进行&#xff0c;如采用不同的传感器、应用不同的协议等&#xff0c;以便提高车位检测的准确性和实时…

xilinix 7系列器件生成已加密文件和已经过身份验证的文件

注释 &#xff1a;如需了解更多信息&#xff0c;请参阅《使用加密确保 7 系列 FPGA 比特流的安全》(XAPP1239)。 要生成加密比特流&#xff0c;请在 Vivado IDE 中打开已实现的设计。在主工具栏中&#xff0c;依次选择“Flow” → “Bitstream Settings”&#xff08;流程 >…

Go语言学习笔记(二)

Go语言的学习资源 以下是一些推荐的Go语言学习资源的链接&#xff1a; Go语言教程&#xff1a;https://golang.org/doc/Go by Example&#xff1a;Go by ExampleGolang Tutorials&#xff1a;https://golangtutorials.com/Go语言第一课&#xff08;慕课网&#xff09;&#x…

每周三提前预知:绝地求生27.2版本最早1月10日上线,交易所系统、召唤掩体等新功能上线

嗨&#xff0c;我是闲游盒 27.2新版本预计最早1月10日上线&#xff0c;届时会停机更新约9小时&#xff0c;大家注意合理安排游戏时间! 这次更新带来了很多荣都地图的新玩法&#xff0c;主打的交易所系统即将上线! PUBG官方已经发布了预告 交易所系统 而这次的交易所系统玩法…

常见排序算法及其稳定性分析

前言&#xff1a; 排序算法可以说是每一个程序员在学习数据结构和算法时必须要掌握的知识点&#xff0c;同样也是面试过程中可能会遇到的问题&#xff0c;在早些年甚至还会考冒泡排序。由此可见呢&#xff0c;掌握一些常见的排序算法是一个程序员的基本素养。虽然现在的语言标…

k8s的node亲和性和pod亲和性和反亲和性 污点 cordon drain

node亲和性和pod亲和性和反亲和性 污点 cordon drain 集群调度: schedule的调度算法 预算策略 过滤出合适的节点 优先策略 选择部署的节点 nodeName:硬匹配&#xff0c;不走调度策略&#xff0c;node01 nodeSelector:根据节点的标签选择&#xff0c;会走调度的算法 只…

PSoc62™开发板之PWM呼吸灯

实验目的 利用PWM动态调节输出功率达到控制LED呼吸变化的效果 实验准备 PSoc62™开发板&#xff08;开发板已经板载LED&#xff09; 板载资源 板载有多少pwm 创建工程例程&#xff0c;在libraries/HAL_Drivers/drv_pwm.h中查看BSP支持的pwm数量及对应的GPIO&#xff0c;可…

pgsql中epoch用法

问题描述 提示&#xff1a;这里描述项目中遇到的问题&#xff1a; 昨天又被叫回来加班,説是数据问题,又回来加班搞,到了以后发现数据没问题,那就是查询接口的事了,写查询接口的人用时间戳去查询,明明直接可以直接用日期查询,非得改成时间戳查询,结果还是有问题,接下来复盘一下…

centos安装gradle

1.将gradle.zip拷到centos 解压 2.配置环境变量 vim /etc/profile 在最后添加 export GRADLE_HOME/zx/gradle-8.5 export PATH$PATH:$GRADLE_HOME/bin:${PATH} 之后source /etc/profile gradle -version 安装成功

RK3566环境搭建

环境&#xff1a;vmware16&#xff0c;ubuntu 18.04 获取SDK前需要安装 sudo apt update sudo apt install -y repo git python 下载完成后先验证一下MD5码 md5sum rk356x_linux_release_v1.3.0b_20221213_split_dir/*firefly_split* 解压 rk3566ubuntu:/path/to$ mkdir ~…

【7-zip密码】7-Zip如何取消文件加密的密码

7z压缩包设置了密码&#xff0c;解压的时候就需要输入正确的密码才能顺利解压出文件&#xff0c;正常当我们解压文件或者删除密码的时候&#xff0c;虽然方法多&#xff0c;但是都需要输入正确的密码才能完成。忘记密码就无法进行操作。 那么&#xff0c;忘记了7z压缩包的密码…

Linux网络命令

文章目录 Linux网络网络配置命令1、ifconfig&#xff1a;查看网络接口信息&#xff08;显示所有活动网卡&#xff09;1.1 常用命令格式1.2 命令格式&#xff08;图文详解&#xff09;1.2.1 临时修改网卡名称1.2.2 永久修改网卡名称1.2.3 永久修改单个网卡 2、hostname&#xff…