【大数据面试知识点】Spark中的累加器

Spark累加器

累加器用来把Executor端变量信息聚合到Driver端,在driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge。

累加器一般是放在行动算子中进行操作的。

Spark累加器有哪些特点?

1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态

2)在Executor中修改它,在Driver读取

3)executor级别共享的,广播变量是task级别的共享两个application不可以共享累加器,但是同一个app不同的job可以共享

应用举例

不经过Shuffle实现词频统计

object Spark06_Accumulator {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))// 声明累加器val sumAcc: LongAccumulator = sc.longAccumulator("sumAcc")rdd.foreach {case (word, count) => {// 使用累加器sumAcc.add(count)}}// 累加器的toString方法//println(sumAcc)//取出累加器中的值println(sumAcc.value)sc.stop()}
}

不经过shuffle,计算以H开头的单词出现的次数。

object Spark07_MyAccumulator {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(List("Hello", "HaHa", "spark", "scala", "Hi", "Hello", "Hi"))// 创建累加器val myAcc = new MyAccumulator//注册累加器sc.register(myAcc, "MyAcc")rdd.foreach{datas => {// 使用累加器myAcc.add(datas)}}// 获取累加器的结果println(myAcc.value)sc.stop()}
}// 自定义累加器
// 泛型分别为输入类型和输出类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {// 定义输出数据变量var map: mutable.Map[String, Int] = mutable.Map[String, Int]()// 累加器是否为初始状态override def isZero: Boolean = map.isEmpty// 复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {val MyAcc = new MyAccumulator// 将此累加器中的数据赋值给新创建的累加器MyAcc.map = this.mapMyAcc}// 重置累加器override def reset(): Unit = {map.clear()}// 累加器添加元素override def add(v: String): Unit = {if (v.startsWith("H")) {// 判断map集合中是否已经存在此元素map(v) = map.getOrElse(v, 0) + 1}}// 合并累加器中的元素override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {val map1: mutable.Map[String, Int] = this.mapval map2: mutable.Map[String, Int] = other.value// 合并两个mapmap = map1.foldLeft(map2) {(m, kv) => {m(kv._1) = m.getOrElse(kv._1, 0) + kv._2m}}}// 获取累加器中的值override def value: mutable.Map[String, Int] = {map}
}

参考:Spark累加器的作用和使用-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/589475.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

12. 整数转罗马数字

罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 …

第14课 多维数组

文章目录 前言一、多维数组的定义二、多维数组的初始化三、多维数组的使用(以二维数组为例)1. 矩阵转置问题 三、课后练习1. 求一个m*n矩阵中所有元素的累加和2. 查找并输出一个m*n矩阵中的最小元素以及其在矩阵中的位置3. 将m*n矩阵A复制为m*n矩阵B&…

mysql 锁 事务 脏读 不可重复读 幻读

脏读:一个事务A读取到另外一个事务B没有提交的数据,本质是事务B对其他事务可见,未提交的数据被事务A读取了(读的是别人没提交的数据) 不可重复读:同一个事务A读了一条数据读了两次,两次返回的记录数据不一样。本质是事…

leetcode 贪心(分发糖果、K次取反后最大化的数组和、加油站)

1005.K次取反后最大化的数组和 给定一个整数数组 A,我们只能用以下方法修改该数组:我们选择某个索引 i 并将 A[i] 替换为 -A[i],然后总共重复这个过程 K 次。(我们可以多次选择同一个索引 i。) 以这种方式修改数组后…

LeetCode 1599. 经营摩天轮的最大利润,简单模拟+贪心

一、题目 1、题目描述 你正在经营一座摩天轮,该摩天轮共有 4 个座舱 ,每个座舱 最多可以容纳 4 位游客 。你可以 逆时针 轮转座舱,但每次轮转都需要支付一定的运行成本 runningCost 。摩天轮每次轮转都恰好转动 1 / 4 周。 给你一个长度为 n…

Linux报错:audit: backlog limit exceeded

今天,一台虚拟机上操作昨天打开的连接一直没响应,新打开连接连接不上。SSH校验不通过。 通过IT的后台,可以看到满屏的audit: backlog limit exceeded。 问题原因:audit服务记录的审计事件超出默认(或设置)数量 ,达到或…

