spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用

一、SparkSQL的进化之路

1.0以前:

Shark

1.1.x开始:

SparkSQL(只是测试性的)  SQL

1.3.x:

SparkSQL(正式版本)+Dataframe

1.5.x:

SparkSQL 钨丝计划

1.6.x:

SparkSQL+DataFrame+DataSet(测试版本)

x:

SparkSQL+DataFrame+DataSet(正式版本)

SparkSQL:还有其他的优化

StructuredStreaming(DataSet)

二、认识SparkSQL

2.1 什么是SparkSQL?

spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

2.2 SparkSQL的作用

提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

2.3 运行原理

将 Spark SQL 转化为 RDD, 然后提交到集群执行

2.4 特点

(1)容易整合

(2)统一的数据访问方式

(3)兼容 Hive

(4)标准的数据连接

2.5 SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

特点:

----为用户提供一个统一的切入点使用Spark 各项功能

----允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

----减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

----与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

2.7 DataFrames

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

fd7700dea28cebb2f59dbc8870376a03.png

三、RDD转换成为DataFrame

使用spark1.x版本的方式

测试数据目录:/home/hadoop/apps/spark/examples/src/main/resources(spark的安装目录里面)

people.txt

076b335c3e972cd096e6bdf69c328bb1.png

3.1 方式一:通过 case class 创建 DataFrames(反射)

//定义case class,相当于表结构

case class People(var name:String,varage:Int)objectTestDataFrame1 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("RDDToDataFrame").setMaster("local")

val sc= newSparkContext(conf)

val context= newSQLContext(sc)//将本地的数据读入 RDD, 并将 RDD 与 case class 关联

val peopleRDD = sc.textFile("E:\\666\\people.txt")

.map(line=> People(line.split(",")(0), line.split(",")(1).trim.toInt))

import context.implicits._//将RDD 转换成 DataFrames

val df =peopleRDD.toDF//将DataFrames创建成一个临时的视图

df.createOrReplaceTempView("people")//使用SQL语句进行查询

context.sql("select * from people").show()

}

}

运行结果

11a95a897b025d57b0a5a3203350c0be.png

3.2 方式二:通过 structType 创建 DataFrames(编程接口)

objectTestDataFrame2 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val fileRDD= sc.textFile("E:\\666\\people.txt")//将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row

val rowRDD: RDD[Row] = fileRDD.map(line =>{

val fields= line.split(",")

Row(fields(0), fields(1).trim.toInt)

})//创建 StructType 来定义结构

val structType: StructType =StructType(//字段名,字段类型,是否可以为空

StructField("name", StringType, true) ::

StructField("age", IntegerType, true) :: Nil

)/**

* rows: java.util.List[Row],

* schema: StructType

**/val df: DataFrame=sqlContext.createDataFrame(rowRDD,structType)

df.createOrReplaceTempView("people")

sqlContext.sql("select * from people").show()

}

}

运行结果

5946850b8ed4690e9126f3d58c2a5973.png

3.3 方式三:通过 json 文件创建 DataFrames

objectTestDataFrame3 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val df: DataFrame= sqlContext.read.json("E:\\666\\people.json")

df.createOrReplaceTempView("people")

sqlContext.sql("select * from people").show()

}

}

cc945f520ecc85287760ed27937d8dd3.png

四、DataFrame的read和save和savemode

4.1 数据的读取

objectTestRead {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)//方式一

val df1 = sqlContext.read.json("E:\\666\\people.json")

val df2= sqlContext.read.parquet("E:\\666\\users.parquet")//方式二

val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")

val df4= sqlContext.read.format("parquet").load("E:\\666\\users.parquet")//方式三,默认是parquet格式

val df5 = sqlContext.load("E:\\666\\users.parquet")

}

}

4.2 数据的保存

objectTestSave {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val df1= sqlContext.read.json("E:\\666\\people.json")//方式一

df1.write.json("E:\\111")

df1.write.parquet("E:\\222")//方式二

df1.write.format("json").save("E:\\333")

df1.write.format("parquet").save("E:\\444")//方式三

df1.write.save("E:\\555")

}

}

4.3 数据的保存模式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

0b0ebbafe1f9b8f329f3df086ed28116.png

五、数据源

5.1 数据源只json

参考4.1

5.2 数据源之parquet

参考4.1

5.3 数据源之Mysql

objectTestMysql {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestMysql").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val url= "jdbc:mysql://192.168.123.102:3306/hivedb"val table= "dbs"val properties= newProperties()

properties.setProperty("user","root")

properties.setProperty("password","root")//需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)

val df =sqlContext.read.jdbc(url,table,properties)

df.createOrReplaceTempView("dbs")

sqlContext.sql("select * from dbs").show()

}

}

运行结果

3589515a2253d153729db3ea1b5740fc.png

5.4 数据源之Hive

(1)准备工作

在pom.xml文件中添加依赖

org.apache.spark

spark-hive_2.11

2.3.0

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

93d5da4fc1572a0d00c73f551453b9a6.png

javax.jdo.option.ConnectionURL

jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true

JDBC connect string for a JDBC metastore

javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

Driver class name for a JDBC metastore

javax.jdo.option.ConnectionUserName

root

username to use against metastore database

javax.jdo.option.ConnectionPassword

root

password to use against metastore database

hive.metastore.warehouse.dir

/hive/warehouse

hive default warehouse, if nessecory, change it

(2)测试代码

object TestHive {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)

val sc = new SparkContext(conf)

val sqlContext = new HiveContext(sc)

