Spark-06:Spark 共享变量

目录

1.广播变量(broadcast variables)

2.累加器(accumulators)


      在分布式计算中,当在集群的多个节点上并行运行函数时,默认情况下,每个任务都会获得函数中使用到的变量的一个副本。如果变量很大,这会导致网络传输占用大量带宽,并且在每个节点上都占用大量内存空间。为了解决这个问题,Spark引入了共享变量的概念。

        共享变量允许在多个任务之间共享数据,而不是为每个任务分别复制一份变量。这样可以显著降低网络传输的开销和内存占用。Spark提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

1.广播变量(broadcast variables)

        通常情况下,Spark程序运行时,通常会将数据以副本的形式分发到每个执行器(Executor)的任务(Task)中,但当变量较大时,这会导致大量的内存和网络开销。通过使用广播变量,Spark将变量只发送一次到每个节点,并在多个任务之间共享这个副本,从而显著降低了内存占用和网络传输的开销。

Scala 实现:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 实现:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

        累加器是Spark中的一种特殊类型的共享变量,主要用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器支持的数据类型仅限于数值类型,包括整数和浮点数等。

Scala 实现:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10

Java 实现:

LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10

        内置累加器功能有限,但可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个方法必须重写:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到此累加器。

自定义累加器Scala实现:

package com.yichenkeji.demo.sparkscalaimport org.apache.spark.util.AccumulatorV2class CustomAccumulator extends AccumulatorV2[Int, Int]{//初始化累加器的值private var sum = 0override def isZero: Boolean = sum == 0override def copy(): AccumulatorV2[Int, Int] = {val newAcc = new CustomAccumulator()newAcc.sum = sumnewAcc}override def reset(): Unit = sum = 0override def add(v: Int): Unit = sum += voverride def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.valueoverride def value: Int = sum
}

自定义累加器Java实现:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.util.AccumulatorV2;public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {// 初始化累加器的值private Integer sum = 0;@Overridepublic boolean isZero() {return sum == 0;}@Overridepublic AccumulatorV2<Integer, Integer> copy() {CustomAccumulator customAccumulator = new CustomAccumulator();customAccumulator.sum = this.sum;return customAccumulator;}@Overridepublic void reset() {this.sum = 0;}@Overridepublic void add(Integer v) {this.sum += v;}@Overridepublic void merge(AccumulatorV2<Integer, Integer> other) {this.sum += ((CustomAccumulator) other).sum;}@Overridepublic Integer value() {return sum;}
}

