spark中结合源码理解reduceByKey、groupByKey、combineByKey等几个ByKey算子的区别

源码版本:pyspark==3.1.2

  • 1.combineByKey
  • 2.reduceByKey
  • 3.groupByKey
  • 4.aggregateByKey
  • 5.foldByKey
  • 总结

1.combineByKey

    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,numPartitions=None, partitionFunc=portable_hash):"""Generic function to combine the elements for each key using a customset of aggregation functions.Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combinedtype" C.Users provide three functions:- `createCombiner`, which turns a V into a C (e.g., createsa one-element list)- `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)- `mergeCombiners`, to combine two C's into a single one (e.g., mergesthe lists)To avoid memory allocation, both mergeValue and mergeCombiners are allowed tomodify and return their first argument instead of creating a new C.In addition, users can control the partitioning of the output RDD.Notes-----V and C can be different -- for example, one might group an RDD of type(Int, Int) into an RDD of type (Int, List[Int]).Examples-------->>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])>>> def to_list(a):...     return [a]...>>> def append(a, b):...     a.append(b)...     return a...>>> def extend(a, b):...     a.extend(b)...     return a...>>> sorted(x.combineByKey(to_list, append, extend).collect())[('a', [1, 2]), ('b', [1])]"""if numPartitions is None:numPartitions = self._defaultReducePartitions()serializer = self.ctx.serializermemory = self._memory_limit()agg = Aggregator(createCombiner, mergeValue, mergeCombiners)def combineLocally(iterator):merger = ExternalMerger(agg, memory * 0.9, serializer)merger.mergeValues(iterator)return merger.items()locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)def _mergeCombiners(iterator):merger = ExternalMerger(agg, memory, serializer)merger.mergeCombiners(iterator)return merger.items()return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)

重点看代码54~67行,代码60行为shuffle过程,在shuffle之前也就是代码59行,self.mapPartitions(combineLocally, preservesPartitioning=True),在map阶段对每个分区执行本地combine合并,传入的参数为54行定义的combineLocally方法,该方法中merger.mergeValues(iterator)定义了数据的merge方式,点进去看:

    def mergeValues(self, iterator):""" Combine the items by creator and combiner """# speedup attribute lookupcreator, comb = self.agg.createCombiner, self.agg.mergeValuec, data, pdata, hfun, batch = 0, self.data, self.pdata, self._partition, self.batchlimit = self.memory_limitfor k, v in iterator:d = pdata[hfun(k)] if pdata else datad[k] = comb(d[k], v) if k in d else creator(v)c += 1if c >= batch:if get_used_memory() >= limit:self._spill()limit = self._next_limit()batch /= 2c = 0else:batch *= 1.5if get_used_memory() >= limit:self._spill()

其中,第四行中 creator, comb 分别是combineByKey方法中传入的参数createCombiner和mergeValue,第5行代码中c是一个计数器,记录上一次溢写磁盘到现在为止本地数据合并的条数,data初始值为空字典,pdata初始值为空数组,hfun是一个根据key 哈希取余的方法,用来获取key的分区编号,通过第9第10行代码可以看到,map阶段在第一次merge时是将所有<K, V>对保存到一个字典,同时根据我们在调用combineByKey方法时传入的mergeValue参数对字典中相同的key更新value值。在初次执行到代码15行self._spill()也就是第一次溢写磁盘时,点进去查看溢写过程:

    def _spill(self):"""dump already partitioned data into disks.It will dump the data in batch for better performance."""global MemoryBytesSpilled, DiskBytesSpilledpath = self._get_spill_dir(self.spills)if not os.path.exists(path):os.makedirs(path)used_memory = get_used_memory()if not self.pdata:# The data has not been partitioned, it will iterator the# dataset once, write them into different files, has no# additional memory. It only called when the memory goes# above limit at the first time.# open all the files for writingstreams = [open(os.path.join(path, str(i)), 'wb')for i in range(self.partitions)]for k, v in self.data.items():h = self._partition(k)# put one item in batch, make it compatible with load_stream# it will increase the memory if dump them in batchself.serializer.dump_stream([(k, v)], streams[h])for s in streams:DiskBytesSpilled += s.tell()s.close()self.data.clear()self.pdata.extend([{} for i in range(self.partitions)])else:for i in range(self.partitions):p = os.path.join(path, str(i))with open(p, "wb") as f:# dump items in batchself.serializer.dump_stream(iter(self.pdata[i].items()), f)self.pdata[i].clear()DiskBytesSpilled += os.path.getsize(p)self.spills += 1gc.collect()  # release the memory as much as possibleMemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20

