hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错

1a9c506d5750be21f862d60003d66c58.gif

https://spark-packages.org/里有很多third-party数据源的package,spark把包加载进来就可以使用了

c144704c812a391fb739c1445ec69230.png

csv格式在spark2.0版本之后是内置的,2.0之前属于第三方数据源

一、读取本地外部数据源

1.直接读取一个json文件

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar 
scala> spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show

运行报错:

Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10]
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519)
... 32 more

查看load方法的源码:

/**
* Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
def load(path: String): DataFrame = {
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
}
---------------------------------------------------------
/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
*
* @since 1.6.0
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = sparkSession.sessionState.conf)).asJava)
// Streaming also uses the data source V2 API. So it may be that the data source implements
// v2, but has no v2 implementation for batch reads. In that case, we fall back to loading
// the dataframe as a v1 source.
val reader = (ds, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)
case (ds: ReadSupport, None) =>
ds.createReader(options)
case (ds: ReadSupportWithSchema, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $ds.")
case (ds: ReadSupport, Some(schema)) =>
val reader = ds.createReader(options)
if (reader.readSchema() != schema) {
throw new AnalysisException(s"$ds does not allow user-specified schemas.")
}
reader
case _ => null // fall back to v1
}
if (reader == null) {
loadV1Source(paths: _*)
} else {
Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
}
} else {
loadV1Source(paths: _*)
}
}
private def loadV1Source(paths: String*) = {
// Code path for data source v1.
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
------------------------------------------------------
private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
-------------------------------------------------------
def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
--------------------------------------------------------
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.stringConf
.createWithDefault("parquet")

从源码中可以看出,如果不指定format,load默认读取的是parquet文件

scala> val users = spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
scala> users.show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

读取其他格式的文件,必须通过format指定文件格式,如下:

//windows idea环境下
val df1 = spark.read.format("json").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load("hdfs://192.168.137.141:9000/data/people.json")
df1.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")必须带上,不然报错

Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX

2.读取CSV格式文件

//源文件内容如下:
[hadoop@hadoop001 ~]$ hadoop fs -text /data/people.csv
name;age;job
Jorge;30;Developer
Bob;32;Developer

//windows idea环境下
val df2 = spark.read.format("csv")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("sep",";")
.option("header","true") //use first line of all files as header
.option("inferSchema","true")
.load("hdfs://192.168.137.141:9000/data/people.csv")
df2.show()
df2.printSchema()
//输出结果:
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
+-----+---+---------+
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- job: string (nullable = true)
-----------------------------------------------------------
//如果不指定option("sep",";")
+------------------+
| name;age;job|
+------------------+
|Jorge;30;Developer|
| Bob;32;Developer|
+------------------+
//如果不指定option("header","true")
+-----+---+---------+
| _c0|_c1| _c2|
+-----+---+---------+
| name|age| job|
|Jorge| 30|Developer|
| Bob| 32|Developer|
+-----+---+---------+

读取csv格式文件还可以自定义schema

val peopleschema = StructType(Array(
StructField("hlwname",StringType,true),
StructField("hlwage",IntegerType,true),
StructField("hlwjob",StringType,true)))
val df2 = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("sep",";")
.option("header","true")
.schema(peopleschema)
.load("hdfs://192.168.137.141:9000/data/people.csv")
//打印测试
df2.show()
df2.printSchema()
输出结果:
+-------+------+---------+
|hlwname|hlwage| hlwjob|
+-------+------+---------+
| Jorge| 30|Developer|
| Bob| 32|Developer|
+-------+------+---------+
root
|-- hlwname: string (nullable = true)
|-- hlwage: integer (nullable = true)
|-- hlwjob: string (nullable = true)

二、将读取的文件以其他格式写出

//将上文读取的users.parquet以json格式写出
scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")
[hadoop@hadoop000 ~]$ cd /home/hadoop/tmp/parquet2json
[hadoop@hadoop000 parquet2json]$ ll
total 4
-rw-r--r--. 1 hadoop hadoop 56 Sep 24 10:15 part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json
-rw-r--r--. 1 hadoop hadoop 0 Sep 24 10:15 _SUCCESS
[hadoop@hadoop000 parquet2json]$ cat part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}

//将上文读取的people.json以csv格式写出
df1.write.format("csv")
.mode("overwrite")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.save("hdfs://192.168.137.141:9000/data/formatconverttest/")
------------------------------------------
[hadoop@hadoop001 ~]$ hadoop fs -text /data/formatconverttest/part-00000-6fd65eff-d0d3-43e5-9549-2b11bc3ca9de-c000.csv
,Michael
30,Andy
19,Justin
//发现若没有.option("header","true"),写出的csv丢失了首行的age,name信息
//若不指定.option("sep",";"),默认逗号为分隔符

此操作的目的在于学会类型转换,生产上最开始进来的数据大多都是text,json等行式存储的文件,一般都要转成ORC,parquet列式存储的文件,加上压缩,能把文件大小减小到10%左右,大幅度减小IO和数据处理量,提高性能
此时如果再执行一次save,路径不变,则会报错:

scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")
org.apache.spark.sql.AnalysisException: path file:/home/hadoop/tmp/parquet2json already exists.;
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:109)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
.........................................................

可以通过设置savemode来解决这个问题

c19e62c3724b6504d2ea3a48de42de50.png默认是errorifexists

scala> users.select("name","favorite_color").write.format("json").mode("overwrite").save("file:///home/hadoop/tmp/parquet2json/")

作者:若泽数据—白面葫芦娃92 

原文:https://www.jianshu.com/p/6fde69ea56bc


回归原创文章:

若泽数据2018视频集合

Flink生产最佳实践,2018年12月刚出炉

我去过端午、国庆生产项目线下班,你呢?

2019元旦-线下项目第11期圆满结束

大数据生产预警平台项目之文章汇总

学习大数据的路上,别忘了多给自己鼓掌

明年毕业的我,拿了大数据30万的offer!

最全的Flink部署及开发案例

我司Kafka+Flink+MySQL生产完整案例代码

代码 | Spark读取mongoDB数据写入Hive普通表和分区表

我司Spark迁移Hive数据到MongoDB生产案例代码

2019高级班&线下班报名咨询请加

ac1101e83996ece4ec18067ec255747b.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/507223.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ffmpeg命令_使用ffmpeg命令为多个短视频修改视频备注说明

今天主要给大家讲一下使用视频剪辑高手中的ffmpeg命令为多个短视频修改备注说明的详细步骤,有需要和感兴趣的宝贝们可以跟随小编一起来试试。收集视频将需要剪辑的短视频保存到同一文件夹上进入软件双击进入视频剪辑高手,选择“批量剪辑视频”功能添加视…

从事python需要掌握哪些知识和技能_零基础想转行从事Python?需要掌握如下技能...

零基础python能找到工作吗?需要掌握哪些技能?对于大部分零基础学编程半路出家的人来说,无非是想改变现状换一门新职业,所谓技术大牛不过是比小白们更早接触编程罢了,选择好自己有兴趣的职业技能,并为之学习…

java byte 判断相等_你真的了解Java中quot;==quot;和equals()的区别?

部分面试资料链接:https://pan.baidu.com/s/1qDb2YoCopCHoQXH15jiLhA密码:jsam想获得全部面试必看资料,关注公众号,大家可以在公众号后台回复“知乎”即可。“判断两个事物是否相等”,是编程中最常见的操作之一,在Java中&#xff…

数据通信原理_同网段主机通信原理

本篇文章介绍数据通信中最基础,最关键的原理之一,两台通网段的主机如何通信。获得更多技术资料和免费学习视频,加入讨论群:752160765适合两台普通电脑之间,两台服务器之间,两台手机之间,电脑和打…

java jdk 未知错误_解决JAVA JDK安装出错的最常见问题,帮你排除困扰

一般来说,安装JAVA JDK的整个流程是很简单的,只要按照提示进行操作即可,就不会出现问题。但是呢,有小伙伴反映说,之前安装了JAVA JDK,进行卸载重装的时候出现错误提示,“正在进行另一Java安装”…

定义const变量是不可以赋值_JavaScript的声明方法和作用范围,常见的结构赋值类型和使用场景...

链接:https://juejin.im/post/5d9bf530518825427b27639d声明const命令:声明常量 let命令:声明变量作用作用域全局作用域函数作用域:function() {}块级作用域:{}作用范围var 命令在全局代码中执行const命令和let命令只能…

java社区活跃度_Java并发编程-活跃度问题

在讲问题前,我先说明一下什么是活跃度?一个并发应用及时执行的能力称作活跃度。我主要讲死锁问题,顺带介绍一下饥饿,弱响应性和活锁。死锁死锁这个词大家都听过,我先来罗列一下产生死锁的四个必要条件:(1) …

java8 di_java8 多个list对象用lambda求差集操作

业务场景:调用同步接口获取当前全部有效账户,数据库已存在部分账户信息,因此需要筛选同步接口中已存在本地的帐户。调用接口获取的数据集合List list response.getData().getItems();本地查询出来的账户集合List towList accountRepository…

怎么抓python程序的包_如何在AWS上部署python应用程序

如何在AWS上部署python应用程序,学姐呕心沥血亲自总结,亲测有效,比网上看网上大把大把的文档要快得多!作者:蕾学姐亚马逊云计算服务(Amazon Web Services,缩写为AWS),由亚…

【Hadoop】Zookeeper架构/特点

Zookeeper 中的角色主要有以下三类: Zookeeper需要保证高可用性和强一致性为了支持更多的客户端,需要增加更多Server,但是Server增多,意味着投票阶段延迟增大,会影响整个系统的性能。所以在3.3.0中ZK引入的新角色&…

wpf 按钮样式_键盘 | 01.在程序集间引用样式

设置Button和TextBox的特定颜色和字体的样式和默认样式,并在程序集间引用。从零开始用WPF/C#开发一个键盘指示器项目完整开源、免费,不依赖第三方库编译好的先行版程序在微信公众号(香辣恐龙蛋)下载。文章同时发布在微信公众号(香辣恐龙蛋)、B站(香辣恐龙…

怎么去除表中的系统导出的字符_EXCEL非常有用的字符函数LEN、LENB,财务工作者的必备利器...

LEN函数与LENB函数是比较常用的函数,在实际中应用那是相当广的,尤其在财务工作中的使用频率是相当的高。我就一起看看实际工作中哪些地方用到了它。我们先简单说下它的用法,很简单,LEN(text)、LENB(text),两个用法是一…

java软尾山地车碳_JAVA FURIA 27.5入门软尾山地车评测

铝合金车架、前后100mm避震行程、超短把立、长款燕把、27.5轮径……之前跟大家讲过,在这台车上你能拥有全避震车型所应该具备的所有基础元素。2个月的时间已经过去,这台车到底怎么样呢?我们一起来看一下。优点:质量靠谱&#xff0…

java 模拟路由表_Router的路由表

Router中使用routers字段表示路由表,这是一个数组,每个元组的类型是[desnination,nexthop],其中destination表示目的网段(cidr),nexthop表示下一跳的IP,举例如下:“routes”:[ { “destination”:”10.50.10.0/24” “…

无符号有符号乘法_刘帅嵌入式系统-乘法指令

ARM有两类乘法指令:一类为32位的乘法指令,即乘法操作的结果为32位;另一类为64位的乘法指令,即乘法操作的结果为64位。两类指令共有以下6条。MUL:32位乘法指令MLA:32位带加数的乘法指令SMULL:64位…

php导出csv_原生PHP实现导出csv格式Excel文件的方法示例【附源码下载】

本文实例讲述了原生PHP实现导出csv格式Excel文件的方法。分享给大家供大家参考,具体如下:效果图源码分析index.phprequire_once "./Export.php";//测试数据$headerList [列名1,列名2,列名3];$data [[值1,值2,值3],[值11,值22,值33],[值111,值…

python 颜色_如何使用python中matplotlib库分析图像颜色

用代码分析图像可能很困难。你如何使代码“理解”图像的上下文?通常,使用AI分析图像的第一步 是找到主要颜色。在如何使用python中matplotlib库分析图像颜色中,我们将使用matplotlib的 image类在图像中找到主色 。查找主导色也是你可以使用第三方API进行…

cnn 准确率无法提升_清华类脑芯片再登Nature: 全球首款基于忆阻器的CNN存算一体芯片...

传统计算架构中计算与存储在不同电路单元中完成,造成大量数据搬运功耗增加和额外延迟,被认为是冯诺依曼计算架构的核心瓶颈。人类的大脑却并非如此,而是直接在记忆体里计算。被认为具有「存算一体」潜力的忆阻器,因而成为类脑计算…

oracle 几个字段中某个字段大于0其他字段不再进行统计?_如何深入理解MySQL 8.0直方图?...

MySQL8.0 新功能直方图,继承于Oracle ,MairaDB的实现方式。 那下面从mysql角度认识下,直方图是什么。先看下官方直方图的实现方式。 从上图上可以看到原来是ANALYZE命令。先了解一下MySQL里 ANALYZE命令到底有什么用。ANALYZE在MySQL里提交一…

godaddy修改php版本,Godaddy美国主机Plesk面板修改PHP版本教程

由于不同的需求,我们站长朋友们建站所用的网站程序也不尽相同,有PHP、ASP和ASP.NET等。Godaddy美国主机作为全球最大域名主机商,当之无愧成为站长使用最多的主机。不少朋友应该都知道不同的网站程序对于PHP和ASP的版本要求又不一样&#xff0…