python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...

说明:spark版本:2.2.0

hive版本:1.2.1

需求: 有本地csv格式的一个文件,格式为${当天日期}visit.txt,例如20180707visit.txt,现在需要将其通过spark-sql程序实现将该文件读取并以parquet的格式通过外部表的形式保存到hive中,最终要实现通过传参的形式,将该日期区间内的csv文件批量加载进去,方式有两种:

1、之传入一个参数,说明只加载一天的数据进去

2、传入两个参数,批量加载这两个日期区间的每一天的数据

最终打成jar包,进行运行

步骤如下:

1、初始化配置,先创建sparkSession(spark2.0版本开始将sqlContext、hiveContext同意整合为sparkSession)

//初始化配置

val spark = new sql.SparkSession

.Builder()

.enableHiveSupport()  //操作hive这一步千万不能少

.appName("project_1")

.master("local[2]")

.getOrCreate()

2、先将文件读进来,并转换为DF

val data = spark.read.option("inferSchema", "true").option("header", "false") //这里设置是否处理头信息,false代表不处理,也就是说文件的第一行也会被加载进来,如果设置为true,那么加载进来的数据中不包含第一行,第一行被当作了头信息,也就是表中的字段名处理了

.csv(s"file:///home/spark/file/project/${i}visit.txt")  //这里设置读取的文件,${i}是我引用的一个变量,如果要在双引号之间引用变量的话,括号前面的那个s不能少

.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time") //将读进来的数据转换为DF,并为每个字段设置字段名

3、将转换后的DF注册为一张临时表

data.createTempView(s"table_${i}")

4、通过spark-sql创建hive外部表,这里有坑

spark.sql(

s"""

|create external table if not exists ${i}visit

|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,

|region string, screen string, stay_time int) stored as parquet

|location 'hdfs://master:9000/project_dest/${i}'

""".stripMargin)

这里的见表语句需要特别注意,如果写成如下的方式是错误的:

spark.sql(

s"""

|create external table if not exists ${i}visit

|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,

|region string, screen string, stay_time int) row format delimited fields terminated by '\t' stored as parquet

|location /project_dest/${i}'

""".stripMargin)

(1)对于row format delimited fields terminated by '\t'这语句只支持存储文件格式为textFile,对于parquet文件格式不支持

(2)对于location这里,一定要写hdfs的全路径,如果向上面这样写,系统不认识,切记

5、通过spark-sql执行insert语句,将数据插入到hive表中

spark.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)

至此,即完成了将本地数据以parquet的形式加载至hive表中了,接下来既可以到hive表中进行查看数据是否成功载入

贴一下完整代码:

package _sql.project_1

import org.apache.spark.sql

/**

* Author Mr. Guo

* Create 2018/9/4 - 9:04

* ┌───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┐

* │Esc│ │ F1│ F2│ F3│ F4│ │ F5│ F6│ F7│ F8│ │ F9│F10│F11│F12│ │P/S│S L│P/B│ ┌┐ ┌┐ ┌┐

* └───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┘ └┘ └┘ └┘

* ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───────┐ ┌───┬───┬───┐ ┌───┬───┬───┬───┐

* │~ `│! 1│@ 2│# 3│$ 4│% 5│^ 6│& 7│* 8│( 9│) 0│_ -│+ =│ BacSp │ │Ins│Hom│PUp│ │N L│ / │ * │ - │

* ├───┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─────┤ ├───┼───┼───┤ ├───┼───┼───┼───┤

* │ Tab │ Q │ W │ E │ R │ T │ Y │ U │ I │ O │ P │{ [│} ]│ | \ │ │Del│End│PDn│ │ 7 │ 8 │ 9 │ │

* ├─────┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴─────┤ └───┴───┴───┘ ├───┼───┼───┤ + │

* │ Caps │ A │ S │ D │ F │ G │ H │ J │ K │ L │: ;│" '│ Enter │ │ 4 │ 5 │ 6 │ │

* ├──────┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴────────┤ ┌───┐ ├───┼───┼───┼───┤

* │ Shift │ Z │ X │ C │ V │ B │ N │ M │< ,│> .│? /│ Shift │ │ ↑ │ │ 1 │ 2 │ 3 │ │

* ├─────┬──┴─┬─┴──┬┴───┴───┴───┴───┴───┴──┬┴───┼───┴┬────┬────┤ ┌───┼───┼───┐ ├───┴───┼───┤ E││

* │ Ctrl│ │Alt │ Space │ Alt│ │ │Ctrl│ │ ← │ ↓ │ → │ │ 0 │ . │←─┘│

* └─────┴────┴────┴───────────────────────┴────┴────┴────┴────┘ └───┴───┴───┘ └───────┴───┴───┘

**/

