SparkSQL函数综合实践

文章目录

  • 1. 实战概述
  • 2. 实战步骤
    • 2.1 创建项目
    • 2.2 添加依赖
    • 2.3 设置源目录
    • 2.4 创建日志属性文件
    • 2.5 创建hive配置文件
    • 2.6 创建数据分析对象
      • 2.6.1 导入相关类
      • 2.6.2 创建获取Spark会话方法
      • 2.6.3 创建表方法
      • 2.6.4 准备数据文件
      • 2.6.5 创建加载数据方法
      • 2.6.6 创建薪水排行榜方法
      • 2.6.7 创建主方法
      • 2.6.8 查看完整代码
    • 2.7 启动metastore服务
    • 2.8 运行程序,查看结果
    • 2.8 在Spark Shell里运行程序
  • 3. 实战小结

1. 实战概述

  • 通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。通过创建 Hive 表、加载 JSON 数据并使用 Spark SQL 查询每个城市工资最高的前 N 名员工,实现了数据的高效处理与分析。实战涵盖了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数的使用,适用于大数据处理场景。

2. 实战步骤

2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述
  • java目录改成scala目录
    在这里插入图片描述

2.2 添加依赖

  • pom.xml文件里添加相关依赖
    在这里插入图片描述
  • 刷新项目依赖
    在这里插入图片描述

2.3 设置源目录

  • pom.xml里设置源目录
    在这里插入图片描述

2.4 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

2.5 创建hive配置文件

  • resources里创建hive-site.xml文件
    在这里插入图片描述
  • bigdata1云主机上执行命令:$HIVE_HOME/conf/hive-site.xml,拷贝其内容到resources里的hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value></property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value></property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value></property><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value></property><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><property><name>hive.server2.thrift.port</name><value>10000</value></property><property><name>hive.server2.thrift.bind.host</name><value>bigdata1</value></property><property><name>hive.metastore.uris</name><value>thrift://bigdata1:9083</value></property><property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value></property><property><name>hive.metastore.schema.verification</name><value>false</value></property><property><name>hive.server2.active.passive.ha.enable</name><value>true</value></property>
</configuration>

2.6 创建数据分析对象

  • 添加scala-sdk到项目
    在这里插入图片描述

  • 单击【Add to Modules…】菜单项
    在这里插入图片描述

  • 单击【OK】按钮即可

  • 创建net.huawei.sql
    在这里插入图片描述

  • net.huawei.sql包里创建DataAnalysis对象
    在这里插入图片描述

2.6.1 导入相关类

  • 导入三个类:SparkConfSparkSessionDataFrame
    在这里插入图片描述

2.6.2 创建获取Spark会话方法

  • 创建getSparkSession()方法
    在这里插入图片描述
// 获取SparkSession对象                                      
def getSparkSession(): SparkSession = {                  // 创建SparkConf对象                                       val conf = new SparkConf()                             conf.setMaster("local[*]")                             conf.setAppName("DataAnalysis")                        conf.set("dfs.client.use.datanode.hostname", "true")   // 创建SparkSession对象                                    SparkSession.builder()                                 .config(conf)                                        .enableHiveSupport()                                 .getOrCreate()                                       
}                                                                           

2.6.3 创建表方法

  • 创建createTable()方法
    在这里插入图片描述
// 创建表                                                   
def createTable(spark: SparkSession): Unit = {           spark.sql(                                             s"""                                                 |CREATE TABLE IF NOT EXISTS salary_info           |  (city string, name string, salary double)      |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  |""".stripMargin                                  )                                                      
}                                                        

2.6.4 准备数据文件

  • 在项目根目录创建data目录,在里面创建salary.json文件
    在这里插入图片描述
{"city": "北京", "name": "陈燕文", "salary": 5000.0}
{"city": "上海", "name": "李伟强", "salary": 8000.0}
{"city": "广州", "name": "王丽娜", "salary": 5500.0}
{"city": "北京", "name": "赵建国", "salary": 5200.0}
{"city": "上海", "name": "孙志强", "salary": 5300.0}
{"city": "广州", "name": "方云龙", "salary": 6800.0}
{"city": "北京", "name": "周晓峰", "salary": 6400.0}
{"city": "上海", "name": "吴雅婷", "salary": 5100.0}
{"city": "广州", "name": "郑文杰", "salary": 5600.0}
{"city": "上海", "name": "王海涛", "salary": 7500.0}
{"city": "北京", "name": "李雪梅", "salary": 5800.0}
{"city": "广州", "name": "童玉明", "salary": 7800.0}

