文章目录
- 1、创建RDD的三种方式
- 2、Spark对RDD的操作
- 2.1、Transformations(转换)
- 2.2、Actions(动作)
1、创建RDD的三种方式
Spark提供三种创建RDD方式:集合、本地文件、HDFS文件。详细可以查看RDD和pair RDD文档
使用例子SparkRdd.java
ppackage com.penngo.rdd;import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;/****/
public class SparkRdd {public static void main(String[] args) {//windows下调试spark需要使用https://github.com/steveloughran/winutilsSystem.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-3.3.1");System.setProperty("HADOOP_USER_NAME", "root");SparkSession spark = SparkSession.builder().appName("List2Rdd").master("local[*]").getOrCreate();JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());// 集合转成RDDList<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2);JavaRDD<Integer> rdd1 = sc.parallelize(data);int sum1 = rdd1.reduce((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);System.out.println("sum1============"+sum1);// 读取本地文件/hadoop文件转成RDDString path = "D:\\project\\data.txt";//path = "hdfs://testspark:9000/data.txt"JavaRDD<String> rddLine = sc.textFile(path, 2);JavaRDD<Integer> rdd2 = rddLine.flatMap((FlatMapFunction<String, Integer>) s -> {String[] strs = s.split(",");List<Integer> list = new ArrayList<>();for(String str:strs){list.add(Integer.valueOf(str.trim()));}return list.iterator();});int sum2 = rdd2.reduce((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);System.out.println("sum2============"+sum2);JavaPairRDD<String, Integer> pairs = rdd2.mapToPair(s -> new Tuple2("num_" + s, 1));JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);counts.foreach(tp2->{System.out.println(tp2._1 + "=" + tp2._2);});spark.stop();}
}
2、Spark对RDD的操作
Spark对RDD的操作可以整体分为两类:
Transformation(转换):表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等。
Action(执行)表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序
2.1、Transformations(转换)
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。 |
filter(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。 |
flatMap(func) | 与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item) |
mapPartitions(func) | 与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型。 |
mapPartitionsWithIndex(func) | 与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。 |
sample(withReplacement, fraction, seed) | 样本数据,设置是否放回(withReplacement)、采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。 |
union(otherDataset) | 返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集。 |
intersection(otherDataset) | 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集。 |
distinct([numTasks])) | 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。 |
groupByKey([numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset。 注意 : 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好。 |
reduceByKey(func, [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 reduce 函数 func 来聚合,它必须为 (V,V) => V 类型。像 groupByKey 一样,可通过第二个可选参数来配置 reduce 任务的数量。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 combine 函数和一个中间的 “zero” 值来聚合,它必须为 (V,V) => V 类型。为了避免不必要的配置,可以使用一个不同与 input value 类型的 aggregated value 类型。 |
sortByKey([ascending], [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset。 |
join(otherDataset, [numTasks]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoin 和fullOuterJoin 来实现。 |
cogroup(otherDataset, [numTasks]) | 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。这个操作也调用了 groupWith。 |
cartesian(otherDataset) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。 |
pipe(command, [envVars]) | 通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。 |
coalesce(numPartitions) | Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。 |
repartition(numPartitions) | Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。 |
2.2、Actions(动作)
Action | 意思 |
---|---|
reduce(func) | 使用函数 func 聚合数据集(dataset)中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它可以被并行地正确计算。 |
collect() | 在驱动程序中,以一个数组的形式返回数据集的所有元素。这在返回足够小(sufficiently small)的数据子集的过滤器(filter)或其他操作(other operation)之后通常是有用的。 |
count() | 返回数据集中元素的个数。 |
first() | 返回数据集中的第一个元素(类似于 take(1))。 |
take(n) | 将数据集中的前 n 个元素作为一个数组返回。 |
takeSample(withReplacement, num, [seed]) | 对一个数据集随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。 |
takeOrdered(n, [ordering]) | 返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。 |
saveAsTextFile(path) | 将数据集中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。 |
saveAsSequenceFile(path) | 将数据集中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int、Double、String 等等)。 |
saveAsObjectFile(path) | 使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。 |
countByKey() | 仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)对 的 hashmap。 |
foreach(func) | 对数据集中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个累加器(Accumulator)或与外部存储系统(external storage systems)进行交互。注意:修改除 foreach() 之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 理解闭包(Understanding closures) 部分。 |
参考自官方文档