第一次溢写时第13行代码中的self.pdata还是一个空数组,所以会先执行if not self.pdata:下的这个分支,循环遍历<k, V>对,获取对应的分区编号并写入各自的分区文件中,同时将pdata数组中存入和分区个数相等的空字典。

这里解释一下为什么要将pdata数组中存入空字典,这就得回到mergeValues方法源码中的第9第10行代码,上面说了初始时会将所有的<K, V>对保存到一个字典,然后在溢写时逐条判断分区再写入,而第一次溢写之后通过增加空字典,后续将所有哈希取余结果相等的key保存到同一个字典,该字典在数组中的下标对应的就是这个字典里面所有的key被划分的分区编号

然后在第二次执行_spill方法溢写时就会走else的这个分支,从_spill的源码中可以看到,第二次溢写时就是将pdata中的字典逐个批量的写入到对应的分区文件了。

再回到 combineByKey 的源码,在map端分区内预处理和shuffle之后,return返回的结果是shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True),这里传入的参数_mergeCombiners就是代码62行定义的_mergeCombiners方法,该方法中merger.mergeCombiners(iterator)定义了reduce中各map分区之间的数据合并方式,点进去看mergeCombiners方法的源码:

    def mergeCombiners(self, iterator, limit=None):""" Merge (K,V) pair by mergeCombiner """if limit is None:limit = self.memory_limit# speedup attribute lookupcomb, hfun, objsize = self.agg.mergeCombiners, self._partition, self._object_sizec, data, pdata, batch = 0, self.data, self.pdata, self.batchfor k, v in iterator:d = pdata[hfun(k)] if pdata else datad[k] = comb(d[k], v) if k in d else vif not limit:continuec += objsize(v)if c > batch:if get_used_memory() > limit:self._spill()limit = self._next_limit()batch /= 2c = 0else:batch *= 1.5if limit and get_used_memory() >= limit:self._spill()

可以看到mergeCombiners和上面mergeValues的逻辑基本一致,区别在于第6行中comb的值为self.agg.mergeCombiners,也就是在调用combineByKey方法时我们传入的mergeCombiners参数。

到此,combineByKey的源码已经解读完了,通过这个过程可以知道两点:

  1. combineByKey在map阶段会在每个分区内数据预处理,shuffle阶段传输的其实是预处理之后的结果。
  2. combineByKey因为mergeValue、mergeCombiners参数我们可以自定义传入,所以适合处理map端和reduce端数据处理逻辑不相同的业务场景。

2.reduceByKey

reduceByKey的源码就很简单了

    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):"""Merge the values for each key using an associative and commutative reduce function.This will also perform the merging locally on each mapper beforesending results to a reducer, similarly to a "combiner" in MapReduce.Output will be partitioned with `numPartitions` partitions, orthe default parallelism level if `numPartitions` is not specified.Default partitioner is hash-partition.Examples-------->>> from operator import add>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> sorted(rdd.reduceByKey(add).collect())[('a', 2), ('b', 1)]"""return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

就一句话:return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc),底层调用的还是combineByKey方法,同时将我们传入的参数func同时作为combineByKey方法的mergeValue和mergeCombiners参数值。

这表明:

  1. reduceByKey同样会在map端按照我们传入的func对分区内数据预处理。
  2. map端与reduce端数据处理逻辑一致。

3.groupByKey

groupByKey的源码实现:

    def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):"""Group the values for each key in the RDD into a single sequence.Hash-partitions the resulting RDD with numPartitions partitions.Notes-----If you are grouping in order to perform an aggregation (such as asum or average) over each key, using reduceByKey or aggregateByKey willprovide much better performance.Examples-------->>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> sorted(rdd.groupByKey().mapValues(len).collect())[('a', 2), ('b', 1)]>>> sorted(rdd.groupByKey().mapValues(list).collect())[('a', [1, 1]), ('b', [1])]"""def createCombiner(x):return [x]def mergeValue(xs, x):xs.append(x)return xsdef mergeCombiners(a, b):a.extend(b)return amemory = self._memory_limit()serializer = self._jrdd_deserializeragg = Aggregator(createCombiner, mergeValue, mergeCombiners)def combine(iterator):merger = ExternalMerger(agg, memory * 0.9, serializer)merger.mergeValues(iterator)return merger.items()locally_combined = self.mapPartitions(combine, preservesPartitioning=True)shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)def groupByKey(it):merger = ExternalGroupBy(agg, memory, serializer)merger.mergeCombiners(it)return merger.items()return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)

