Spark---RDD算子(单值类型Value)

文章目录

  • 1.RDD算子介绍
  • 2.转换算子
      • 2.1 Value类型
          • 2.1.1 map
          • 2.1.2 mapPartitions
          • 2.1.3 mapPartitionsWithIndex
          • 2.1.4 flatMap
          • 2.1.5 glom
          • 2.1.6 groupBy
          • 2.1.7 filter
          • 2.1.8 sample
          • 2.1.9 distinct
          • 2.1.10 coalesce
          • 2.1.11 repartition
          • 2.1.12 sortBy

1.RDD算子介绍

RDD算子是用于对RDD进行转换(Transformation)或行动(Action)操作的方法或函数。通俗来讲,RDD算子就是RDD中的函数或者方法,根据其功能,RDD算子可以分为两大类:
转换算子(Transformation): 转换算子用于从一个RDD生成一个新的RDD,但是原始RDD保持不变。常见的转换算子包括map、filter、flatMap等,它们通过对RDD的每个元素执行相应的操作来生成新的RDD。
行动算子(Action): 行动算子触发对RDD的实际计算,并返回计算结果或将结果写入外部存储系统。与转换算子不同,行动算子会导致Spark作业的执行。如collect方法。

2.转换算子

RDD 根据数据处理方式的不同将算子整体上分为:
Value 类型:对一个RDD进行操作或行动,生成一个新的RDD。
双 Value 类型:对两个RDD进行操作或行动,生成一个新的RDD。
Key-Value类型:对键值对进行操作,如reduceByKey((x, y),按照key对value进行合并。

2.1 Value类型

2.1.1 map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

函数定义
def map[U: ClassTag](f: T => U): RDD[U]

代码实现:

    //建立与Spark框架的连接val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件val sparkRdd = new SparkContext(rdd) //读取配置文件val mapRdd: RDD[Int] = sparkRdd.makeRDD(List(1, 2, 3, 4))//对mapRdd进行转换val mapRdd1 = mapRdd.map(num => num * 2)//对mapRdd1进行转换val mapRdd2 = mapRdd1.map(num => num + "->")mapRdd2.collect().foreach(print)sparkRdd.stop();//关闭连接

在这里插入图片描述

2.1.2 mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

函数定义
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

mapPartitions在处理数据的时候因为是批处理,相对于map来说处理效率较高,但是如果数据量较大的情况下使用mapPartitions可能会造成内存溢出,因为mapPartitions会将分区内的数据全部加载到内存中。此时更推荐使用map。

2.1.3 mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

函数定义
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

实现只保留第二个分区的数据

    val mapRdd: RDD[Int] = sparkRdd.makeRDD(List(1, 2, 3, 4),2)val newRdd: RDD[Int] = mapRdd.mapPartitionsWithIndex((index, iterator) => {if (index == 1) iteratorelse Nil.iterator})newRdd.collect().foreach(println)
2.1.4 flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

       //建立与Spark框架的连接val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件val sparkRdd = new SparkContext(rdd) //读取配置文件val rdd1: RDD[List[Int]] = sparkRdd.makeRDD(List(List(1, 2), List(3, 4)))val rdd2: RDD[String] = sparkRdd.makeRDD(List("Hello Java", "Hello Scala"), 2)val frdd1: RDD[Int] =rdd1.flatMap(list=>{list})val frdd2: RDD[String] =rdd2.flatMap(str=>str.split(" "))frdd1.collect().foreach(println)frdd2.collect().foreach(println)sparkRdd.stop();//关闭连接

在这里插入图片描述

2.1.5 glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变,glom函数的作用就是将一组数据转换为数组。

函数定义
def glom(): RDD[Array[T]]

    /建立与Spark框架的连接val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件val sparkRdd = new SparkContext(rdd) //读取配置文件val rdd1: RDD[Any] = sparkRdd.makeRDD(List(1,2,3,4),2)val value: RDD[Array[Any]] = rdd1.glom()value.collect().foreach(data=> println(data.mkString(",")))sparkRdd.stop();//关闭连接

在这里插入图片描述

2.1.6 groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。 极限情况下,数据可能被分在同一个分区中

函数定义
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

	    //按照奇偶分组val rdd1: RDD[Int] = sparkRdd.makeRDD(List(1,2,3,4),2)val value = rdd1.groupBy(num => num % 2)value.collect().foreach(println)//将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。val rdd2: RDD[String] = sparkRdd.makeRDD(List("Hello", "hive", "hbase", "Hadoop"))val value1: RDD[(Char, Iterable[String])] = rdd2.groupBy(str => {str.charAt(0)})value1.collect().foreach(println)

在这里插入图片描述

2.1.7 filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

函数定义
def filter(f: T => Boolean): RDD[T]

	//获取偶数val dataRDD = sparkRdd.makeRDD(List(1, 2, 3, 4), 1)val value1 = dataRDD.filter(_ % 2 == 0)
2.1.8 sample

函数定义
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

根据指定的规则从数据集中抽取数据

参数具体意义:
1.抽取数据不放回withReplacement: Boolean, 该参数表示抽取不放回,此时采用伯努利算法(false)fraction: Double,该参数表示抽取的几率,范围在[0,1]之间,0:全不取;1:全取;seed: Long = Utils.random.nextLong): RDD[T] 该参数表示随机数种子2.抽取数据放回withReplacement: Boolean, 该参数表示抽取放回,此时采用泊松算法(true)fraction: Double,该参数表示重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数seed: Long = Utils.random.nextLong): RDD[T] 该参数表示随机数种子
2.1.9 distinct

