SparkSQL与Hive整合 、SparkSQL函数操作

SparkSQL与Hive整合

SparkSQL和Hive的整合,是一种比较常见的关联处理方式,SparkSQL加载Hive中的数据进行业务处理,同时将计算结果落地回Hive中。

整合需要注意的地方

1)需要引入hive的hive-site.xml,添加classpath目录下面即可,或者放到$SPARK_HOME/conf。

2)为了能够正常解析hive-site.xml中hdfs路径,需要将hdfs-site.xml和core-site.xml到classpath下面。整合编码如下:

object Hive_Support {def main(args: Array[String]): Unit = {//创建sparkSql程序入口val spark: SparkSession = SparkSession.builder().appName("demo").master("local[*]").enableHiveSupport().getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//查询hive当中的表spark.sql("show tables").show()//创建表spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '")//导入数据spark.sql("load data local inpath'./person.txt' into table person")//查询表当中数据spark.sql("select * from person").show()}
}

SparkSQL函数操作

函数的定义

SQL中函数,其实说白了就是各大编程语言中的函数,或者方法,就是对某一特定功能的封装,通过它可以完成较为复杂的统计。这里的函数的学习,就基于Hive中的函数来学习。

函数的分类

函数的分类方式非常多,主要从功能和实现方式上进行区分。

实现方式上分类

1)UDF(User Defined function)用户自定义函数:一路输入,一路输出,比如year,date_add, instr。

2)UDAF(User Defined aggregation function)用户自定义聚合函数:多路输入,一路输出,常见的聚合函数:count、sum、collect_list。

3)UDTF(User Defined table function)用户自定义表函数:一路输入,多路输出,explode。

4)开窗函数:row_number(),sum/max/min over。

用户自定义函数

当系统提供的这些函数,满足不了我们的需要的话,就只能进行自定义相关的函数,一般自定义的函数两种,UDF和UDAF。

1)UDF:一路输入,一路输出,完成就是基于scala函数。

通过模拟获取字符串长度的udf来学习自定义udf操作。