可以看到在代码41行的shuffle执行之前在各分区内同样进行了一次mapPartition操作,参数combine就是35行定义的combine方法,该方法里在创建merger 对象时传入的参数agg在代码33行被创建。而在创建agg对象时,传入的参数createCombiner、mergeValue、mergeCombiners其实就是代码20~29行定义的三个方法。所以代码37行的mergeValues其实就是在调用20行的createCombiner和23行的mergeValue,也就是在map阶段将每个分区内的数据根据key分组,将相同key的值存放到一个列表中,也就是由<K1,V1>,<K1, V2>, … 转换成了<K1,[V1, V2, …]>。同样reduce端shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)中的mapPartitions的参数groupByKey来自43行定义的groupByKey方法,该方法内部merger.mergeCombiners(it)调用的就是代码27行定义的mergeCombiners方法,可以看出reduce端的处理就是把不同map拉过来的数据,将key相同的value列表直接extend合并。

从这个过程可以看出:groupByKey就是简单的将所有数据根据key分组,在map端没有数据预聚合之类的操作,只是将相同key的value统一保存到一个列表中。在shuffle过程中传输的是<K1,[V1, V2, …]>这样的数据结构。

4.aggregateByKey

    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,partitionFunc=portable_hash):"""Aggregate the values of each key, using given combine functions and a neutral"zero value". This function can return a different result type, U, than the typeof the values in this RDD, V. Thus, we need one operation for merging a V intoa U and one operation for merging two U's, The former operation is used for mergingvalues within a partition, and the latter is used for merging values betweenpartitions. To avoid memory allocation, both of these functions areallowed to modify and return their first argument instead of creating a new U."""def createZero():return copy.deepcopy(zeroValue)return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc)

该方法底层调用的还是combineByKey方法,不同的是combineByKey方法的第一个位置参数createCombiner的值为lambda v: seqFunc(createZero(), v),也就是首先用我们给定的分区内计算规则seqFunc对我们传递进来的初始值zeroValue和当前<K, V>对中的value进行了一次seqFunc计算,所以每个分区的处理结果其实是包含了初始值zeroValue在内的计算结果。

和reduceBykey方法对比,reduceBykey方法在底层调用combineByKey方法时第一个参数为lambda x: x,也就是将<K, V>中的V原样返回。aggregateByKey方法在底层调用combineByKey方法时,第一个参数lambda v: seqFunc(createZero(), v),也就是将<K, V>中的V和传递进来的初始值zeroValue计算并返回计算后的结果。以add加法举例来说,reduceBykey各分区在map端合并后的结果为分区内所有V的和,而aggregateByKey各分区在map合并后的结果为分区内所有V的和+初始值zeroValue。

从在调用该方法时传递的参数也可以看到,combineByKey的mergeValue其实就是aggregateByKey的seqFunc,combineByKey的mergeCombiners是aggregateByKey的combFunc,所以aggregateByKey的适用场景也是map端处理逻辑和reduce端处理逻辑不一致的业务场景。

5.foldByKey

    def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):"""Merge the values for each key using an associative function "func"and a neutral "zeroValue" which may be added to the result anarbitrary number of times, and must not change the result(e.g., 0 for addition, or 1 for multiplication.).Examples-------->>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> from operator import add>>> sorted(rdd.foldByKey(0, add).collect())[('a', 2), ('b', 1)]"""def createZero():return copy.deepcopy(zeroValue)return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions,partitionFunc)

有没有发现和上面的aggregateByKey极其的相似,唯一区别在于调用combineByKey时,combineByKey的参数mergeValue和mergeCombiners都是foldByKey中传递的func,这就很明显了,在map阶段的预处理需要加初始值,且map和reduce中的处理逻辑一致时可用aggregateByKey也可用foldByKey;若map和reduce中的处理逻辑不一致,则只能用aggregateByKey。

总结

1.上面这么多的…ByKey,其实最底层的就是combineByKey,这个方法中的三个参数所表示的含义:

