Spark SQL 中DataFrame DSL的使用

在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。

文章链接:

一、单词统计案例引入

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心数据类型不太一样** 1、读取数据构建一个DataFrame,相当于一张表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定读取数据的格式.schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割.option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径/*** DSL: 类SQL语法 api  介于代码和纯sql之间的一种api** spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数* 如果想要在DSL语法中使用这些函数,需要导入隐式转换**///导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._//    linesDF.select(explode(split($"line","\\|")) as "word")
//      .groupBy($"word")
//      .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存数据*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}

注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)

二、数据源获取

查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

 1、csv-->json

    val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多种类型数据源读取演示").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数import org.apache.spark.sql.functions._//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段import sparkSession.implicits._/*** 读csv格式的文件-->写到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")

2、json-->parquet

val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段/*** 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构*///1500100967,能映秋,21,女,文科五班//方式1://    val studentsJsonDF: DataFrame = sparkSession.read//      .format("json")//      .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:实际上也是调用方式1,只是更简洁了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")

3、parquet-->csv

    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._/*** parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")

三、DataFrame DSL API的使用

1、select


import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函数演示").getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函数*///方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类studentsDF.select("id", "name", "age")//方式2:弥补了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隐式转换函数中的$将字段变为一个对象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用对象对字段进行处理
//    stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show()       //不可使用未变为对象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age")                 // +是函数,可以等价于该语句//3.2可以在select中使用sql函数studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}

2、where

    /*** where函数:过滤数据*///方式1:直接将sql中的where语句以字符串形式传参studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列对象形式过滤/*** 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()

3、groupBy和agg

    /*** groupby:分组函数     agg:聚合函数* 注意:* 1、groupby与agg函数通常都是一起使用* 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段* 3、分组聚合之后select中无法出现不是分组的字段*///需求:根据班级分组,求每个班级的人数和平均年龄studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()

4、join

/*** 5、join:表关联*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//关联场景1:所关联的字段名字一样studentsDF.join(subjectDF1,"id")//关联场景2:所关联的字段名字不一样studentsDF.join(subjectDF2,$"id"===$"sid","inner")
//    studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类* * @param joinType Type of join to perform. Default `inner`. Must be one of:* *                 `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* *                 `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/

5、开窗

    /*** 开窗函数* 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的* 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。**///需求:统计每个班级总分前三的学生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()

注意:

      DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。

在构建 Spark DataFrame 转换和操作时,常用流程介绍:

  1. 选择数据源:使用 spark.read 或从其他 DataFrame 派生。
  2. 转换:使用各种转换函数(如 selectfiltermapflatMapjoin 等)来修改 DataFrame。
  3. 聚合:使用 groupBy 和聚合函数(如 sumavgcount 等)对数据进行分组和汇总。
  4. 排序:使用 orderBy 或 sort 对数据进行排序。
  5. 输出:使用 showcollectwrite 等函数将结果输出到控制台、收集到驱动程序或写入外部存储。

四、RDD与DataFrame的转换

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd与Df之间的转换").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Xinstall助力实现App间直接跳转,提升用户体验

在移动互联网时代&#xff0c;App已成为我们日常生活中不可或缺的一部分。然而&#xff0c;在使用各类App时&#xff0c;我们经常会遇到需要在不同App之间切换的情况&#xff0c;这时如果能够直接跳转&#xff0c;将会大大提升用户体验。而Xinstall正是这样一款能够帮助开发者实…

OpenCV 获取 RTSP 摄像头视频流保存至本地

介绍 Java OpenCV 是一个强大的开源计算机视觉库&#xff0c;它提供了丰富的图像处理和分析功能&#xff0c;越来越多的应用需要使用摄像头来获取实时视频流进行处理和分析。 在 Java 中使用 OpenCV 打开摄像头的基本步骤如下&#xff1a; 确保已经安装了OpenCV库使用 OpenC…

Raylib 绘制自定义字体的一种套路

Raylib 绘制自定义字体是真的难搞。我的需求是程序可以加载多种自定义字体&#xff0c;英文中文的都有。 我调试了很久成功了&#xff01; 很有用的参考&#xff0c;建议先看一遍&#xff1a; 瞿华&#xff1a;raylib绘制中文内容 个人笔记&#xff5c;Raylib 的字体使用 - …

W801 实现获取天气情况

看了小安派&#xff08;AiPi-Eyes 天气站&#xff09;的源码&#xff0c;感觉用W801也可以实现。 一、部分源码 main.c #include "wm_include.h" #include "Lcd_Driver.h"void UserMain(void) {printf("\n user task \n");Lcd_Init();Lcd_Clea…

MySQL主从复制(五):读写分离

一主多从架构主要应用场景&#xff1a;读写分离。读写分离的主要目标是分摊主库的压力。 读写分离架构 读写分离架构一 架构一结构图&#xff1a; 这种结构模式下&#xff0c;一般会把数据库的连接信息放在客户端的连接层&#xff0c;由客户端主动做负载均衡。也就是说由客户…

RabbitMQ 消息队列安装及入门

市面常见消息队列中间件对比 技术名称吞吐量 /IO/并发时效性&#xff08;类似延迟&#xff09;消息到达时间可用性可靠性优势应用场景activemq万级高高高简单易学中小型企业、项目rabbitmq万级极高&#xff08;微秒&#xff09;高极高生态好&#xff08;基本什么语言都支持&am…