object UDF_Demo {def main(args: Array[String]): Unit = {//创建sparkSql程序入口val spark: SparkSession = SparkSession.builder().appName("demo").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载文件val personDF: DataFrame = spark.read.json("E:\\data\\people.json")//展示数据//personDF.show()//注册成为一张表personDF.createOrReplaceTempView("t_person")//赋予什么功能val fun = (x:String)=>{"Name:"+x}//没有addName这个函数,就注册它spark.udf.register("addName",fun)//查询spark.sql("select name,addName(name) from t_person").show()//释放资源spark.stop()}}

2)开窗函数:over()开窗函数是按照某个字段分组,然后查询出另一字段的前几个的值,相当于分组取topN。

row_number() over (partitin by XXX order by XXX)

rank() 跳跃排序,有两个第二名是,后边跟着的是第四名

dense_rank()  连续排序,有两个第二名是,后边跟着的是第三名

row_number() 连续排序,两个值相同排序也是不同

在使用聚合函数后,会将多行变成一行,而over()开窗函数其实就是给每个分组的数据,按照其排序的顺序,打上一个分组内的行号,直接将所有列信息显示出来。在使用聚合函数后,如果要显示其它的列必须将列加入到group by中,而使用开窗函数后,可以不使用group by。

代码如下:

case class StudentScore(name:String,clazz:Int,score:Int)
object SparkSqlOverDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("sparksqlover")val sc = new SparkContext(conf)val spark = SparkSession.builder().config(conf).getOrCreate()val arr01 = Array(("a",1,88),("b",1,78),("c",1,95),("d",2,74),("e",2,92),("f",3,99),("g",3,99),("h",3,45),("i",3,53),("j",3,78))import spark.implicits._val scoreRDD = sc.makeRDD(arr01).map(x=>StudentScore(x._1,x._2,x._3)).toDSscoreRDD.createOrReplaceTempView("t_score")//查询t_score表数据spark.sql("select * from t_score").show()//使用开窗函数查找topN,rank() 跳跃排序,有两个第二名是,后边跟着的是第四名spark.sql("select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score ").show()//讲使用开窗函数后的查询结果作为一张临时表,这个临时表有每个班的成绩排名,再取前三名spark.sql("select * from (select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score) t1 where rownum <=3 ").show()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/7986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【web网页制作】html+css旅游家乡河南开封主题网页制作(4页面)【附源码】

HTMLCSS家乡河南主题网页目录 &#x1f354;涉及知识&#x1f964;写在前面&#x1f367;一、网页主题&#x1f333;二、页面效果Page1 首页Page2 开封游玩Page 3 开封美食Page4 留言 &#x1f308; 三、网页架构与技术3.1 脑海构思3.2 整体布局3.3 技术说明书 &#x1f40b;四…

springboot lua检查redis库存

需求 最近需求需要实现检查多个马戏场次下的座位等席对应库存渠道的库存余量&#xff0c;考虑到性能&#xff0c;决定采用Lua脚本实现库存检查。 数据结构 库存层级结构 redis库存hash类型结构 实现 lua脚本 --- 字符串分割为数组 local function split(str, char)local…

IIoT:数据融合在工业物联网中的应用——青创智通

工业物联网解决方案-工业IOT-青创智通 随着科技的不断发展&#xff0c;工业物联网&#xff08;IIoT&#xff09;已经逐渐渗透到各个行业&#xff0c;为企业的生产和管理带来了前所未有的便利。 然而&#xff0c;与此同时&#xff0c;海量的数据也为企业带来了挑战。如何将这些…

linux的firmware和hal层

linux的firmware和hal层 在Linux中&#xff0c;固件&#xff08;firmware&#xff09;和硬件抽象层&#xff08;Hardware Abstraction Layer&#xff0c;HAL&#xff09;是两个不同的概念。固件是运行在硬件设备上的程序&#xff0c;它们通常被用来控制硬件的操作。而HAL是一种…

【数学建模】天然肠衣搭配问题

2011高教社杯全国大学生数学建模竞赛D题 天然肠衣&#xff08;以下简称肠衣&#xff09;制作加工是我国的一个传统产业&#xff0c;出口量占世界首位。肠衣经过清洗整理后被分割成长度不等的小段&#xff08;原料&#xff09;&#xff0c;进入组装工序。传统的生产方式依靠人工…

Node.js v20.12.2版本执行npm run dev 报openssl 错误2024最新修复方案

Node.js v20.12.2版本执行npm run dev 报openssl 错误2024最新修复方案 故障描述修复方案 故障描述 ERROR SyntaxError: missing ) after argument list C:\Users\xxx\Documents\workspace\WebstormProjects\xxxx\node_modules\webpack\lib\util\createHash.js:135return new …

基于springboot实现可盈保险合同管理系统项目【项目源码+论文说明】

基于springboot实现可盈保险合同管理系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本可盈保险合同管理系统就是在这样的大环境下诞生&#xff0c;其…

C语言总结五:操作符(压缩版)

一&#xff0c;操作符分类 算术操作符&#xff0c;移位操作符&#xff0c;位操作符&#xff0c;赋值操作符&#xff0c;单目操作符&#xff0c;关系操作符&#xff0c;逻辑操作符&#xff0c;条件操作符&#xff0c;逗号表达式&#xff0c;下标引用&#xff0c;函数调用&#…

MySQL中GROUP_CONCAT与JSON_OBJECT、GROUP BY的巧妙结合:打造高效JSON数组汇总

在数据库操作中&#xff0c;经常遇到需要将同一组内的多行数据汇总为一个结构化的输出&#xff0c;特别是在处理一对多关系时。MySQL 5.7及以上版本引入了对JSON的支持&#xff0c;使得这一过程变得更加灵活和高效。本文将以一个实例深入探讨如何利用GROUP_CONCAT结合JSON_OBJE…

