SparkSQL学习03-数据读取与存储

文章目录

    • 1 数据的加载
      • 1.1 方式一:spark.read.format
        • 1.1.1读取json数据
        • 1.1.2 读取jdbc数据
      • 1.2 方式二:spark.read.xxx
        • 1.2.1 读取json数据
        • 1.2.2 读取csv数据
        • 1.2.3 读取txt数据
        • 1.2.4 读取parquet数据
        • 1.2.5 读取orc数据
        • 1.2.6 读取jdbc数据
    • 2 数据的保存
      • 2.1 方式一:spark.write.format
        • 2.1.1 读取orc数据
      • 2.2 方式二:spark.write.xxx
        • 2.2.1 写入到jdbc数据库中

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不用格式的数据。SparkSQL默认读取和保存的文件格式为parquet,parquet是一种能够有效存储嵌套数据的列式存储格式。

1 数据的加载

SparkSQL提供了两种方式可以加载数据

1.1 方式一:spark.read.format

  • spark.read.format读取数据文件格式.load加载数据路径”
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 需要注意:在读取jdbc时需要在format和load之间添加多个option进行相应的JDBC参数设置【url、user、password.tablename】load中不用传递路经空参数即可
  • 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format
1.1.1读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//使用第一种范式加载数据var frame: DataFrame = session.read.format("json").load("data/people.json")frame.printSchema()/*** 运行结果:root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/*** 运行结果:+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+* */}
}
1.1.2 读取jdbc数据

读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 如果读取的JDBC操作(即读取mysql中的数据)val frame = session.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydb1").option("dbtable","location_info").option("user","root").option("password","123456").load()frame.printSchema()}
}

1.2 方式二:spark.read.xxx

  • 上述的书写方式太过项,所以SparksQL推出了更加便捷的方式spark.read.xxx加载数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在读取jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
1.2.1 读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//【推荐使用】第二种方式进行读取操作val frame = session.read.json("data/people.json")frame.printSchema()/**root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/**+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+ */}}
1.2.2 读取csv数据

csv数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.csv("data/country.csv")frame.printSchema()/**root|-- _c0: string (nullable = true)|-- _c1: string (nullable = true)|-- _c2: string (nullable = true)*/frame.show()/**+---+----------------+---+|_c0|             _c1|_c2|+---+----------------+---+|  1|            中国|  1||  2|      阿尔巴尼亚|ALB||  3|      阿尔及利亚|DZA||  4|          阿富汗|AFG||  5|          阿根廷|ARG||  6|阿拉伯联合酋长国|ARE||  7|          阿鲁巴|ABW||  8|            阿曼|OMN||  9|        阿塞拜疆|AZE|| 10|        阿森松岛|ASC|| 11|            埃及|EGY|| 12|      埃塞俄比亚|ETH|| 13|          爱尔兰|IRL|| 14|        爱沙尼亚|EST|| 15|          安道尔|AND|| 16|          安哥拉|AGO|| 17|          安圭拉|AIA|| 18|安提瓜岛和巴布达|ATG|| 19|        澳大利亚|AUS|| 20|          奥地利|AUT|+---+----------------+---+*/}}
1.2.3 读取txt数据

txt数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.text("data/dailykey.txt")frame.printSchema()/**root|-- value: string (nullable = true)* */frame.show()/**+--------------------+|               value|+--------------------+|2018-11-13\ttom\t...||2018-11-13\ttom\t...||2018-11-13\tjohn\...||2018-11-13\tlucy\...||2018-11-13\tlucy\...||2018-11-13\tjohn\...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-14\ttom\t...||2018-11-14\ttom\t...||2018-11-14\ttom\t...|+--------------------+* */}}
1.2.4 读取parquet数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.parquet("data/users.parquet")frame.printSchema()/**root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)*/frame.show()/*+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+*/}
}
1.2.5 读取orc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.orc("data/student.orc")frame.printSchema()/**root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- age: string (nullable = true)|-- gender: string (nullable = true)|-- course: string (nullable = true)|-- score: string (nullable = true)*/frame.show()/**+---+------+---+------+-------+-----+| id|  name|age|gender| course|score|+---+------+---+------+-------+-----+| 12|  张三| 25|    男|chinese|   50|| 12|  张三| 25|    男|   math|   60|| 12|  张三| 25|    男|english|   70|| 12|  李四| 20|    男|chinese|   50|| 12|  李四| 20|    男|   math|   50|| 12|  李四| 20|    男|english|   50|| 12|  王芳| 19|    女|chinese|   70|| 12|  王芳| 19|    女|   math|   70|| 12|  王芳| 19|    女|english|   70|| 13|张大三| 25|    男|chinese|   60|| 13|张大三| 25|    男|   math|   60|| 13|张大三| 25|    男|english|   70|| 13|李大四| 20|    男|chinese|   50|| 13|李大四| 20|    男|   math|   60|| 13|李大四| 20|    男|english|   50|| 13|王小芳| 19|    女|chinese|   70|| 13|王小芳| 19|    女|   math|   80|| 13|王小芳| 19|    女|english|   70|+---+------+---+------+-------+-----+*/}
}
1.2.6 读取jdbc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 读取jdbc文件val properties = new Properties()properties.put("user","root")properties.put("password","123456")val frame = session.read.jdbc("jdbc:mysql://localhost:3306/mydb1","location-info",properties)frame.printSchema()frame.show()}
}

