Spark SQL DSL

1、 Spark sql   -- 代替hive的(并非完全代替)  

(1) Spark sql 和 hive 区别 :

     两者都是写sql的,区别是计算引擎不一样  

 hive        -- 计算引擎是MapReduce ,是通过MR做计算的

 Spark sql   -- 计算引擎是Saprk Core,是通过Spark Core做计算的

     Spark sql 功能比 hive 强大 :   并非只能写sql

 hive只能在shell行写sql

 spark可以在代码中写sql  

(2) Spark sql结构 :

1、 Data Source API(读数据) :   可以读取 csv(文本文件)、 json、 jdbc 等各种各样的数据做处理

2、 Data Frame API(提供了两个API):

        Dataframe DSL      -- 写代码      (DSL :  类SQL语法,与SQL差不多,但它是代码)

    Spark SQL and HQL  -- 写SQL

(3) DataFrame :   数据框(二维的表结构,类似hive的一张表)

    写SQL的前提 :  有表

DataFrame 是基于 RDD 做了封装, 在上面提供了 列名和列类型 的概念,即表的结构的概念。

          可以基于 DataFrame 去写 SQL 。

2、 写Spark SQL :   

在spark sql中, shuffle之后分区数不是由前面的RDD决定的,而是有默认值, 默认200个。 可以指定参数修改。

    (1) 导入Spark SQL依赖         -- 在Spark项目的pom文件中加入

<!--  Spark sql核心依赖  -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.4.5</version>

        </dependency>

(2) 在Spark项目下创建sql包    -- 新的模块一定要新建新的包  

*项目名称一定要小写,多个单词之间用-分割 : s1-v1_1.2   

*包名也要小写,一般是公司域名倒写 : com.shujia.spark

(3) 创建Spark sql环境 :

    val spark: SparkSession = SparkSession

.builder()    // 构建

.appName("wordCount")

.master("local")

// 设置 sparkSQL 在 shuffle 之后 DF 的分区数,默认是200

            .config("spark.sql.shuffle.partitions", 1)

.getOrCreate()   // 当前环境有SparkSession就获取, 反之则创建

(4) 返回值不再是 RDD,  而是 DataFrame (DF)

    查看数据不再是 foreach(),  而是 show()

(5) 针对于sql语句有多行的情况, 可以使用 """ """ 格式书写

val wordCountDF = spark.sql(

"""

|select word,count(1) as c from (

|select explode(split(line,',')) word from lines

|) as d

|group by word

|""".stripMargin)       // stripMargin :  删除"|"  并合并以上sql语句

(6) 创建 DataFrame 的方式:

 1、  读取 csv 格式的数据创建 DF

    val studentDF: DataFrame = spark

    .read

  .format("csv")

  .option("sep", ",")     //列的分割方式

  .schema("id STRING, name STRING, age INT, gender STRING, clazz STRING")  // 指定字段名和字段类型, 必须按照数据顺序指定

  .load("data/students.txt")     //指定读取的路径

 2、  读取 json 格式的数据构建 DF

      (spark 会自动解析json格式)

val studentJsonDF: DataFrame = spark

  .read

  .format("json")

  .load("data/a.json")

         3、  读取 jdbc 数据构建 DF

              (通过网络远程读取 mysql 中的数据,  需要添加mysql依赖)

  

    val jdbcDF: DataFrame = spark.read

  .format("jdbc")

  .option("url", "jdbc:mysql://master:3306")

  .option("dbtable", "bigdata.students")

  .option("user", "root")

  .option("password", "123456")

  .load()

4、  读取 parquet 格式的数据构建 DF

             (parquet格式的数据中自带 列名 和 列类型,

             parquet会对数据进行压缩, 体积变小, 解压和压缩需要时间)

// 保存一个parquet格式的文件

studentDF

  .write

  .format("parquet")

  .mode(SaveMode.Overwrite)

  .save("data/parquet")

// 读取parquet格式的数据

val parquetDF: DataFrame = spark

  .read

  .format("parquet")

  .load("data/parquet")

3、 DSL语法   -- 类sql语法

    // spark sql 中必须要导入隐式转换, 才可以使用 $方法 获取列对象

import spark.implicits._

//导入 DSL 所有的函数

    import org.apache.spark.sql.functions._       

(1) show   :   查看前面20条数据,  相当于action算子

                   action算子  -- 每一个Action算子都会触发一个job

(2) select :   选择字段,  和 sql 中 select 是一样

(3) $ : 是一个方法,作用是通过列名获取列的对象

studentDF.select($"id", $"age" + 2 as "age").show()

(4) where :  过滤数据

    = : 赋值    == : 判断    === : 等于

(5) group by :   分组

