源码版本:pyspark==3.1.2
- 1.combineByKey
- 2.reduceByKey
- 3.groupByKey
- 4.aggregateByKey
- 5.foldByKey
- 总结
1.combineByKey
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,numPartitions=None, partitionFunc=portable_hash):"""Generic function to combine the elements for each key using a customset of aggregation functions.Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combinedtype" C.Users provide three functions:- `createCombiner`, which turns a V into a C (e.g., createsa one-element list)- `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)- `mergeCombiners`, to combine two C's into a single one (e.g., mergesthe lists)To avoid memory allocation, both mergeValue and mergeCombiners are allowed tomodify and return their first argument instead of creating a new C.In addition, users can control the partitioning of the output RDD.Notes-----V and C can be different -- for example, one might group an RDD of type(Int, Int) into an RDD of type (Int, List[Int]).Examples-------->>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])>>> def to_list(a):... return [a]...>>> def append(a, b):... a.append(b)... return a...>>> def extend(a, b):... a.extend(b)... return a...>>> sorted(x.combineByKey(to_list, append, extend).collect())[('a', [1, 2]), ('b', [1])]"""if numPartitions is None:numPartitions = self._defaultReducePartitions()serializer = self.ctx.serializermemory = self._memory_limit()agg = Aggregator(createCombiner, mergeValue, mergeCombiners)def combineLocally(iterator):merger = ExternalMerger(agg, memory * 0.9, serializer)merger.mergeValues(iterator)return merger.items()locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)def _mergeCombiners(iterator):merger = ExternalMerger(agg, memory, serializer)merger.mergeCombiners(iterator)return merger.items()return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
重点看代码54~67行,代码60行为shuffle过程,在shuffle之前也就是代码59行,self.mapPartitions(combineLocally, preservesPartitioning=True)
,在map阶段对每个分区执行本地combine合并,传入的参数为54行定义的combineLocally
方法,该方法中merger.mergeValues(iterator)
定义了数据的merge方式,点进去看:
def mergeValues(self, iterator):""" Combine the items by creator and combiner """# speedup attribute lookupcreator, comb = self.agg.createCombiner, self.agg.mergeValuec, data, pdata, hfun, batch = 0, self.data, self.pdata, self._partition, self.batchlimit = self.memory_limitfor k, v in iterator:d = pdata[hfun(k)] if pdata else datad[k] = comb(d[k], v) if k in d else creator(v)c += 1if c >= batch:if get_used_memory() >= limit:self._spill()limit = self._next_limit()batch /= 2c = 0else:batch *= 1.5if get_used_memory() >= limit:self._spill()
其中,第四行中 creator, comb 分别是combineByKey方法中传入的参数createCombiner和mergeValue,第5行代码中c是一个计数器,记录上一次溢写磁盘到现在为止本地数据合并的条数,data初始值为空字典,pdata初始值为空数组,hfun是一个根据key 哈希取余的方法,用来获取key的分区编号,通过第9第10行代码可以看到,map阶段在第一次merge时是将所有<K, V>对保存到一个字典,同时根据我们在调用combineByKey方法时传入的mergeValue参数对字典中相同的key更新value值。在初次执行到代码15行self._spill()
也就是第一次溢写磁盘时,点进去查看溢写过程:
def _spill(self):"""dump already partitioned data into disks.It will dump the data in batch for better performance."""global MemoryBytesSpilled, DiskBytesSpilledpath = self._get_spill_dir(self.spills)if not os.path.exists(path):os.makedirs(path)used_memory = get_used_memory()if not self.pdata:# The data has not been partitioned, it will iterator the# dataset once, write them into different files, has no# additional memory. It only called when the memory goes# above limit at the first time.# open all the files for writingstreams = [open(os.path.join(path, str(i)), 'wb')for i in range(self.partitions)]for k, v in self.data.items():h = self._partition(k)# put one item in batch, make it compatible with load_stream# it will increase the memory if dump them in batchself.serializer.dump_stream([(k, v)], streams[h])for s in streams:DiskBytesSpilled += s.tell()s.close()self.data.clear()self.pdata.extend([{} for i in range(self.partitions)])else:for i in range(self.partitions):p = os.path.join(path, str(i))with open(p, "wb") as f:# dump items in batchself.serializer.dump_stream(iter(self.pdata[i].items()), f)self.pdata[i].clear()DiskBytesSpilled += os.path.getsize(p)self.spills += 1gc.collect() # release the memory as much as possibleMemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
第一次溢写时第13行代码中的self.pdata还是一个空数组,所以会先执行if not self.pdata:
下的这个分支,循环遍历<k, V>对,获取对应的分区编号并写入各自的分区文件中,同时将pdata数组中存入和分区个数相等的空字典。
这里解释一下为什么要将pdata数组中存入空字典,这就得回到
mergeValues
方法源码中的第9第10行代码,上面说了初始时会将所有的<K, V>对保存到一个字典,然后在溢写时逐条判断分区再写入,而第一次溢写之后通过增加空字典,后续将所有哈希取余结果相等的key保存到同一个字典,该字典在数组中的下标对应的就是这个字典里面所有的key被划分的分区编号
。
然后在第二次执行_spill
方法溢写时就会走else
的这个分支,从_spill
的源码中可以看到,第二次溢写时就是将pdata中的字典逐个批量的写入到对应的分区文件了。
再回到 combineByKey 的源码,在map端分区内预处理和shuffle之后,return返回的结果是shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
,这里传入的参数_mergeCombiners
就是代码62行定义的_mergeCombiners方法,该方法中merger.mergeCombiners(iterator)
定义了reduce中各map分区之间的数据合并方式,点进去看mergeCombiners方法的源码:
def mergeCombiners(self, iterator, limit=None):""" Merge (K,V) pair by mergeCombiner """if limit is None:limit = self.memory_limit# speedup attribute lookupcomb, hfun, objsize = self.agg.mergeCombiners, self._partition, self._object_sizec, data, pdata, batch = 0, self.data, self.pdata, self.batchfor k, v in iterator:d = pdata[hfun(k)] if pdata else datad[k] = comb(d[k], v) if k in d else vif not limit:continuec += objsize(v)if c > batch:if get_used_memory() > limit:self._spill()limit = self._next_limit()batch /= 2c = 0else:batch *= 1.5if limit and get_used_memory() >= limit:self._spill()
可以看到mergeCombiners和上面mergeValues的逻辑基本一致,区别在于第6行中comb
的值为self.agg.mergeCombiners,也就是在调用combineByKey方法时我们传入的mergeCombiners参数。
到此,combineByKey的源码已经解读完了,通过这个过程可以知道两点:
- combineByKey在map阶段会在每个分区内数据预处理,shuffle阶段传输的其实是预处理之后的结果。
- combineByKey因为mergeValue、mergeCombiners参数我们可以自定义传入,所以适合处理map端和reduce端数据处理逻辑不相同的业务场景。
2.reduceByKey
reduceByKey的源码就很简单了
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):"""Merge the values for each key using an associative and commutative reduce function.This will also perform the merging locally on each mapper beforesending results to a reducer, similarly to a "combiner" in MapReduce.Output will be partitioned with `numPartitions` partitions, orthe default parallelism level if `numPartitions` is not specified.Default partitioner is hash-partition.Examples-------->>> from operator import add>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> sorted(rdd.reduceByKey(add).collect())[('a', 2), ('b', 1)]"""return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
就一句话:return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
,底层调用的还是combineByKey方法,同时将我们传入的参数func
同时作为combineByKey方法的mergeValue和mergeCombiners参数值。
这表明:
- reduceByKey同样会在map端按照我们传入的
func
对分区内数据预处理。 - map端与reduce端数据处理逻辑一致。
3.groupByKey
groupByKey的源码实现:
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):"""Group the values for each key in the RDD into a single sequence.Hash-partitions the resulting RDD with numPartitions partitions.Notes-----If you are grouping in order to perform an aggregation (such as asum or average) over each key, using reduceByKey or aggregateByKey willprovide much better performance.Examples-------->>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> sorted(rdd.groupByKey().mapValues(len).collect())[('a', 2), ('b', 1)]>>> sorted(rdd.groupByKey().mapValues(list).collect())[('a', [1, 1]), ('b', [1])]"""def createCombiner(x):return [x]def mergeValue(xs, x):xs.append(x)return xsdef mergeCombiners(a, b):a.extend(b)return amemory = self._memory_limit()serializer = self._jrdd_deserializeragg = Aggregator(createCombiner, mergeValue, mergeCombiners)def combine(iterator):merger = ExternalMerger(agg, memory * 0.9, serializer)merger.mergeValues(iterator)return merger.items()locally_combined = self.mapPartitions(combine, preservesPartitioning=True)shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)def groupByKey(it):merger = ExternalGroupBy(agg, memory, serializer)merger.mergeCombiners(it)return merger.items()return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
可以看到在代码41行的shuffle执行之前在各分区内同样进行了一次mapPartition操作,参数combine
就是35行定义的combine方法,该方法里在创建merger
对象时传入的参数agg
在代码33行被创建。而在创建agg
对象时,传入的参数createCombiner、mergeValue、mergeCombiners其实就是代码20~29行定义的三个方法。所以代码37行的mergeValues
其实就是在调用20行的createCombiner
和23行的mergeValue
,也就是在map阶段将每个分区内的数据根据key分组,将相同key的值存放到一个列表中,也就是由<K1,V1>,<K1, V2>, … 转换成了<K1,[V1, V2, …]>。同样reduce端shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
中的mapPartitions的参数groupByKey
来自43行定义的groupByKey方法,该方法内部merger.mergeCombiners(it)调用的就是代码27行定义的mergeCombiners
方法,可以看出reduce端的处理就是把不同map拉过来的数据,将key相同的value列表直接extend合并。
从这个过程可以看出:groupByKey就是简单的将所有数据根据key分组,在map端没有数据预聚合之类的操作,只是将相同key的value统一保存到一个列表中。在shuffle过程中传输的是<K1,[V1, V2, …]>这样的数据结构。
4.aggregateByKey
def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,partitionFunc=portable_hash):"""Aggregate the values of each key, using given combine functions and a neutral"zero value". This function can return a different result type, U, than the typeof the values in this RDD, V. Thus, we need one operation for merging a V intoa U and one operation for merging two U's, The former operation is used for mergingvalues within a partition, and the latter is used for merging values betweenpartitions. To avoid memory allocation, both of these functions areallowed to modify and return their first argument instead of creating a new U."""def createZero():return copy.deepcopy(zeroValue)return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc)
该方法底层调用的还是combineByKey方法,不同的是combineByKey方法的第一个位置参数createCombiner
的值为lambda v: seqFunc(createZero(), v)
,也就是首先用我们给定的分区内计算规则seqFunc
对我们传递进来的初始值zeroValue和当前<K, V>对中的value进行了一次seqFunc计算,所以每个分区的处理结果其实是包含了初始值zeroValue在内的计算结果。
和reduceBykey方法对比,reduceBykey方法在底层调用combineByKey方法时第一个参数为lambda x: x
,也就是将<K, V>中的V原样返回。aggregateByKey方法在底层调用combineByKey方法时,第一个参数lambda v: seqFunc(createZero(), v)
,也就是将<K, V>中的V和传递进来的初始值zeroValue计算并返回计算后的结果。以add加法举例来说,reduceBykey各分区在map端合并后的结果为分区内所有V的和,而aggregateByKey各分区在map合并后的结果为分区内所有V的和+初始值zeroValue。
从在调用该方法时传递的参数也可以看到,combineByKey的mergeValue其实就是aggregateByKey的seqFunc,combineByKey的mergeCombiners是aggregateByKey的combFunc,所以aggregateByKey的适用场景也是map端处理逻辑和reduce端处理逻辑不一致的业务场景。
5.foldByKey
def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):"""Merge the values for each key using an associative function "func"and a neutral "zeroValue" which may be added to the result anarbitrary number of times, and must not change the result(e.g., 0 for addition, or 1 for multiplication.).Examples-------->>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])>>> from operator import add>>> sorted(rdd.foldByKey(0, add).collect())[('a', 2), ('b', 1)]"""def createZero():return copy.deepcopy(zeroValue)return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions,partitionFunc)
有没有发现和上面的aggregateByKey极其的相似,唯一区别在于调用combineByKey时,combineByKey的参数mergeValue和mergeCombiners都是foldByKey中传递的func,这就很明显了,在map阶段的预处理需要加初始值,且map和reduce中的处理逻辑一致时可用aggregateByKey也可用foldByKey;若map和reduce中的处理逻辑不一致,则只能用aggregateByKey。
总结
1.上面这么多的…ByKey,其实最底层的就是combineByKey,这个方法中的三个参数所表示的含义:
1.createCombiner:定义分区内第一条被处理的数据的转换规则
2.mergeValue:定义各map分区内数据之间的计算规则。(发生在shuffle之前)
3.mergeCombiners:定义不同的map分区之间的数据合并规则。(发生在shuffle之后)
2.使用场景:
(1)combineByKey:reduceByKey、aggregateByKey、foldByKey的底层调用方法,会在map端根据传入的规则对分区内数据进行预处理。注意这里的预处理未必就是聚合,这取决于我们传入的规则。且map阶段的处理规则和reduce阶段的处理规则可以不一致。
(2)reduceByKey:底层调用combineByKey,所以会存在map端的预处理,且map阶段的处理规则和reduce阶段的处理规则一致。
(3)groupByKey:map阶段和reduce阶段的处理逻辑都是将相同key的value存放到一个列表。map和reduce两端都不涉及到数据聚合操作。
(4)aggregateByKey:可以理解为,map阶段带初始值的combineByKey操作。
(5)foldByKey:等价于map阶段带初始值的reduceByKey操作。