2.6.5 创建加载数据方法

  • 创建loadData()方法
    在这里插入图片描述
// 加载数据                                                                          
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {val fileDF: DataFrame = spark.read.format("json").load(inputPath)              fileDF.write.insertInto(tableName)                                             
}                                                                                

2.6.6 创建薪水排行榜方法

  • 创建salaryTopN()方法
    在这里插入图片描述
// 查询工资topN                                                                           
def salaryTopN(spark: SparkSession, topN: Int): Unit = {                              spark.sql(                                                                          s"""                                                                              |SELECT                                                                        |  city, name, salary                                                          |FROM                                                                          |  (                                                                           |    SELECT                                                                    |      city, name, salary,                                                     |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num   |    FROM                                                                      |      salary_info                                                             |  ) salary_rank                                                               |WHERE row_num <= $topN                                                        |""".stripMargin                                                               ).show()                                                                            
}                                                                                     
  • 代码说明salaryTopN 方法用于查询每个城市工资最高的前 topN 名员工。通过 row_number() 窗口函数按城市分组并按工资降序排序,生成行号 row_num,然后筛选出行号小于等于 topN 的记录。最终结果展示每个城市工资最高的前 topN 名员工的姓名和工资。

2.6.7 创建主方法

  • 通过 getSparkSession() 获取 SparkSession 实例,使用 createTable() 在 Hive 中创建表,调用 loadData() 加载数据并写入 Hive 表,通过 salaryTopN() 查询每个城市工资最高的前 N 名员工信息,最后释放资源。
    在这里插入图片描述
// 主方法                                                   
def main(args: Array[String]): Unit = {                  // 获取SparkSession对象                                    val spark = getSparkSession()                          // 创建表                                                 createTable(spark)                                     // 加载数据                                                loadData(spark, "data/salary.json", "salary_info")     // 查询工资top3                                            salaryTopN(spark, 3)                                   
}                                                        

2.6.8 查看完整代码

package net.huawei.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 功能:数据分析对象* 作者:华卫* 日期:2025年01月21日*/
object DataAnalysis {// 获取SparkSession对象def getSparkSession(): SparkSession = {// 创建SparkConf对象val conf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("DataAnalysis")conf.set("dfs.client.use.datanode.hostname", "true")// 创建SparkSession对象SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}// 创建表def createTable(spark: SparkSession): Unit = {spark.sql(s"""|CREATE TABLE IF NOT EXISTS salary_info|  (city string, name string, salary double)|  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','|""".stripMargin)}// 加载数据def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {val fileDF: DataFrame = spark.read.format("json").load(inputPath)fileDF.write.insertInto(tableName)}// 查询工资topNdef salaryTopN(spark: SparkSession, topN: Int): Unit = {spark.sql(s"""|SELECT|  city, name, salary|FROM|  (|    SELECT|      city, name, salary,|      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num|    FROM|      salary_info|  ) salary_rank|WHERE row_num <= $topN|""".stripMargin).show()}// 主方法def main(args: Array[String]): Unit = {// 获取SparkSession对象val spark = getSparkSession()// 创建表createTable(spark)// 加载数据loadData(spark, "data/salary.json", "salary_info")// 查询工资top3salaryTopN(spark, 3)}
}

2.7 启动metastore服务

  • 执行命令:hive --service metastore &
    在这里插入图片描述

2.8 运行程序,查看结果

  • 运行DataAnalysis对象
    在这里插入图片描述
  • hive客户端,查看创建的c
    在这里插入图片描述
  • 查看salary_info表的内容
    在这里插入图片描述
  • 在HDFS上查看salary_info表对应的目录
    在这里插入图片描述
  • 下载文件,查看内容
    在这里插入图片描述

