spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用

一、SparkSQL的进化之路

1.0以前:

Shark

1.1.x开始:

SparkSQL(只是测试性的)  SQL

1.3.x:

SparkSQL(正式版本)+Dataframe

1.5.x:

SparkSQL 钨丝计划

1.6.x:

SparkSQL+DataFrame+DataSet(测试版本)

x:

SparkSQL+DataFrame+DataSet(正式版本)

SparkSQL:还有其他的优化

StructuredStreaming(DataSet)

二、认识SparkSQL

2.1 什么是SparkSQL?

spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

2.2 SparkSQL的作用

提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

2.3 运行原理

将 Spark SQL 转化为 RDD, 然后提交到集群执行

2.4 特点

(1)容易整合

(2)统一的数据访问方式

(3)兼容 Hive

(4)标准的数据连接

2.5 SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

特点:

----为用户提供一个统一的切入点使用Spark 各项功能

----允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

----减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

----与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

2.7 DataFrames

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

fd7700dea28cebb2f59dbc8870376a03.png

三、RDD转换成为DataFrame

使用spark1.x版本的方式

测试数据目录:/home/hadoop/apps/spark/examples/src/main/resources(spark的安装目录里面)

people.txt

076b335c3e972cd096e6bdf69c328bb1.png

3.1 方式一:通过 case class 创建 DataFrames(反射)

//定义case class,相当于表结构

case class People(var name:String,varage:Int)objectTestDataFrame1 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("RDDToDataFrame").setMaster("local")

val sc= newSparkContext(conf)

val context= newSQLContext(sc)//将本地的数据读入 RDD, 并将 RDD 与 case class 关联

val peopleRDD = sc.textFile("E:\\666\\people.txt")

.map(line=> People(line.split(",")(0), line.split(",")(1).trim.toInt))

import context.implicits._//将RDD 转换成 DataFrames

val df =peopleRDD.toDF//将DataFrames创建成一个临时的视图

df.createOrReplaceTempView("people")//使用SQL语句进行查询

context.sql("select * from people").show()

}

}

运行结果

11a95a897b025d57b0a5a3203350c0be.png

3.2 方式二:通过 structType 创建 DataFrames(编程接口)

objectTestDataFrame2 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val fileRDD= sc.textFile("E:\\666\\people.txt")//将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row

val rowRDD: RDD[Row] = fileRDD.map(line =>{

val fields= line.split(",")

Row(fields(0), fields(1).trim.toInt)

})//创建 StructType 来定义结构

val structType: StructType =StructType(//字段名,字段类型,是否可以为空

StructField("name", StringType, true) ::

StructField("age", IntegerType, true) :: Nil

)/**

* rows: java.util.List[Row],

* schema: StructType

**/val df: DataFrame=sqlContext.createDataFrame(rowRDD,structType)

df.createOrReplaceTempView("people")

sqlContext.sql("select * from people").show()

}

}

运行结果

5946850b8ed4690e9126f3d58c2a5973.png

3.3 方式三:通过 json 文件创建 DataFrames

objectTestDataFrame3 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val df: DataFrame= sqlContext.read.json("E:\\666\\people.json")

df.createOrReplaceTempView("people")

sqlContext.sql("select * from people").show()

}

}

cc945f520ecc85287760ed27937d8dd3.png

四、DataFrame的read和save和savemode

4.1 数据的读取

objectTestRead {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)//方式一

val df1 = sqlContext.read.json("E:\\666\\people.json")

val df2= sqlContext.read.parquet("E:\\666\\users.parquet")//方式二

val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")

val df4= sqlContext.read.format("parquet").load("E:\\666\\users.parquet")//方式三,默认是parquet格式

val df5 = sqlContext.load("E:\\666\\users.parquet")

}

}

4.2 数据的保存

objectTestSave {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val df1= sqlContext.read.json("E:\\666\\people.json")//方式一

df1.write.json("E:\\111")

df1.write.parquet("E:\\222")//方式二

df1.write.format("json").save("E:\\333")

df1.write.format("parquet").save("E:\\444")//方式三

df1.write.save("E:\\555")

}

}

4.3 数据的保存模式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

0b0ebbafe1f9b8f329f3df086ed28116.png

五、数据源

5.1 数据源只json

参考4.1

5.2 数据源之parquet

参考4.1

5.3 数据源之Mysql

objectTestMysql {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestMysql").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val url= "jdbc:mysql://192.168.123.102:3306/hivedb"val table= "dbs"val properties= newProperties()

properties.setProperty("user","root")

properties.setProperty("password","root")//需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)

val df =sqlContext.read.jdbc(url,table,properties)

df.createOrReplaceTempView("dbs")

sqlContext.sql("select * from dbs").show()

}

}

运行结果

3589515a2253d153729db3ea1b5740fc.png

5.4 数据源之Hive

(1)准备工作

在pom.xml文件中添加依赖

org.apache.spark

