SparkSQL学习03-数据读取与存储

文章目录

    • 1 数据的加载
      • 1.1 方式一:spark.read.format
        • 1.1.1读取json数据
        • 1.1.2 读取jdbc数据
      • 1.2 方式二:spark.read.xxx
        • 1.2.1 读取json数据
        • 1.2.2 读取csv数据
        • 1.2.3 读取txt数据
        • 1.2.4 读取parquet数据
        • 1.2.5 读取orc数据
        • 1.2.6 读取jdbc数据
    • 2 数据的保存
      • 2.1 方式一:spark.write.format
        • 2.1.1 读取orc数据
      • 2.2 方式二:spark.write.xxx
        • 2.2.1 写入到jdbc数据库中

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不用格式的数据。SparkSQL默认读取和保存的文件格式为parquet,parquet是一种能够有效存储嵌套数据的列式存储格式。

1 数据的加载

SparkSQL提供了两种方式可以加载数据

1.1 方式一:spark.read.format

  • spark.read.format读取数据文件格式.load加载数据路径”
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 需要注意:在读取jdbc时需要在format和load之间添加多个option进行相应的JDBC参数设置【url、user、password.tablename】load中不用传递路经空参数即可
  • 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format
1.1.1读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//使用第一种范式加载数据var frame: DataFrame = session.read.format("json").load("data/people.json")frame.printSchema()/*** 运行结果:root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/*** 运行结果:+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+* */}
}
1.1.2 读取jdbc数据

读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 如果读取的JDBC操作(即读取mysql中的数据)val frame = session.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydb1").option("dbtable","location_info").option("user","root").option("password","123456").load()frame.printSchema()}
}

1.2 方式二:spark.read.xxx

  • 上述的书写方式太过项,所以SparksQL推出了更加便捷的方式spark.read.xxx加载数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在读取jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
1.2.1 读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//【推荐使用】第二种方式进行读取操作val frame = session.read.json("data/people.json")frame.printSchema()/**root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/**+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+ */}}
1.2.2 读取csv数据

csv数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.csv("data/country.csv")frame.printSchema()/**root|-- _c0: string (nullable = true)|-- _c1: string (nullable = true)|-- _c2: string (nullable = true)*/frame.show()/**+---+----------------+---+|_c0|             _c1|_c2|+---+----------------+---+|  1|            中国|  1||  2|      阿尔巴尼亚|ALB||  3|      阿尔及利亚|DZA||  4|          阿富汗|AFG||  5|          阿根廷|ARG||  6|阿拉伯联合酋长国|ARE||  7|          阿鲁巴|ABW||  8|            阿曼|OMN||  9|        阿塞拜疆|AZE|| 10|        阿森松岛|ASC|| 11|            埃及|EGY|| 12|      埃塞俄比亚|ETH|| 13|          爱尔兰|IRL|| 14|        爱沙尼亚|EST|| 15|          安道尔|AND|| 16|          安哥拉|AGO|| 17|          安圭拉|AIA|| 18|安提瓜岛和巴布达|ATG|| 19|        澳大利亚|AUS|| 20|          奥地利|AUT|+---+----------------+---+*/}}
1.2.3 读取txt数据

txt数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.text("data/dailykey.txt")frame.printSchema()/**root|-- value: string (nullable = true)* */frame.show()/**+--------------------+|               value|+--------------------+|2018-11-13\ttom\t...||2018-11-13\ttom\t...||2018-11-13\tjohn\...||2018-11-13\tlucy\...||2018-11-13\tlucy\...||2018-11-13\tjohn\...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-14\ttom\t...||2018-11-14\ttom\t...||2018-11-14\ttom\t...|+--------------------+* */}}
1.2.4 读取parquet数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.parquet("data/users.parquet")frame.printSchema()/**root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)*/frame.show()/*+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+*/}
}
1.2.5 读取orc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.orc("data/student.orc")frame.printSchema()/**root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- age: string (nullable = true)|-- gender: string (nullable = true)|-- course: string (nullable = true)|-- score: string (nullable = true)*/frame.show()/**+---+------+---+------+-------+-----+| id|  name|age|gender| course|score|+---+------+---+------+-------+-----+| 12|  张三| 25|    男|chinese|   50|| 12|  张三| 25|    男|   math|   60|| 12|  张三| 25|    男|english|   70|| 12|  李四| 20|    男|chinese|   50|| 12|  李四| 20|    男|   math|   50|| 12|  李四| 20|    男|english|   50|| 12|  王芳| 19|    女|chinese|   70|| 12|  王芳| 19|    女|   math|   70|| 12|  王芳| 19|    女|english|   70|| 13|张大三| 25|    男|chinese|   60|| 13|张大三| 25|    男|   math|   60|| 13|张大三| 25|    男|english|   70|| 13|李大四| 20|    男|chinese|   50|| 13|李大四| 20|    男|   math|   60|| 13|李大四| 20|    男|english|   50|| 13|王小芳| 19|    女|chinese|   70|| 13|王小芳| 19|    女|   math|   80|| 13|王小芳| 19|    女|english|   70|+---+------+---+------+-------+-----+*/}
}
1.2.6 读取jdbc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 读取jdbc文件val properties = new Properties()properties.put("user","root")properties.put("password","123456")val frame = session.read.jdbc("jdbc:mysql://localhost:3306/mydb1","location-info",properties)frame.printSchema()frame.show()}
}