2 数据的保存

SparkSQL提供了两种方式可以保存数据

2.1 方式一:spark.write.format

  • spark.write.format(“保存数据格式”).mode(“存储格式”).save(“存储数据路径”)
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 保存数据可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置
  • SaveMode是一个枚举类,其中的常量包括:
scala/javaAny LanguageMeaning
SaveMode.ErrorifExists(default)“error”(default)如果文件已经存在,则抛出异常
SaveMode.Append“append”如果文件已经存在,则追加
SaveMode.Overwrite“overwrite”如果文件已经存在,则覆盖
SaveMode.Ignore“ignore”如果文件已经存在,则忽略

需要注意:在读取jdbc时需要在format和save之间添加多个option进行相应的JDBC参数设置【url、user、password、tablename】save中不用传递路经空参数即可,可以不用设置mode

数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format

2.1.1 读取orc数据
package _02SparkSQL
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")//保存到某个路径下,OWstudent为文件夹,不需要文件名frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWstudent")session.stop()}
}

最后结果为:
在这里插入图片描述

2.2 方式二:spark.write.xxx

上述的书写方式太过繁项,所以SparksQL推出了更加便捷的方式:

  • spark.write.xxx(“保存数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在保存jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
  • mode可以选择性设置
2.2.1 写入到jdbc数据库中
package _02SparkSQL
import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")val properties = new Properties()properties.put("user","root")properties.put("password","123456")frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/mydb1","student",properties)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/698823.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SmartX 携手 openGauss 社区发布联合方案评测与性能最佳实践

近日,北京志凌海纳科技有限公司(以下简称 “SmartX”)携手 openGauss 社区完成了 openGauss 数据库基于 SmartX 超融合平台(SMTX OS)和 SmartX 分布式存储平台(SMTX ZBS)的性能测试和调优。 结果…

JavaScript中的可选链——通过示例解释

JavaScript开发经常涉及导航嵌套对象,这可能很麻烦且容易出错,特别是在处理null或undefined值时。可选链是现代JavaScript语法中的一个改革性特性。 在本文中,我们将通过实际示例探讨可选链,演示它如何简化代码并使开发更加高效。…

MySQL数据库基础(十三):关系型数据库三范式介绍

文章目录 关系型数据库三范式介绍 一、什么是三范式 二、数据冗余 三、范式的划分 四、一范式 五、二范式 六、三范式 七、总结 关系型数据库三范式介绍 一、什么是三范式 设计关系数据库时,遵从不同的规范要求,设计出合理的关系型数据库&…

代码随想录算法训练营第五十九天|

