Spark RDD(弹性分布式数据集)是Spark中的核心抽象,它代表一个不可变、分区的分布式数据集合。下面是一些常用的RDD算子:
转换算子:
-
map(func):对RDD中的每个元素应用给定的函数,返回一个新的RDD。
-
filter(func):对RDD中的每个元素应用给定的函数,返回满足条件的元素组成的新的RDD。
-
flatMap(func):对RDD中的每个元素应用给定的函数并返回一个迭代器,将所有迭代器的元素组合成一个新的RDD。
-
distinct():去除RDD中的重复元素,返回一个包含唯一元素的新的RDD。
-
groupByKey():对具有相同键的元素进行分组,返回一个键值对的RDD。
-
sortByKey():按照键对RDD中的元素进行排序,返回一个键值对的RDD。
-
join(otherRDD):将两个RDD按照键进行连接操作,返回一个键值对的RDD。
-
union(otherRDD):将两个RDD进行合并,返回一个包含两个RDD所有元素的新的RDD。
-
aggregateByKey(zeroValue)(seqOp, combOp):对每个键的元素进行聚合操作,返回一个键值对的RDD。
行动算子:
-
collect():将RDD中的所有元素以数组的形式返回到驱动程序。
-
count():返回RDD中的元素数量。
-
first():返回RDD中的第一个元素。
-
take(n):返回RDD中的前n个元素。
-
reduce(func):使用给定的函数对RDD中的元素进行归约操作,返回一个元素。
-
foreach(func):对RDD中的每个元素应用给定的函数。
-
saveAsTextFile(path):将RDD中的元素保存为文本文件。
-
saveAsObjectFile(path):将RDD中的元素保存为序列化的对象文件。
进行shuffle操作的一些常见算子:
-
groupByKey()
: 将具有相同键的元素分组到一起,并创建一个键值对的RDD。这个操作会导致数据的重新洗牌,将具有相同键的数据移动到同一个分区。 -
reduceByKey()
: 通过对具有相同键的值进行reduce操作来合并数据,并创建一个键值对的RDD。这个操作也会导致数据的重新洗牌,将具有相同键的数据移动到同一个分区。 -
sortByKey()
: 根据键对RDD进行排序。这个操作需要将数据重新洗牌,将具有相同键的数据移动到同一个分区。 -
join()
: 在两个键值对的RDD之间执行内连接操作。这个操作会对两个RDD进行重新洗牌,并将具有相同键的数据移动到同一个分区。 -
cogroup()
: 将具有相同键的两个RDD的数据进行分组,并返回键值对的RDD。这个操作会对两个RDD进行重新洗牌,将具有相同键的数据移动到同一个分区。 -
distinct()
: 去除RDD中的重复元素,并返回一个新的RDD。这个操作需要将数据进行重新洗牌,以确保在整个数据集上去重。
groupByKey()和reduceByKey()区别:
如果只需要将具有相同键的值分组起来,而不进行聚合计算,可以使用groupByKey()
。
而如果需要对具有相同键的值进行聚合计算,并返回一个键值对的RDD,可以使用reduceByKey()
。
在性能方面,尽量使用reduceByKey()
来减少数据的传输和处理开销。
1.map(func):
# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 对RDD中的每个元素进行平方操作
squaredRDD = rdd.map(lambda x: x**2)# 输出结果
print(squaredRDD.collect()) # [1, 4, 9, 16, 25]
2.filter(func):
# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 过滤RDD中的偶数元素
filteredRDD = rdd.filter(lambda x: x % 2 == 0)# 输出结果
print(filteredRDD.collect()) # [2, 4]
3.flatMap(func):
# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 对RDD中的每个元素进行重复操作
flatMapRDD = rdd.flatMap(lambda x: [x, x, x])# 输出结果
print(flatMapRDD.collect()) # [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5]
4.distinct():
# 创建RDD
rdd = sparkContext.parallelize([1, 2, 2, 3, 4, 4, 5])# 去除RDD中的重复元素
distinctRDD = rdd.distinct()# 输出结果
print(distinctRDD.collect()) # [1, 2, 3, 4, 5]
5.reduce(func):
# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 对RDD中的元素求和
sum = rdd.reduce(lambda x, y: x + y)# 输出结果
print(sum) # 15
6.groupByKey():
# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 'a'), (2, 'b'), (1, 'c'), (2, 'd')])# 按键进行分组
groupedRDD = rdd.groupByKey()# 输出结果
for key, values in groupedRDD.collect():print(key, list(values))
# 1 ['a', 'c']
# 2 ['b', 'd']
7.sortByKey():
# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 'c'), (2, 'b'), (3, 'a')])# 按键进行排序
sortedRDD = rdd.sortByKey()# 输出结果
print(sortedRDD.collect()) # [(1, 'c'), (2, 'b'), (3, 'a')]
8.join(otherRDD):
# 创建两个键值对的RDD
rdd1 = sparkContext.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sparkContext.parallelize([(1, 'c'), (2, 'd')])# 按键进行连接
joinedRDD = rdd1.join(rdd2)# 输出结果
print(joinedRDD.collect()) # [(1, ('a', 'c')), (2, ('b', 'd'))]
9.union(otherRDD):
# 创建两个RDD
rdd1 = sparkContext.parallelize([1, 2, 3])
rdd2 = sparkContext.parallelize([4, 5, 6])# 合并两个RDD
unionRDD = rdd1.union(rdd2)# 输出结果
print(unionRDD.collect()) # [1, 2, 3, 4, 5, 6]
10.aggregateByKey(zeroValue)(seqOp, combOp):
# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 2), (1, 4), (2, 3), (2, 5)])# 对每个键的元素进行求和操作
sumRDD = rdd.aggregateByKey(0, lambda x, y: x + y, lambda a, b: a + b)# 输出结果
print(sumRDD.collect()) # [(1, 6), (2, 8)]