Delta lake with Java--数据增删改查

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java结合真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

package detal.lake.java;import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;public class DeltaLakeCURD {//将字符串转换成java.sql.Timestamppublic static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {SimpleDateFormat sf = new SimpleDateFormat(dateFormat);java.util.Date date = null;try {date = sf.parse(strDate);} catch (Exception e) {e.printStackTrace();}java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());return dateSQL;}public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.databricks.delta.autoCompact.enabled", "true").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";String tableName = "taxidb.YellowTaxis";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义表DeltaTable.createIfNotExists(spark).tableName(tableName).addColumn("RideId","INT").addColumn("VendorId","INT").addColumn("PickupTime","TIMESTAMP").addColumn("DropTime","TIMESTAMP").location(savePath).execute();//加载csv数据并导入delta表var df=spark.read().format("delta").table(tableName);var schema=df.schema();System.out.println(schema.simpleString());var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);System.out.println("记录总行数:"+df_for_append.count());System.out.println("导入数据,开始时间"+  sdf.format(new Date()));df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("导入数据,结束时间" + sdf.format(new Date()));DeltaTable deltaTable = DeltaTable.forName(spark,tableName);//插入数据List<Row> list = new ArrayList<Row>();list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));StructType structType = DataTypes.createStructType(structFields);var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframeSystem.out.println("插入数据,开始时间"+  sdf.format(new Date()));yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);System.out.println("插入数据,结束时间"+  sdf.format(new Date()));System.out.println("插入后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//更新数据System.out.println("更新前数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);System.out.println("更新数据,开始时间"+  sdf.format(new Date()));deltaTable.updateExpr("RideId = 999994",new HashMap<String, String>() {{put("VendorId", "250");}});System.out.println("更新数据,结束时间"+  sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);//查询数据System.out.println("查询数据,开始时间"+  sdf.format(new Date()));var selectDf= deltaTable.toDF().select("*").where("RideId=1");selectDf.show(false);System.out.println("查询数据,结束时间" + sdf.format(new Date()));//删除数据System.out.println("删除数据,开始时间"+  sdf.format(new Date()));deltaTable.delete("RideId=1");System.out.println("删除数据,结束时间"+  sdf.format(new Date()));deltaTable.toDF().select("*").where("RideId=1").show(false);}
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/7532.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java如何实现简单的随机抽奖功能?

在很多应用场景中&#xff0c;我们常常需要从一系列候选者中随机抽取若干名幸运儿。这里&#xff0c;我们将通过一个具体的例子来展示如何在Java中实现这一功能。我们的例子中包含一个比赛活动&#xff0c;活动结束后需要随机抽取投给获胜队伍的用户作为奖品的接收者。 抽奖逻…

【JAVA |基础】运算符、程序逻辑控制以及方法的使用

目录 一、前言 二、操作符 1.算术运算符 2.赋值运算符 3.比较运算符 4.逻辑运算符 5.条件&#xff08;三目、三元&#xff09;运算符 6.位运算符(都是基于二进制来计算) 三、 程序逻辑控制 1.顺序结构 2.分支结构 if语句 Switch语句 3.循环结构 while语句 for循环…

thinkphp5 中控制器的创建和使用方法

在 ThinkPHP 5 中&#xff0c;控制器&#xff08;Controller&#xff09;是用于处理请求、执行逻辑操作并返回响应的类。以下是在 ThinkPHP 5 中创建和使用控制器的基本方法&#xff1a; 1. 创建控制器 在 ThinkPHP 5 中&#xff0c;控制器通常位于 application/index/contro…

6.Docker端口映射与容器互联

文章目录 端口映射与容器互联1、端口映射实现容器访问1.1、从外部访问容器应用1.2 映射所有接口的地址1.3 映射到指定地址的指定端口1.4 映射到指定地址的任意端口1.5 查看映射端口配置 2、互联机制实现容器互访2.1、自定义容器名称2.2、容器互联 端口映射与容器互联 在生产实…

Hive3.0新特性:Materialized Views 物化视图

Materialized Views 物化视图 在 Apache Hive 3.0 中引入了物化视图&#xff08;Materialized Views&#xff09;的支持&#xff0c;它们是预先计算并缓存了查询结果的数据结构&#xff0c;以提高查询性能和降低延迟。物化视图通过将查询的结果存储在物理表中来实现&#xff0…

算法提高之玉米田

算法提高之玉米田 核心思想&#xff1a;状态压缩dp 将图存入g数组 存的时候01交换一下方便后面判断即g数组中0为可以放的地方 state中1为放的地方 这样只要state为1 g为0就可以判断不合法 #include <iostream>#include <cstring>#include <algorithm>#includ…

桥接模式类图与代码

欲开发一个绘图软件&#xff0c;要求使用不同的绘图程序绘制不同的图形。以绘制直线和圆形为例&#xff0c;对应的绘图程序如表 7.7 所示。 根据绘图软件的扩展性要求&#xff0c;该绘图软件将不断扩充新的图形和新的绘图程序。为了避免出现类爆炸的情况&#xff0c;现采用桥接…

Application exit(Out of memory)

Qt for WebAssembly 开发的网页&#xff0c;在 iOS 设备上打开会提示&#xff1a;Out of memory 如图&#xff1a; 解决办法&#xff1a; 环境&#xff1a;Qt 6.7.0 WebAssembly multi-threaded Emscripten Compiler 3.1.50 在CMakeLists.txt 中增加&#xff1a; set_tar…

【Osek网络管理测试】[TG4_TC4]tWaitBusSleep

🙋‍♂️ 【Osek网络管理测试】系列💁‍♂️点击跳转 文章目录 1.环境搭建2.测试目的3.测试步骤4.预期结果5.测试结果1.环境搭建 硬件:VN1630 软件:CANoe 2.测试目的 验证DUT的tWBS时间参数是否符合NM标准 本处规定tWBS在[1350ms,1650ms]范围内符合要求 3.测试步骤…

使用Docker安装MySQL5.7.36

拉取镜像并查看 docker pull mysql:5.7.36拉取成功后查看&#xff08;非必须&#xff09; docker images创建并设置宿主机 mysql 配置文件目录和数据文件目录 创建相关文件夹将容器中的mysql数据保存到本地&#xff0c;这样即使容器被删除&#xff0c;数据也不会丢失。 mkd…

Python + selenium如何截图!

废话不多说&#xff0c;直接进入正题 一、直接截取网页全屏 截全屏的时候&#xff0c;我们用到的内置方法为save_screenshot("demo1.png") from selenium import webdriver from time import sleepclass test:driver webdriver.Chrome()driver.maximize_window()…

《架构思维:从程序员到CTO》:通往顶级架构师之路

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

PCIE协议-1

1. PCIe结构拓扑 一个结构由点对点的链路组成&#xff0c;这些链路将一组组件互相连接 - 图1-2展示了一个结构拓扑示例。该图展示了一个称为层级结构的单一结构实例&#xff0c;由一个根复合体&#xff08;Root Complex, RC&#xff09;、多个端点&#xff08;I/O设备&#xf…

ubuntu20部署3d高斯

3d高斯的链接&#xff1a;https://github.com/graphdeco-inria/gaussian-splatting 系统环境 ubuntu20的系统环境&#xff0c;打算只运行训练的代码&#xff0c;而不去进行麻烦的可视化&#xff0c;可视化直接在windows上用他们预编译好的exe去可视化。&#xff08;因为看的很…

NLP中常见的tokenize方式及token类型

目录 Tokenizer的细节与计算方式Tokenizer的计算方式各种Tokenizer的优缺点 NLP中常用的Tokens单词Tokens&#xff08;Word Tokens&#xff09;子词Tokens&#xff08;Subword Tokens&#xff09;字符Tokens&#xff08;Character Tokens&#xff09;字节Tokens&#xff08;Byt…

C语言函数

1.函数是什么 在数学里,函数是一种对应关系,而 C 语言里的函数和数学中的函数具有相似点,但是有很大的不同,甚至有些人认为“函数”这个名词不够恰当。准确来说,C 函数的函数是一种子程序,您可以去 Wiki 百科查看对子程序的解释。 所谓的子程序,实际上就是大型程序中的…

[linux] pytorch各种报错

1. matplot lib "fatal IO error 25 (Inappropriate ioctl for device) on X server “localhost:10.0” 解决方案&#xff1a; import matplotlib matplotlib.use(Agg) 2. Error ALSA Carla 0.9.9 报错信息为&#xff1a; 4.24.3-0UE4Release-4.24 518 0 Disabling c…

暗区突围pc端资格发放了吗 暗区突围pc测试资格怎么获取

暗区突围pc端资格发放了吗 暗区突围pc测试资格怎么获取 暗区突围是一款很火爆的第一人称射击网游&#xff0c;现在终于要上线PC端啦&#xff01;小伙伴们是不是已经迫不及待想要体验电脑上的硬核射击快感了&#xff1f;暗区突围pc端资格已经陆续发放&#xff0c;想要参与PC端…

TC8002D 是一颗带关断模式的音频功放IC

一、一般概述 TC8002D是一颗带关断模式的音频功放IC。在5V输入电压下工作时&#xff0c;负载(3Ω)上的平均功率 为3 W&#xff0c;且失真度不超过10%。而对于手提设备而言&#xff0c;当VDD作用于关断端时&#xff0c;TC8002D将会进入关断模式&#xff0c;此时的功耗极…

Redis的几种集群模式

主从复制模式&#xff1a; 主从复制是Redis最简单的集群模式。这个模式主要是为了解决单点故障的问题&#xff0c;所以将数据复制多个副本中&#xff0c;这样即使有一台服务器出现故障&#xff0c;其他服务器依然可以继续提供服务。数据会在一个主节点&#xff08;master&#…