今天尝试一下将昨天的数据操作建立的表换成分区表,参考Delta Lake Up and Running做法用分区表的方式来更新数据。还要比较一下分区表的查询与非分区表的查询,结果显示分区表的查询速度要比非分区表要快。直接上代码:
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;public class DeltaLakePartitionCURD {//将字符串转换成java.sql.Timestamppublic static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {SimpleDateFormat sf = new SimpleDateFormat(dateFormat);Date date = null;try {date = sf.parse(strDate);} catch (Exception e) {e.printStackTrace();}java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());return dateSQL;}public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.databricks.delta.autoCompact.enabled", "true").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String savePath="D:\\bigdata\\detla-lake-with-java\\YellowTaxiPartitioned";String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";String tableName = "taxidb.YellowTaxiPartitioned";String savePath2="D:\\bigdata\\detla-lake-with-java\\YellowTaxi";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义表DeltaTable.createIfNotExists(spark).tableName(tableName).addColumn("RideId","INT").addColumn("VendorId","INT").addColumn("PickupTime","TIMESTAMP").addColumn("DropTime","TIMESTAMP").partitionedBy("VendorId").location(savePath).execute();//加载csv数据并导入delta表var df=spark.read().format("delta").table(tableName);var schema=df.schema();System.out.println(schema.simpleString());var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);System.out.println("记录总行数:"+df_for_append.count());System.out.println("导入数据,开始时间"+ sdf.format(new Date()));df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("导入数据,结束时间" + sdf.format(new Date()));DeltaTable deltaTable = DeltaTable.forName(spark,tableName);deltaTable.optimize().executeZOrderBy("RideId");//插入数据List<Row> list = new ArrayList<Row>();list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));var yellowTaxipDF=spark.createDataFrame(list,schema);//建立需要新增数据并转换成dataframeSystem.out.println("插入数据,开始时间"+ sdf.format(new Date()));yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);System.out.println("插入数据,结束时间"+ sdf.format(new Date()));System.out.println("插入后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);System.out.println("更新数据,开始时间"+ sdf.format(new Date()));spark.read().format("delta").load(savePath).where("VendorId==-1 and RideId==-1").withColumn("PickupTime", functions.lit("2023-11-01 10:00:00").cast(DataTypes.TimestampType)).write().format("delta").option("replaceWhere","VendorId==-1").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("更新数据,结束时间"+ sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//更新数据System.out.println("更新前数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);System.out.println("更新数据,开始时间"+ sdf.format(new Date()));deltaTable.update(functions.col("RideId").equalTo("-1"),new HashMap<String, Column>() {{put("PickupTime", functions.lit("2023-01-01 10:00:00").cast(DataTypes.TimestampType));}});System.out.println("更新数据,结束时间"+ sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//查询数据System.out.println("不分区表查询数据,开始时间"+ sdf.format(new Date()));spark.read().format("delta").load(savePath2).where("VendorId==4 and RideId==859744").show(false);System.out.println("不分区表查询数据,结束时间" + sdf.format(new Date()));System.out.println("分区表查询数据,开始时间"+ sdf.format(new Date()));spark.read().format("delta").load(savePath).where("VendorId==4 and RideId==859744").show(false);System.out.println("分区表查询数据,结束时间" + sdf.format(new Date()));}
}
最终运行结果:
从最后一张图红框可以看到比较结果,分区表查询效率要高一点。