leetcode124 二叉树中的最大路径和-dp

题目 二叉树中的 路径 被定义为一条节点序列&#xff0c;序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root &…

【Crypto】Rabbit

文章目录 一、Rabbit解题感悟 一、Rabbit 题目提示很明显是Rabbit加密&#xff0c;直接解 小小flag&#xff0c;拿下&#xff01; 解题感悟 提示的太明显了

redis核心面试题二(实战优化)

文章目录 10. redis配置mysql实战优化[重要]11. redis之缓存击穿、缓存穿透、缓存雪崩12. redis实现分布式session 10. redis配置mysql实战优化[重要] // 最初实现OverrideTransactionalpublic Product createProduct(Product product) {productRepo.saveAndFlush(product);je…

MQTT 5.0 报文解析 05:DISCONNECT

欢迎阅读 MQTT 5.0 报文系列 的第五篇文章。在上一篇中&#xff0c;我们已经介绍了 MQTT 5.0 的 PINGREQ 和 PINGRESP 报文。现在&#xff0c;我们将介绍下一个控制报文&#xff1a;DISCONNECT。 在 MQTT 中&#xff0c;客户端和服务端可以在断开网络连接前向对端发送一个 DIS…

手把手教你搭建一个花店小程序商城

如果你是一位花店店主&#xff0c;想要为你的生意搭建一个精美的小程序商城&#xff0c;以下是你将遵循的五个步骤。 步骤1&#xff1a;登录乔拓云平台进入后台 首先&#xff0c;你需要登录乔拓云平台的后台管理页面。你可以在电脑或移动设备上的浏览器中输入乔拓云的官方网站…

2024.5.26 机器学习周报

目录 引言 Abstract 文献阅读 1、题目 2、引言 3、创新点 4、Motivation 5、naive Lite-HRNet 6、Lite-HRNet 7、实验 深度学习 解读SAM(Segment Anything Model) 1、SAM Task 2、SAM Model 2.1、Patch Embedding 2.2、Positiona Embedding 2.3、Transformer …

互联网医院开发:引领智慧医疗新时代

随着科技的迅猛发展和互联网的普及&#xff0c;传统医疗模式正在迎来一场深刻的变革。互联网医院的崛起&#xff0c;打破了时间和空间的限制&#xff0c;为患者和医疗机构带来了更加便捷、高效、安全的医疗服务体验。本文将从技术角度深入探讨互联网医院的开发&#xff0c;包括…

多线程(八)

一、wait和notify 等待 通知 机制 和join的用途类似,多个线程之间随机调度,引入 wait notify 就是为了能够从应用层面上,干预到多个不同线程代码的执行顺序.( 这里说的干预,不是影响系统的线程调度策略 内核里的线程调度,仍然是无序的. 相当于是在应用程序…

Pod容器资源限制和探针

目录 一、资源限制 1.Pod和容器的资源请求和限制 2.CPU 资源单位 案例一 案例二 二、健康检查&#xff0c;又称为探针&#xff08;Probe&#xff09; 1.探针的三种规则 2.Probe支持三种检查方法 3.探测获得的三种结果 案例一&#xff1a;exec 案例二&#xff1a;htt…

OneMO同行 心级服务:中移物联OneMO模组助力客户终端寒冷环境下的稳定运行

中移物联OneMO模组以客户为中心&#xff0c;基于中国移动心级服务要求&#xff0c;开展“OneMO同行 心级服务 标定一流”高标服务主题活动&#xff0c;升级“服务内容““服务方式”和“服务意识”&#xff0c;为行业客户提供全新的服务体验。 近日&#xff0c;某车载监控设备…

ACM实训第十七天

Is It A Tree? 问题 考试时应该做不出来&#xff0c;果断放弃 树是一种众所周知的数据结构&#xff0c;它要么是空的(null, void, nothing)&#xff0c;要么是一个或的集合满足以下属性的节点之间有向边连接的节点较多。 •只有一个节点&#xff0c;称为根节点&#xff0c;它…

【Crypto】摩丝

文章目录 一、摩斯解题感悟 一、摩斯 很明显莫尔斯密码 iloveyou还挺浪漫 小小flag&#xff0c;拿下 解题感悟 莫尔斯密码这种题还是比较明显的

智能锁千千万,谁是你的NO.1,亲身实测凯迪仕传奇大师K70旗舰新品

智能锁千千万&#xff0c;谁是你的NO.1。欢迎来到智哪儿评测室&#xff0c;这次我们为大家带来了凯迪仕传奇大师K70系列的一款重磅新品。 在科技的浪潮中&#xff0c;家居安全领域正经历着前所未有的变革。智能锁越来越成为家的安全守护神&#xff0c;以及智能生活的得力助手。…

Android 11 Framework实时监听Activity堆栈变化

核心类 Framework中有一个类SystemActivityMonitoringService专门用于监控Activity堆栈变化&#xff0c;属于隐藏Api&#xff0c;应用侧无法调用。此类位于 packages/services/Car/service/src/com/android/car/SystemActivityMonitoringService.java 方法 void registerTa…