04-240606Spark笔记

04-240606Spark笔记

1.行动算子-2

  • save相关算子:

格式:

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit

例子:

  val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
​rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")// saveAsSequenceFile方法要求数据的格式必须为K-V类型rdd.saveAsSequenceFile("output2")

输出结果:

image-20240604225213130

  • foreach

格式:

def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

例子:

    val rdd = sc.makeRDD(List(1,2,3,4))
​//foreach 其实是Driver端内存集合的循环遍历方法rdd.collect().foreach(println) //Driverprintln("***************")// foreach 其实是Executor端内存数据打印rdd.foreach(println)    // Executor// 算子 : Operator(操作)//         RDD的方法和Scala集合对象的方法不一样//         集合对象的方法都是在同一个节点的内存中完成的。//         RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行//         为了区分不同的处理效果,所以将RDD的方法称之为算子。//        RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

输出结果:

image-20240604232824753

2. 序列化

2.1 闭包检测
  • 闭包检测

因为Driver需要给两个Executor共享User方法,共享就需要序列化

案例:

  def main(args: Array[String]): Unit = {
​val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)
​val rdd = sc.makeRDD(List[Int]())
​val user = new User()
​// SparkException: Task not serializable// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
​// RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能// 闭包检测rdd.foreach(num => {println("age = " + (user.age + num))})
​sc.stop()
​}//class User extends Serializable {// 样例类在编译时,会自动混入序列化特质(实现可序列化接口)//case class User() {class User {var age : Int = 30}
  • RDD 的分区器

自己来写分区器:

    def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)
​val rdd = sc.makeRDD(List(("nba", "xxxxxxxxx"),("cba", "xxxxxxxxx"),("wnba", "xxxxxxxxx"),("nba", "xxxxxxxxx"),),3)val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner )
​partRDD.saveAsTextFile("output")
​sc.stop()}

自定义的分区器:

    class MyPartitioner extends Partitioner{// 分区数量override def numPartitions: Int = 3
​// 根据数据的key值返回数据所在的分区索引(从0开始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case _ => 2}}}
* 自定义分区器
* 1. 继承Partitioner
* 2. 重写方法

输出结果:

image-20240605170312913

image-20240605170321664

  • RDD 文件读取与保存

案例1:

    def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)
​val rdd = sc.textFile("output1")println(rdd.collect().mkString(","))
​val rdd1 = sc.objectFile[(String, Int)]("output2")println(rdd1.collect().mkString(","))
​val rdd2 = sc.sequenceFile[String, Int]("output3")println(rdd2.collect().mkString(","))
​sc.stop()}

输出结果:

image-20240605170535800

案例2:

    def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)
​val rdd = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))
​rdd.saveAsTextFile("output1")rdd.saveAsObjectFile("output2")rdd.saveAsSequenceFile("output3")
​sc.stop()}

输出结果:

image-20240605170643956

1. 数据结构:

image-20240605170954358

  • 累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。

