Spark---累加器和广播变量

文章目录

  • 1.累加器实现原理
  • 2.自定义累加器
  • 3.广播变量

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)var sum=0dataRdd.foreach(num=>sum+=num)println(sum)context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))val sum = context.longAccumulator("sum")dataRdd.foreach(num=>{//使用累加器sum.add(num)})//获取累加器的值println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqiimport bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable/*** 使用累加器完成WordCount案例*/
object Spark_addDemo {def main(args: Array[String]): Unit = {//建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")//创建累加器对象val wordCountAccumulator = new WordCountAccumulator//向Spark中进行注册context.register(wordCountAccumulator,"wordCountAccumulator")//实现累加dataRDD.foreach(word => {wordCountAccumulator.add(word)})//获取累加结果,打印在控制台上println(wordCountAccumulator.value)//关闭链接context.stop()}}class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{//定义一个map用于存储累加后的结果var map: mutable.Map[String, Long] =mutable.Map[String,Long]()//累加器是否为初始状态override def isZero: Boolean = {map.isEmpty}//复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new WordCountAccumulator()}//重置累加器override def reset(): Unit = {map.clear()}//向累加器添加数据INoverride def add(word: String): Unit = {// 查询 map 中是否存在相同的单词// 如果有相同的单词,那么单词的数量加 1// 如果没有相同的单词,那么在 map 中增加这个单词val newValue = map.getOrElse(word, 0L) + 1map.update(word,newValue)}//合并累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {var map1=this.mapvar map2=other.value//合并两个mapmap2.foreach({case (word,count)=>{val newValue = map1.getOrElse(word,0L)+countmap1.update(word,newValue)}})}//返回累加器的结果(OUT)override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

3.广播变量

在Apache Spark中,广播变量(Broadcast Variables)是一种特殊类型的变量,用于优化数据共享。当Spark应用程序在集群中的多个节点上运行时,每个节点都需要访问相同的数据集。如果数据集很大,那么将这些数据发送到每个节点可能会非常耗时和低效。 广播变量提供了一种方法,可以在集群中的所有节点上共享数据集的一个只读副本 ,从而避免了在每个节点上重复发送数据。

在这里插入图片描述
广播变量也叫分布式只读变量,它可以将闭包数据发送到每个Executor的内存中来达到共享的目的,Executor其实就相当于一个JVM,在启动的时候会自动分配内存。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val rdd1 = context.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))// 声明广播变量val broadcast: Broadcast[List[(String, Int)]] = context.broadcast(list)val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {case (key, num) => {var num2 = 0// 使用广播变量for ((k, v) <- broadcast.value) {if (k == key) {num2 = v}}(key, (num, num2))}}resultRDD.collect().foreach(print)context.stop()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629480.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈敏捷开发的思维

什么是敏捷 Agile&#xff08;敏捷&#xff09;来源于敏捷宣言&#xff0c;宣言明确指出&#xff0c;“敏捷”&#xff1a; 不是一种方法论也不是开发软件的具体方法更不是一个框架或者过程 “敏捷”是一套价值观&#xff08;理念&#xff09;和原则&#xff0c;帮助团队在软…

PPO 跑CartPole-v1

gym-0.26.2 cartPole-v1 参考动手学强化学习书中的代码,并做了一些修改 代码 import gym import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import matplotlib.pyplot as plt from tqdm import tqdmclass PolicyNet(nn.Module):def __…

HTML--JavaScript--语法基础

变量与常量 这个基本上没啥问题 变量命名规则&#xff1a; 变量由字母、数字、下划线、$组成&#xff0c;且变量第一个字符不能为数字 变量不能是系统关键字和保留字 语法&#xff1a; var 变量名 值&#xff1b;所有Javacript变量都由var声明 定义赋值字符串&#xff1a; …

GaussDB(DWS)查询优化技术大揭秘

GaussDB(DWS)查询优化技术大揭秘 大数据时代&#xff0c;数据量呈爆发式增长&#xff0c;经常面临百亿、千亿数据查询场景&#xff0c;当数据仓库数据量较大、SQL语句执行效率低时&#xff0c;数据仓库性能会受到影响。本文将深入讲解在GaussDB(DWS)中如何进行表结构设计&#…

【Web】websocket应用的是哪个协议

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;Web ⛳️ 功不唐捐&#xff0c;玉汝于成 前言 在当今互联网时代&#xff0c;实时性和即时通讯成为网络应用日益重要的一部分。WebSocket 协议作为一种创新性的通信协议&#xff0c;极大地改善了…

C语言——编译和链接

&#xff08;图片由AI生成&#xff09; 0.前言 C语言是最受欢迎的编程语言之一&#xff0c;以其接近硬件的能力和高效性而闻名。理解C语言的编译和链接过程对于深入了解其运行原理至关重要。本文将详细介绍C语言的翻译环境和运行环境&#xff0c;重点关注编译和链接的各个阶段…

Architecture Lab:预备知识2【汇编call/leave/ret指令、CS:APP练习4.4】

chap4的练习4.4&#xff08;page.255&#xff09;让用Y86-64实现rsum&#xff08;递归求数组元素之和&#xff09;&#xff0c;提示为&#xff1a;先得到x86-64汇编代码&#xff0c;然后转换成Y86-64的 这是rsum的c实现&#xff1a; long rsum(long *start, long count) {if …

【面试合集】说说微信小程序的发布流程?

面试官&#xff1a;说说微信小程序的发布流程&#xff1f; 一、背景 在中大型的公司里&#xff0c;人员的分工非常仔细&#xff0c;一般会有不同岗位角色的员工同时参与同一个小程序项目。为此&#xff0c;小程序平台设计了不同的权限管理使得项目管理者可以更加高效管理整个团…

微软推出付费版Copilot

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 微软已经超越苹果&#xff0c;成了全球市值最高的公司&#xff0c;其他公司都因为AI大裁员&#xff0c;而微软正好相反&#xff0c;当然这个原因很简单&#xff1a;就是微软强制把AI全面接入到系统里来了。而Copilot…

Mac系统下,保姆级Jenkins自动化部署Android

一、Jenkins自动化部署 1、安装jenkins 官网&#xff1a;macOS Installers for Jenkins LTS 选择macOS brew install jenkins-lts 安装最新: brew install jenkins-lts 启动jenkins服务: brew services start jenkins-lts 重启jenkins服务: brew services restart jenkin…

web开发学习笔记(2.js)

1.引入 2.js的两种引入方式 3.输出语句 4.全等运算符 5.定义函数 6.数组 7.数组属性 8.字符串对象的对应方法 9.自定义对象 10.json对象 11.bom属性 12.window属性 13.定时刷新时间 14.跳转网址 15.DOM文档对象模型 16.获取DOM对象&#xff0c;根据DOM对象来操作网页 如下图…

基于杂交PSO算法的风光储微网日前优化调度(MATLAB实现)

微网中包含&#xff1a;风电、光伏、储能、微型燃气轮机&#xff0c;以最小化电网购电成本、光伏风机的维护成本、蓄电池充放电维护成本、燃气轮机运行成本及污染气体治理成本为目标&#xff0c;综合考虑&#xff1a;功率平衡约束、燃气轮机爬坡约束、电网交换功率约束、储能装…

【GCC】6 接收端实现:周期构造RTCP反馈包

基于m98代码。GCC涉及的代码,可能位于:webrtc/modules/remote_bitrate_estimator webrtc/modules/congestion_controller webrtc/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.cc webrtc 之 RemoteEstimatorProxy 对 remote_bitrate_estimator 的 RemoteEstimato…

Vue 富文本实现内容项目倒序

应用场景&#xff1a; 比如写计划和待办事项&#xff0c;内容少还好&#xff0c;内容多了最新的内容就放在下面了&#xff0c;每次打开要滚动到最后才能看到&#xff0c;这时可以使用倒序把最新的排在最前面。 倒序前&#xff1a; 倒序后&#xff1a; 倒序代码&#xff1a; …

设计模式⑥ :访问数据结构

文章目录 一、前言二、Visitor 模式1. 介绍2. 应用3. 总结 三、Chain of Responsibility 模式1. 介绍2. 应用3. 总结 参考内容 一、前言 有时候不想动脑子&#xff0c;就懒得看源码又不像浪费时间所以会看看书&#xff0c;但是又记不住&#xff0c;所以决定开始写"抄书&q…

ElasticSearch概述+SpringBoot 集成ES

ES概述 开源的、高扩展的、分布式全文检索引擎【站内搜索】 解决问题 1.搜索词是一个整体时&#xff0c;不能拆分&#xff08;mysql整体连续&#xff09; 2.效率会低&#xff0c;不会用到索引&#xff08;mysql索引失效&#xff09; 解决方式 进行数据的存储&#xff08;只存储…

【51单片机系列】继电器使用

文章来源&#xff1a;《零起点学Proteus单片机仿真技术》。 本文是关于继电器使用相关内容。 继电器广泛应用在工业控制中&#xff0c;通过继电器对其他大电流的电器进行控制。 继电器控制原理图如下。继电器部分包括控制线圈和3个引脚&#xff0c;A引脚接电源&#xff0c;B引…

排序算法9----计数排序(C)

计数排序是一种非比较排序&#xff0c;不比较大小 。 1、思想 计数排序又称为鸽巢原理&#xff0c;是对哈希直接定址法的变形应用。 2、步骤 1、统计数据&#xff1a;统计每个数据出现了多少次。&#xff08;建立一个count数组&#xff0c;范围从[MIN,MAX],MAX代表arr中…

.Net 8.0 Web API Controllers 添加到 windows 服务

示例源码下载&#xff1a;https://download.csdn.net/download/hefeng_aspnet/88747022 创建 Windows 服务的方法之一是从工作线程服务模板开始。 但是&#xff0c;如果您希望能够让它托管 API 控制器&#xff08;也许是为了查看它正在运行的进程的状态&#xff09;&#xff0…

深入浅出Spring AOP

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff0c;咱们今天要聊的是Java中Spring框架的AOP&#xff08;面向切面编程&#xff09;。对于程序员来说&#xff0c;理解AOP对于掌握Spring框架来说是超级关键的。它像是魔法一样&#xff0c;能让咱们在不改变原有代码的…