03-240605-Spark笔记

03-240605

1. 行动算子-1

  • reduce

聚合

格式:

def reduce(f: (T, T) => T): T

例子:

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)
​val rdd = sc.makeRDD(List(1,2,3,4))
​// TODO - 行动算子
​//reduceval i: Int = rdd.reduce(_+_)println(i)

输出结果:

10

  • collect

采集

格式:

def collect(): Array[T]

例子:

        // collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组val ints: Array[Int] = rdd.collect()println(ints.mkString(","))

输出结果:

1,2,3,4

  • count

计数

格式:

def count(): Long

例子:

        // count : 数据源中数据的个数val cnt = rdd.count()println(cnt)

运行结果:

4

  • first

获取数据源的第一个数据

格式:

def first(): T

例子:

        // first : 获取数据源中数据的第一个val first = rdd.first()println(first)

输出结果:

1

  • take

获取数据源的N个数据

格式:

def take(num: Int): Array[T]

例子:

        // take : 获取N个数据val ints: Array[Int] = rdd.take(3)println(ints.mkString(","))

输出结果:

1,2,3

  • takeOrdered

数据排序后.再取第N个数据

格式:

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

例子:

        // takeOrdered : 数据排序后,取N个数据val rdd1 = sc.makeRDD(List(4,2,3,1))val ints1: Array[Int] = rdd1.takeOrdered(3)println(ints1.mkString(","))

输出结果:

1,2,3

  • aggregate

给定初始值,初始值参与分区内与分区间的计算

格式:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

例子:

        val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40// aggregateByKey : 初始值只会参与分区内计算// aggregate : 初始值会参与分区内计算,并且和参与分区间计算val result = rdd.aggregate(10)(_+_._+_)println(result)

输出结果:

40

  • fold

折叠操作,aggregate的简化版操作

格式:

def fold(zeroValue: T)(op: (T, T) => T): T

例子:

        //10 + 13 + 17 = 40// aggregateByKey : 初始值只会参与分区内计算// aggregate : 初始值会参与分区内计算,并且和参与分区间计算//val result = rdd.aggregate(10)(_+_, _+_)val result = rdd.fold(10)(_+_)println(result)

输出结果:

40

  • countByKey 与 countByValue

都是统计每种Key或者Value出现的个数

格式:

def countByKey(): Map[K, Long]

例子:

image-20240604213641365

        val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3)))//val intToLong: collection.Map[Int, Long] = rdd.countByValue()//println(intToLong)val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)

输出结果:

Map(a -> 3)

  • WordCount 不同的实现方式:

运用9种不同的方式实现WordCount

  1. 使用groupBy:

    // groupBydef wordcount1(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)}
  1. 使用groupByKey:

    // groupByKeydef wordcount2(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)}
  1. 使用reduceByKey:

    // reduceByKeydef wordcount3(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)}
  1. 使用aggregateByKey

    // aggregateByKeydef wordcount4(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)}
  1. 使用foldByKey:

    // foldByKeydef wordcount5(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)}
  1. 使用combineByKey:

    // combineByKeydef wordcount6(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.combineByKey(v=>v,(x:Int, y) => x + y,(x:Int, y:Int) => x + y)}
  1. 使用countByKey:

    // countByKeydef wordcount7(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: collection.Map[String, Long] = wordOne.countByKey()}
  1. 使用countByValue:

    // countByValuedef wordcount8(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordCount: collection.Map[String, Long] = words.countByValue()}
  1. 使用reduce:

    def wordcount91011(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))// 【(word, count),(word, count)】// word => Map[(word,1)]val mapWord = words.map(word => {mutable.Map[String, Long]((word,1))})val wordCount = mapWord.reduce((map1, map2) => {map2.foreach{case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}map1})println(wordCount)}

2. 序列化

算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。

  • RDD序列化

案例:

object Spark01_RDD_Serial {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))val search = new Search("h")//search.getMatch1(rdd).collect().foreach(println)search.getMatch2(rdd).collect().foreach(println)sc.stop()}// 查询对象// 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测class Search(query:String){def isMatch(s: String): Boolean = {s.contains(this.query)}// 函数序列化案例def getMatch1 (rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)}// 属性序列化案例def getMatch2(rdd: RDD[String]): RDD[String] = {val s = queryrdd.filter(x => x.contains(s))}}
}

输出结果:

image-20240605133427336

  • Kryo序列化框架

Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

了解一下就行

案例:

 def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SerDemo").setMaster("local[*]")// 替换默认的序列化机制.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// 注册需要使用 kryo 序列化的自定义类.registerKryoClasses(Array(classOf[Searcher]))val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", 