将数据集中重复的数据去重

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    val dataRDD = sparkRdd.makeRDD(List(1, 2, 3, 4, 1, 2), 6)val value = dataRDD.distinct()

在这里插入图片描述

2.1.10 coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]

    //初始Rdd采用6个分区val dataRDD = sparkRdd.makeRDD(List(1, 2, 3, 4, 1, 2), 6)//将分区数量缩减至2个val value = dataRDD.coalesce(2)

在coalesce中默认不开启shuffle,在进行分区缩减的时候,数据不会被打散。
在这里插入图片描述

2.1.11 repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

repartition内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。
在这里插入图片描述

	//将分区数量从2个提升至4个val dataRDD = sparkRdd.makeRDD(List(1, 2, 3, 4, 1, 2), 2)val dataRDD1 = dataRDD.repartition(4)
2.1.12 sortBy

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

def sortBy[K](
f: (T) => K, 该参数表述用于处理的函数
ascending: Boolean = true, 该参数表示是否升序排序
numPartitions: Int = this.partitions.length) 该参数表示设置分区数量
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

    val dataRDD = sparkRdd.makeRDD(List(1, 2, 3, 4, 1, 2), 2)//按照初始数据降序排列val dataRDD1 = dataRDD.sortBy(num => num, false, 4)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/594814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣225. 用队列实现栈

题目 请你仅使用两个队列实现一个后入先出(LIFO)的栈,并支持普通栈的全部四种操作(push、top、pop 和 empty)。 实现 MyStack 类: void push(int x) 将元素 x 压入栈顶。int pop() 移除并返回栈顶元素。int…

【UEFI基础】EDK网络框架(基础说明)

基础说明 UEFI中的网络框架大致如下: 红框部分是实现UEFI的EDK2开源项目中网络框架自带的实现,红框之外的部分需要网卡设备商提供驱动。UEFI下通常推荐使用最右边的形式,即网卡设备商提供实现了UNDI的网卡驱动。因此UEFI网络框架的另一个形式…

线性代数_对称矩阵

对称矩阵是线性代数中一种非常重要的矩阵结构,它具有许多独特的性质和应用。下面是对称矩阵的详细描述: ### 定义 对称矩阵,即对称方阵,是指一个n阶方阵A,其转置矩阵等于其本身,即A^T A。这意味着方阵A中的…

一种多串口冗余设计解决思路

需求 总共11个串口,现在每个串口接收BUFF分配1024个字节的收发缓冲区。原始方法将11个串口全部进行初始化,分配大串口,由于单片机RAM不足,现在将串口1和串口2固定分配为大串口,串口3-11随机选择2个作为大串口&#xf…

python 知识点

ping ping 不能带协议,如:ping baidu.com 引入包顺序 分三级,第一级是 Python 的内置库,第二级是第三方库,第三级是自己的代码。每一级用一个空行间隔 运算符 keyError:key不存在 列表推导式 创建字典 字…

git 如何撤销历史某次merge

git,如何 撤销某一次历史提交或merge,并保留该版本的后续提交? 场景1: 你有两个功能迭代版本的分支,一个是 15 号上线,一个是25号上线。5号的时候产品突然说,这两个版本一起上,然后…

Moonsong Labs与Web3演变

作者:Derek Yoo 创建Moonsong Labs的理由 我们创建了Moonsong Labs,其使命是创建推动Web3采用的软件基础设施协议。我们的动力来自这样一个观念,即Web3使人类相互交往更加透明、高效和公正。这无疑是一个值得努力实现的目标,但更…

变量和函数提升(js的问题)