自定义累加器的使用:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;
import java.util.List;public class AccumulatorTest {public static void main(String[] args) {//1.初始化SparkContext对象SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");JavaSparkContext sc = new JavaSparkContext(sparkConf);CustomAccumulator customAccumulator = new CustomAccumulator();//注册自定义累加器才能使用sc.sc().register(customAccumulator);sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));System.out.println(customAccumulator.value());//5.停止SparkContextsc.stop();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/171583.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 相机库CameraView源码解析 (一) : 预览

1. 前言 这段时间&#xff0c;在使用 natario1/CameraView 来实现带滤镜的预览、拍照、录像功能。 由于CameraView封装的比较到位&#xff0c;在项目前期&#xff0c;的确为我们节省了不少时间。 但随着项目持续深入&#xff0c;对于CameraView的使用进入深水区&#xff0c;逐…

【LeetCode】挑战100天 Day17(热题+面试经典150题)

【LeetCode】挑战100天 Day17&#xff08;热题面试经典150题&#xff09; 一、LeetCode介绍二、LeetCode 热题 HOT 100-192.1 题目2.2 题解 三、面试经典 150 题-193.1 题目3.2 题解 一、LeetCode介绍 LeetCode是一个在线编程网站&#xff0c;提供各种算法和数据结构的题目&…

java stream流常用方法

filter(Predicate predicate)&#xff1a;根据指定条件过滤元素。 List<Integer> numbers Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> evenNumbers numbers.stream().filter(n -> n % 2 0).collect(Collectors.toList()); System.out.pr…

VS2010 VS2015环境编译boost库

VS2010下安装boost库 去www.boost.org下载最新的boost&#xff0c;我下载了boost_1_46_1.7z&#xff08;我放在D:/cpp目录下&#xff09;解压到当前文件夹打开VS2010->VS TOOLS->VS命令提示CD D:/cpp/boost_1_46_1输入bootstrap&#xff0c;便生成bjam.exe文件输入bjam …

[建议收藏] 一个网站集合所有最新最全的AI工具

今天给大家推荐一个宝藏的AI工具合集网站&#xff0c;有了这个网站&#xff0c;你们再也不用去其他地方找AI工具了。 名称&#xff1a;AI-BOT工具集 这个网站精选1000AI工具&#xff0c;并持续每天更新添加&#xff0c;包括AI写作、AI绘画、AI音视频处理、AI平面设计、AI自动编…

Atcoder Beginner Contest 330——A~F题

A - Counting Passes Description Problem Statement N N N people labeled 1 , 2 , … , N 1,2,\dots,N 1,2,…,N took an exam, and person i i i scored A i A_i Ai​ points. Only those who scored at least L L L points pass this exam. Determine how many peopl…

SpringBoot:邮件发送

官网文档&#xff1a;39. Sending Email (spring.io)。 Sending Email Spring框架提供了JavaMailSender实例&#xff0c;用于发送邮件。 如果SpringBoot项目中包含了相关的启动器&#xff0c;那么就会自动装配一个Bean实例到项目中。 在SpringBoot项目中引入如下Email启动器&a…

Add, Divide and Floor(cf round 158 div2)

题目&#xff1a;给你一个整数数组 a1,a2,…,an 。在一次操作中&#xff0c;你可以选择一个整数 x &#xff0c;并用 (a[i]x)/2 替换 ai ( (a[i]x)/2表示将 y(a[i]x)/2舍入为最接近的整数(下取整)。 ⌊y⌋ 表示将 y 舍入为最接近的整数&#xff09;来替换从 1 到 n 的所有 i。…

【数据分享】2019-2023年我国区县逐月新房房价数据(Excel/Shp格式)

房价是一个城市发展程度的重要体现&#xff0c;一个城市的房价越高通常代表这个城市越发达&#xff0c;对于人口的吸引力越大&#xff01;因此&#xff0c;房价数据是我们在各项城市研究中都非常常用的数据&#xff01;之前我们分享过2019-2023年我国地级市逐月房价数据&#x…

Spring Boot 项目中读取 YAML 文件中的数组、集合和 HashMap

在 Spring Boot 项目中&#xff0c;我们经常使用 YAML 文件来配置应用程序的属性。在这篇博客中&#xff0c;我将模拟如何在 Java 的 Spring Boot 项目中读取 YAML 文件中的数组、集合和 HashMap。 1. 介绍 YAML&#xff08;YAML Aint Markup Language&#xff09;是一种人类…

OpenMp并行编程

目录 介绍编译用法>OpenMp parallel>OpenMp for>OpenMp private、firstprivate、lastprivate>OpenMp section>OpenMp reduction>OpenMp single>OpenMp master>OpenMp barrier OpenMp的API函数 介绍 OpenMp是一种并行编程模型&#xff0c;旨在简化多线…

【Spring集成MyBatis】MyBatis注解开发

文章目录 1. MyBatis的常用注解2. 基于注解的MyBatis增删改查增删改查完整代码加载映射关系测试代码 3. MyBatis的注解实现复杂映射开发一对一操作的实现一对一操作实现的第二种方式一对多操作的实现多对多操作实现 1. MyBatis的常用注解 2. 基于注解的MyBatis增删改查 使用注…

Linux加强篇004-Vim编辑器与Shell命令脚本

目录 前言 1. Vim文本编辑器 1.1 编写简单文档 1.2 配置主机名称 1.3 配置网卡信息 1.4 配置软件仓库 2. 编写Shell脚本 2.1 编写简单的脚本 2.2 接收用户的参数 2.3 判断用户的参数 3. 流程控制语句 3.1 if条件测试语句 3.2 for条件循环语句 3.3 while条件循环语…

【开源】基于JAVA的高校学院网站

项目编号&#xff1a; S 020 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S020&#xff0c;文末获取源码。} 项目编号&#xff1a;S020&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学院院系模块2.2 竞赛报名模块2.3 教…

Postman如何使用(三):使用数据文件

数据文件是非常强大的方式使用不同的测试数据来测试我们的API&#xff0c;以检查它们是否在各种情况下都能正常运行。我们可以认为数据文件是“Collection Runner”中每个请求的参数。下面&#xff0c;我们通过一个例子来说明如何使用数据文件。 这篇文章需要结合下面两个文件进…

史上最全前端知识点+高频面试题合集,十二大专题,命中率高达95%

前言&#xff1a; 下面分享一些关于阿里&#xff0c;美团&#xff0c;深信服等公司的面经&#xff0c;供大家参考一下。大家也可以去收集一些其他的面试题&#xff0c;可以通过面试题来看看自己有哪里不足。也可以了解自己想去的公司会问什么问题&#xff0c;进行有针对的复习。…

PowerShell基础

1. Tab键补全 有时候不记得指令全称&#xff0c;只记得开头几个字母&#xff0c;使用Tab键可显式建议选项&#xff0c;再次按Tab可以往后翻&#xff0c;ShiftTab可以往前翻。 2. 查看指令类型 Get-Command -Name Get-Alias 指令是遵循一定的格式规范的&#xff0c;如动词加名…

css之选择第一个或最后一个元素、第n个标签、选择偶数或奇数标签、选择最后n个标签、等差数列标签的选择、first、last、nth、child

MENU first-child选择列表中的第一个标签last-child选择列表中的最后一个标签nth-child(n)选择列表中的第n个标签nth-child(2n)选择列表中的偶数位标签nth-child(2n-1)选择列表中的奇数位标签nth-child(nm)选择从第m个到最后一个标签nth-child(-nm)选择从第1个到第m个nth-last-…

Python与设计模式--桥梁模式

11-Python与设计模式–桥梁模式 一、画笔与形状 在介绍原型模式的一节中&#xff0c;我们举了个图层的例子&#xff0c;这一小节内容&#xff0c;我们同样以类似画图的例子&#xff0c; 说明一种结构类设计模式&#xff1a;桥梁模式。 在一个画图程序中&#xff0c;常会见到这…

《数据结构与算法之美》读书笔记2

链表操作的技巧 1.理解指针 将摸个变量赋值给指针&#xff0c;实际上就是将这个变量的地址赋给指针&#xff0c;或者&#xff0c;指针中存储了这个变量的地址&#xff0c;指向了这个变量&#xff0c;所以可以通过指针找到这个变量。 2.内存泄漏或指针丢失 删除链表节点时&a…