SparkSQL DataFrame基础篇

SparkSQL DataFrame基础篇
SparkSQL DataFrame进阶篇

SparkSQL DataFrame基础篇

Spark 2.2及以后的SparkSession替换了Spark以前版本中的SparkContext和SQLContext,为Spark集群提供了唯一的入口点。
val spark =SparkSession.builder().appName(“SparkExample”).getOrCreate()
为了向后兼容,SparkSession对象包含SparkContext和SQLContext对象。当使用交互式Spark shell时,创建一个SparkSession类型对象名为spark。因此该文档里所有的SQLContext在spark2.2+中都可以替换成spark

基于反射机制创建DataFrame

//students.txt
160201,Michael,17
160101,Andy,23
160301,justin,23
160202,John,22
160102,Herry,17
160203,Brewster,18
160302,Brice,20
160303,Justin,25
160103,Jerry,22
160304,Tom,24

不结合hive,使用spark实例(适用于spark1+)
val spark=new org.apache.spark.sql.spark(sc)
import spark.implicits._

结合hive后,访问hive时使用HiveContext实例
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._

这里与Spark2.2+有所不同

case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()
students.registerTempTable("tb_students")
val youngstudents=spark.sql("SELECT name FROM tb_students WHERE age>=19 AND age<=22")
youngstudents.show

基于编程创建DataFrame

import org.apache.spark.sql.types._   
import org.apache.spark.sql._         
import org.apache.spark._val students=sc.textFile("hdfs://master:9000/sqldata/students.txt")val schemaString="id name age"val schema=StructType(schemaString.split(" ").map(fieldname=> StructField(fieldname, StringType, true)))val rowRDD=students.map(_.split(",")).map(p=>Row(p(0), p(1), p(2).trim))val studentsDataFrame=spark.createDataFrame(rowRDD, schema) studentsDataFrame.registerTempTable("tb_students")val names=spark.sql("SELECT name FROM tb_students")names.show

基于DataFrame创建Json文件

1.创建DataFrame

import spark.implicits._
case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()

2.另存为json类型文件

//students.save("hdfs://master:9000/sqldata/students.json")//无法执行spark1.4及以后,dataframe中的save方法不建议使用,有的直接被弃用,
使用DataFrame中的write方法返回一个DataFrameWriter类型对象,再使用里面的save方法、format().save()、parquet()等方法students.write.save("hdfs://master:9000/sqldata/students.json")
students.write.format("json").save("hdfs://master:9000/sqldata/s1.json")//Spark 1.4 DataFrameWriter中方法format、save
students.write.json("hdfs://master:9000/sqldata/s2.json")//Spark 1.4 DataFrameWriter中方法json

基于DataFrame创建Parquet文件

1.创建DataFrame

import spark.implicits._
case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()

2.另存为parquet类型文件

//students.save("hdfs://master:9000/sqldata/students.parquet")//无法执行spark1.4及以后,dataframe中的save方法不建议使用,有的直接被弃用,
使用DataFrame中的write方法返回一个DataFrameWriter类型对象,再使用里面的save方法、format().save()、parquet()等方法students.write.save("hdfs://master:9000/sqldata/students.parquet")
students.write.format("parquet").save("hdfs://master:9000/sqldata/s1.parquet")
students.write.parquet("hdfs://master:9000/sqldata/s2.parquet")

基于Json创建DataFrame

//read方法是spark中的方法,返回值类型是DataFrameReader,以往的jsonFile方法已经不建议使用 
val students=spark.read.json("hdfs://master:9000/sqldata/s2.json")//read.json()是最通用的一种方法s1.json,s2.json都可读     
students.registerTempTable("tb_students")
spark.sql("select * from tb_students").show
/*
val students=spark.read.load("hdfs://master:9000/sqldata/students.json")//可执行,但换做读s1.json,s2.json不可读
val students=spark.read.load("hdfs://master:9000/sqldata/s1.json") //不可执行,load默认读取parquet类型文件
val students=spark.jsonFile("hdfs://master:9000/sqldata/students.json") //jsonFile不可用
*/

基于Parquet创建DataFrame

//read方法是spark中的方法,返回值类型是DataFrameReader,以往的parquetFile方法已经不建议使用 
val students=spark.read.parquet("hdfs://master:9000/sqldata/students.parquet") //最通用的一种方法,students.parquet,s1.parquet,s2.parquet都可读
students.registerTempTable("tb_students")
spark.sql("select * from tb_students").show
/*
val students=spark.read.load("hdfs://master:9000/sqldata/students.parquet")
val students=spark.load.parquet("hdfs://master:9000/sqldata/s1.parquet")//此处不可执行因为load非SparkSession中方法,Spark1.X中load方法可执行,默认读取parquet文件
val students=spark.parquetFile("hdfs://master:9000/sqldata/students.parquet")//parquetFile不可用
*/

