今天要解决的问题是如何使用spark sql 建表,插入数据以及查询数据
1、建立一个类叫 DeltaLakeWithSparkSql1,具体代码如下,例子参考Delta Lake Up & Running第3章内容
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;import java.text.SimpleDateFormat;
import java.util.Date;public class DeltaLakeWithSparkSql1 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxi(" +"RideID INT,"+"PickupTime TIMESTAMP,"+"CabNumber STRING)" +"USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi'");// 插入5条记录spark.sql("DESCRIBE TABLE taxidb.YellowTaxi").show();spark.sql("INSERT INTO taxidb.YellowTaxi (RideID,PickupTime,CabNumber) values (1,'2013-10-13 10:13:15','51-96')");spark.sql("INSERT INTO taxidb.YellowTaxi (RideID,PickupTime,CabNumber) values (2,'2013-10-13 10:13:15','51-96')");spark.sql("INSERT INTO taxidb.YellowTaxi (RideID,PickupTime,CabNumber) values (3,'2013-10-13 10:13:15','51-96')");spark.sql("INSERT INTO taxidb.YellowTaxi (RideID,PickupTime,CabNumber) values (4,'2013-10-13 10:13:15','51-96')");spark.sql("INSERT INTO taxidb.YellowTaxi (RideID,PickupTime,CabNumber) values (5,'2013-10-13 10:13:15','51-96')");System.out.println("不分区查询开始时间(含毫秒): " + sdf.format(new Date()));spark.sql("SELECT RideID,PickupTime,CabNumber FROM taxidb.YellowTaxi").show();System.out.println("不分区查询结束时间(含毫秒): " + sdf.format(new Date()));spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiPartitioned(" +"RideID INT,"+"PickupTime TIMESTAMP,"+"CabNumber STRING)" +"USING DELTA PARTITIONED BY(RideID) LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiPartitioned'");spark.sql("DESCRIBE TABLE taxidb.YellowTaxiPartitioned").show();var df=spark.read().format("delta").table("taxidb.YellowTaxi");//将数据复制到分区表df.write().format("delta").mode(SaveMode.Overwrite).save("file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiPartitioned");System.out.println("分区查询开始时间(含毫秒): " + sdf.format(new Date()));spark.sql("SELECT RideID,PickupTime,CabNumber FROM taxidb.YellowTaxiPartitioned").show();System.out.println("分区查询结束时间(含毫秒): " + sdf.format(new Date()));spark.close();}
}
代码主要实现建立一个表名为YellowTaxi,插入5条数据,然后查询YellowTaxi这5条数据,再建立一个表YellowTaxiPartitioned,YellowTaxiPartitioned是分区表。然后从YellowTaxi获取数据并写入到YellowTaxiPartitioned,再查询YellowTaxiPartitioned这5条数据
2、IDEA运行结果如下:
具体文字内容如下,从结果可以看出分区表的查询效率要比不分区表要好,后面建表还是要用分区表。
+----------+---------+-------+
| col_name|data_type|comment|
+----------+---------+-------+
| RideID| int| NULL|
|PickupTime|timestamp| NULL|
| CabNumber| string| NULL|
+----------+---------+-------+
不分区查询开始时间(含毫秒): 2024-05-01 11:29:39.655
+------+-------------------+---------+
|RideID| PickupTime|CabNumber|
+------+-------------------+---------+
| 1|2013-10-13 10:13:15| 51-96|
| 2|2013-10-13 10:13:15| 51-96|
| 4|2013-10-13 10:13:15| 51-96|
| 3|2013-10-13 10:13:15| 51-96|
| 5|2013-10-13 10:13:15| 51-96|
+------+-------------------+---------+
不分区查询结束时间(含毫秒): 2024-05-01 11:29:40.130
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| RideID| int| NULL|
| PickupTime|timestamp| NULL|
| CabNumber| string| NULL|
|# Partition Infor...| | |
| # col_name|data_type|comment|
| RideID| int| NULL|
+--------------------+---------+-------+
分区查询开始时间(含毫秒): 2024-05-01 11:29:42.052
+------+-------------------+---------+
|RideID| PickupTime|CabNumber|
+------+-------------------+---------+
| 4|2013-10-13 10:13:15| 51-96|
| 3|2013-10-13 10:13:15| 51-96|
| 1|2013-10-13 10:13:15| 51-96|
| 2|2013-10-13 10:13:15| 51-96|
| 5|2013-10-13 10:13:15| 51-96|
+------+-------------------+---------+
分区查询结束时间(含毫秒): 2024-05-01 11:29:42.198