2.8 在Spark Shell里运行程序

  • salary.json上传到HDFS的/data目录
    在这里插入图片描述

  • 在spark shell里执行命令::paste,粘贴代码
    在这里插入图片描述

  • Ctrl + D,查看结果
    在这里插入图片描述

3. 实战小结

  • 本次实战通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。首先,我们创建了 Hive 表并加载了 JSON 数据,随后通过 Spark SQL 查询每个城市工资最高的前 N 名员工。实战中,我们使用了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数等技术,实现了数据的高效处理与分析。通过本次实战,我们掌握了 Spark 和 Hive 的基本操作,并学会了如何在大数据场景下进行数据分析和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/67827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flutter中PlatformView在鸿蒙中的使用

Flutter中PlatformView在鸿蒙中的使用 概述在Flutter中的处理鸿蒙端创建内嵌的鸿蒙视图创建PlatformView创建PlatformViewFactory创建plugin&#xff0c;注册platformview注册插件 概述 集成平台视图&#xff08;后称为平台视图&#xff09;允许将原生视图嵌入到 Flutter 应用…

逆波兰表达式求值(力扣150)

这道题也是一道经典的栈应用题。为什么这样说呢&#xff1f;我们可以发现&#xff0c;当我们遍历到运算符号的时候&#xff0c;我们就需要操控这个运算符之前的两个相邻的数。这里相邻数不仅仅指最初数组里相邻的数&#xff0c;在进行了运算之后&#xff0c;得到的结果与后面的…

ElasticSearch DSL查询之排序和分页

一、排序功能 1. 默认排序 在 Elasticsearch 中&#xff0c;默认情况下&#xff0c;查询结果是根据 相关度 评分&#xff08;score&#xff09;进行排序的。我们之前已经了解过&#xff0c;相关度评分是通过 Elasticsearch 根据查询条件与文档内容的匹配程度自动计算得出的。…

《汽车维修技师》是什么级别的期刊?是正规期刊吗?能评职称吗?

​问题解答&#xff1a; 问&#xff1a;《汽车维修技师》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的正规学术期刊。 问&#xff1a;《汽车维修技师》级别&#xff1f; 答&#xff1a;省级。主管单位&#xff1a;北方联合出版传媒&#xff08;…

产品经理面试题总结2025【其一】

一、产品理解与定位 1、你如何理解产品经理这个角色&#xff1f; 作为一名互联网产品经理&#xff0c;我理解这个角色的核心在于成为产品愿景的制定者和执行的推动者。具体来说&#xff0c;产品经理是连接市场、用户和技术团队之间的桥梁&#xff0c;负责理解市场需求、用户痛…

数学基础 --线性代数之理解矩阵乘法

理解矩阵乘法的解析 矩阵乘法&#xff08;Matrix Multiplication&#xff09;是线性代数中的核心操作之一。在数学、几何和工程实际中&#xff0c;它不仅是一种代数运算规则&#xff0c;还承载着丰富的几何和映射意义。本文将从多个角度深入解析矩阵乘法&#xff0c;帮助读者理…

C#高级:用Csharp操作鼠标和键盘