2 数据的保存

SparkSQL提供了两种方式可以保存数据

2.1 方式一:spark.write.format

  • spark.write.format(“保存数据格式”).mode(“存储格式”).save(“存储数据路径”)
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 保存数据可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置
  • SaveMode是一个枚举类,其中的常量包括:
scala/javaAny LanguageMeaning
SaveMode.ErrorifExists(default)“error”(default)如果文件已经存在,则抛出异常
SaveMode.Append“append”如果文件已经存在,则追加
SaveMode.Overwrite“overwrite”如果文件已经存在,则覆盖
SaveMode.Ignore“ignore”如果文件已经存在,则忽略

需要注意:在读取jdbc时需要在format和save之间添加多个option进行相应的JDBC参数设置【url、user、password、tablename】save中不用传递路经空参数即可,可以不用设置mode

数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format

2.1.1 读取orc数据
package _02SparkSQL
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")//保存到某个路径下,OWstudent为文件夹,不需要文件名frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWstudent")session.stop()}
}

最后结果为:
在这里插入图片描述

2.2 方式二:spark.write.xxx

上述的书写方式太过繁项,所以SparksQL推出了更加便捷的方式:

  • spark.write.xxx(“保存数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在保存jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
  • mode可以选择性设置
2.2.1 写入到jdbc数据库中
package _02SparkSQL
import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")val properties = new Properties()properties.put("user","root")properties.put("password","123456")frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/mydb1","student",properties)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/698823.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SmartX 携手 openGauss 社区发布联合方案评测与性能最佳实践

近日,北京志凌海纳科技有限公司(以下简称 “SmartX”)携手 openGauss 社区完成了 openGauss 数据库基于 SmartX 超融合平台(SMTX OS)和 SmartX 分布式存储平台(SMTX ZBS)的性能测试和调优。 结果…

MySQL数据库基础(十三):关系型数据库三范式介绍

文章目录 关系型数据库三范式介绍 一、什么是三范式 二、数据冗余 三、范式的划分 四、一范式 五、二范式 六、三范式 七、总结 关系型数据库三范式介绍 一、什么是三范式 设计关系数据库时,遵从不同的规范要求,设计出合理的关系型数据库&…

紫光同创初使用

