spark-sql建表语句限制_第三篇|Spark SQL编程指南

在《第二篇|Spark Core编程指南》一文中,对Spark的核心模块进行了讲解。本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上构建的,于2014年5月发布。从名称上可以看出,该模块是Spark提供的关系型操作API,实现了SQL-on-Spark的功能。对于一些熟悉SQL的用户,可以直接使用SQL在Spark上进行复杂的数据处理。通过本文,你可以了解到:

  • Spark SQL简介
  • DataFrame API&DataSet API
  • Catalyst Optimizer优化器
  • Spark SQL基本操作
  • Spark SQL的数据源
  • RDD与DataFrame相互转换
  • Thrift  server与Spark SQL CLI

Spark SQL简介

Spark SQL是Spark的其中一个模块,用于结构化数据处理。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息,Spark SQL会使用这些额外的信息来执行额外的优化。使用SparkSQL的方式有很多种,包括SQL、DataFrame API以及Dataset API。值得注意的是,无论使用何种方式何种语言,其执行引擎都是相同的。实现这种统一,意味着开发人员可以轻松地在不同的API之间来回切换,从而使数据处理更加地灵活。

DataFrame API&DataSet API

DataFrame API

DataFrame代表一个不可变的分布式数据集合,其核心目的是让开发者面对数据处理时,只关心要做什么,而不用关心怎么去做,将一些优化的工作交由Spark框架本身去处理。DataFrame是具有Schema信息的,也就是说可以被看做具有字段名称和类型的数据,类似于关系型数据库中的表,但是底层做了很多的优化。创建了DataFrame之后,就可以使用SQL进行数据处理。

用户可以从多种数据源中构造DataFrame,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API支持Scala,Java,Python和R,在Scala和Java中,row类型的DataSet代表DataFrame,即Dataset[Row]等同于DataFrame。

DataSet API

DataSet是Spark 1.6中添加的新接口,是DataFrame的扩展,它具有RDD的优点(强类型输入,支持强大的lambda函数)以及Spark SQL的优化执行引擎的优点。可以通过JVM对象构建DataSet,然后使用函数转换(mapflatMapfilter)。值得注意的是,Dataset API在Scala和 Java中可用,Python不支持Dataset API。

另外,DataSet API可以减少内存的使用,由于Spark框架知道DataSet的数据结构,因此在持久化DataSet时可以节省很多的内存空间。

Catalyst Optimizer优化器

在Catalyst中,存在两种类型的计划:

  • 逻辑计划(Logical Plan):定义数据集上的计算,尚未定义如何去执行计算。每个逻辑计划定义了一系列的用户代码所需要的属性(查询字段)和约束(where条件),但是不定义该如何执行。具体如下图所示:

e1b94350043e12401f3398d749cc6264.png

  • 物理计划(Physical Plan):物理计划是从逻辑计划生成的,定义了如何执行计算,是可执行的。举个栗子:逻辑计划中的JOIN会被转换为物理计划中的sort merge JOIN。需要注意,Spark会生成多个物理计划,然后选择成本最低的物理计划。具体如下图所示:

3c66fc44e9bda7c0da5642a1a8ef7553.png

在Spark SQL中,所有的算子操作会被转换成AST(abstract syntax tree,抽象语法树),然后将其传递给Catalyst优化器。该优化器是在Scala的函数式编程基础会上构建的,Catalyst支持基于规则的(rule-based)和基于成本的(cost-based)优化策略。

Spark SQL的查询计划包括4个阶段(见下图):

3381cbdede1a4f3331bbf99ce699c0f7.png

  • 1.分析
  • 2.逻辑优化
  • 3.物理计划
  • 4.生成代码,将查询部分编译成Java字节码

注意:在物理计划阶段,Catalyst会生成多个计划,并且会计算每个计划的成本,然后比较这些计划的成本的大小,即基于成本的策略。在其他阶段,都是基于规则的的优化策略。

分析

