Delta lake with Java--数据增删改查

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java结合真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

package detal.lake.java;import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;public class DeltaLakeCURD {//将字符串转换成java.sql.Timestamppublic static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {SimpleDateFormat sf = new SimpleDateFormat(dateFormat);java.util.Date date = null;try {date = sf.parse(strDate);} catch (Exception e) {e.printStackTrace();}java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());return dateSQL;}public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.databricks.delta.autoCompact.enabled", "true").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";String tableName = "taxidb.YellowTaxis";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义表DeltaTable.createIfNotExists(spark).tableName(tableName).addColumn("RideId","INT").addColumn("VendorId","INT").addColumn("PickupTime","TIMESTAMP").addColumn("DropTime","TIMESTAMP").location(savePath).execute();//加载csv数据并导入delta表var df=spark.read().format("delta").table(tableName);var schema=df.schema();System.out.println(schema.simpleString());var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);System.out.println("记录总行数:"+df_for_append.count());System.out.println("导入数据,开始时间"+  sdf.format(new Date()));df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("导入数据,结束时间" + sdf.format(new Date()));DeltaTable deltaTable = DeltaTable.forName(spark,tableName);//插入数据List<Row> list = new ArrayList<Row>();list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));StructType structType = DataTypes.createStructType(structFields);var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframeSystem.out.println("插入数据,开始时间"+  sdf.format(new Date()));yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);System.out.println("插入数据,结束时间"+  sdf.format(new Date()));System.out.println("插入后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//更新数据System.out.println("更新前数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);System.out.println("更新数据,开始时间"+  sdf.format(new Date()));deltaTable.updateExpr("RideId = 999994",new HashMap<String, String>() {{put("VendorId", "250");}});System.out.println("更新数据,结束时间"+  sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);//查询数据System.out.println("查询数据,开始时间"+  sdf.format(new Date()));var selectDf= deltaTable.toDF().select("*").where("RideId=1");selectDf.show(false);System.out.println("查询数据,结束时间" + sdf.format(new Date()));//删除数据System.out.println("删除数据,开始时间"+  sdf.format(new Date()));deltaTable.delete("RideId=1");System.out.println("删除数据,结束时间"+  sdf.format(new Date()));deltaTable.toDF().select("*").where("RideId=1").show(false);}
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/7532.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JAVA |基础】运算符、程序逻辑控制以及方法的使用

目录 一、前言 二、操作符 1.算术运算符 2.赋值运算符 3.比较运算符 4.逻辑运算符 5.条件&#xff08;三目、三元&#xff09;运算符 6.位运算符(都是基于二进制来计算) 三、 程序逻辑控制 1.顺序结构 2.分支结构 if语句 Switch语句 3.循环结构 while语句 for循环…

Hive3.0新特性:Materialized Views 物化视图

Materialized Views 物化视图 在 Apache Hive 3.0 中引入了物化视图&#xff08;Materialized Views&#xff09;的支持&#xff0c;它们是预先计算并缓存了查询结果的数据结构&#xff0c;以提高查询性能和降低延迟。物化视图通过将查询的结果存储在物理表中来实现&#xff0…

算法提高之玉米田

算法提高之玉米田 核心思想&#xff1a;状态压缩dp 将图存入g数组 存的时候01交换一下方便后面判断即g数组中0为可以放的地方 state中1为放的地方 这样只要state为1 g为0就可以判断不合法 #include <iostream>#include <cstring>#include <algorithm>#includ…

桥接模式类图与代码

欲开发一个绘图软件&#xff0c;要求使用不同的绘图程序绘制不同的图形。以绘制直线和圆形为例&#xff0c;对应的绘图程序如表 7.7 所示。 根据绘图软件的扩展性要求&#xff0c;该绘图软件将不断扩充新的图形和新的绘图程序。为了避免出现类爆炸的情况&#xff0c;现采用桥接…

Application exit(Out of memory)

Qt for WebAssembly 开发的网页&#xff0c;在 iOS 设备上打开会提示&#xff1a;Out of memory 如图&#xff1a; 解决办法&#xff1a; 环境&#xff1a;Qt 6.7.0 WebAssembly multi-threaded Emscripten Compiler 3.1.50 在CMakeLists.txt 中增加&#xff1a; set_tar…

使用Docker安装MySQL5.7.36

拉取镜像并查看 docker pull mysql:5.7.36拉取成功后查看&#xff08;非必须&#xff09; docker images创建并设置宿主机 mysql 配置文件目录和数据文件目录 创建相关文件夹将容器中的mysql数据保存到本地&#xff0c;这样即使容器被删除&#xff0c;数据也不会丢失。 mkd…

Python + selenium如何截图!

废话不多说&#xff0c;直接进入正题 一、直接截取网页全屏 截全屏的时候&#xff0c;我们用到的内置方法为save_screenshot("demo1.png") from selenium import webdriver from time import sleepclass test:driver webdriver.Chrome()driver.maximize_window()…

《架构思维:从程序员到CTO》:通往顶级架构师之路

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

PCIE协议-1

1. PCIe结构拓扑 一个结构由点对点的链路组成&#xff0c;这些链路将一组组件互相连接 - 图1-2展示了一个结构拓扑示例。该图展示了一个称为层级结构的单一结构实例&#xff0c;由一个根复合体&#xff08;Root Complex, RC&#xff09;、多个端点&#xff08;I/O设备&#xf…

ubuntu20部署3d高斯

3d高斯的链接&#xff1a;https://github.com/graphdeco-inria/gaussian-splatting 系统环境 ubuntu20的系统环境&#xff0c;打算只运行训练的代码&#xff0c;而不去进行麻烦的可视化&#xff0c;可视化直接在windows上用他们预编译好的exe去可视化。&#xff08;因为看的很…

暗区突围pc端资格发放了吗 暗区突围pc测试资格怎么获取

暗区突围pc端资格发放了吗 暗区突围pc测试资格怎么获取 暗区突围是一款很火爆的第一人称射击网游&#xff0c;现在终于要上线PC端啦&#xff01;小伙伴们是不是已经迫不及待想要体验电脑上的硬核射击快感了&#xff1f;暗区突围pc端资格已经陆续发放&#xff0c;想要参与PC端…

TC8002D 是一颗带关断模式的音频功放IC

一、一般概述 TC8002D是一颗带关断模式的音频功放IC。在5V输入电压下工作时&#xff0c;负载(3Ω)上的平均功率 为3 W&#xff0c;且失真度不超过10%。而对于手提设备而言&#xff0c;当VDD作用于关断端时&#xff0c;TC8002D将会进入关断模式&#xff0c;此时的功耗极…

探索淘宝API接口对接(属性规格丨sku价格丨详情图丨优惠券等):打造智能电商解决方案

一、引言 随着电子商务的快速发展&#xff0c;越来越多的企业和开发者希望通过自动化和智能化的方式接入电商平台&#xff0c;以实现更高效的数据交互和业务流程。淘宝作为中国最大的电商平台之一&#xff0c;其提供的API接口成为了众多企业和开发者关注的焦点。本文将探讨淘宝…

【spring】Bean的生命周期回调函数和Bean的循环依赖

目录 1、Bean的生命周期 2、Bean的生命周期回调函数 2.1、初始化的生命周期回调 2.2、销毁的生命周期回调 3、Bean的循环依赖 1、Bean的生命周期 spring的bean的生命周期主要是创建bean的过程&#xff0c;一个bean的生命周期主要是4个步骤&#xff1a;实例化&#xff0c;…

视频剪辑图文实例:一键操作,轻松实现视频批量片头片尾减时

视频剪辑是现代媒体制作中不可或缺的一环&#xff0c;而批量处理视频更是许多专业人士和爱好者的常见需求。在剪辑过程中&#xff0c;调整视频的片头片尾时长可以显著提升视频的质量和观感。本文将通过图文实例的方式&#xff0c;向您展示如何一键操作&#xff0c;轻松实现视频…

直播录屏怎么录?分享3种方法

随着网络直播的兴起&#xff0c;直播录屏已成为众多网友记录精彩瞬间、分享有趣内容的重要工具。直播录屏不仅能帮助我们回顾和保存直播中的精彩片段&#xff0c;还能为创作者提供更多的素材和灵感。 本文将为大家介绍3种直播录屏的方法&#xff0c;帮助大家能够更好地利用这一…

【IEEE独立出版|往届均已成功检索】ISPDS 2024诚邀投稿参会

第五届信息科学与并行、分布式处理国际学术会议&#xff08;ISPDS 2024&#xff09; 2024 5th International Conference on Information Science, Parallel and Distributed Systems 2024年5月31-6月2日 | 中国广州NEWS&#xff1a;会议已在格林威治大学官网上线会议已经上线到…

学术咸鱼入门指南(2)

巧用思维导图阅读文献 化整为零&#xff1a;读文献&#xff0c;从拆分文章的结构开始 大家在初步接触自己学科的论文时&#xff0c;要了解清楚基本的范式&#xff0c;日后读起来就比较顺了。 科研论文的第一部分&#xff0c;是文章的标题&#xff0c;摘要和关键词&#xff0…

【MySQL】连接查询(JOIN 关键字)—— 图文详解:内连接、外连接、左连接、左外连接、右连接、右外连接

文章目录 连接查询驱动表连接查询分类 内连接&#xff08;INNER JOIN&#xff09;内连接 —— 等值连接内连接 —— 自然连接&#xff08;NATURAL JOIN&#xff09;内连接 —— 交叉连接&#xff08;笛卡尔积&#xff09; 外连接&#xff08;OUTER JOIN&#xff09;外连接 ——…

nodejs里面的 http 模块介绍和使用

Node.js的HTTP模块是一个核心模块&#xff0c;它提供了很多功能来创建HTTP服务器和发送HTTP请求。 http.Server是一个基于事件的http服务器&#xff0c;内部是由c实现的&#xff0c;接口是由JavaScript封装。 http.request是一个http客户端工具。 用户向服务器发送数据。 创…