"atguigu", "hahah"), 2)val searcher = new Searcher("hello")val result: RDD[String] = searcher.getMatchedRDD1(rdd)result.collect.foreach(println)}
}
case class Searcher(val query: String) {def isMatch(s: String) = {s.contains(query)}def getMatchedRDD1(rdd: RDD[String]) = {rdd.filter(isMatch) }def getMatchedRDD2(rdd: RDD[String]) = {val q = queryrdd.filter(_.contains(q))}

Kryo绕过了Java的序列化机制,Kryo比Java序列化小,适合大数据传输、存储

  • RDD 血缘关系

toDebugString查看血缘关系

image-20240605135251669

多个连续的RDD的依赖关系,称之为血缘关系

演示:

image-20240605135543501

关于如何将RDD间的关系保存下来:

image-20240605135759577

血缘关系演示:

image-20240605140042850

image-20240605140110991

image-20240605140124464

  • RDD的依赖关系

dependencies查看依赖关系

image-20240605140238899

OneToOne依赖(窄依赖)

image-20240605140706460

窄依赖我们形象的比喻为独生子女。

image-20240605141335525

Shuffle依赖(宽依赖):

image-20240605140820212

宽依赖我们形象的比喻为多生。

image-20240605141533442

  • RDD 阶级划分

image-20240605141721424

image-20240605142320693

  • RDD 任务划分

image-20240605142405972

源码演示:

image-20240605142747592

  • RDD 的持久化

这样的复用在底层不是很好用:

image-20240605143051395

image-20240605143137900

应该这样:

image-20240605143221932

image-20240605143241039

image-20240605143253432

放在内存中 mapRDD.cache()

放在磁盘中 mapRDD.persist()

Cache缓存:

image-20240605143410471

  • RDD CheckPoint 检查点

image-20240605143502870

image-20240605143516625

checkpoint 需要落盘,需要指定检查点保存路径

检查点路径保存的文件,当作业执行完毕后,不会被删除

一般保存路径都是在分布式存储系统: HDFS

  • checkpoint、Cache、Persist的区别:

以上三个都可以存储,关于他们的区别:

cache : 将数据临时存储在内存中进行数据重用

会在血缘关系中添加新的依赖。一旦出现问题,可以重新读取数据

persist : 将数据临时存储在硬盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint : 将数据长久地保存在磁盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

为了保证数据安全,所以一般情况下,会独立执行作业

为了能够提高效率,一般情况下,是需要和cache联合使用

执行过程中,会切断血缘关系,重新建立新的血缘关系

checkpoint等同于改变数据源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/851461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基础IO (Linux文件操作)

目录 1.文件操作 2.文件描述符 3.缓冲区 4.系统的缓冲区 1.文件操作 在C语言学习中,我们就已经使用了一些文件操作相关的接口,在学习IO之前,我们首先要复习一些以前讲过的概念, 1. 空文件也要在磁盘中占用空间,因为…

OBS 录屏软件 for Mac 视频录制和视频实时交流软件 安装

Mac分享吧 文章目录 效果一、准备工作二、开始安装注意事项:包内有两个版本及圆形图片,请根据自身需要版本进行安装演示为:MacBook Pro M3芯片1、双击运行软件,将其从左侧拖入右侧文件夹中(最终目的:安装进…

python教程

python解释器的安装 https://www.python.org/ftp/python/3.12.4/python-3.12.4-amd64.exe jetbrains官网 英文 PyCharm 专业的版本 Thank you for downloading PyCharm! 社区 Thank you for downloading PyCharm! 中文 PyCharm 专业的版本 感谢您下载PyCharm&#xff01…

[大模型]GLM-4-9B-Chat WebDemo 部署

环境准备 在autodl平台中租一个4090等24G显存的显卡机器,如下图所示镜像选择PyTorch–>2.1.0–>3.10(ubuntu22.04)–>12.1 接下来打开刚刚租用服务器的JupyterLab, 图像 并且打开其中的终端开始环境配置、模型下载和运行演示。 pip换源和安装…

【图论应用】使用多路图(multigraph)对上海地铁站点图建模,并解决最短路径问题

文章目录 1 前言2 导包导入数据集3 创建多路图,导入节点和边信息3 绘制线路图4 计算最短路径 1 前言 最近正在学习图神经网络,先pick up了一些最基础的图论知识并学习了一些好玩的应用。 本文启发于B站视频(BV1LY411R7HJ)&#…

【python】flask 框架

python flask 框架 flask是一个轻量级的python后端框架 (Django, tornado, flask) 官网:欢迎来到 Flask 的世界 — Flask中文文档(3.0.x) 安装:pip install Flask -i https://pypi.douban.com 常识: http,默认端口号为80; https,默认端口号…

国资e学快速学习实战教程

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

OSPF LSA头部详解

LSA概述 LSA是OSPF的本质 , 对于网工来说能否完成OSPF的排错就是基于OSPF的LSDB掌握程度 . 其中1/2类LAS是负责区域内部的 类似于设备的直连路由 . 加上对端的设备信息 3 类LSA是区域间的 指的是Area0和其他Area的区域间关系 , 设计多区域的初衷就是避免大型OSPF环境LSA太多…

Go模板页面浏览器显示HTML源码问题

<!--* Title: This is a file for ……* Author: JackieZheng* Date: 2024-06-09 17:00:01* LastEditTime: 2024-06-09 17:01:12* LastEditors: Please set LastEditors* Description:* FilePath: \\GoCode\\templates\\index.html --> <!DOCTYPE html> <html …

【RAG】浅尝基于多头注意力机制思想设计的Multi-Head RAG(多头RAG)

一、动机 现有RAG设计和评估方法&#xff0c;没有方案或评估方法明确针对具有多方面性的问题。下面解释一下多方面性的问题&#xff1a; "多方面性的问题"是指那些需要理解和整合多个不同领域或主题的知识和信息才能得到完整和准确回答的问题。这类问题的特点在于它…

Characters 2 01(卡通可爱人物动画模型)

● 包裹● - 26名男子; - 29个女孩。 ● 使用地点 ● - 游戏。针对游戏引擎优化的模型; -乘法; 广告和营销; - 虚拟现实/增强现实。 ● 特点 ● - 你可以很容易地改变物体的颜色 - 使用UV贴图; - 对象逻辑位置的枢轴; - 模型具有逻辑名称。 ● 几何学● 62个独特的资产(…

Objective-C 学习笔记 | Block 对象

Objective-C 学习笔记 | Block 对象 Objective-C 学习笔记 | Block 对象编写并使用 Block 对象Block 对象的返回值匿名 Block 对象外部变量在 Block 对象中使用 self在 Block 对象中无意使用 self修改外部变量 Objective-C 学习笔记 | Block 对象 Block 对象类似于匿名函数&am…

xLua(一) 环境安装笔记

为了方便查阅记录一下xLua的安装地址及方法 1.登录地址下载: https://github.com/Tencent/xLua 2.解压文件 将文件中的这些内容拷贝到项目中的Asset文件夹中 注意 : 工程项目路径不得含有中文 3.将Tools复制到Asset同级目录下 4.导入后会发现有Bug,需要导入工程 5.还有另…

Java:九九乘法表,打印三角形

文章目录 九九乘法表打印三角形改进:控制行数的三角形有空格的三角形 九九乘法表 package com.zhang; /* 打印九九乘法表*/ public class Test8 {public static void main(String[] args) {//i是竖着的 j是横着的for (int i 1; i < 9; i) {for(int j 1; j < 9; j) {i…

IP协议(二)

TOC 一: 网段划分 同一个局域网的主机,要按一定的规则分配IP地址 把一个IP地址分为两部分: 前半部分 ,网络号 >用来表示局域网后半部分,主机号 > 用来区分同一个局域网中的不同主机 同一个局域网内部&#xff0c;主机之间的IP &#xff0c; 网络号相同&#xff0c;主…

FuTalk设计周刊-Vol.039

&#x1f525;AI漫谈 热点捕手 1、AI视频生成工具大PK | Runway Gen-2、Pika、Moonvalley和W.A.L.T的文字生视频对比评测 AI届的学术大牛李飞飞最近推出了用于生成逼真视频的扩散模型W.A.L.T。效果很不错&#xff0c;不过目前还未开放公网的访问。于是我萌生了一个想法&#…

气体流量的换算

测量气体流量时&#xff0c;往往需要进行温压补偿。我们可以选择Nm:/h和m3/h作为测量单位&#xff0c;二者之间如何换算呢?在标准状态下&#xff0c;即温度为0℃℃(273.15K)和压力为1个标准大气压(101.325kPa)时&#xff0c;气体的体积被称为Nm3(标方)&#xff0c;N代表标准条…

rust asyn和await pin unpin加精!!!

15-探讨为什么Pin在Rust异步编程中如此重要 | Databend_哔哩哔哩_bilibili 能不能Pin住&#xff0c;取决于T是否实现了Unpin&#xff0c;如果实现了Unpin&#xff0c;那么Pin不住 Pin不能pin住u32等基础变量 编译器为async和await生成结构体实现了!Unpin 结构体中使用引用要…

HTML+CSS 交互式开关按钮

效果演示 实现了一个交互式开关按钮的效果,包括一个标签和两个选项(Yes和No),当用户点击其中一个选项时,按钮会发生动画效果,同时选中的选项会被高亮显示。整个按钮的样式采用了渐变背景色、圆角边框、阴影等元素,使得按钮看起来更加美观。 Code HTML <!DOCTYPE ht…

下拉框数据被遮挡 且 后续数据无法下拉的 解决方法

目录 前言1. 问题所示2. 原理分析3. 解决方法3.1 添加空白版2.2 调整z-index2.3 父容器的溢出属性2.4 调整样式属性4. 效果图前言 小程序使用的是Uniapp,原理都差不多,索性标题就不标注Uniapp(小程序) 对于该问题调试了一个晚上,最终解决,对此记录下来 1. 问题所示 执…