Unresolved Logical plan --> Logical plan。Spark SQL的查询计划首先起始于由SQL解析器返回的AST,或者是由API构建的DataFrame对象。在这两种情况下,都会存在未处理的属性引用(某个查询字段可能不存在,或者数据类型错误),比如查询语句:SELECT col FROM sales,关于字段col的类型,或者该字段是否是一个有效的字段,只有等到查看该sales表时才会清楚。当不能确定一个属性字段的类型或者没能够与输入表进行匹配时,称之为未处理的。Spark SQL使用Catalyst的规则以及Catalog对象(能够访问数据源的表信息)来处理这些属性。首先会构建一个Unresolved Logical Plan树,然后作用一系列的规则,最后生成Logical Plan。

逻辑优化

Logical plan --> Optimized Logical Plan。逻辑优化阶段使用基于规则的优化策略,比如谓词下推、投影裁剪等。经过一些列优化过后,生成优化的逻辑计划Optimized Logical Plan。

物理计划

Optimized Logical Plan -->physical Plan。在物理计划阶段,Spark SQL会将优化的逻辑计划生成多个物理执行计划,然后使用Cost Model计算每个物理计划的成本,最终选择一个物理计划。在这个阶段,如果确定一张表很小(可以持久化到内存),Spark SQL会使用broadcast join。

需要注意的是,物理计划器也会使用基于规则的优化策略,比如将投影、过滤操作管道化一个Spark的map算子。此外,还会将逻辑计划阶段的操作推到数据源端(支持谓词下推、投影下推)。

代码生成

查询优化的最终阶段是生成Java字节码,使用Quasi quotes来完成这项工作的。

经过上面的分析,对Catalyst Optimizer有了初步的了解。关于Spark的其他组件是如何与Catalyst Optimizer交互的呢?具体如下图所示:

8d575fae536219154fd1634c1f1e98ce.png

如上图所示:ML Pipelines, Structured streaming以及 GraphFrames都使用了DataFrame/Dataset APIs,并且都得益于 Catalyst optimiser。

Quick Start

创建SparkSession

SparkSession是Dataset与DataFrame API的编程入口,从Spark2.0开始支持。用于统一原来的HiveContext和SQLContext,为了兼容两者,仍然保留这两个入口。通过一个SparkSession入口,提高了Spark的易用性。下面的代码展示了如何创建一个SparkSession:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
//导入隐式转换,比如将RDD转为DataFrame
import spark.implicits._

创建DataFrame

创建完SparkSession之后,可以使用SparkSession从已经存在的RDD、Hive表或者其他数据源中创建DataFrame。下面的示例使用的是从一个JSON文件数据源中创建DataFrame:

/**
* {"name":"Michael"}
* {"name":"Andy", "age":30}
* {"name":"Justin", "age":19}
*/
val df = spark.read.json("E://people.json")
//输出DataFrame的内容
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

DataFrame基本操作

创建完DataFrame之后,可以对其进行一些列的操作,具体如下面代码所示:

// 打印该DataFrame的信息
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 查询name字段
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// 将每个人的age + 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 查找age大于21的人员信息
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 按照age分组,统计每种age的个数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在程序中使用SQL查询

上面的操作使用的是**DSL(domain-specific language)**方式,还可以直接使用SQL对DataFrame进行操作,具体如下所示:

// 将DataFrame注册为SQL的临时视图
// 该方法创建的是一个本地的临时视图,生命周期与其绑定的SparkSession会话相关
// 即如果创建该view的session结束了,该view也就消失了
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View

上面使用的是Temporary views的方式,该方式是Spark Session范围的。如果将创建的view可以在所有session之间共享,可以使用Global Temporary View的方式创建view,具体如下:

// 将DataFrame注册为全局临时视图(global temporary view)
// 该方法创建的是一个全局的临时视图,生命周期与其绑定的Spark应用程序相关,
// 即如果应用程序结束,会自动被删除
// 全局临时视图是可以跨Spark Session的,系统保留的数据库名为`global_temp`
// 当查询时,必须要加上全限定名,如`SELECT * FROM global_temp.view1`
df.createGlobalTempView("people")

