一、创建DataSet
使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame 。
(1)基于JSON的内容创建一个DataFrame
//hdfs
Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd
RDD<String> jsonRDD = ...
Dataset<Row> df = spark.read().json(jsonRDD);//dataset
Dataset<String> jsonDataset = ...
Dataset<Row> df = spark.read().json(dataSet);
(2)基于parquet的内容创建一个DataFrame
//hdfs
Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");
(3)基于orc的内容创建一个DataFrame
//hdfs
Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");
(4)基于txt的内容创建一个DataFrame
//hdfs 创建只有value列的数据
Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");
(5)基于cvs的内容创建一个DataFrame
//hdfs
Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");
(6)基于jdbc的内容创建一个DataFrame
Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load();
df1.show();Properties properties = new Properties();
properties.put("user", "root");
properties.put("password","admin");
properties.put("driver", "com.mysql.jdbc.Driver");
Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man", "man", properties);
df2.show();
(7)基于textFile的内容创建一个DataSet
//hdfs
Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");
(8)rdd创建DataSet
//反射推断StructType
JavaRDD<Person> peopleRDD = ...
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//编程方式指定StructType
String schemaString = ...
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim());
});
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
二、DataSet操作
(1)schema结构
df.printSchema();
StructType type = df.schema();
(2)map一对一映射操作
//dataframe格式转换
Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema()));
df1.show();//dataframe格式转换
StructField structField = new StructField("name", DataTypes.StringType, true, null);
StructType structType = new StructType(new StructField[]{structField});
Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType));
df2.show();//dataSet格式转换
Dataset<String> dfs = df.map(v-> v.getAs("name"), Encoders.STRING());
dfs.show();
(3)flatMap一对多映射操作
//dataSet格式转换
Dataset<String> dfs = df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING());
dfs.show();
(4)filter过滤操作
Dataset<Row> df1 = df.filter(new Column("name").$eq$eq$eq("mk"));
Dataset<Row> df2 = df.filter(new Column("name").notEqual("mk"));
(5)withColumn加列或者覆盖
Dataset<Row> df1 = df.withColumn("name1", functions.col("name"));
df1.show();
Dataset<Row> df2 = df.withColumn("name", functions.lit("a"));
df2.show();
Dataset<Row> df3 = df.withColumn("name", functions.concat(functions.col("name"), functions.lit("zzz")));
df3.show();
(6)select选择列
Dataset<Row> df1 = df.select(functions.concat(functions.col("name"), functions.lit("zzz")).as("name1"));
df1.show();
Dataset<Row> df2 = df.select(functions.col("name"), functions.concat(functions.col("name"), functions.lit("zzz")).as("name1"));
df2.show();
(7)selectExpr表达式选择列
Dataset<Row> df1 = df.selectExpr("name", "'a' as name1");
df1.show();
(8)groupBy agg分组统计
Dataset<Row> df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc"));
df1.show();
(9)drop删除列
Dataset<Row> df1 = df.drop("name");
df1.show();
(10)distinct去重
Dataset<Row> df1 = df.distinct();
df1.show();
(11)dropDuplicates 根据字段去重
Dataset<Row> df1 = df.dropDuplicates("name");
df1.show();
(12)summary统计count、mean、stddev、min、max、25%、50%、75%,支持统计类型过滤
Dataset<Row> df1 = df.summary("count");
df1.show();
(13)describe统计count、mean、stddev、min、max,支持列过滤
Dataset<Row> df1 = df.describe("name");
df1.show();
(14)sort 排序
Dataset<Row> df1 = df.sort(functions.col("name").asc());
df1.show();
(15)limit 分页
Dataset<Row> df1 = df.limit(1);
df1.show();
三、DataSet连接
(1)join连接
Dataset<Row> df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer");
df1.show();Dataset<Row> df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")));
df2.show();
(2)crossJoin笛卡尔连接
Dataset<Row> df1 = df.as("a").crossJoin(df.as("b"));
df1.show();
四、DataSet集合运算
(1)except差集
Dataset<Row> df1 = df.except(df.filter("name='mk'"));
df1.show();
(2)union并集,根据列位置合并行,列数要一致
Dataset<Row> df1 = df.union(df.filter("name='mk'"));
df1.show();
(3)unionByName并集,根据列名合并行,不同名报错,列数要一致
Dataset<Row> df1 = df.unionByName(df.filter("name='mk'"));
df1.show();
(4)intersect交集
Dataset<Row> df1 = df.intersect(df.filter("name='mk'"));
df1.show();
五、DataSet分区
repartition(numPartitions:Int):RDD[T]
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现
假设RDD有N个分区,需要重新划分成M个分区
1、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。
2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false。
在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
3、如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能。
如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。
DataSet的coalesce是Repartition shuffle=false的简写方法
Dataset<Row> df1 = df.coalesce(1);
Dataset<Row> df2 = df.repartition(1);