![image-20240605202228850](E:\Files2\Typictures\image-20240605202228850.png

image-20240605202424331

Acc,累加器可以把Excutor端的数据返回到Driver中去:

image-20240605202543334

案例:

    def main(args: Array[String]): Unit = {
​val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
​val rdd = sc.makeRDD(List(1,2,3,4))
​// reduce : 分区内计算,分区间计算//val i: Int = rdd.reduce(_+_)//println(i)var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum)
​sc.stop()
​}
  • 系统累加器

案例:

    def main(args: Array[String]): Unit = {
​val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
​val rdd = sc.makeRDD(List(1,2,3,4))
​// 获取系统累加器// Spark默认就提供了简单数据聚合的累加器val sumAcc = sc.longAccumulator("sum")
​//sc.doubleAccumulator//sc.collectionAccumulator
​rdd.foreach(num => {// 使用累加器sumAcc.add(num)})
​// 获取累加器的值println(sumAcc.value)
​sc.stop()
​}

累加器的一些特殊情况:

少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
一般情况下,累加器会放置在行动算子进
    def main(args: Array[String]): Unit = {
​val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
​val rdd = sc.makeRDD(List(1,2,3,4))
​// 获取系统累加器// Spark默认就提供了简单数据聚合的累加器val sumAcc = sc.longAccumulator("sum")
​//sc.doubleAccumulator//sc.collectionAccumulator
​val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})
​// 获取累加器的值// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行// 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行// 一般情况下,累加器会放置在行动算子进行操作mapRDD.collect()mapRDD.collect()println(sumAcc.value)
​sc.stop()
​}
  • 自定义累加器

分布式共享只写变量

案例:

    def main(args: Array[String]): Unit = {
​val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
​val rdd = sc.makeRDD(List("hello", "spark", "hello"))
​// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")
​rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})
​// 获取累加器累加的结果println(wcAcc.value)
​sc.stop()
​}/*自定义数据累加器:WordCount
​1. 继承AccumulatorV2, 定义泛型IN : 累加器输入的数据类型 StringOUT : 累加器返回的数据类型 mutable.Map[String, Long]
​2. 重写方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
​private var wcMap = mutable.Map[String, Long]()
​// 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty}
​override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}
​override def reset(): Unit = {wcMap.clear()}
​// 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}
​// Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
​val map1 = this.wcMapval map2 = other.value
​map2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}
​// 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}
  • 广播变量

实现原理:

广播变量用来高效分发较大的对象。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务

分别发送。

案例:

    def main(args: Array[String]): Unit = {
​val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
​val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))
//        val rdd2 = sc.makeRDD(List(
//            ("a", 4),("b", 5),("c", 6)
//        ))val map = mutable.Map(("a", 4),("b", 5),("c", 6))
​
​
​// join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)//joinRDD.collect().foreach(println)// (a, 1),    (b, 2),    (c, 3)// (a, (1,4)),(b, (2,5)),(c, (3,6))rdd1.map {case (w, c) => {val l: Int = map.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)
​
​
​sc.stop()
​}

join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用

image-20240606162528164

Spark 中的广播变量就可以将闭包的数据保存到Executor的内存中

Spark 中的广播变量不能更改 : 分布式共享只读变量

image-20240606162609035

封装广播变量1

案例:

    def main(args: Array[String]): Unit = {
​val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
​val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6))
​// 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
​rdd1.map {case (w, c) => {// 方法广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)
​
​
​sc.stop()
​}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/23547.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python报错】已解决NameError: name ‘Image‘ is not defined

解决Python报错&#xff1a;NameError: name ‘Image’ is not defined 在使用Python进行图像处理时&#xff0c;我们经常使用Pillow库&#xff08;PIL的一个分支&#xff09;。如果你在尝试创建或处理图像时遇到了NameError: name Image is not defined的错误&#xff0c;这通…

关于python包导入问题的重思考

将顶层目录直接设置为一个包 像这样&#xff0c;每一个文件从顶层包开始导入 这样可以解决我的问题&#xff0c;但是要注意的时&#xff0c;要避免使用出现上下级出现同名包的情况&#xff0c;比如&#xff1a; AutoServer--AutoServer--__init__.py--__init__.py这种情况下…

绿联云NAS一些探索(1):SSH、包管理器探测、安装docker-compose等

绿联云NAS一些探索SSH、包管理器探测、安装docker-compose等 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https:…

AI图书推荐:《如何利用ChatGPT在线赚钱》

这本书《如何利用ChatGPT在线赚钱》&#xff08;$100m ChatGPT_ How To Make Money Online With ChatGPT -- Sharp, Biily -- 2023 &#xff09;主要阐述如何利用ChatGPT这一强大的语言模型工具在互联网上创造收入。 以下是各章节内容的概要&#xff1a; **引言** - 介绍了Chat…

STM32F103单片机工程移植到航顺单片机HK32F103注意事项

一、简介 作为国内MCU厂商中前三阵营之一的航顺芯片&#xff0c;建立了世界首创超低功耗7nA物联网、万物互联核心处理器浩瀚天际10X系列平台&#xff0c;接受代理商/设计企业/方案商定制低于自主研发十倍以上成本&#xff0c;接近零风险自主品牌产品&#xff0c;芯片设计完成只…

编译等底层知识

目录 一. GCC命令语句大全 二. GCC编译4个阶段 三. makefile的使用 四. CMake 五. GNU工具链开发流程图 六. Keil中的地址段 七. 静态库和动态库 一. GCC命令语句大全 -c只编译源文件&#xff0c;生成目标文件&#xff08;.o 文件&#xff09;&#xff0c;不进行链接。…

CC++内存管理【new和delete操作符的详细分析】【常见面试题】

C/C内存管理 1.C/C内存分布 我们先来看一段代码&#xff0c;来了解一下C/C中的数据内存分布。 # include <stdlib.h>int globalVar 1; static int staticGlobalVar 1; // 比globalVar还要先销毁,同一个文件下后定义的先析构 // 全局变量存在 数据段&#xff08;静态…

VSCode+Vite+Vue3断点调试

目录 lunch.json创建 vite.config.ts 打断点运行 lunch.json创建 首先&#xff0c;点击VSCode左上角&#xff0c;甲壳虫运行的按钮&#xff0c;然后点击运行与调试&#xff0c;选择chrome浏览器&#xff0c;修改成一下配置。 { // 使用 IntelliSense 了解相关属性。 // 悬停…

codeforces round 949 div2

A Turtle and Piggy Are Playing a Game 题目&#xff1a; 思路&#xff1a;输出2的幂次b使得2^b为最大的不超过x的数 代码&#xff1a; #include <iostream>using namespace std;const int N 2e5 10;void solve() {int l, r;cin >> l >> r;if(r % 2) …

vscode 运行和调试

vscode使用断点 1.安装并激活扩展 Debugger for Chrome (弃用 --> JavaScript Debugger)Debugger for Firefox 2. 配置config文件 打开 config/index.js 并找到 devtool property。将其更新为&#xff1a; 如果你使用的是 Vue CLI 2&#xff0c;请设置并更新 config/in…

SpringBoot Redis读写与数据序列化 RedisTemplate 与 StringRedisTemplate 防转字节

介绍 RedisTemplate 对象在底层默认会转成字节&#xff0c;造成了内存的开销很大&#xff0c;这是他底层进行处理的,造成可读性差&#xff0c;如需要转成简单的字符串存储需要进行序列化的配置。 RedisTemplate 配置类 Configuration public class RedisConfig {Beanpublic …

OpenGL系列(五)纹理贴图

概述 OpenGL纹理是一种在三维图形中应用纹理映射的技术。纹理是一张图像&#xff0c;可以应用到三维模型的表面上&#xff0c;从而使得模型看起来更加真实和具有细节。通过纹理映射&#xff0c;可以将图像的像素值与三维模型的顶点进行匹配&#xff0c;从而为模型的表面增加细节…

Java并发编程之由于静态变量错误使用可能导致的并发问题

Java并发编程之由于静态变量错误使用可能导致的并发问题 1.1 前言1.2 业务背景1.3 问题分析1.4 为什么呢&#xff1f;1.5 修复方案2 演示示例源码下载 1.1 前言 我们知道在 Java 后端服务开发中&#xff0c;如果出现并发问题一般都是由于在多个线程中使用了共享的变量导致的。…

JVM相关:Java内存区域

Java 虚拟机&#xff08;JVM)在执行 Java 程序的过程中会把它管理的内存划分成若干个不同的数据区域。 Java运行时数据区域是指Java虚拟机&#xff08;JVM&#xff09;在执行Java程序时&#xff0c;为了管理内存而划分的几个不同作用域。这些区域各自承担特定的任务&#xff0c…

Day23 自定义对话框服务

​本章节实现了,自定义对话框服务的功能 当现有的对话框服务无法满足特定需求时,我们可以采用自定义对话框的解决方案,以更好地满足一些特殊需求。 一.自定义对话框主机服务步骤 在Models 文件夹中,再建立一个 IDialogHostService 接口类,继承自 IDialogService 对话框服…

绝对实用Linux命令行下的文件夹逐层创建术,从小白到大神的必学技能

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 基础篇&#xff1a;初识Linux文件系统 在深入了解如何在Linux中逐层创建文件夹之前&#xff0c;需要对Linux的文件系统有一个基本的认识。Linux文件系统以其树状结构而著称&#xff0c;其中/&#xff08;根目录&…

SIMBA方法解读

目录 预处理scRNA-seqscATAC-seq 图构建&#xff08;5种场景&#xff09;scRNA-seq分析scATAC-seq分析多模态分析批次整合多模态整合 图学习SIMBA空间中查询实体识别TF-target genes 预处理 scRNA-seq 过滤掉在少于三个细胞中表达的基因。原始计数按文库大小标准化&#xff0…

DDS自动化测试落地方案 | 怿星科技携最新技术亮相是德科技年度盛会

5月28日&#xff0c;怿星科技作为是德科技的重要合作伙伴亮相Keysight World Tech Day 2024。在此次科技盛会上&#xff0c;怿星科技不仅展示了领先的DDS自动化测试解决方案等前沿技术&#xff0c;还分享了在“周期短、任务重”的情况下&#xff0c;如何做好软件开发和测试验证…

前端开发之性能优化

本文章 对各大学习技术论坛知识点&#xff0c;进行总结、归纳自用学习&#xff0c;共勉&#x1f64f; 文章目录 1. [CDN](https://www.bootcdn.cn/)2.懒加载3.缓存4.图片压缩5.图片分割6.sprite7.Code Splitting8.gzip9.GPU加速10.Ajax11.Tree Shaking12.Resource Hints 1. CD…

YOLO系列模型 pt文件转化为ONNX导出

文章目录 啥是onnx怎么导出导出之后 啥是onnx Microsoft 和合作伙伴社区创建了 ONNX 作为表示机器学习模型的开放标准。许多框架&#xff08;包括 TensorFlow、PyTorch、scikit-learn、Keras、Chainer、MXNet 和 MATLAB&#xff09;的模型都可以导出或转换为标准 ONNX 格式。 在…