object Spark_Sql_Load_Data_To_Hive {

//初始化配置

val spark = new sql.SparkSession

.Builder()

.enableHiveSupport()

.appName("project_1")

.master("local[2]")

.getOrCreate()

//设置日志的级别

spark.sparkContext.setLogLevel("WARN")

def main(args: Array[String]): Unit = {

try {

if (args.length != 1) {

data_load(args(0).toInt)

} else if (args.length != 2) {

for (i

data_load(i)

}

} else {

System.err.println("Usage: or ")

System.exit(1)

}

}catch {

case ex:Exception => println("Exception")

}finally{

spark.stop()

}

}

def data_load(i:Int): Unit = {

println(s"*******data_${i}********")

val data = spark.read.option("inferSchema", "true").option("header", "false")

.csv(s"file:///home/spark/file/project/${i}visit.txt")

.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time")

data.createTempView(s"table_${i}")

spark.sql("use project_1".stripMargin)

spark.sql(

s"""

|create external table if not exists ${i}visit

|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,

|region string, screen string, stay_time int) stored as parquet

|location 'hdfs://master:9000/project_dest/${i}'

""".stripMargin)

spark

.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)

}

}

6、打成jar包(我的IDEA版本是2017.3版本)

25ec1f2de5aa9cd0b314f77bba70174f.png

如果没有上面这一栏,点击View,然后勾选Toolbar即可

345f198b4d41fd933b6770569d3a3735.png

296b879ad9bdb023f282da941a287965.png

点击ok

1200a74dee6403e4438231ffd01c66b4.png

52e461b2ebb3e41e77c1ca7e1816e9cd.png

09dceeb3c81af5ca128911fd6fd3aa94.png

b72e33b9d4e3f79fc0f75ad7f55211d5.png

此时这里会成成这么一个文件,是编译之后的class文件

de79c6bcec51bccba4d86dd444094756.png

到这个目录下会找到这么一个jar包

9f4c1654764e6b4c195fb60e0d7c872e.png

找到该文件夹,上传到服务器,cd到该目录下运行命令:

spark-submit --class spark._sql.project_1.Conn_hive --master spark://master:7077 --executor-memory 2g --num-executors 3 /spark_maven_project.jar 20180901 20180910

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/244442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

el-date-picker设置默认日期_程序员必备:Java 日期处理的十个坑

前言整理了Java日期处理的十个坑&#xff0c;希望对大家有帮助。一、用Calendar设置时间的坑反例&#xff1a;Calendar c Calendar.getInstance();c.set(Calendar.HOUR, 10);System.out.println(c.getTime());运行结果&#xff1a;Thu Mar 26 22:28:05 GMT08:00 2020解析&…

scope python_Python标准库Scope

作者&#xff1a;zhbzz2007 出处&#xff1a;http://www.cnblogs.com/zhbzz2007 欢迎转载&#xff0c;也请保留这段声明。谢谢&#xff01;1 模块简介你一定在很多计算机科学课程上听说过作用域。它很重要&#xff0c;如果你不理解它的工作原理&#xff0c;那么就会出现一些令人…