YHZ012 Python 隐式类型转换

资源编号:YHZ012 配套视频:https://www.bilibili.com/video/BV1zy4y1Z7nk?p13 🐣 隐式类型转换 在隐式类型转换中,Python 会自动将一种数据类型转换为另一种数据类型,不需要我们去干预。 以下实例中,我们对…

生产问题一:redis锁处理幂等性失效

伪代码: Transactional(rollbackFor Exception.class)public void add(User user) {String key "key";RLock lock redissonClient.getLock(key);lock.lock();try {long count userMapper.selectCount(user);if (count 0) {userMapper.insert(user);…

边缘计算网关在温室大棚智能控制系统应用,开启农业新篇章

项目需求 ●目前大棚主要通过人为手动控温度、控水、控光照、控风,希望通过物联网技术在保障产量的前提下,提高作业效率,降低大棚总和管理成本。 ●释放部分劳动力,让农户有精力管理更多大棚,进而增加农户收入。 ●…

Python进行批量字符替换的3种方法

一、问题的提出 之前,我写过一篇如何在word中计算数学算式: 如何用Python批量计算Word中的算式-CSDN博客 为了计算算式,就需要对算式进行格式化,把不规则的算式转换成规则的算式,这时就会涉及到一些字符的批量替换。…

解决SpringBoot中出现的跨域请求问题

When allowCredentials is true, allowedOrigins cannot contain the special value "*"since that cannot be set on the “Access-Control-Allow-Origin” response header. To allow credentials to a set of origins, list them explicitly or consider using “…

如何在 Linux 中配置 firewalld 规则

什么是FirewallD “firewalld”是firewall daemon。它提供了一个动态管理的防火墙,带有一个非常强大的过滤系统,称为 Netfilter,由 Linux 内核提供。 FirewallD 使用zones和services的概念,而 iptables 使用chain和rules。与 ip…

【LLM-RAG】知识库问答 | 检索 | embedding

note RAG流程(写作论文中的background:公式设定、emb、召回内容、召回基准)(工作中的思路:嵌入模型、向量存储、向量存储检索器、LLM、query改写、RAG评测方法)仅为个人关于RAG的一些零碎总结,…

【网络面试(4)】协议栈和套接字及连接阶段的三次握手原理

1. 协议栈 一直对操作系统系统的内核协议栈理解的比较模糊,借着这一篇博客做一下简单梳理, 我觉得最直白的理解就是,内核协议栈就是操作系统中的一个网络控制软件,就是一段程序代码,它负责和网卡驱动程序交互&#xff…

Docker 从入门到实践:Docker介绍

前言 在当今的软件开发和部署领域,Docker已经成为了一个不可或缺的工具。Docker以其轻量级、可移植性和标准化等特点,使得应用程序的部署和管理变得前所未有的简单。无论您是一名开发者、系统管理员,还是IT架构师,理解并掌握Dock…

7.11全排列(LC46-M)

算法: 排列和组合很像,但是有顺序。 还是用回溯算法。 与组合不同之处(无startindex,有used数组): 首先排列是有序的,也就是说 [1,2] 和 [2,1] 是两个集合。 可以看出元素1在[1,2]中已经使…

大学物理II-作业1【题解】

1.【单选题】——考查高斯定理 下面关于高斯定理描述正确的是(D )。 A.高斯面上的电场强度是由高斯面内的电荷激发的 B.高斯面上的各点电场强度为零时,高斯面内一定没有电荷 C.通过高斯面的电通量为零时,高斯面上各点电场强度…

基于被囊群算法优化的Elman神经网络数据预测 - 附代码

基于被囊群算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于被囊群算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于被囊群优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要&#x…

2023-12-15 LeetCode每日一题(反转二叉树的奇数层)

2023-12-15每日一题 一、题目编号 2415. 反转二叉树的奇数层二、题目链接 点击跳转到题目位置 三、题目描述 给你一棵 完美 二叉树的根节点 root ,请你反转这棵树中每个 奇数 层的节点值。 例如,假设第 3 层的节点值是 [2,1,3,4,7,11,29,18] &…

lambda表达式和包装器

正文开始前给大家推荐个网站,前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 我们在使用库里的排序算法时如果排序的是自定义类型或者库里默认的排序不能满足我们则需求&…