DataFrame的其他操作

students.head    students.head(3)students.show(3)students.columnsstudents.dtypesstudents.printSchema
//更详细的Schema信息

withColumn

students.withColumn("bonus", students("age")*50).show
students.withColumn("bonus", $"age"*50).show//可以运行
students.withColumn("bonus", "age"*50).show//不可运行

withColumnRenamed

val newstudents=students.withColumnRenamed("age", "newage")newstudents.printSchema

select

case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()students.select("age", "name").show
students.select( $"age", $"name").show
students.selectExpr("age+1", "name", "abs(age)").show
students.selectExpr("age+1 as newage1", "name as newName", "sqrt(age) as newage2").show

filter

students.filter("age>20").show
students.filter($"age">20).show
students.filter($"age">23 && $"name"==="Justin").show  //可以执行
students.filter('age>20).show //可以执行
students.filter("age>23 && name===Justin").show        //不可以执行,字符串中不能用逻辑运算符

where

students.where('age>20).show   
students.where($"age">20).show 
students.where($"age">23 && $"name"==="Justin").show
students.where('age>23 && 'name==="Justin").show
students.where("age>23 && name===Justin").show      //不可以执行,字符串中不能用逻辑运算符

orderBy

students.orderBy("age", "id").show(5)
students.orderBy(students("age")).show(5)
students.orderBy($"age").show(5)

groupBy

newstudents.groupBy('course_id).mean("score").orderBy('course_id).showmax("col")
min("col")
mean("col")
sum("col")
该四种方法只适用于数值型的GroupedData对象

sort

students.sort("age").show(5)
students.sort($"age".desc).show(5)
students.sort("age".desc).show(5)//不可执行

toDF

val newstudents=students.toDF("newid", "newage", "newname")
newstudents.printSchema

join

case class Score(id:String,course_id:String,score:Int)
val scores_rdd=sc.textFile("hdfs://master:9000/sqldata/scores.txt").map(_.split(",")).map(p => Score(p(0), p(1), p(2).trim.toInt))
val scores = scores_rdd.toDF()
students.join(scores, students("id" )===scores("id"), "outer").show

案例练习

Spark SQL基本案例1

scores.txt

scores 学号、课程编号、成绩
160201,1,60
160101,2,90
160301,1,70
160202,3,70
160102,3,50
160102,4,95
160302,1,98
160303,2,57
160103,3,64
160304,3,50
160201,2,77
160101,3,57
160301,3,72
160202,2,80
160102,2,58
160102,3,97
160302,4,91
160303,1,67
160103,2,62
160304,4,71

students.txt

160201,Michael,17
160101,Andy,23
160301,justin,23
160202,John,22
160102,Herry,17
160203,Brewster,18
160302,Brice,20
160303,Justin,25
160103,Jerry,22
160304,Tom,24

1.利用反射机制创建students.txt对应的DataFrame,其中包含id、name、age三个字段。

val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p=>Student(p(0),p(1),p(2).toInt))
students = rdd.toDF
students.show

2.查看所有学生的姓名。

students.select('name).show

3.查询所有学生的年龄,并按照年龄降序排序。

students.select('age).show
students.sort("age".desc).show()

4.查询年龄小于19或年龄大于21的所有学生。

students.where($"age">21 || $"age"<19).show

5.添加scholarship字段,每个学生的scholarship项是其龄*20。

val newstudents = students.withColumn("scholarship",$"age"*20)
newstudents.show(5)

6.将添加scholarship字段后的DataFrame分别以Parquet和JSON格式保存至HDFS上。

newstudents.write.format("json").save("hdfs://master:9000/sqldata/newstudents.json")
newstudents.write.format("parquet").save("hdfs://master:9000/sqldata/newstudents.parquet")

7.利用SQL语句实现案例1中2-4。

students.registerTempTable("tb_students")spark.sql("SELECT name FROM tb_students WHERE age>=19 AND age<=21")

8.利用自定义接口创建scores.txt对应的DataFrame,其中包含id、course_id、score三个字段。

val scores_rdd=sc.textFile("hdfs://master:9000/sqldata/scores.txt").map(_.split(",")).map(p => Score(p(0), p(1), p(2).trim.toInt))val scores = scores.toDF()
val newstudents = students.join(scores, students("id" )==scores("id"), "outer")
newstudents.show

9.按照课程编号分组,查看每门课的平均成绩,并按课程编号升序排序。

newstudents.groupBy('course_id).mean("score").orderBy('course_id).show

10.按照学生编号分组,查看个学生的姓名和其所有课程的平均成绩,并在统计结果中筛选出平均成绩大于72的同学。

val newstudents = students.join(scores, students("id" )==scores("id"), "outer")
newstudents.groupBy($"name").mean("score").where($"avg(score)">72).orderBy("name").show

Spark SQL基本案例2

students.txt

学号/姓名/性别/年龄/学年/系别/
160201,Michael,male,37,2012,2,2
160101,Rose,female,33,2011,1,1
160301,justin,male,23,2013,3,3
160202,John,male,22,2012,2,2
160102,Lucy,female,27,2011,2,1
160203,Brewster,male,37,2012,1,2
160302,Susan,female,30,2013,1,3
160303,Justin,male,23,2013,3,3
160103,John,male,22,2011,2,1
160304,Lucy,female,27,2013,3,3

departments.txt

Computer,1
Math,2
Art,3

projects.txt

学号/创新项目编号/学年/
160201,XC2014001,2014,16,64,2
160101,XC201213,2012,32,48,3
160301,RW201103,2011,32,48,3
160202,XC2014002,2014,16,64,1
160102,XC2013002,2013,16,64,2
160102,XC2012011,2012,16,48,2
160302,RW201401,2014,32,32,2
160304,RW201503,2015,16,32,1

scores.txt

160201,90
160101,83
160301,80
160202,70
160102,67
160102,89
160302,91
160303,58
160103,64

scholarship.txt

160201,2013,2000
160201,2014,3000
160102,2013,2000
160101,2013,2000
160301,2014,2000
160302,2014,2000
160302,2015,3000
160302,2016,4000

1.查询创建的五个表的概要信息。

2.查询各院系学生总数。

3.查询各院系学生奖学金的总和并排序。

4.查询各院系学生的平均学分绩值并排序。

5.统计各学院每年学生参与创新项目所获得的创新学分数总数。

SparkSQL DataFrame基础篇
SparkSQL DataFrame进阶篇

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/293025.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ch340串口驱动_关于串口下载问题和超时

串口下载适用于mini、精英、战舰、探索者、阿波罗429不适用于阿波罗767&#xff0c;H743&#xff0c;号令者1052保证板子在独立供电状态下&#xff0c;电源灯处于亮灯状态下&#xff0c;USB线接板子上USB_232&#xff0c;RXD 和 PA9(STM32 的 TXD)TXD 和 PA10(STM32的 RXD)通过…

tcpdump抓取HTTP包

http://blog.csdn.net/kofandlizi/article/details/8106841 cpdump -XvvennSs 0 -i eth0 tcp[20:2]0x4745 or tcp[20:2]0x4854 0x4745 为"GET"前两个字母"GE" 0x4854 为"HTTP"前两个字母"HT" 说明&#xff1a; 通常情况下:一…

cobbler工作流分析

官网 http://cobbler.github.io/ 介绍 Cobbler是一个快速网络安装linux的服务&#xff0c;而且在经过调整也可以支持网络安装windows。该工具使用python开发&#xff0c;小巧轻便&#xff0c;使用简单的命令即可完成PXE网络安装环境的配置&#xff0c;同时还可以管理DHCP、DNS、…

数学到底有多难难难难?看完这个,瞬间觉得智商都提高了!

▲ 点击查看数学家陈省身曾说过&#xff0c;我们每个人一生都花了很多时间学数学&#xff0c;但我们其实只是学会了计算&#xff0c;而不是数学。不知道你有没有听说过这样一句话&#xff1a;想要学好数学&#xff0c;就要靠刷题。尽管我们不得不承认&#xff0c;确实需要不断地…

Redis常用概念简介

Redis支持五种数据类型1、string是redis最基本的类型&#xff0c;你可以理解成与Memcached一模一样的类型&#xff0c;一个key对应一个value。string类型是二进制安全的。意思是redis的string可以包含任何数据。比如jpg图片或者序列化的对象 。string类型是Redis最基本的数据类…

.net Mvc Controller 接收 Json/post方式 数组 字典 类型 复杂对象

原文地址&#xff1a;http://www.cnblogs.com/fannyatg/archive/2012/04/16/2451611.html ------------------------------------------------------------------------------------------------------------------ Asp.net Mvc Controller Json数组接收数组字典 类型 复杂对象…

SparkSQL DataFrame进阶篇

SparkSQL DataFrame基础篇 SparkSQL DataFrame进阶篇 1.创建SparkSession【2.0】和 SQLContext实例【1.x】 1.创建SparkSession【2.0】 ///spark2.0后&#xff0c;用sparksession代替sparkcontext和sqlcontext的创建 val spark SparkSession.builder().appName("SparkS…

matlab机械臂工作空间代码_【ROS-Moveit!】机械臂控制探索(3)——基于python的API示例代码分析...

本文参考Moveit!官方文档。系统&#xff1a;ubuntu 18.04 / 16.04ROS&#xff1a;Melodic / Kinetic概述基于python的运动组API是最简单的MoveIt!用户接口。其中提供了用户常用的大量功能封装&#xff0c;例如&#xff1a;设置目标关节控制或笛卡尔空间位置创建运动规划移动机器…

ubuntu如何杀死进程

一、得到所有进程 先用命令查询出所有进程 ps -ef 二、杀死进程 我们使用ps -ef命令之后&#xff0c;就会得到一些列进程信息&#xff0c;有进程pid什么的&#xff0c;如果你要杀死莫个进程的话&#xff0c;直接使用命令kill pid

Oracle to_char() to_date() to_number()函数

TO_CHAR 是把日期或数字转换为字符串TO_DATE 是把字符串转换为数据库中得日期类型转换函数TO_NUMBER 将字符转化为数字 TO_CHAR 使用TO_CHAR函数处理数字 TO_CHAR(number, 格式) TO_CHAR(salary,’$99,999.99’); 使用TO_CHAR函数处理日期 TO_CHAR(date,’格式’);  TO_NUM…

妄想性仮想人格障害 新手教程 +改动器

记得上次 HM 教程以后 我如今 继续写 Teatime 新作新手教程 首先我说下 此游戏 模式 AI2机械迷城 一共同拥有5关,结局从文本分析应该至少有两个 本文不是教你怎么玩通结局 安装没什么说的,APP就能够还不明确的上网找找教程 进入游戏后 首先要注意 上面的时间,一个是MM 生命 另外…

如何在宝塔面板启用 ASP.NET CORE 网站并自动申请 HTTPS 证书

要想在 Linux 上部署 ASP.NET CORE 网站&#xff0c;除了使用“宇内流云”大神的 Jexus 之外就是使用 Nginx 对 ASP.NET CORE 网站进行反向代理。常规的做法是在服务器上部署完成 ASP.NET 运行环境、网站程序之后&#xff0c;使用 Supervisor 对网站程序进程进行守护。网站程序…

竟然有如何奇葩的如厕方式......

1 密集恐惧症一下子都好啦&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼2 请选择适合您的如厕方式&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼3 为什么当年的粉丝没有现在像这样互掐&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼4 怕什么…

crm开源系统 tp框架_八个开源的 Spring Boot 前后端分离项目,一定要收藏!

点击蓝色字关注我们前后端分离已经在慢慢走进各公司的技术栈&#xff0c;不少公司都已经切换到这个技术栈上面了。即使贵司目前没有切换到这个技术栈上面&#xff0c;也非常建议大家学习一下前后端分离开发&#xff0c;以免在公司干了两三年&#xff0c;SSH 框架用的滚瓜烂熟&a…

Spark运行原理剖析

http://ihoge.cn/2018/Spark%20Scheduler.html Apache Spark是一个开源的&#xff0c;强大的分布式查询和处理引擎。它提供MapReduce的灵活性和可扩展性&#xff0c;但速度明显更高。 Spark的核心是根据RDD来实现的&#xff0c;Spark Scheduler则为Spark核心实现的重要一环…

硬盘检测工具Smartmontools安装、部署、使用

在服务器管理的实际环境中&#xff0c;硬盘是最容易出现问题及发生故障的硬件&#xff0c;而且硬盘中存储着大量重要的数据&#xff0c;万一出现故障所造成的损失也是无法估计的&#xff0c;轻则需要化费大量的时间与精力去做数据恢复&#xff0c;重则硬盘报废&#xff0c;里面…

【转】Python可变长度的函数参数

http://www.pythoner.com/4.html转载于:https://www.cnblogs.com/liangnote/p/3964062.html

那些奇奇怪怪的男性用品......

1 原来浣熊竟是我自己&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼2 能不能对全靠运气&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼3 万万没想到是这个结局&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼4 真实版出猪车&#xff08;素材来…

Android之getCacheDir()和getFilesDir()方法区别

getCacheDir()和getFilesDir()方法区别. Activity提供了getCacheDir()和getFilesDir()方法: getCacheDir()方法用于获取/data/data//cache目录getFilesDir()方法用于获取/data/data//files目录//将文件写入SD卡内:获取SDCard的状态&#xff1a;Environment.getExtemalStorageSt…

.Net Minimal API 介绍

Minimal APIs 是.Net 6 中新增的模板&#xff0c;借助 C# 10 的一些特性以最少的代码运行一个 Web 服务。本文脱离 VS 通过 VS Code&#xff0c;完成一个简单的 Minimal Api 项目的开发。创建项目新建一个文件夹&#xff0c;用来管理我们的项目文件&#xff0c;文件夹内启动命令…