C#里如何设置输出路径,不要net7.0-windows

官网介绍&#xff1a; 更改生成输出目录 - Visual Studio (Windows) | Microsoft Learn <PropertyGroup> <AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath> <AppendRuntimeIdentifierToOutputPath>false</Appen…

面试题: malloc与new的区别

malloc, free是C语言中的库函数&#xff0c; new, delete是C中的运算符new自动计算分配内存的大小&#xff0c;malloc需要手动计算分配内存的大小new返回对象类型的指针&#xff0c;malloc返回的是void*类型&#xff0c;需要显式类型转换new分配失败抛出异常&#xff0c;malloc…

大数据Scala教程从入门到精通第四篇:Scala语言特点

一&#xff1a;Scala语言特点 Scala是一门头Java虚拟机(JVM)为运行环境并将面向对象和函数式编程的最佳特性结合在一起的静态类型编程语言(静态语言需要提前编译的如:Java、c、c等&#xff0c;动态语言如:js) 1)&#xff1a;Scala是一门多范式的编程语言&#xff0c;Scala支持…

[COCI2022-2023#1] Berilij 题解

推荐在 cnblogs 上阅读。 Solution P9030 [COCI2022-2023#1] Berilij 本题解转载翻译自官方题解&#xff1a;COCI 2022/2023 CONTEST 1 Part 1 让我们定义图形 G G G&#xff0c;顶点代表飞船&#xff0c;边代表两艘飞船外部接触的情况。此外&#xff0c;让边的边权成为它…

AI大模型程序员小白入门 - 关于如何更好地学习算法

关于本书 本项目旨在打造一本开源免费、新手友好的数据结构与算法入门教程。 全书采用动画图解,内容清晰易懂、学习曲线平滑,引导初学者探索数据结构与算法的知识地图。源代码可一键运行,帮助读者在练习中提升编程技能,了解算法工作原理和数据结构底层实现。提倡读者互助学…

git bash各分支修改内容不同但合并后不显示冲突问题

在跟着廖雪峰老师的git学习时&#xff0c;按部就班的执行明后&#xff0c;发现 而不是出现原文的结果 解决方法&#xff1a; 切换位feature分支&#xff0c;再合并 git switch feature1 git merge master 此时我们发现&#xff1a; 后面再跟着原文敲就可以了

双指针类型解题汇总

1 最接近的三数之和 给定一个包括 n 个整数的数组 nums 和 一个目标值 target。找出 nums 中的三个整数&#xff0c;使得它们的和与 target 最接近。返回这三个数的和。假定每组输入只存在唯一答案。 示例&#xff1a;输入&#xff1a;nums [-1,2,1,-4], target 1 输出&am…

每日一题5:Pandas-修改列

一、每日一题 一家公司决定增加员工的薪水。 编写一个解决方案&#xff0c;将每个员工的薪水乘以2来 修改 salary 列。 返回结果格式如下示例所示。 解答&#xff1a; import pandas as pddef modifySalaryColumn(employees: pd.DataFrame) -> pd.DataFrame:employees.loc[…

如何更好地使用Kafka? - 故障时解决

要确保Kafka在使用过程中的稳定性&#xff0c;需要从kafka在业务中的使用周期进行依次保障。主要可以分为&#xff1a;事先预防&#xff08;通过规范的使用、开发&#xff0c;预防问题产生&#xff09;、运行时监控&#xff08;保障集群稳定&#xff0c;出问题能及时发现&#…

Databend 开源周报第 143 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 了解 Databend …

Redis学习(十)|使用消息队列的重试机制实现 MySQL 和 Redis 的数据一致性

文章目录 介绍原理整体方案实现步骤示例代码总结其他&#xff1a;Kafka 重试策略配置1. 生产者重试策略配置2. 消费者重试策略配置 介绍 在分布式系统中&#xff0c;保持 MySQL 和 Redis 之间的数据一致性是至关重要的。为了确保数据的一致性&#xff0c;我们通常采取先更新数…