spark-hive_2.11

2.3.0

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

93d5da4fc1572a0d00c73f551453b9a6.png

javax.jdo.option.ConnectionURL

jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true

JDBC connect string for a JDBC metastore

javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

Driver class name for a JDBC metastore

javax.jdo.option.ConnectionUserName

root

username to use against metastore database

javax.jdo.option.ConnectionPassword

root

password to use against metastore database

hive.metastore.warehouse.dir

/hive/warehouse

hive default warehouse, if nessecory, change it

(2)测试代码

object TestHive {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)

val sc = new SparkContext(conf)

val sqlContext = new HiveContext(sc)

sqlContext.sql("select * from myhive.student").show()

}

}

运行结果

a622f51a405d1aaaa88d835c72834c3b.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/406298.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GetClientRect相当于GetWindowRect和ScreenToClient区别

From: http://www.cnblogs.com/yongtaiyu/archive/2011/05/18/2049554.html GetWindowRect是取得窗口在屏幕坐标系下的RECT坐标(包括客户区和非客户区),这样可以得到窗口的大小和相对屏幕左上角(0,0)的位置。 GetClientRect取得窗口客户区(…

xxljob 配置具体定时任务_记一次xxl-job定时任务没有触发的问题

当初选了xxl-job就是因为它的触发机制比较靠谱,到点准时发,而且有日志可以看。 昨天突然发现部署在一台本地机器上的xxl-job到点并没有触发,且没有任何日志。通过管理页面查询触发日志,发现日志还是有的,只是和筛选条件不甚匹配。比如选取了昨天的日志,结果集中包含了今天…

TOMCAT启动完成但是ECLIPSE仍然显示starting....

最近重新部署了一个TOMCAT服务,但是启动碰到个问题,虽然TOMCAT控制台已显示启动成功,但是ECLIPSE右下角仍然一直显示STARTING,最后TOMCAT超时,启动失败。之前以为是拷贝工程的问题,但其实是SERVER配置的问题…

例解 autoconf 和 automake 生成 Makefile 文件

From: http://www.ibm.com/developerworks/cn/linux/l-makefile/ 简介: 本文介绍了在 linux 系统中,通过 Gnu autoconf 和 automake 生成 Makefile 的方法。主要探讨了生成 Makefile 的来龙去脉及其机理,接着详细介绍了配置 Configure.in 的方…

爱慕内衣信息化颠覆流程重构供应链

通过IT,爱慕内衣进行了供应链流程的颠覆和重构。在北京慕集团首席信息官赵先生脑海中,一直有着这样一幅“大图景”:只要需要,老总在自己的办公室里能看到每一寸原料的采购情况,每一件成衣的生产和销售情况,…

mysql中 课程1比课程2成绩高_小菜菜mysql练习解读分析1——查询 01 课程比 02 课程成绩高的学生的信息及课程分数...

查询" 01 "课程比" 02 "课程成绩高的学生的信息及课程分数好的,第一道题,刚开始做,就栽了个跟头,爽歪歪,至于怎么栽跟头的——需要分析题目,查询的是,查询的是(1)学生的信息…

window mysql集群视频_windows7实现mysql集群cluster-mysql簇

http://bitar.cn下载:mysql-cluster-gpl-7.4.7-winx64内置了:mysql-5.6.25,ndb-7.4.7文档可参考 mysql参考手册【mysql簇】管理(MGM)节点新建文件夹,存放ndb_mgm.exe 和ndb_mgmd.exe,就是管理(MGM)节点不需要mysql客户端端口监督程…

Human_height

Country/RegionAverage male heightAverage female heightSample population / age rangeMethodologyYearSourceArgentina1.745 m (5 ft 81⁄2 in)1.610 m (5 ft 31⁄2 in)19Measured1998–2001[1]Australia1.748 m (5 ft 9 in)1.634 m (5 ft 41⁄2 in)18Measured1995[2]Austra…

TreeSet()详解

/* * TreeSet存储对象的时候, 可以排序, 但是需要指定排序的算法 * * Integer能排序(有默认顺序), String能排序(有默认顺序), 自定义的类存储的时候出现异常(没有顺序) * * 如果想把自定义类的对象存入TreeSet进行排序, 那么必须实现Comparable接口 * 在类上implement Com…

Python的前景及应用

2019独角兽企业重金招聘Python工程师标准>>> 前景 Python在编程领域的占有率一直处于稳步上升之中,根据最新的数据,Python排名第六。前五名分别是 Java、C、PHP、C 和 VB. 作为一个很年轻的语言,Python的位置已经相当令人振奋了。…

设置允许远程连接MySQL (Ubuntu为例)

设置允许远程连接MySQL (Ubuntu为例) MySQL默认配置是不允许远程连接,为了安全! 然而我需要这么做; 开始改配置; #1 设置用户权限 Type help; or \h for help. Type \c to clear the current input statement.mysql> use mysql#查看当前设置 mysql&…