// 全局临时视图默认的保留数据库为:`global_temp` 
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// 全局临时视图支持跨Spark Session会话
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

创建DataSet

DataSet与RDD很类似,但是,RDD使用的Java的序列化器或者Kyro序列化,而DataSet使用的是Encoder对在网络间传输的对象进行序列化的。创建DataSet的示例如下:

case class Person(name: String, age: Long)
// 创建DataSet
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// 通过导入Spark的隐式转换spark.implicits._
// 可以自动识别数据类型
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 通过调用as方法,DataFrame可以转为DataSet,
val path = "E://people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

RDD与DataFrame相互转换

Spark SQL支持两种不同的方式将RDD转换为DataFrame。第一种是使用反射来推断包含特定类型对象的RDD的模式,这种基于反射的方式可以提供更简洁的代码,如果在编写Spark应用程序时,已经明确了schema,可以使用这种方式。第二种方式是通过可编程接口来构建schema,然后将其应用于现有的RDD。此方式编写的代码更冗长,此种方式创建的DataFrame,直到运行时才知道该DataFrame的列及其类型。

下面案例的数据集如下people.txt:

Tom, 29
Bob, 30
Jack, 19

通过反射的方式

Spark SQL的Scala接口支持自动将包含样例类的RDD转换为DataFrame。样例类定义表的schema。通过反射读取样例类的参数名称,并映射成column的名称。

object RDD2DF_m1 {
  //创建样例类
  case class  Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    //导入隐式转换,用于RDD转为DataFrame
    import spark.implicits._
    //从文本文件中创建RDD,并将其转换为DataFrame
    val peopleDF = spark.sparkContext
      .textFile("file:///E:/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    //将DataFrame注册成临时视图
    peopleDF.createOrReplaceTempView("people")
    // 运行SQL语句
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    // 使用字段索引访问列
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    // +----------+
    // |     value|
    // +----------+
    // |Name: Jack|
    // +----------+

    // 通过字段名访问列
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Jack|
    // +------------+
  }
}

通过构建schema的方式

通过构建schema的方式创建DataFrame主要包括三步:

  • 1.从原始RDD创建Row类型的RDD
  • 2.使用StructType,创建schema
  • 3.通过createDataFrame方法将schema应用于Row类型的RDD
object RDD2DF_m2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    //导入隐式转换,用于RDD转为DataFrame
    import spark.implicits._
    //创建原始RDD
    val peopleRDD = spark.sparkContext.textFile("E:/people.txt")
    //step 1 将原始RDD转换为ROW类型的RDD
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim.toInt))
    //step 2 创建schema
    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))
    //step 3 创建DF
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    // 将DataFrame注册成临时视图
    peopleDF.createOrReplaceTempView("people")
    // 运行SQL语句
    val results = spark.sql("SELECT name FROM people")
    // 使用字段索引访问列
    results.map(attributes => "Name: " + attributes(0)).show()
    // +----------+
    // |     value|
    // +----------+
    // | Name: Tom|
    // | Name: Bob|
    // | Name: Jack|
    // +----------+
  }
}

Spark SQL的数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作,可以使用关系转换以及临时视图对DataFrame进行操作。常见的数据源包括以下几种:

文件数据源

  • Parquet文件
  • JSON文件
  • CSV文件
  • ORC文件
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
    /**
      * 读取parquet文件数据源,并将结果写入到parquet文件
      */

    val usersDF = spark
      .read
      .load("E://users.parquet")
    usersDF.show()
    // 将DF保存到parquet文件
    usersDF
      .select("name", "favorite_color")
      .write
      .mode(SaveMode.Overwrite)
      .save("E://namesAndFavColors.parquet")
    /**
      * 读取json文件数据源,并将结果写入到parquet文件
      */

    val peopleDF = spark
      .read
      .format("json")
      .load("E://people.json")
    peopleDF.show()
    // 将DF保存到parquet文件
    peopleDF
      .select("name", "age")
      .write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .save("E://namesAndAges.parquet")

    /**
      * 读取CSV文件数据源
      */
    val peopleDFCsv = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("E://people.csv")

    /**
      * 将usersDF写入到ORC文件
      */
    usersDF.write.format("orc")
      .option("orc.bloom.filter.columns", "favorite_color")
      .option("orc.dictionary.key.threshold", "1.0")
      .option("orc.column.encoding.direct", "name")
      .mode(SaveMode.Overwrite)
      .save("E://users_with_options.orc")

    /**
      * 将peopleDF保存为持久化表,一般保存为Hive中
      */
    peopleDF
      .write
      .option("path","E://warehouse/people_bucketed") // 保存路径
      .bucketBy(42, "name")           // 按照name字段分桶
      .sortBy("age")                  // 按照age字段排序
      .saveAsTable("people_bucketed")

    /**
      * 将userDF保存为分区文件,类似于Hive分区表
      */
    usersDF
      .write
      .partitionBy("favorite_color")  // 分区字段
      .format("parquet")        // 文件格式
      .mode(SaveMode.Overwrite) // 保存模式
      .save("E://namesPartByColor.parquet")

    /**
      *
      */
    usersDF
      .write
      .option("path","E://warehouse/users_partitioned_bucketed") // 保存路径
      .partitionBy("favorite_color")  // 分区
      .bucketBy(42, "name")           // 分桶
      .saveAsTable("users_partitioned_bucketed")

    spark.sql("DROP TABLE IF EXISTS people_bucketed")
    spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed")
  }

保存模式

Scala/JavaMeaning
SaveMode.ErrorIfExists(default)如果目标文件已经存在,则报异常
SaveMode.Append如果目标文件或表已经存在,则将结果追加进去
SaveMode.Overwrite如果目标文件或表已经存在,则覆盖原有的内容
SaveMode.Ignore类似于SQL中的CREATE TABLE IF NOT EXISTS,如果目标文件或表已经存在,则不做任何操作

保存为持久化表

DataFrame可以被保存为Hive的持久化表,值得注意的是,这种方式并不依赖与Hive的部署,也就是说Spark会使用Derby创建一个默认的本地Hive metastore,与createOrReplaceTempView不同,该方式会直接将结果物化。

对于基于文件的数据源( text, parquet, json等),在保存的时候可以指定一个具体的路径,比如 df.write.option("path", "/some/path").saveAsTable("t")(存储在指定路径下的文件格式为parquet)当表被删除时,自定义的表的路径和表数据不会被移除。如果没有指定具体的路径,spark默认的是warehouse的目录(/user/hive/warehouse),当表被删除时,默认的表路径也会被删除。

Hive数据源

见下面小节:Spark SQL集成Hive

JDBC数据源

Spark SQL还包括一个可以使用JDBC从其他数据库读取数据的数据源。与使用JdbcRDD相比,应优先使用此功能。这是因为结果作为DataFrame返回,它们可以在Spark SQL中轻松处理或与其他数据源连接。JDBC数据源也更易于使用Java或Python,因为它不需要用户提供ClassTag。

可以使用Data Sources API将远程数据库中的表加载为DataFrame或Spark SQL临时视图。用户可以在数据源选项中指定JDBC连接属性。user并且password通常作为用于登录数据源的连接属性提供。除连接属性外,Spark还支持以下不区分大小写的选项:

属性名称解释
url要连接的JDBC URL
dbtable读取或写入的JDBC表
query指定查询语句
driver用于连接到该URL的JDBC驱动类名
partitionColumn, lowerBound, upperBound如果指定了这些选项,则必须全部指定。另外, numPartitions必须指定
numPartitions表读写中可用于并行处理的最大分区数。这也确定了并发JDBC连接的最大数量。如果要写入的分区数超过此限制,我们可以通过coalesce(numPartitions)在写入之前进行调用将其降低到此限制
queryTimeout默认为0,查询超时时间
fetchsizeJDBC的获取大小,它确定每次要获取多少行。这可以帮助提高JDBC驱动程序的性能
batchsize默认为1000,JDBC批处理大小,这可以帮助提高JDBC驱动程序的性能。
isolationLevel事务隔离级别,适用于当前连接。它可以是一个NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。此选项仅适用于写作。
sessionInitStatement在向远程数据库打开每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句,使用它来实现会话初始化代码。
truncate这是与JDBC writer相关的选项。当SaveMode.Overwrite启用时,就会清空目标表的内容,而不是删除和重建其现有的表。默认为false
pushDownPredicate用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark将尽可能将过滤器下推到JDBC数据源。
object JdbcDatasetExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("JdbcDatasetExample")
      .master("local") //设置为本地运行
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runJdbcDatasetExample(spark)
  }

  private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    //注意:从JDBC源加载数据
    val jdbcPersonDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "person")
      .option("user", "root")
      .option("password", "123qwe")
      .load()
    //打印jdbcDF的schema
    jdbcPersonDF.printSchema()
    //打印数据
    jdbcPersonDF.show()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "123qwe")
    //通过.jdbc的方式加载数据
    val jdbcStudentDF = spark
      .read
      .jdbc("jdbc:mysql://localhost/mydb", "student", connectionProperties)
    //打印jdbcDF的schema
    jdbcStudentDF.printSchema()
    //打印数据
    jdbcStudentDF.show()
    // 保存数据到JDBC源
    jdbcStudentDF.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "student2")
      .option("user", "root")
      .option("password", "123qwe")
      .mode(SaveMode.Append)
      .save()

    jdbcStudentDF
      .write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://localhost/mydb", "student2", connectionProperties)

  }
}

Spark SQL集成Hive

Spark SQL还支持读取和写入存储在Apache Hive中的数据。但是,由于Hive具有大量依赖项,因此这些依赖项不包含在默认的Spark发布包中。如果可以在类路径上找到Hive依赖项,Spark将自动加载它们。请注意,这些Hive依赖项也必须存在于所有工作节点(worker nodes)上,因为它们需要访问Hive序列化和反序列化库(SerDes)才能访问存储在Hive中的数据。

将hive-site.xml,core-site.xml以及hdfs-site.xml文件放在conf/下

在使用Hive时,必须实例化一个支持Hive的SparkSession,包括连接到持久性Hive Metastore,支持Hive 的序列化、反序列化(serdes)和Hive用户定义函数。没有部署Hive的用户仍可以启用Hive支持。如果未配置hive-site.xml,则上下文(context)会在当前目录中自动创建metastore_db,并且会创建一个由spark.sql.warehouse.dir配置的目录,其默认目录为spark-warehouse,位于启动Spark应用程序的当前目录中。请注意,自Spark 2.0.0以来,该在hive-site.xml中的hive.metastore.warehouse.dir属性已被标记过时(deprecated)。使用spark.sql.warehouse.dir用于指定warehouse中的默认位置。可能需要向启动Spark应用程序的用户授予写入的权限。

下面的案例为在本地运行(为了方便查看打印的结果),运行结束之后会发现在项目的目录下E:\IdeaProjects\myspark创建了spark-warehouse和metastore_db的文件夹。可以看出没有部署Hive的用户仍可以启用Hive支持,同时也可以将代码打包,放在集群上运行。

object SparkHiveExample {


  case class Record(key: Int, value: String)