• js解释执行 • 变量和函数提升 变量声明提前,函数声明提前 • 变量声明提前:值停留在本地 • 函数声明提前:整个函数体提前 如果是var赋值声明的函数,变量提前,函数体停留在本地 1、变量提…

Flutter 中的 Stream:异步编程的利器

在Flutter中,异步编程是非常重要的一部分,特别是在处理用户输入、网络请求或其他涉及时间的操作时。Flutter提供了一种强大的工具,称为Stream,用于简化异步编程的过程。 什么是 Stream? Stream是一种用于处理异步数据…

从0到1实战微服务架构之打造在线营销平台(一)

目录 一、前言 二、在线营销平台涉及的微服务 2.1营销中心 2.2商品中心 2.3库存中心 2.4订购中心 2.5用户中心 三、技术栈 3.1 微服务框架 3.2 中间件的使用 3.3 持续集成 3.4 系统部署 3.5 系统监控 四、总结 一、前言 随着人们生活水平的提高,消费…

【REST2SQL】03 GO读取JSON文件

REST2SQL需要一些配置信息,用JSON文件保存,比如config.json 1 创建config.json配置文件 {"hostPort":"localhost:5217","connString":"oracle://blma:5217127.0.0.1:1521/CQYH","_oracle":"ora…

linux 清空nat,linux 命令iptables -t nat

iptables -t nat -vnL 用详细方式列出 nat 表所有链的所有规则,只显示 IP 地址和端口号 iptables -L 粗略列出 filter 表所有链及所有规则 iptables -t nat -vxnL PREROUTING 用详细方式列出 nat 表 PREROUTING 链的所有规则以及详细数字,不反解 ipt…

ubuntu 执行apt-get update报错

系统是Ubuntu22.04 执行apt-get update 遇到如下情况 E: 无法下载 https://mirrors.tuna.tsinghua.edu.cn/ubuntu/dists/jammy/main/binary-arm64/Packages 404 Not Found [IP: 101.6.15.130 443] E: 无法下载 https://mirrors.tuna.tsinghua.edu.cn/ubuntu/dists/jammy-upda…

Kali/Debian Linux 安装Docker Engine

0x01 卸载旧版本 在安装Docker Engine之前,需要卸载已经安装的可能有冲突的软件包。一些维护者在他们的仓库提供的Docker包可能是非Docker官方发行版,须先卸载这些软件包,然后才能安装Docker官方正式发行的Docker Engine版本。 要卸载的软件…

RocketMQ5.0新组件Proxy

前言 RocketMQ 4.x 版本之前,一套完整的 MQ 服务包含的组件有:Namesrv、Broker、Consumer、Producer。 RocketMQ 5.0 版本之后,官方引入了一个新的组件:Proxy,它的作用是什么呢? 架构对比 RocketMQ 4.x …

Day22 二叉树part08 235.二叉搜索树的最近公共祖先 701.二叉搜索树中的插入操作 450.删除二叉搜索树中的节点

二叉树part08 235.二叉搜索树的最近公共祖先 701.二叉搜索树中的插入操作 450.删除二叉搜索树中的节点 235. 二叉搜索树的最近公共祖先 方法一:递归法(利用二叉搜索树性质) class Solution { public:TreeNode* lowestCommonAncestor(TreeN…

OpenCV-14图片的四则运算和图片的融合

一、图片的四则运算 1. 加法运算 通过使用API add来执行图像的加法运算 cv2.add(src1, src2)需要再其中传入两张图片。 图片就是矩阵,图片的加法运算就是矩阵的加法运算。 因此加法运算中要求两张图的shape必须是相同的。 首…

基于SpringBoot的旅游网站

目录 前言 开发环境以及工具 项目功能介绍 用户端: 管理端: 详细设计 用户端首页 登录页面 管理端页面 源码获取 前言 本项目是一个基于IDEA和Java语言开发基于SpringBoot的旅游网站。应用包含管理端和用户端等多个功能模块。 改革开放以来&am…

用HTML的原生语法实现两个div子元素在同一行中排列

代码如下&#xff1a; <div id"level1" style"display: flex;"><div id"level2-1" style"display: inline-block; padding: 10px; border: 1px solid #ccc; margin: 5px;">这是第一个元素。</div><div id"…

漏洞复现-任我行CRM系统SmsDataList接口SQL注入漏洞(附漏洞检测脚本)

免责声明 文章中涉及的漏洞均已修复&#xff0c;敏感信息均已做打码处理&#xff0c;文章仅做经验分享用途&#xff0c;切勿当真&#xff0c;未授权的攻击属于非法行为&#xff01;文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直接或者间接的…