java命令_JAVA与模式之命令模式

在阎宏博士的《JAVA与模式》一书中开头是这样描述命令(Command)模式的&#xff1a;命令模式属于对象的行为模式。命令模式又称为行动(Action)模式或交易(Transaction)模式。命令模式把一个请求或者操作封装到一个对象中。命令模式允许系统使用不同的请求把客户端参数化&#xf…

android 16进制 全透明_你有几种实现方案Android 设备唯一标识?

前言项目开发中&#xff0c;多少会遇到这种需求&#xff1a;获得设备唯一标识DeviceId&#xff0c;用于&#xff1a;1.标识一个唯一的设备&#xff0c;做数据精准下发或者数据统计分析&#xff1b;2.账号与设备绑定&#xff1b;3.....分析这类文章&#xff0c;网上有许多资料&a…

链表的数据域怎么使用结构体_一步一步教你从零开始写C语言链表

为什么要学习链表&#xff1f;链表主要有以下几大特性&#xff1a;1、解决数组无法存储多种数据类型的问题。2、解决数组中&#xff0c;元素个数无法改变的限制(C99的变长数组&#xff0c;C也有变长数组可以实现)。3、数组移动元素的过程中&#xff0c;要对元素进行大范围的移动…

python计算bmi_Python编程语言:如何用Python编程来判断体重指数BMI是否健康

上一篇小编分享了自己学习Python语言有关字符串和模块time使用的相关知识&#xff0c;这一篇小编分享给大家的是比较有趣的运用&#xff0c;那就是如何用Python编程来表示自己体重BMI。 用Python程序来算出我们自己的BMI指数&#xff0c;来判断我们自己的健康情况&#xff0c;首…

drbd实现mysql地热备_heartheartbeat+drbd+mysql主库热备

1 环境主机名网卡磁盘mastereth0 桥接模式 eth0(192.168.1.10) 自定义模式(VMnet2)(192.168.2.10)VIP 192.168.1.200/210系统盘20G外接磁盘slaveeth0 桥接模式(192.168.1.20) eth1 自定义模式(VMnet2)(192.168.2.20)VIP 192.168.1.200/210系统盘20G外接磁盘server3eth0 桥接模式…

dba的前景_运维、测试、程序员,这些技术岗位哪个更有前景?

在一个初具规模的互联网公司&#xff0c;从业务方面出发&#xff0c;有很多岗位类型&#xff0c;比如运营、客服、市场、产品、设计、技术等等。在这些大类下面&#xff0c;还要细分各种小类&#xff0c;以技术为例&#xff0c;可分为前端(客户端)、后端、测试、运维、DBA等等&…

mysql深度解析_百万级数据下的mysql深度解析

mysql 作为一款非常优秀的免费数据库被广泛的使用&#xff0c;平时我们开发的项目数据过百万的时候不多。最近花了大量的时间来深入的研究mysql百万级数据情况下的优化。 遇到了很多问题并解决了他们&#xff0c;特此分享给大家。欢迎提出您的宝贵意见&#xff01;一、百万级数…

python异步实现方式_Python通过yield实现异步

改写程序first函数等待long函数返回值的方式为yield,代码如下&#xff1a; import _thread import time gen None def long(): print (long execute) def fun(): time.sleep(5) result long end gen.send(result) _thread.start_new_thread(fun,()) def first(): print (firs…

免安装版的mysql步骤_mysql免安装版的安装方法及步骤

mysql免安装版的安装方法及步骤发布时间&#xff1a;2020-07-15 14:07:18来源&#xff1a;亿速云阅读&#xff1a;83作者&#xff1a;清晨小编给大家分享一下mysql免安装版的安装方法及步骤&#xff0c;希望大家阅读完这篇文章后大所收获&#xff0c;下面让我们一起去探讨吧&am…

jupyter notebook怎么写python代码_如何在Jupyter Notebook中使用Python虚拟环境?