1.createCombiner:定义分区内第一条被处理的数据的转换规则
2.mergeValue:定义各map分区内数据之间的计算规则。(发生在shuffle之前)
3.mergeCombiners:定义不同的map分区之间的数据合并规则。(发生在shuffle之后)

2.使用场景:

(1)combineByKey:reduceByKey、aggregateByKey、foldByKey的底层调用方法,会在map端根据传入的规则对分区内数据进行预处理。注意这里的预处理未必就是聚合,这取决于我们传入的规则。且map阶段的处理规则和reduce阶段的处理规则可以不一致。
(2)reduceByKey:底层调用combineByKey,所以会存在map端的预处理,且map阶段的处理规则和reduce阶段的处理规则一致。
(3)groupByKey:map阶段和reduce阶段的处理逻辑都是将相同key的value存放到一个列表。map和reduce两端都不涉及到数据聚合操作。
(4)aggregateByKey:可以理解为,map阶段带初始值的combineByKey操作。
(5)foldByKey:等价于map阶段带初始值的reduceByKey操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86031.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

详解TCP/IP协议第四篇:数据在网络中传输方式的分类概述

文章目录 前言 一&#xff1a;面向有连接型与面向无连接型 1&#xff1a;大致概念 2&#xff1a;面向有连接型 3&#xff1a;面向无连接型 二&#xff1a;电路交换与分组交换 1&#xff1a;分组交换概念 2&#xff1a;分组交交换过程 三&#xff1a;根据接收端数量分…

@SpringBootApplication注解说明(InsCode AI 创作助手)

SpringBootApplication 是 Spring Boot 中的一个关键注解&#xff0c;用于标识一个类是 Spring Boot 应用程序的主应用程序类。在这篇文章中&#xff0c;我们将详细解释 SpringBootApplication 注解以及它在 Spring Boot 应用程序中的作用。 SpringBootApplication 注解的作用…

【操作系统】聊聊CPU上下文切换实操

如何查看系统的上下文切换情况 上一篇文章我们说了过多的上下文切换&#xff0c;会把CPU时间消耗在寄存器、内核栈以及虚拟内存等数据的保存和恢复上&#xff0c;那么当出现系统的上下文切换过多的时候&#xff0c;我们如果通过监控指标查看呢。 vmstat 是一个常用的系统性能…

免费玩云上大数据--海汼部落实验室

玩大数据遇到的问题 大家好&#xff0c;这次分享一个免费的大数据部署工具&#xff0c;并非是给人家打广告&#xff0c;试过了真的爽。 学习大数据的人都知道&#xff0c;如果用VMware模拟Linux搭建大数据集群的话我们需要很高的内存和硬盘内存&#xff0c;随随便便跑一下mapre…

【云原生】Kubernetes学习笔记

部署 在部署前强调几点 不要使用IPv6, 很多组件都不支持IPv6不要使用最新版本, 最新版本非常不稳定, 甚至可能存在无法运行的bug不要版本更新, 安装后就将版本固定下来, 新的版本可能会引入新功能, 或移除旧功能, 导致Kubernetes无法运行 Kubeadm介绍 K8s是由多个模块构成的…

2023华为杯数模C题——大规模创新类竞赛评审方案研究

B题——大规模创新类竞赛评审方案研究 思路&#xff1a;采用数据分析等手段改进评分算法性能 完成情况(1-2问已经完成) 代码下载 问题一 在每个评审阶段&#xff0c;作品通常都是随机分发的&#xff0c;每份作品需要多位评委独立评审。为了增加不同评审专家所给成绩之间的可比…

解决因为修改SELINUX配置文件出错导致Faild to load SELinux poilcy无法进入CentOS7系统的问题

一、问题 最近学习Kubernetes&#xff0c;需要设置永久关闭SELINUX,结果修改错了一个SELINUX配置参数&#xff0c;关机重新启动后导致无法进入CentOS7系统&#xff0c;卡在启动进度条界面。 二、解决 多次重启后&#xff0c;在启动日志中发现 Faild to load SELinux poilcy…

简单的自托管书签服务NeonLink

什么是 NeonLink &#xff1f; NeonLink 是一个简单且开源的自托管书签服务。它是轻量级的&#xff0c;使用最少的依赖项&#xff0c;并且易于通过 Docker 安装。由于系统要求较低&#xff0c;该应用程序非常适合部署在 RaspberryPI 上。 安装 在群晖上以 Docker 方式安装。 …

【ES6】