sqlContext.sql("select * from myhive.student").show()

}

}

运行结果

a622f51a405d1aaaa88d835c72834c3b.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/406298.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac 生成SSH Key并配置到GitLab(单个)

Mac 生成SSH Key并配置到GitLab(单个)

一封写给自己的信

亲爱的自己,从今天起为了自己骄傲的活着吧,好好爱自己,没有人会心疼你,除了你妈妈。亲爱的自己,不要太在意一些人太在乎一些事,顺其自然以最佳心态面对,因为这世界就是这么不公平,往…

GetClientRect相当于GetWindowRect和ScreenToClient区别

From: http://www.cnblogs.com/yongtaiyu/archive/2011/05/18/2049554.html GetWindowRect是取得窗口在屏幕坐标系下的RECT坐标(包括客户区和非客户区),这样可以得到窗口的大小和相对屏幕左上角(0,0)的位置。 GetClientRect取得窗口客户区(…

汇编中call printf参数压栈时错误理解

EAX, ECX,EDX,EBX均可以32bit,16bit,8bit访问&#xff0c;如下所示: <-------------------EAX------------------------>|<----------------------|-----------|----------->|             |<---------AX--------->|             |&…

ajax传输json数据格式乱码_解决Ajax加载JSon数据中文乱码问题

一、问题描述使用zTree的异步刷新父级菜单时&#xff0c;服务器返回中文乱码&#xff0c;但项目中使用了SpringMvc&#xff0c;已经对中文乱码处理&#xff0c;为什么还会出现呢&#xff1f;此处为的异步请求的配置&#xff1a;Java代码async: {enable: true,url: basePath /s…

在命令提示符下输入的命令

ASSOC 显示或修改文件扩展名关联。ATTRIB 显示或更改文件属性。BREAK 设置或清除扩展式 CTRLC 检查。BOOTCFG 设置 boot.ini 文件的属性以便控制启动加载。CACLS 显示或修改文件的访问控制列表(ACL)。CALL 从另一个批处理…

(转载)WebSphere MQ安装过程

参考文档&#xff1a; http://www.ibm.com/developerworks/cn/linux/linux-speed-start/l-ss-mq/ 转载于:https://www.cnblogs.com/lichmama/p/4312577.html

Vue.config.productionTip = false 是什麽意思

Vue.config.productionTip false 是什麽意思

non-aggregates cannot be initialized with initializer list

From: http://blog.csdn.net/sp_daiyq/article/details/7008990 我定义了一个结构体&#xff0c;示意如下&#xff1a; [cpp] view plaincopyStruct A { int x; CString test; }; 然后我定义一个变量同时对其进行串行初始化&#xff1a; A a {0, "hello&q…

xxljob 配置具体定时任务_记一次xxl-job定时任务没有触发的问题

当初选了xxl-job就是因为它的触发机制比较靠谱,到点准时发,而且有日志可以看。 昨天突然发现部署在一台本地机器上的xxl-job到点并没有触发,且没有任何日志。通过管理页面查询触发日志,发现日志还是有的,只是和筛选条件不甚匹配。比如选取了昨天的日志,结果集中包含了今天…

TOMCAT启动完成但是ECLIPSE仍然显示starting....

最近重新部署了一个TOMCAT服务&#xff0c;但是启动碰到个问题&#xff0c;虽然TOMCAT控制台已显示启动成功&#xff0c;但是ECLIPSE右下角仍然一直显示STARTING&#xff0c;最后TOMCAT超时&#xff0c;启动失败。之前以为是拷贝工程的问题&#xff0c;但其实是SERVER配置的问题…

StatusCodeError: 400 - “{\“code\“:40000,\“error\“:\“错误 Error: 登录用户不是该小程序的开发者

StatusCodeError: 400 - “{\“code\“:40000,\“error\“:\“错误 Error: 登录用户不是该小程序的开发者

例解 autoconf 和 automake 生成 Makefile 文件

From: http://www.ibm.com/developerworks/cn/linux/l-makefile/ 简介&#xff1a; 本文介绍了在 linux 系统中&#xff0c;通过 Gnu autoconf 和 automake 生成 Makefile 的方法。主要探讨了生成 Makefile 的来龙去脉及其机理&#xff0c;接着详细介绍了配置 Configure.in 的方…

爱慕内衣信息化颠覆流程重构供应链

通过IT&#xff0c;爱慕内衣进行了供应链流程的颠覆和重构。在北京慕集团首席信息官赵先生脑海中&#xff0c;一直有着这样一幅“大图景”&#xff1a;只要需要&#xff0c;老总在自己的办公室里能看到每一寸原料的采购情况&#xff0c;每一件成衣的生产和销售情况&#xff0c;…

mysql中 课程1比课程2成绩高_小菜菜mysql练习解读分析1——查询 01 课程比 02 课程成绩高的学生的信息及课程分数...

查询" 01 "课程比" 02 "课程成绩高的学生的信息及课程分数好的&#xff0c;第一道题&#xff0c;刚开始做&#xff0c;就栽了个跟头&#xff0c;爽歪歪&#xff0c;至于怎么栽跟头的——需要分析题目&#xff0c;查询的是&#xff0c;查询的是(1)学生的信息…

专门讲讲这个MYSQL授权当中的with grant option的作用

对象的owner将权限赋予某个用户(如:testuser1) grant select ,update on bd_corp to testuser1 [with grant option ]1.如果带了 with grant option 那么用户testuser1可以将select ,update权限传递给其他用户( 如testuser2)grant select,update on bd_corp to testuser22.如果…