Spark-06:共享变量

目录

1.广播变量(broadcast variables)

2.累加器(accumulators)


      在分布式计算中,当在集群的多个节点上并行运行函数时,默认情况下,每个任务都会获得函数中使用到的变量的一个副本。如果变量很大,这会导致网络传输占用大量带宽,并且在每个节点上都占用大量内存空间。为了解决这个问题,Spark引入了共享变量的概念。

        共享变量允许在多个任务之间共享数据,而不是为每个任务分别复制一份变量。这样可以显著降低网络传输的开销和内存占用。Spark提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

1.广播变量(broadcast variables)

        通常情况下,Spark程序运行时,通常会将数据以副本的形式分发到每个执行器(Executor)的任务(Task)中,但当变量较大时,这会导致大量的内存和网络开销。通过使用广播变量,Spark将变量只发送一次到每个节点,并在多个任务之间共享这个副本,从而显著降低了内存占用和网络传输的开销。

Scala 实现:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 实现:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

        累加器是Spark中的一种特殊类型的共享变量,主要用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器支持的数据类型仅限于数值类型,包括整数和浮点数等。

Scala 实现:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10

Java 实现:

LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10

        内置累加器功能有限,但可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个方法必须重写:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到此累加器。

自定义累加器Scala实现:

package com.yichenkeji.demo.sparkscalaimport org.apache.spark.util.AccumulatorV2class CustomAccumulator extends AccumulatorV2[Int, Int]{//初始化累加器的值private var sum = 0override def isZero: Boolean = sum == 0override def copy(): AccumulatorV2[Int, Int] = {val newAcc = new CustomAccumulator()newAcc.sum = sumnewAcc}override def reset(): Unit = sum = 0override def add(v: Int): Unit = sum += voverride def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.valueoverride def value: Int = sum
}

自定义累加器Java实现:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.util.AccumulatorV2;public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {// 初始化累加器的值private Integer sum = 0;@Overridepublic boolean isZero() {return sum == 0;}@Overridepublic AccumulatorV2<Integer, Integer> copy() {CustomAccumulator customAccumulator = new CustomAccumulator();customAccumulator.sum = this.sum;return customAccumulator;}@Overridepublic void reset() {this.sum = 0;}@Overridepublic void add(Integer v) {this.sum += v;}@Overridepublic void merge(AccumulatorV2<Integer, Integer> other) {this.sum += ((CustomAccumulator) other).sum;}@Overridepublic Integer value() {return sum;}
}