ES6 1 ES6简介1.1 什么是ES61.2 为什么使用ES6 2 ES6的新增语法2.1 let2.2 const2.3 let、const、var的区别2.4 解构赋值2.4.1 数组解构2.4.2 对象解构 2.5 箭头函数2.6 剩余参数 3 ES6的内置对象扩展3.1 Array的扩展方法3.1.1 扩展运算符(展开语法)3.1.2 构造函数方法&#xf…

Docker 部署 Bitwarden RS 服务

Bitwarden RS 服务是官方 Bitwarden server API 的 Rust 重构版。因为 Bitwarden RS 必须要通过 https 才能访问, 所以在开始下面的步骤之前, 建议先参考 《Ubuntu Nginx 配置 SSL 证书》 配置好域名和 https 访问。 部署 Bitwarden RS 拉取最新版本的 docker.io/vaultwarden…

vite + vue3 的项目中使用 vitest 做单元测试(仅供参考)

一、配置文件 // vitest.config.tsimport { fileURLToPath } from node:url import { mergeConfig, defineConfig } from vite import { configDefaults } from vitest/config // import viteConfig from ./vite.configimport vue from vitejs/plugin-vue import vueJsx from …

第一百五十二回 自定义组件综合实例:游戏摇杆三

文章目录 内容回顾优化性能示例代码我们在上一章回中介绍了 如何实现游戏摇杆相关的内容,本章回中将继续介绍这方面的知识.闲话休提,让我们一起Talk Flutter吧。 内容回顾 我们在前面章回中介绍了游戏摇杆的概念以及实现方法,并且通过示例代码演示了实现游戏摇杆的整个过程…

Windows安装cuda和cudnn教程最新版(2023年9月)

文章目录 cudacudnn cuda 查看电脑的cuda最高驱动版本&#xff08;适用于N卡电脑-Nvidia&#xff09; winR打开命令行&#xff0c;输入nvidia-smi 右上角cuda -version就是目前支持的最高cuda版本&#xff0c;目前是12.2 nvidia官网下载cuda 下载地址&#xff1a;https://d…

基于eBPF的安卓逆向辅助工具——stackplz

前言 stackplz是一款基于eBPF技术实现的追踪工具&#xff0c;目的是辅助安卓native逆向&#xff0c;仅支持64位进程&#xff0c;主要功能如下&#xff1a; hardware breakpoint 基于pref_event实现的硬件断点功能&#xff0c;在断点处可读取寄存器信息&#xff0c;不会被用户…

C/C++自定义读取ini、cfg配置文件

常见cfg、ini文件如下: [config1] setting192.168.1.1 [config2] setting192.168.1.2 [config3] setting192.168.1.3 示例代码使用 // opt_ini.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 //#include <iostream> #include "cfg.h"…

短信登录功能如何实现?

简介&#xff1a; 在日常生活中我们登录/注册某些网站/APP是通常可以选择 密码登录和手机号登录。 为什么手机号发送后会有验证码返回呢&#xff1f; 网站如何识别我的验证码是否正确&#xff1f; 如果我的个人网站也想要实现短信登录功能&#xff0c;具体该如何实现&#xff1…

获取文件创建时间

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl Java源码 public void testGetFileTime() {try {String string "E://test.txt";File file new File(string);Path path file.toPath();BasicFileAttributes ba…

RedHat 服务器安装NGINX

参照官方文档&#xff1a;nginx: Linux packages 按顺序操作&#xff1a; 安装前提&#xff1a; sudo yum install yum-utils 设置yum仓库&#xff08;执行命令的时候会自动新建文件&#xff09;&#xff1a; sudo vi /etc/yum.repos.d/nginx.repo 粘贴下面的内容保存退出…

一个电子信息工程学生的历程和内心感想

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、我对大学三年专业课程的理解二、我为什么本科选择研究嵌入式这个方向&#xff1f;1.可以把理论变为实际应用——兴趣是最好的老师。2.嵌入式方向可以打的比赛非…

如何把利用paddlepaddle导出的json文件转化为yolo或者voc文件

目录 1. 修改源码&#xff0c;让模型能够生成出对于单个图像的标注。 2. 把数据转为yolo格式 3.把yolo格式转化为xml格式 这两天想偷懒&#xff0c;想让模型先在数据上标一遍&#xff0c;然后我再做修正&#xff0c;主要是图个省事。由于我们主要是利用paddle,模型也是基于p…