(6) agg :   分组之后进行聚合计算

            只能在分组后使用, 即一般跟在group函数后面

studentDF

  .groupBy($"clazz")

  // 分组之后做聚合计算   -- 可以写多个

  .agg(count($"clazz") as "c", avg($"age") as "avgAge")

  .show()

(7) join :   表关联

(8) 开窗函数     --  统计每个班级总分前2的学生   

    withColumn  :   给 DF 增加新的列

joinDF

  // 按照 id 和 班级 分组

  .groupBy($"id", $"clazz")

  // 对分数求和

  .agg(sum($"sco") as "sumSco")

  // 使用开窗函数                  -- row_number() over (partition by clazz order by sumSco desc)

//    .select($"id", $"clazz", $"sumSco", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc) as "r")

  // 在前面 DF 的基础上增加列 ( 上面的简写, 省去写 $"id", $"clazz", $"sumSco" 步骤, 直接将 "r" 加在 "sumSco" 后面  )

  .withColumn("r", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))

  // 取 班级前2

  .where($"r" <= 2).show()

(9) orderBy :  排序

DSL 语法 与 SQL 的异同 :

1、 DSL 和 SQL 功能相同, 但写法不同, 代码更简洁

    2、 DSL 不需要做 子查询

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58294.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go语言捕获和处理异常

在Go语言中&#xff0c;异常处理机制与其他语言&#xff08;如Java或Python&#xff09;有所不同。Go并没有传统意义上的异常机制&#xff0c;而是通过返回值来处理错误。不过&#xff0c;Go语言也提供了 panic 和 recover 机制来处理运行时错误。本文将介绍这两种机制的使用。…

vivo 轩辕文件系统:AI 计算平台存储性能优化实践

在早期阶段&#xff0c;vivo AI 计算平台使用 GlusterFS 作为底层存储基座。随着数据规模的扩大和多种业务场景的接入&#xff0c;开始出现性能、维护等问题。为此&#xff0c;vivo 转而采用了自研的轩辕文件系统&#xff0c;该系统是基于 JuiceFS 开源版本开发的一款分布式文件…

Java | Leetcode Java题解之第520题检测大写字母

题目&#xff1a; 题解&#xff1a; class Solution {public boolean detectCapitalUse(String word) {// 若第 1 个字母为小写&#xff0c;则需额外判断第 2 个字母是否为小写if (word.length() > 2 && Character.isLowerCase(word.charAt(0)) && Charact…

如何封装一个可取消的 HTTP 请求?

前言 你可能会好奇什么样的场景会需要取消 HTTP 请求呢&#xff1f; 确实在实际的项目开发中&#xff0c;可能会很少有这样的需求&#xff0c;但是不代表没有&#xff0c;比如&#xff1a; 假如要实现上述这个公告栏&#xff0c;每点击一个 tab 按钮就会切换展示容器容器中…

图的最短路径算法-迪杰斯特拉(Dijkstra)算法与弗洛伊德(Frolyd)算法(更新中)

一、最短路径算法&#xff08;Shortest Path&#xff09; 最短路径问题是图论研究中的一个经典算法问题&#xff0c;旨在寻找图&#xff08;由结点和路径组成的&#xff09;中两结点之间的最短路径。 最短路径不一定是经过边最少的路径&#xff0c;但在这些最短路径中&#x…

JSON文件转YOLO文件示例

文章目录 前言一、步骤指南二、代码实现1.类别名称到ID的映射2.边界框转换函数3.JSON解码函数4.主程序 前言 将JSON标注文件转换为YOLO格式通常涉及从JSON文件中提取图像尺寸、对象类别和边界框坐标&#xff0c;并将这些信息格式化为YOLO格式所需的格式。YOLO格式通常要求每行…

ubuntu编译ffmpeg

配置 运行环境&#xff1a;vmware ubuntu 20.04 时间&#xff1a;2024年10月24日 权限问题&#xff1a;由于ubuntu权限问题 建议使用root权限编译&#xff0c;且~是根据用户组来进行定位的。 环境配置更新 cd ~ && \ mkdir ffmpeg_sources ffmpeg_build bin &…

EasyExcel自定义下拉注解的三种实现方式

文章目录 一、简介二、关键组件1、ExcelSelected注解2、ExcelDynamicSelect接口&#xff08;仅用于方式二&#xff09;3、ExcelSelectedResolve类4、SelectedSheetWriteHandler类 三、实际应用总结 一、简介 在使用EasyExcel设置下拉数据时&#xff0c;每次都要创建一个SheetWr…

【vs2022】windows可用的依赖预编译库

ffmpeg 、x264 、x265 等。obs是基于qt6+vs2022+64bit obs的官网传统构建已经不用了obs的s2022构建OBS Deps Build 2024-09-12FFmpeg4.4 库,x64 可用。