一、winform 1.实时获取鼠标位置 public Form1() {InitializeComponent();InitialTime(); }private void InitialTime() {// 初始化 Timer 控件var timer new System.Windows.Forms.Timer();timer.Interval 100; // 设置为 100 毫秒&#xff0c;即每 0.1 秒更新一次timer.…

【中国电信-安全大脑产品介绍】

座右铭&#xff1a;人生的道路上无论如何选择总会有遗憾的&#xff01; 文章目录 前言一、安全大脑介绍二、中国电信-安全大脑产品分类1.防护版2.审计版 三、安全大脑-部署方案总结 前言 安全占据我们日常生活中首要地位&#xff0c;它时时刻刻提醒着我们出入平安。当然网络安…

数据库:MongoDB命令行帮助解释

MongoDB命令&#xff1a; mongodmongosmongoperrormongoexportmongofilesmongoimportmongorestoreMongostat MongoDB包中的核心组件包括: mongod 是 MongoDB 的核心服务器进程&#xff0c;负责数据存储和管理。mongos 是分片集群的路由进程&#xff0c;负责将请求路由到正确…

洛谷P8837

[传智杯 #3 决赛] 商店 - 洛谷 代码区&#xff1a; #include<stdio.h> #include<stdlib.h> int cmp(const void*a,const void *b){return *(int*)b-*(int*)a; } int main(){int n,m;scanf("%d%d",&n,&m);int w[n];int c[m];for(int i0;i<n;…

多线程杂谈:惊群现象、CAS、安全的单例

引言 本文是一篇杂谈&#xff0c;帮助大家了解多线程可能会出现的面试题。 目录 引言 惊群现象 结合条件变量 CAS原子操作&#xff08;cmp & swap&#xff09; 线程控制&#xff1a;两个线程交替打印奇偶数 智能指针线程安全 单例模式线程安全 最简单的单例&…

三分钟简单了解HTML的一些语句

1.图片建议建立一个文件夹如下图所示 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"keywords"><title>魔神羽落</title><style>.testone{background-color: #ff53e…

HCIP笔记4--OSPF域内路由计算

1. 域内LSA 1.1 一类LSA 一类LSA: 路由器直连状态&#xff0c;Router LSA。 串口需要两端配置好IP,才会产生一类LSA; 以太网口只需要一端配置了IP就会直接产生一类LSA。 LSA通用头部 Type: Router 直连路由LS id: 12.1.1.1 路由器router idAdv rtr: 12.1.1.1 通告的路由器&…

k8s基础(7)—Kubernetes-Secret

Secret概述&#xff1a; Secret 是一种包含少量敏感信息例如密码、令牌或密钥的对象。 这样的信息可能会被放在 Pod 规约中或者镜像中。 使用 Secret 意味着你不需要在应用程序代码中包含机密数据。 由于创建 Secret 可以独立于使用它们的 Pod&#xff0c; 因此在创建、查看和…

【leetcode100】验证二叉搜索树

1、题目描述 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右子树自身必须也是二叉搜索树。 示例 1&…

谈谈MySQL中的索引和事务

目录 1. 索引 1.1 索引介绍 1.2 缺陷 1.3 使用 1.3.1 查看索引 1.3.2 创建索引 1.3.3 删除索引 2. 索引底层的数据结构 2.1 B树 3. 事务 3.1 为什么使用事务 3.2 事务的使用 3.3 事务的基本特性 1. 索引 1.1 索引介绍 索引相当于一本书的目录(index), 在一…

2024:CSDN上的收获与蜕变——我的技术成长之旅

2024&#xff1a;CSDN上的收获与蜕变——我的技术成长之旅 前言数据见证&#xff1a;2024年的创作足迹荣誉殿堂&#xff1a;各平台的创作证书与认可社区共建&#xff1a;行业贡献与互动交流展望未来&#xff1a;2025年的目标与计划结语 前言 博主简介&#xff1a;江湖有缘 在技…

博客之星2024年度-技术总结:技术探险家小板的一年的征程

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 技术探险家的新一年征程 2.0 数据库管理与优化&#xff1a;MySQL 的魔法森林 2.1 穿越基础概念的迷雾 2.2 实践应用&#xff1a;成为森林的主人 2.3 性能调优&…

【vim】vim怎样直接跳转到某行?

vim怎样直接跳转到某行&#xff1f; 一、使用行号跳转二、使用相对行号跳转三、使用标记跳转 在Vim中直接跳转到某行可以使用以下几种方法&#xff1a; 一、使用行号跳转 在命令模式下&#xff0c;输入冒号:&#xff0c;然后输入你想要跳转的行号&#xff0c;最后按回车键。例…

SentencePiece和 WordPiece tokenization 的含义和区别

SentencePiece和 WordPiece tokenization 的含义和区别 SentencePiece 和 WordPiece 都是常用的分词(tokenization)技术,主要用于自然语言处理(NLP)中的文本预处理,尤其是在处理大规模文本数据时。它们都基于子词(subword)单元,能够将未登录词(out-of-vocabulary, O…