  def main(args: Array[String]) {


    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", "e://warehouseLocation")
      .master("local")//设置为本地运行
      .enableHiveSupport()
      .getOrCreate()


    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    import spark.implicits._
    import spark.sql
    //使用Spark SQL 的语法创建Hive中的表
    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    sql("LOAD DATA LOCAL INPATH 'file:///e:/kv1.txt' INTO TABLE src")

    // 使用HiveQL查询
    sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // 支持使用聚合函数
    sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // SQL查询的结果是一个DataFrame,支持使用所有的常规的函数
    val sqlDF = sql("SELECT key, value FROM src WHERE key  0 ORDER BY key")

    // DataFrames是Row类型的, 允许你按顺序访问列.
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    //可以通过SparkSession使用DataFrame创建一个临时视图
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")

    //可以用DataFrame与Hive中的表进行join查询
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // |  5| val_5|  5| val_5|
    // ...

    //创建一个Parquet格式的hive托管表,使用的是HQL语法,没有使用Spark SQL的语法("USING hive")
    sql("CREATE TABLE IF NOT EXISTS hive_records(key int, value string) STORED AS PARQUET")

    //读取Hive中的表,转换成了DataFrame
    val df = spark.table("src")
    //将该DataFrame保存为Hive中的表,使用的模式(mode)为复写模式(Overwrite)
    //即如果保存的表已经存在,则会覆盖掉原来表中的内容
    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
    // 查询表中的数据
    sql("SELECT * FROM hive_records").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // 设置Parquet数据文件路径
    val dataDir = "/tmp/parquet_data"
    //spark.range(10)返回的是DataSet[Long]
    //将该DataSet直接写入parquet文件
    spark.range(10).write.parquet(dataDir)
    // 在Hive中创建一个Parquet格式的外部表
    sql(s"CREATE EXTERNAL TABLE IF NOT EXISTS hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
    // 查询上面创建的表
    sql("SELECT * FROM hive_ints").show()
    // +---+
    // |key|
    // +---+
    // |  0|
    // |  1|
    // |  2|
    // ...

    // 开启Hive动态分区
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    // 使用DataFrame API创建Hive的分区表
    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")

    //分区键‘key’将会在最终的schema中被移除
    sql("SELECT * FROM hive_part_tbl").show()
    // +-------+---+
    // |  value|key|
    // +-------+---+
    // |val_238|238|
    // | val_86| 86|
    // |val_311|311|
    // ...

    spark.stop()

  }
}

Thrift  server与Spark SQL CLI

可以使用JDBC/ODBC或者命令行访问Spark SQL,通过这种方式,用户可以直接使用SQL运行查询,而不用编写代码。

Thrift JDBC/ODBC server

Thrift JDBC/ODBC server与Hive的HiveServer2向对应,可以使用Beeline访问JDBC服务器。在Spark的sbin目录下存在start-thriftserver.sh脚本,使用此脚本启动JDBC/ODBC服务器:

./sbin/start-thriftserver.sh

使用beeline访问JDBC/ODBC服务器,Beeline会要求提供用户名和密码,在非安全模式下,只需输入用户名和空白密码即可

beeline> !connect jdbc:hive2://localhost:10000

b03893c98610f110cae372b66644143b.png

Spark SQL CLI

Spark SQL CLI是在本地模式下运行Hive Metastore服务并执行从命令行输入的查询的便捷工具。请注意,Spark SQL CLI无法与Thrift JDBC服务器通信。

要启动Spark SQL CLI,只需要在Spark的bin目录中运行以下命令:

./spark-sql 

e6bbc2bc7b2a758c4c6a5019260d1cbf.png

总结

本文主要对Spark SQL进行了阐述,主要包括Spark SQL的介绍、DataFrame&DataSet API基本使用、Catalyst Optimizer优化器的基本原理、Spark SQL编程、Spark SQL数据源以及与Hive集成、Thrift  server与Spark SQL CLI。下一篇将分享Spark Streaming编程指南。

ac749644a54eef2debd2c455e11d7767.png

e8d43a6d46090660f6921f398c81c747.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/359452.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4固定在底部_礼堂椅厂家教你如何固定座椅

礼堂椅厂家众所周知,当人们离开时,礼堂或刷房中使用的座椅会自动翻转到垂直位置,因此行和行之间有一条大通道让人走路。 在现有技术中,通过以下方法翻转礼堂椅:在两个支腿之间设置固定轴,并且在支座底部的两…

python时间去掉t_Python的set集合详解

Python 还包含了一个数据类型 —— set (集合)。 集合是一个无序不重复元素的集。基本功能包括关系测试和消除重复元素。 集合对象还支持 union(联合),intersection(交),difference&…

快乐学习 Ionic Framework+PhoneGap 手册1-3 {面板切换}

编程的快乐和乐趣&#xff0c;来自于能成功运行程序并运用到项目中,会在后面案例&#xff0c;实际运用到项目当中与数据更新一起说明 从面板切换开始&#xff0c;请看效果图和代码&#xff0c;这只是一个面板切换的效果 Index HTML Code <!DOCTYPE html> <html ng-ap…

去掉左边0_SLAM从0到1——11. 视觉里程计VO内容框架

「本文是之前学习VO 部分整理的思维导图&#xff0c;笔记写入之后均折叠了起来&#xff0c;正文中采用markdown格式展开&#xff0c;可看到笔记内容」放上来的目的其实是方便自己查阅笔记&#xff0c;同样给有需要的同学提供一点思路。整体上的结构分为5部分&#xff0c;包括常…

教程:如何实现Java OAuth 2.0以使用GitHub和Google登录

将Google和GitHub OAuth登录添加到Java应用程序的指南 我们添加到Takipi的最新功能之一是3rd party登录。 如果您像我一样懒惰&#xff0c;那么我想您也希望跳过填写表单和输入新密码的操作 。 只要有权限&#xff0c;许多人都希望使用第三方登录&#xff0c;只要他们要求的权…

柔性太阳能电池pdf_房车旅行如何做到电力无忧,那就选择一套合适的太阳能供电系统吧...

“旅行途中房车电力够不够用&#xff1f;”是众多车友在购买房车时会考虑的因素之一。而房车外部供电方式一般有三种&#xff1a;电网供电、发电机发电和太阳能发电&#xff0c;其中太阳能发电因其结构简单、体积小且轻、易安装、维护简单、寿命长不易损坏、一次性投资、循环利…

POJ 3617

题意&#xff1a;给定长度为N的字符串S&#xff0c;现要构造一个字符串T&#xff08;起初为空串&#xff09;。任意进行一下的一种操作&#xff1a; 1>从S的头部删除一个字符&#xff0c;加到T的尾部 2>从S的尾部删除一个字符&#xff0c;加到T的尾部 目的使T的字典序最小…

echarts的词云图表类型有哪些_数据可视化之常见12种图表类型分析

数据可视化有众多展现方式&#xff0c;不同的数据类型要选择适合的展现方法&#xff0c;今天友创云天就整理分析了几种常见的类型&#xff0c;给大家提供参考。1.饼图饼图是一个划分为几个扇形的圆形统计图表。每个扇形的弧长&#xff08;以及圆心角和面积&#xff09;大小&…

使用Spring Boot和注释支持配置Spring JMS应用程序

1.简介 在以前的文章中&#xff0c;我们学习了如何使用Spring JMS配置项目。 如果查看有关使用Spring JMS进行消息传递的文章介绍 &#xff0c;您会注意到它是使用XML配置的。 本文将利用Spring 4.1版本中引入的改进 &#xff0c;并仅使用Java config来配置JMS项目。 在这个示…

室内主题元素分析图_2020届室内设计专业优秀毕业设计作品展(五)

“环”食疗养生空间概念设计△建筑外立面▲LOGO前 言每当人们提及健康时&#xff0c;人们的反应往往是运动、睡眠和饮食。现代的青年上班族&#xff0c;又因为快节奏的生活&#xff0c;工作压力大&#xff0c;饮食的不规律&#xff0c;生活不良习性的增加&#xff0c;导致了各…

C++输入cin详解

C输入cin详解 输入原理&#xff1a; 程序的输入都建有一个缓冲区&#xff0c;即输入缓冲区。一次输入过程是这样的&#xff0c;当一次键盘输入结束时会将输入的数据存入输入缓冲区&#xff0c;而cin函数直接从输入缓冲区中取数据。正因为cin函数是直接从缓冲区取数据的&#xf…

时间序列的截尾和拖尾_R语言:时间序列(一)

01 解决什么问题在社会活动中经常可见按照时间顺序记录下来的随机事件观察值&#xff0c;例如每年死亡人数序列&#xff0c;每年糖尿病发病人数序列&#xff0c;医院门诊每日诊治病例数序列。这类数据的特性是相邻时间点的观察值之间具有明显的相关性&#xff0c;这一特性不同于…

JVM崩溃时:如何调查最严重错误的根本原因

当应用程序崩溃时&#xff0c;您可以学到什么&#xff1f; 我认为&#xff0c;“后见之明是20 /”是最喜欢的短语之一托马斯罗梅尔 &#xff0c;工程ZeroTurnaround的副总裁。 好吧&#xff0c;我实际上不确定在他的短语中占什么位置&#xff0c;但是我已经听过他几次说了。 鉴…

常用个人密码管理软件

http://www.williamlong.info/archives/3100.html转载于:https://www.cnblogs.com/svennee/p/4099358.html

查看网口命令_20个常用Linux命令

今天总结几个非常常用的Linux命令,其中有几个在面试中很可能问相关命令的原理,比如后台运行命令。希望对大家有所帮助,最好自己去尝试在Linux操作系统中实践一下。 1、查看目录以及权限 在windows中,使用dir查看当前目录中文件。在Linux中使用ls(list)查看当前目录文件。 w…

中统计字符串长度的函数_SQL Server中的字符串分割函数

您是否知道从SQL Server 2016开始&#xff0c;系统就内置STRING_SPLIT函数&#xff0c;该函数用于将字符串分隔的变量拆分为一个可用列表。 对于经常需要分割字符串的技术人员&#xff0c;建议您查看此功能。 STRING_SPLIT是一个表值函数&#xff0c;它返回由定界符分隔的字符串…

JBoss BPM Suite快速指南–将外部数据模型导入BPM项目

您正在从事一个大型项目&#xff0c;在企业中开发规则&#xff0c;事件和流程以满足关键业务需求。 部分要求指出&#xff0c;某个业务部门将提供您的数据模型供您利用。 不会在JBoss BPM Suite数据建模器中设计此数据模型&#xff0c;但是在从业务中心仪表板处理规则&#x…

卸载 流程_一款适合于windows端的卸载神器 彻底清理残留软件

今天给大家介绍的是一款适合于Windows端的软件卸载神器---Uninstall&#xff0c;可以彻底清理残留软件。它的卸载流程是这样的&#xff0c;首先会使用软件本身的默认卸载程序进行卸载&#xff0c;卸载完成后再次扫描软件残留的一些残余文件及注册表之类的&#xff0c;可以完美的…

十三水算法php_基于PHP+Redis令牌桶限流

一 、场景描述在开发接口服务器的过程中&#xff0c;为了防止客户端对于接口的滥用&#xff0c;保护服务器的资源&#xff0c; 通常来说我们会对于服务器上的各种接口进行调用次数的限制。比如对于某个 用户&#xff0c;他在一个时间段&#xff08;interval&#xff09;内&…

Java REST JAX-RS 2.0 –如何处理日期,时间和时间戳记数据类型

无论是X-Form-Urlencoded还是JSON HTTP发布到REST资源端点&#xff0c;对于与日期或时间相关的数据都没有特定的“数据类型”。 大多数开发人员会将这些数据发布为“字符串”&#xff0c;或者只是将它们转换为Unix时间戳值&#xff08;例如1435061152&#xff09;。 但是&#…