芯片PGC2KG-6LPG144 1、安装好软件接,加载license,有两个,与电脑MAC地址绑定的 2、正常使用后,新建个工程,配置管脚Tools→UCE 3、程序中有些信号被软件认为是时钟信号,会报错(时钟输入I0约束在非专用时钟…

【LeetCode刷题笔记】242.有效的字母异位词

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 更多算法知识专栏&#xff1a;算法分析&#x1f525; 给大家跳段街舞感谢…

Spring基础之AOP和代理模式

文章目录 理解AOPAOP的实现原理 AOP代理模式静态代理动态代理1-JDK动态代理2-CGLIB动态代理 总结 理解AOP OOP - - Object Oriented Programming 面向对象编程 AOP - - Aspect Oriented Programming 面向切面编程 AOP是Spring提供的关键特性之一。AOP即面向切面编程&#xff0…

Jenkins邮件通知配置(7)

1、安装插件&#xff1a; Email Extension&#xff0c;Email Extension Template&#xff0c;这两个插件可以帮助我们进行邮件的编写发送以及格式化 2、配置jenkins中链接腾讯企业邮箱 先配置发送服务&#xff0c;然后在具体工程中设置接收者 基础信息&#xff1a; POP3/S…

SWIFT:自我认知微调

文档:https://github.com/modelscope/swift/blob/main/docs/source/LLM/%E8%87%AA%E6%88%91%E8%AE%A4%E7%9F%A5%E5%BE%AE%E8%B0%83%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5.md ​​​​​​代码: Swift是如何把自我认知数据集融合到训练集中呢? 1:相关的3个参数

企业级大数据安全架构(十一)Kerberos接入dophinscheduler

作者&#xff1a;楼高 建议将dophinscheduler集成到Ambari安装部署&#xff0c;在Ambari上面开启kerberos 1.安装准备 编译 从GitHub获取dolphinscheduler-1.3.9源码 git clone https://github.com/apache/dolphinscheduler.git -b 1.3.9-releasehttps://github.com/apache/…

多输入回归预测|GWO-CNN-LSTM|灰狼算法优化的卷积-长短期神经网络回归预测(Matlab)

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、算法介绍&#xff1a; 灰狼优化算法&#xff1a; 卷积神经网络-长短期记忆网络&#xff1a; 四、完整程序下载&#xff1a; 一、程序及算法内容…

ChatGPT/GPT4科研应用与AI绘图及论文写作

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…

重看LeakCanary

LeakCanary是我很久之前看的东西了&#xff0c;我当时侯对它的印象就是它可以用来检测内存泄漏&#xff0c;具体原理就是将弱引用对象延迟个5s然后看是否被回收,如果没有被回收,那么就说明发生了内存泄漏,其他的也没有仔细地看 现在就详细地梳理一遍这个流程&#xff1a; 1.L…

微服务篇之分布式事务

一、Seata架构 Seata事务管理中有三个重要的角色&#xff1a; TC (Transaction Coordinator) - 事务协调者&#xff1a;维护全局和分支事务的状态&#xff0c;协调全局事务提交或回滚。 TM (Transaction Manager) - 事务管理器&#xff1a;定义全局事务的范围、开始全局事务、…

docker学习总结

docker 1.初识Docker1.1.什么是Docker1.1.1.应用部署的环境问题1.1.2.Docker解决依赖兼容问题1.1.3.Docker解决操作系统环境差异1.1.4.小结 1.2.Docker和虚拟机的区别1.3.Docker架构1.3.1.镜像和容器1.3.2.DockerHub1.3.3.Docker架构1.3.4.小结 1.4.安装Docker 2.Docker的基本操…

Kubernetes Prometheus 系列|Prometheus介绍和使用|Prometheus+Grafana集成

目录 第1章Prometheus 入门1.1 Prometheus 的特点1.1.1 易于管理1.1.2 监控服务的内部运行状态1.1.3 强大的数据模型1.1.4 强大的查询语言 PromQL1.1.5 高效1.1.6 可扩展1.1.7 易于集成1.1.8 可视化1.1.9 开放性 1.2 Prometheus 的架构1.2.1 Prometheus 生态圈组件1.2.2 架构理…

Go 数据库编程精粹:database/sql 实用技巧解析

Go 数据库编程精粹&#xff1a;database/sql 实用技巧解析 简介database/sql 库的基础知识核心概念连接池驱动事务 环境配置 建立数据库连接连接到数据库示例&#xff1a;连接 MySQL 数据库连接池管理 执行查询和处理结果基本查询执行多行查询执行单行查询 结果处理处理多行结果…

基于Java SSM框架实现问卷调查系统项目【项目源码】

基于java的SSM框架实现问卷调查系统演示 B/S结构 BROWSER/SERVER程序架构方式是使用电脑中安装的各种浏览器来进行访问和使用的&#xff0c;相比C/S的程序结构不需要进行程序的安装就可以直接使用。BROWSER/SERVER架构的运行方式是在远程的服务器上进行安装一个&#xff0c;然…

普中51单片机学习(DS1302)

DS1302时钟 DS1302实时时钟具有能计算2100年之前的秒、分、时、日、日期、星期、月、年的能力&#xff0c;还有闰年调整的能力。内部含有31个字节静态RAM&#xff0c;可提供用户访问。采用串行数据传送方式&#xff0c;使得管脚数量最少&#xff0c;简单SPI 3线接口。工作电压…

4.8 Verilog过程连续赋值

关键词&#xff1a;解除分配&#xff0c;强制&#xff0c;释放 过程连续赋值是过程赋值的一种。赋值语句能够替换其他所有wire 或 reg 的赋值&#xff0c;改写wire 或 reg 类型变量的当前值。 与过程赋值不同的是&#xff0c;过程连续赋值表达式能被连续的驱动到wire 或 reg …

C++——基础语法(2):函数重载

4. 函数重载 函数重载就是同一个函数名可以重复被定义&#xff0c;即允许定义相同函数名的函数。但是相同名字的函数怎么在使用的时候进行区分呢&#xff1f;所以同一个函数名的函数之间肯定是要存在不同点的&#xff0c;除了函数名外&#xff0c;还有返回类型和参数两部分可以…

本地配置多个git账户及ll设置

本地配置多个git账户 清除全局配置将命令行&#xff0c;切换到ssh目录生成GitLab和Gitee的公钥、私钥去对应的代码仓库添加 SSH Keys添加私钥ll设置 管理密钥验证仓库配置关于gitgitee.com: Permission denied (publickey) 清除全局配置 此步骤可以不做&#xff0c;经测试不影…