583. 两个字符串的删除操作 本题和动态规划:115.不同的子序列 相比,其实就是两个字符串都可以删除了,情况虽说复杂一些,但整体思路是不变的。 代码随想录 class Solution {public int minDistance(String word1, String word2) {…

流畅的Python(十一)-从协议到抽象基类

一、核心要义 主要讨论Python中的接口,所谓接口就是类实现或继承的一套公开(按照定义,受保护的属性和私有属性不在接口中)属性和方法,包括特殊方法,如__getitem__或__add__等。Python有两套规范接口的方式: 1. 鸭子类型和协议,这…

几种后端开发中常用的语言。

几种后端开发中常用的语言。 C/C 语言 C 语言最初是用于系统开发工作,特别是组成操作系统的程序。由于 C 语言所产生的代码运行速度与汇编语言编写的代码运行速度几乎一样,所以采用 C 语言作为系统开发语言。目前,C 语言是最广泛使用的系统…

MongoDB聚合运算符:$atan2

$atan2用来计算反正切&#xff0c;返回指定表达式的反正切值&#xff0c;与$antan的区别主要是参数不同。 语法 { $atan2: [<expression1>, <expression1>] }<expression>为可被解析为数值的表达式$atan2返回弧度&#xff0c;使用$radiansToDegrees运算符可…

数据结构与算法-常用排序算法

一、常用排序说明 当涉及排序算法时&#xff0c;理解每个算法的工作原理、时间复杂度和空间复杂度是至关重要的。下面对常用排序算法进行详细说明&#xff1a; 1、冒泡排序&#xff08;Bubble Sort&#xff09;&#xff1a; 工作原理&#xff1a;比较相邻的元素并交换&am…

python bug与debug

一、什么是bug&#xff08;软件缺陷&#xff09;&#xff1f; 产品说明书中规定要做的事情&#xff0c;而软件没有实现。 产品说明书中规定不要做的事情&#xff0c;而软件确实现了。 产品说明书中没有提到过的事情&#xff0c;而软件确实现了。 产品说明书中没有提到但是必…

跨语言的序列化与反序列化

在Java中实现跨语言的序列化与反序列化通常可以采用以下几种方式 使用标准的跨语言序列化格式 可以选择使用一些标准的跨语言序列化格式,例如JSON、XML、Protocol Buffers(ProtoBuf)等。这些格式都是跨语言的,可以方便地在不同的编程语言之间进行数据交换。在Java中,可以…

紫光同创初使用

芯片PGC2KG-6LPG144 1、安装好软件接&#xff0c;加载license,有两个&#xff0c;与电脑MAC地址绑定的 2、正常使用后&#xff0c;新建个工程&#xff0c;配置管脚Tools→UCE 3、程序中有些信号被软件认为是时钟信号&#xff0c;会报错&#xff08;时钟输入I0约束在非专用时钟…

LeetCode--代码详解 78.子集

78.子集 题目 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[],[1…

Python爬虫-使用代理伪装IP

爬虫系列&#xff1a;http://t.csdnimg.cn/WfCSx 前言 我们在做爬虫的过程中经常会遇到这样的情况&#xff0c;最初爬虫正常运行&#xff0c;正常抓取数据&#xff0c;一切看起来都是那么的美好&#xff0c;然而一杯茶的功夫可能就会出现错误&#xff0c;比如 403 Forbidden&…

【LeetCode刷题笔记】242.有效的字母异位词

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 更多算法知识专栏&#xff1a;算法分析&#x1f525; 给大家跳段街舞感谢…

Spring基础之AOP和代理模式

文章目录 理解AOPAOP的实现原理 AOP代理模式静态代理动态代理1-JDK动态代理2-CGLIB动态代理 总结 理解AOP OOP - - Object Oriented Programming 面向对象编程 AOP - - Aspect Oriented Programming 面向切面编程 AOP是Spring提供的关键特性之一。AOP即面向切面编程&#xff0…

Jenkins邮件通知配置(7)

1、安装插件&#xff1a; Email Extension&#xff0c;Email Extension Template&#xff0c;这两个插件可以帮助我们进行邮件的编写发送以及格式化 2、配置jenkins中链接腾讯企业邮箱 先配置发送服务&#xff0c;然后在具体工程中设置接收者 基础信息&#xff1a; POP3/S…

SWIFT:自我认知微调

文档:https://github.com/modelscope/swift/blob/main/docs/source/LLM/%E8%87%AA%E6%88%91%E8%AE%A4%E7%9F%A5%E5%BE%AE%E8%B0%83%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5.md ​​​​​​代码: Swift是如何把自我认知数据集融合到训练集中呢? 1:相关的3个参数

设计模式学习笔记 - 面向对象 - 6.为什么要基于接口而非实现编程?有必要为每个类都定义接口吗?

前言 “基于接口而非实现编程”这个原则非常重要&#xff0c;是一种非常有效的提高代码质量的手段&#xff0c;在平时的开发中经常被用到。 如何解读原则中的“接口”二字 要理解“基于接口而非实现编程”的关键就是要理解其中的“接口”二字&#xff0c;我们可以理解为编程语…

学习数据节构和算法的第14天

题目讲解 链表的移除 #include <stdio.h> #include <stdlib.h> // 定义链表节点结构体 typedef struct Node {int data; // 节点数据struct Node* next; // 指向下一个节点的指针 } Node; // 初始化链表节点 Node* initNode(int data) {Node* n…

mfc 疑难杂症之一

病情&#xff1a; 1.xxxx处的第一机会异常: 0xC0000005: 读取位置 0x00000004 时发生访问冲突。 2.不定时程序闪退 访问违例 程序定位到 main处 0x76DCB5B2 处(位于 Tetris.exe 中)引发的异常: Microsoft C 异常: CResourceException&#xff0c;位于内存位置 0x008FF0D4 处…