一、创建DataSet
DataSet与RDD相似,但是,它们不使用Java序列化或Kryo,而是使用专用的Encoder对对象进行序列化以进行网络处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但是编码器是动态生成的代码,并使用一种格式,该格式允许Spark执行许多操作,例如过滤,排序和哈希处理,而无需将字节反序列化回对象。
Encoder<Man> manEncoder = Encoders.bean(Man.class);
Dataset<Man> manDS = spark.createDataset(Collections.singletonList(man),manEncoder
);Encoder<String> strEncoder = Encoders.STRING();
Dataset<String> strDS = spark.createDataset(Arrays.asList("a", "b", "c"), strEncoder);
二、RDD转DataSet
Spark SQL支持两种将现有RDD转换为数据集的方法。第一种方法使用反射来推断包含特定对象类型的RDD的架构。这种基于反射的方法可以使代码更简洁,并且在编写Spark应用程序时已经了解架构时可以很好地工作。
创建数据集的第二种方法是通过编程界面,该界面允许您构造模式,然后将其应用于现有的RDD。尽管此方法较为冗长,但可以在运行时才知道列及其类型的情况下构造数据集。
(1)使用反射推断表的模式
Spark SQL支持自动将JavaBean的RDD 转换为DataFrame,使用反射得到Bean信息,定义了表的模式。Spark SQL不支持包含Map字段的JavaBean 。 不过,支持嵌套JavaBean和List或Array字段。
可以通过创建一个实现Serializable并为其所有字段具有getter和setter的类来创建JavaBean。
JavaRDD<Man> manRDD = spark.read().textFile("/test.txt").javaRDD().map(line -> {String[] splits = line.split(",");Man man = new Man();man.setName(splits[0]);man.setDesc(splits[1]);return man;});Dataset<Row> manDF = spark.createDataFrame(manRDD, Man.class);
(2)以编程方式指定架构
当无法提前定义JavaBean类时(例如,记录的结构编码为字符串,或者将解析文本数据集,并且为不同的用户设计不同的字段),Dataset<Row>
可以通过三个步骤以编程方式创建a 。
Row
从原始RDD创建一个的RDD;- 在第1步中创建的RDD中创建
StructType
与Row
s的结构匹配 表示的模式。 Row
通过通过createDataFrame
提供的方法将架构应用于的RDDSparkSession
。
JavaRDD<String> manRDD = ...
String cols = "name,desc";
List<StructField> fields = new LinkedList<>();
for (String fieldName : cols.split(",")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = manRDD.map((Function<String, Row>) record -> {String[] splits = record.split(",");return RowFactory.create(splits[0], splits[1].trim());
});Dataset<Row> manDataFrame = spark.createDataFrame(rowRDD, schema);
三、DataSet转RDD
toJavaRDD方法直接将DataSet转RDD
JavaRDD<Row> javaRDD = df.toJavaRDD();