每天五分钟深度学习pytoroch:基于pytorch搭建逻辑回归算法模型

本文重点 前面我们学习了线性回归模型的搭建,无论是基于pytorch还是不基于pytorch,以上的模型都是回归模型,本文我们将使用pytorch搭建逻辑回归模型,逻辑回归模型是一个经典的分类问题。 模型搭建 class LogisticRegression(nn.Module) : def __init__(self) :super (Lo…

嵌入式软件 Bug 排查与调试技巧

目录 1、准备工作 2、打印调试 实现步骤 注意事项 3、断点调试 4、观察点调试 5、远程调试 6、内存分析 内存泄漏检测 栈溢出检测 7、异常处理 8、性能分析 9、逻辑分析仪 10、示波器 11、常见bug类型 12、调试策略 1、准备工作 硬件工具准备 调试器:例如 J - …

玩转Docker | 使用Docker部署推箱子网页小游戏

玩转Docker | 使用Docker部署推箱子网页小游戏 一、项目介绍项目简介项目预览 二、系统要求环境要求环境检查Docker版本检查检查操作系统版本 三、部署推箱子网页小游戏下载镜像创建容器检查容器状态检查服务端口安全设置 四、访问推箱子网页小游戏五、总结 一、项目介绍 项目…

什么是服务器?服务器与客户端的关系?本地方访问不了网址与服务器访问不了是什么意思?有何区别

服务器是一种高性能的计算机&#xff0c;它通过网络为其他计算机&#xff08;称为客户端&#xff09;提供服务。这些服务可以包括文件存储、打印服务、数据库服务或运行应用程序等。服务器通常具有强大的处理器、大量的内存和大容量的存储空间&#xff0c;以便能够处理多个客户…

Iperius Backup(数据备份软件) v8.3.0 中文免费版

下载&#xff1a; 【1】https://pan.quark.cn/s/19ef716c02d5 【2】https://drive.uc.cn/s/197acba8d8d94?public1 Iperius Backup是一款专业的备份还原软件&#xff0c;功能强大&#xff0c;支持DAT备份、LTO备份、NAS备份、磁带备份、RDX驱动器、USB备份&#xff0c;满足用…

SOES(EtherCAT)从站API梳理

1. void ESC_config (esc_cfg_t * cfg); 功能&#xff1a;配置EtherCAT从站。参数&#xff1a;esc_cfg_t *cfg 指向配置结构体的指针&#xff0c;该结构体包含从站的配置参数。解释&#xff1a;该函数用于初始化或更新从站的配置&#xff0c;如通信参数、同步管理器设置等。 …

Java Lock Condition 总结

前言 相关系列 《Java & Lock & 目录》&#xff08;持续更新&#xff09;《Java & Lock & Condition & 源码》&#xff08;学习过程/多有漏误/仅作参考/不再更新&#xff09;《Java & Lock & Condition & 总结》&#xff08;学习总结/最新最准…

K8S测试pod内存和CPU资源不足

只设置requests参数 mysql主从pod启动后监控 读压测之后 同时设置limits和requests&#xff0c;只调低内存值 监控 压力测试 同时设置limits和requests&#xff0c;只调低CPU值 初始状态 开始压测 结论 对于CPU&#xff0c;如果pod中服务使用CPU超过设置的limits&…

谷歌云GCP基础概念讲解

概览 云的基础是虚拟化&#xff1a;服务器&#xff0c;存储&#xff0c;网络。服务器是远程计算机的逻辑分区。存储是物理硬盘的逻辑划分。网络则是虚拟私有云。 谷歌是唯一一个拥有全球私有基础设施的公司&#xff1b;他们的谷歌云基础设施没有任何一部分通过公共互联网。换句…

绿盟科技发布三季度报告,收入略增,亏损收窄,经营性净现金流同比翻倍

10月30日&#xff0c;绿盟科技发布2024年三季度报告。2024年公司前三季度实现营业收入12.74亿元&#xff0c;同比增长5.57%&#xff1b;毛利率59.50%&#xff0c;同比增长4.76个百分点&#xff1b;期间费用总额同比下降7.68%&#xff1b;公司实现归属于上市公司股东的净利润-3.…

【云原生】云原生后端详解:架构与实践

目录 引言一、云原生后端的核心概念1.1 微服务架构1.2 容器化1.3 可编排性1.4 弹性和可伸缩性 二、云原生后端的架构示意图三、云原生后端的最佳实践3.1 使用服务网格3.2 监控与日志管理3.3 CI/CD 流水线3.4 安全性 总结参考资料 引言 随着云计算的迅猛发展&#xff0c;云原生…