如何在使用Jupyter Notebook时&#xff0c;解决Python虚拟环境间的切换问题&#xff1f;本文一步步帮你拆解。希望你能够避免踩坑的痛苦&#xff0c;把更多的时间花在愉快的编程上。 痛点 Python目前有两个主版本并存&#xff0c;这很让人苦恼。 一般人对于软件&#xff0c;总是…

cmd检查java_如何通过cmd查看java环境

展开全部JAVA环境变量设置一、下载JDK。62616964757a686964616fe4b893e5b19e31333363376561下载后是一个可执行程序&#xff0c;双击安装&#xff0c;安装路径为C:\Program Files\Java\jdk1.6.0_22\(当然&#xff0c;其他路径也可以)&#xff0c;如下图&#xff1a;二、设置环境…

ios 检测是否联网_秋招|阿里 iOS 五轮面经分享,已收到阿里的意向书

作者&#xff1a;aaaaaazzzz链接&#xff1a;https://www.nowcoder.com/discuss/302113来源&#xff1a;牛客网感觉牛客很少看到iOS的面经了&#xff0c;今天收到了阿里的意向书&#xff0c;来分享下面经&#xff0c;希望大家都Offer~&#xff01;总体感觉面试官都非常好&#…

java spring框架 注解_spring框架之注解的使用

原标题&#xff1a;spring框架之注解的使用今天是刘小爱自学Java的第122天。感谢你的观看&#xff0c;谢谢你。学习内容安排如下&#xff1a;Spring注解的使用。JavaWeb项目的搭建。Spring的Web集成。本来还计划学Spring的junit测试集成的&#xff0c;结果又没时间了。一、Spri…

idea 代码格式化插件_IDEA非常棒的插件,阿里巴巴约定成文的代码公约规范

无规矩&#xff0c;不方圆。每个人都有自己的编码风格&#xff0c;每个公司也有自己的代码规范。规范的代码&#xff0c;无论是自己日常维护&#xff0c;还是以后接盘者来接盘&#xff0c;都能快速定位上手&#xff0c;大大提高效率。作为一个IDEA万年爱好者&#xff0c;这些最…

java der pem_JAVA解析各种编码密钥对(DER、PEM、openssh公钥)

一、DER编码密钥对先说下DER编码&#xff0c;是因为JCE本身是支持DER编码密钥对的解析的&#xff0c;可以参见PKCS8EncodedKeySpec和X509EncodedKeySpec.DER编码是ASN.1编码规则中的一个子集&#xff0c;具体格式如何编排没有去了解&#xff0c;但最终呈现肯定的是一堆有规律的…

switch最大选项数目_随时随地学习C语言之3—if和switch哪个效率高?

之前学习C语言的时候&#xff0c;我经常有一个疑问&#xff0c;既然有if-else if-else结构的多分支选择语句&#xff0c;C语言为何还要制定switch这种多分支选择语句呢&#xff1f;直到两年前在分析ARM平台C语言反汇编代码的时候&#xff0c;才终于明白了switch-case这种结构存…

android java 退出程序_android开发两种退出程序方式(killProcess,System.exit)

KillProcess&#xff1a;在android中我们如果想要程序的进程结束可以这样写&#xff1a;android.os.Process.killProcess(android.os.Process.myPid());这样就可以从操作系统中结束掉当前程序的进程。注意&#xff1a;android中所有的activity都在主进程中&#xff0c;在Androi…

java 验证码_Java - 验证码 - 由Kaptcha组件实现

本文是基于SpringBoot整合Kaptcha验证码实现Kaptcha 是一个可高度配置的实用验证码生成工具&#xff0c;在项目开发中能够非常方便实现验证码先来看一个由 Kaptcha 制作的验证码效果图快速进入如何进行配置与实现的第1步&#xff1a;配置 Kaptcha 的依赖库com.github.penggle …