自定义累加器的使用:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;
import java.util.List;public class AccumulatorTest {public static void main(String[] args) {//1.初始化SparkContext对象SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");JavaSparkContext sc = new JavaSparkContext(sparkConf);CustomAccumulator customAccumulator = new CustomAccumulator();//注册自定义累加器才能使用sc.sc().register(customAccumulator);sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));System.out.println(customAccumulator.value());//5.停止SparkContextsc.stop();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/165670.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开启数据库审计(db,extended级别或os级别),并将审计文件存放到/home/oracle/audit下

文章目录 开启数据库审计&#xff08;db,extended级别或os级别&#xff09;&#xff0c;并将审计文件存放到/home/oracle/audit下一. 简介二. 配置2.1. 审计是否安装2.2. 审计表空间迁移2.3. 审计参数2.4. 审计级别2.5. 其他审计选项2.6. 审计相关视图 三. 使用3.1. 开启/关闭审…

成为独立开发者有多难

首先自我介绍&#xff1a;我是一名前端开发工程师&#xff0c;7年的前端开发经验。CSDN 九段刀客_js,vue,ReactNative-CSDN博客,80多万的访问量&#xff0c;1万多的粉丝。 相信80%的程序员的终极梦想都是成为一名独立开发者&#xff0c;不用找工作有自己的产品可以有睡后收入。…

深度学习模型训练计算量的估算

深度学习模型训练计算量的估算 方法1&#xff1a;基于网络架构和批处理数量计算算术运算次数前向传递计算和常见层的参数数量全连接层&#xff08;Fully connected layer&#xff09;参数浮点数计算量 CNN参数浮点数计算量 转置CNN参数浮点数计算量 RNN参数浮点数计算量 GRU参数…

刷题学习记录(含2023ISCTFweb题的部分知识点)

[SWPUCTF 2021 新生赛]sql 进入环境 查看源码&#xff0c;发现是get传参且参数为wllm fuzz测试&#xff0c;发现空格&#xff0c;&#xff0c;and被过滤了 同样的也可以用python脚本进行fuzz测试 import requests fuzz{length ,,handler,like,select,sleep,database,delete,h…

java学习part09类的构造器

1. 2.默认构造器 如果没有显式定义任何构造器&#xff0c;系统会默认加一个默认构造器。 如果定义了&#xff0c;则不会有默认构造器。 默认构造器的权限和类的权限一样&#xff0c;类是public构造器就是public&#xff0c;类是缺省默认构造器就是缺省 反编译之后添加的构造…

解决DaemonSet没法调度到master节点的问题

最近在kubernetes部署一个springcloud微服务项目&#xff0c;到了最后一步部署边缘路由&#xff1a;使用nginx-ingress和traefik都可以&#xff0c;必须使用DaemonSet部署&#xff0c;但是发现三个节点&#xff0c;却总共只有两个pod。 换句话说&#xff0c; DaemonSet没法调度…

UML建模图文详解教程05——包图

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl本文参考资料&#xff1a;《UML面向对象分析、建模与设计&#xff08;第2版&#xff09;》吕云翔&#xff0c;赵天宇 著 包图概述 包图(package diagram)是用来描述模型中的…

一个最简单的工业通讯数据分析例子

1.背景 对工业设备的通讯协议进行分析可以帮助我们更好地理解其工作原理和相关技术&#xff0c;并且有助于以下几个方面&#xff1a; 1. 优化工业设备的通讯效率&#xff1a;了解通讯协议的细节可以帮助我们找到通讯效率低下的原因并进行优化&#xff0c;提高设备的通讯效率和…

MySQL 8 配置文件详解与最佳实践

MySQL 8 是一款强大的关系型数据库管理系统&#xff0c;通过适当的配置文件设置&#xff0c;可以充分发挥其性能潜力。在这篇博客中&#xff0c;我们将深入探究 MySQL 8 常用的配置文件&#xff0c;并提供一些建议&#xff0c;帮助您优化数据库性能。 配置文件概览 在 MySQL …

【数据结构】二叉树概念 | 满二叉树 | 完全二叉树

二叉树的概念 二叉树在实践中用的很多。 一棵二叉树是结点的一个有限集合&#xff0c;该集合&#xff1a; 或者为空&#xff1b;由一个根结点加上两棵别称为左子树和右子树的二叉树组成。二叉树最多两个孩子。 这里注意&#xff1a;二叉树并不是度为2的树。 二叉树的度最大值是…

Go lumberjack 日志轮换和管理

在开发应用程序时&#xff0c;记录日志是一项关键的任务&#xff0c;以便在应用程序运行时追踪问题、监视性能和保留审计记录。Go 语言提供了灵活且强大的日志记录功能&#xff0c;可以通过多种方式配置和使用。其中一个常用的日志记录库是 github.com/natefinch/lumberjack&am…

【JAVA】我们该如何规避代码中可能出现的错误?(二)

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言异常方法&#xff08;Throwable类&#xff09;Throwable类的方法 捕获异常多重捕获块 前言 异常是程序中的一些错误&#xff0c;但并不是所有的错误都是异常&#xff0c;并…

git-3

1.如何让工作区的文件恢复为和暂存区一样&#xff1f; 工作区所作的变更还不及暂存区的变更好&#xff0c;想从暂存区拷贝到工作区&#xff0c;变更工作区(恢复成和暂存区一样的状态)&#xff0c;想到用git checkout -- 文件名 2.怎样取消暂存区部分文件的更改&#xff1f; 如…

无损压缩技巧:减小PDF文件尺寸的有效方法

我们在制作pdf文档的时候&#xff0c;会加入许多内容&#xff0c;文字、图片等等&#xff0c;素材添加的过多之后就会导致pdf文档特别大&#xff0c;在上传或者储存时&#xff0c;就会特别不方便&#xff0c;所以今天就告诉大家一个pdf压缩的方法&#xff0c;使用pdf在线压缩工…

洛谷 P1883 函数

P1883 函数 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) Error Curves - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 这两题是一模一样的&#xff0c;过一题水两题。 分析 主要难点在于证明F(x)是一个单峰函数可以被三分&#xff0c;但是我随便画了几个f(x)之后发现好像…

MySQL的Redo Log跟Binlog

文章目录 概要Redo Log日志Redo Log的作用Redo Log的写入机制 Binlog日志Binlog的作用Binlog写入机制 两段提交 概要 Redo Log和Binlog是MySQL日志系统中非常重要的两种机制&#xff0c;也有很多相似之处&#xff0c;本文主要介绍两者细节和区别。 Redo Log日志 Redo Log的作…

Docker+ Jenkins+Maven+git自动化部署

环境&#xff1a;Centos7 JDK1.8 Maven3.3.9 Git 2.40 Docker 20.10.17 准备工作&#xff1a; 安装Docker Centos7默认的yum安装的docker是1.13&#xff0c;版本太低&#xff0c;很多镜像都要Docker版本要求&#xff0c;升级Docker版本。 卸载已安装Docker: yum …

你知道如何实现游戏中的透视效果吗?

引言 游戏中的透视效果可以合理运用CtrlCV实现。 不知道大家有没有这样一段经历&#xff1a;在做Cocos项目时需要一些特定的Shader去做一些特定的效果&#xff0c;例如透视、高光、滤镜等等&#xff0c;想自己写吧&#xff0c;不怎么会啊&#xff0c;网上又找不到&#xff0c…

redis-cluster集群(目的:高可用)

1、特点 集群由多个node节点组成&#xff0c;redis数据分布在这些节点中&#xff0c;在集群中分为主节点和从节点&#xff0c;一个主对应一个从&#xff0c;所有组的主从形成一个集群&#xff0c;每组的数据是独立的&#xff0c;并且集群自带哨兵模式 2、工作原理 集群模式中…

重生之我是一名程序员 37 ——C语言中的栈溢出问题

哈喽啊大家晚上好&#xff01; 今天呢给大家带来一个烧脑的知识——C语言中的栈溢出问题。那什么是栈溢出呢&#xff1f;栈溢出指的是当程序在执行函数调用时&#xff0c;为了保护函数的局部变量和返回地址&#xff0c;将这些数据存储在栈中。如果函数在函数调用时使用了过多的…