SparkSQL函数综合实践

文章目录

  • 1. 实战概述
  • 2. 实战步骤
    • 2.1 创建项目
    • 2.2 添加依赖
    • 2.3 设置源目录
    • 2.4 创建日志属性文件
    • 2.5 创建hive配置文件
    • 2.6 创建数据分析对象
      • 2.6.1 导入相关类
      • 2.6.2 创建获取Spark会话方法
      • 2.6.3 创建表方法
      • 2.6.4 准备数据文件
      • 2.6.5 创建加载数据方法
      • 2.6.6 创建薪水排行榜方法
      • 2.6.7 创建主方法
      • 2.6.8 查看完整代码
    • 2.7 启动metastore服务
    • 2.8 运行程序,查看结果
    • 2.8 在Spark Shell里运行程序
  • 3. 实战小结

1. 实战概述

  • 通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。通过创建 Hive 表、加载 JSON 数据并使用 Spark SQL 查询每个城市工资最高的前 N 名员工,实现了数据的高效处理与分析。实战涵盖了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数的使用,适用于大数据处理场景。

2. 实战步骤

2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述
  • java目录改成scala目录
    在这里插入图片描述

2.2 添加依赖

  • pom.xml文件里添加相关依赖
    在这里插入图片描述
  • 刷新项目依赖
    在这里插入图片描述

2.3 设置源目录

  • pom.xml里设置源目录
    在这里插入图片描述

2.4 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

2.5 创建hive配置文件

  • resources里创建hive-site.xml文件
    在这里插入图片描述
  • bigdata1云主机上执行命令:$HIVE_HOME/conf/hive-site.xml,拷贝其内容到resources里的hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value></property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value></property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value></property><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value></property><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><property><name>hive.server2.thrift.port</name><value>10000</value></property><property><name>hive.server2.thrift.bind.host</name><value>bigdata1</value></property><property><name>hive.metastore.uris</name><value>thrift://bigdata1:9083</value></property><property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value></property><property><name>hive.metastore.schema.verification</name><value>false</value></property><property><name>hive.server2.active.passive.ha.enable</name><value>true</value></property>
</configuration>

2.6 创建数据分析对象

  • 添加scala-sdk到项目
    在这里插入图片描述

  • 单击【Add to Modules…】菜单项
    在这里插入图片描述

  • 单击【OK】按钮即可

  • 创建net.huawei.sql
    在这里插入图片描述

  • net.huawei.sql包里创建DataAnalysis对象
    在这里插入图片描述

2.6.1 导入相关类

  • 导入三个类:SparkConfSparkSessionDataFrame
    在这里插入图片描述

2.6.2 创建获取Spark会话方法

  • 创建getSparkSession()方法
    在这里插入图片描述
// 获取SparkSession对象                                      
def getSparkSession(): SparkSession = {                  // 创建SparkConf对象                                       val conf = new SparkConf()                             conf.setMaster("local[*]")                             conf.setAppName("DataAnalysis")                        conf.set("dfs.client.use.datanode.hostname", "true")   // 创建SparkSession对象                                    SparkSession.builder()                                 .config(conf)                                        .enableHiveSupport()                                 .getOrCreate()                                       
}                                                                           

2.6.3 创建表方法

  • 创建createTable()方法
    在这里插入图片描述
// 创建表                                                   
def createTable(spark: SparkSession): Unit = {           spark.sql(                                             s"""                                                 |CREATE TABLE IF NOT EXISTS salary_info           |  (city string, name string, salary double)      |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  |""".stripMargin                                  )                                                      
}                                                        

2.6.4 准备数据文件

  • 在项目根目录创建data目录,在里面创建salary.json文件
    在这里插入图片描述
{"city": "北京", "name": "陈燕文", "salary": 5000.0}
{"city": "上海", "name": "李伟强", "salary": 8000.0}
{"city": "广州", "name": "王丽娜", "salary": 5500.0}
{"city": "北京", "name": "赵建国", "salary": 5200.0}
{"city": "上海", "name": "孙志强", "salary": 5300.0}
{"city": "广州", "name": "方云龙", "salary": 6800.0}
{"city": "北京", "name": "周晓峰", "salary": 6400.0}
{"city": "上海", "name": "吴雅婷", "salary": 5100.0}
{"city": "广州", "name": "郑文杰", "salary": 5600.0}
{"city": "上海", "name": "王海涛", "salary": 7500.0}
{"city": "北京", "name": "李雪梅", "salary": 5800.0}
{"city": "广州", "name": "童玉明", "salary": 7800.0}

2.6.5 创建加载数据方法

  • 创建loadData()方法
    在这里插入图片描述
// 加载数据                                                                          
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {val fileDF: DataFrame = spark.read.format("json").load(inputPath)              fileDF.write.insertInto(tableName)                                             
}                                                                                

2.6.6 创建薪水排行榜方法

  • 创建salaryTopN()方法
    在这里插入图片描述
// 查询工资topN                                                                           
def salaryTopN(spark: SparkSession, topN: Int): Unit = {                              spark.sql(                                                                          s"""                                                                              |SELECT                                                                        |  city, name, salary                                                          |FROM                                                                          |  (                                                                           |    SELECT                                                                    |      city, name, salary,                                                     |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num   |    FROM                                                                      |      salary_info                                                             |  ) salary_rank                                                               |WHERE row_num <= $topN                                                        |""".stripMargin                                                               ).show()                                                                            
}                                                                                     
  • 代码说明salaryTopN 方法用于查询每个城市工资最高的前 topN 名员工。通过 row_number() 窗口函数按城市分组并按工资降序排序,生成行号 row_num,然后筛选出行号小于等于 topN 的记录。最终结果展示每个城市工资最高的前 topN 名员工的姓名和工资。

2.6.7 创建主方法

  • 通过 getSparkSession() 获取 SparkSession 实例,使用 createTable() 在 Hive 中创建表,调用 loadData() 加载数据并写入 Hive 表,通过 salaryTopN() 查询每个城市工资最高的前 N 名员工信息,最后释放资源。
    在这里插入图片描述
// 主方法                                                   
def main(args: Array[String]): Unit = {                  // 获取SparkSession对象                                    val spark = getSparkSession()                          // 创建表                                                 createTable(spark)                                     // 加载数据                                                loadData(spark, "data/salary.json", "salary_info")     // 查询工资top3                                            salaryTopN(spark, 3)                                   
}                                                        

2.6.8 查看完整代码

package net.huawei.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 功能:数据分析对象* 作者:华卫* 日期:2025年01月21日*/
object DataAnalysis {// 获取SparkSession对象def getSparkSession(): SparkSession = {// 创建SparkConf对象val conf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("DataAnalysis")conf.set("dfs.client.use.datanode.hostname", "true")// 创建SparkSession对象SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}// 创建表def createTable(spark: SparkSession): Unit = {spark.sql(s"""|CREATE TABLE IF NOT EXISTS salary_info|  (city string, name string, salary double)|  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','|""".stripMargin)}// 加载数据def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {val fileDF: DataFrame = spark.read.format("json").load(inputPath)fileDF.write.insertInto(tableName)}// 查询工资topNdef salaryTopN(spark: SparkSession, topN: Int): Unit = {spark.sql(s"""|SELECT|  city, name, salary|FROM|  (|    SELECT|      city, name, salary,|      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num|    FROM|      salary_info|  ) salary_rank|WHERE row_num <= $topN|""".stripMargin).show()}// 主方法def main(args: Array[String]): Unit = {// 获取SparkSession对象val spark = getSparkSession()// 创建表createTable(spark)// 加载数据loadData(spark, "data/salary.json", "salary_info")// 查询工资top3salaryTopN(spark, 3)}
}

2.7 启动metastore服务

  • 执行命令:hive --service metastore &
    在这里插入图片描述

2.8 运行程序,查看结果

  • 运行DataAnalysis对象
    在这里插入图片描述
  • hive客户端,查看创建的c
    在这里插入图片描述
  • 查看salary_info表的内容
    在这里插入图片描述
  • 在HDFS上查看salary_info表对应的目录
    在这里插入图片描述
  • 下载文件,查看内容
    在这里插入图片描述

2.8 在Spark Shell里运行程序

  • salary.json上传到HDFS的/data目录
    在这里插入图片描述

  • 在spark shell里执行命令::paste,粘贴代码
    在这里插入图片描述

  • Ctrl + D,查看结果
    在这里插入图片描述

3. 实战小结

  • 本次实战通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。首先,我们创建了 Hive 表并加载了 JSON 数据,随后通过 Spark SQL 查询每个城市工资最高的前 N 名员工。实战中,我们使用了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数等技术,实现了数据的高效处理与分析。通过本次实战,我们掌握了 Spark 和 Hive 的基本操作,并学会了如何在大数据场景下进行数据分析和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/67827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ElasticSearch DSL查询之排序和分页

一、排序功能 1. 默认排序 在 Elasticsearch 中&#xff0c;默认情况下&#xff0c;查询结果是根据 相关度 评分&#xff08;score&#xff09;进行排序的。我们之前已经了解过&#xff0c;相关度评分是通过 Elasticsearch 根据查询条件与文档内容的匹配程度自动计算得出的。…

《汽车维修技师》是什么级别的期刊?是正规期刊吗?能评职称吗?

​问题解答&#xff1a; 问&#xff1a;《汽车维修技师》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的正规学术期刊。 问&#xff1a;《汽车维修技师》级别&#xff1f; 答&#xff1a;省级。主管单位&#xff1a;北方联合出版传媒&#xff08;…

【中国电信-安全大脑产品介绍】

座右铭&#xff1a;人生的道路上无论如何选择总会有遗憾的&#xff01; 文章目录 前言一、安全大脑介绍二、中国电信-安全大脑产品分类1.防护版2.审计版 三、安全大脑-部署方案总结 前言 安全占据我们日常生活中首要地位&#xff0c;它时时刻刻提醒着我们出入平安。当然网络安…

洛谷P8837

[传智杯 #3 决赛] 商店 - 洛谷 代码区&#xff1a; #include<stdio.h> #include<stdlib.h> int cmp(const void*a,const void *b){return *(int*)b-*(int*)a; } int main(){int n,m;scanf("%d%d",&n,&m);int w[n];int c[m];for(int i0;i<n;…

多线程杂谈:惊群现象、CAS、安全的单例

引言 本文是一篇杂谈&#xff0c;帮助大家了解多线程可能会出现的面试题。 目录 引言 惊群现象 结合条件变量 CAS原子操作&#xff08;cmp & swap&#xff09; 线程控制&#xff1a;两个线程交替打印奇偶数 智能指针线程安全 单例模式线程安全 最简单的单例&…

三分钟简单了解HTML的一些语句

1.图片建议建立一个文件夹如下图所示 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"keywords"><title>魔神羽落</title><style>.testone{background-color: #ff53e…

HCIP笔记4--OSPF域内路由计算

1. 域内LSA 1.1 一类LSA 一类LSA: 路由器直连状态&#xff0c;Router LSA。 串口需要两端配置好IP,才会产生一类LSA; 以太网口只需要一端配置了IP就会直接产生一类LSA。 LSA通用头部 Type: Router 直连路由LS id: 12.1.1.1 路由器router idAdv rtr: 12.1.1.1 通告的路由器&…

k8s基础(7)—Kubernetes-Secret

Secret概述&#xff1a; Secret 是一种包含少量敏感信息例如密码、令牌或密钥的对象。 这样的信息可能会被放在 Pod 规约中或者镜像中。 使用 Secret 意味着你不需要在应用程序代码中包含机密数据。 由于创建 Secret 可以独立于使用它们的 Pod&#xff0c; 因此在创建、查看和…

【leetcode100】验证二叉搜索树

1、题目描述 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右子树自身必须也是二叉搜索树。 示例 1&…

谈谈MySQL中的索引和事务

目录 1. 索引 1.1 索引介绍 1.2 缺陷 1.3 使用 1.3.1 查看索引 1.3.2 创建索引 1.3.3 删除索引 2. 索引底层的数据结构 2.1 B树 3. 事务 3.1 为什么使用事务 3.2 事务的使用 3.3 事务的基本特性 1. 索引 1.1 索引介绍 索引相当于一本书的目录(index), 在一…

2024:CSDN上的收获与蜕变——我的技术成长之旅

2024&#xff1a;CSDN上的收获与蜕变——我的技术成长之旅 前言数据见证&#xff1a;2024年的创作足迹荣誉殿堂&#xff1a;各平台的创作证书与认可社区共建&#xff1a;行业贡献与互动交流展望未来&#xff1a;2025年的目标与计划结语 前言 博主简介&#xff1a;江湖有缘 在技…

博客之星2024年度-技术总结:技术探险家小板的一年的征程

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 技术探险家的新一年征程 2.0 数据库管理与优化&#xff1a;MySQL 的魔法森林 2.1 穿越基础概念的迷雾 2.2 实践应用&#xff1a;成为森林的主人 2.3 性能调优&…

视频m3u8形式播放 -- python and html

hls hls官网地址 创建项目 ts为视频片段 m3u8文件内容 html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" …

【知识分享】PCIe5.0 TxRx 电气设计参数汇总

目录 0 引言 1 参考时钟--Refclk 2 发射端通道设计 3 发送均衡技术 4 接收端通道设计 5 接收均衡技术 6 结语 7 参考文献 8 扩展阅读 0 引言 PCI Express Base Specification 5.0的电气规范中&#xff0c;关键技术要点如下&#xff1a; 1. 支持2.5、5.0、8.0、16.0和3…

【HF设计模式】06-命令模式

声明&#xff1a;仅为个人学习总结&#xff0c;还请批判性查看&#xff0c;如有不同观点&#xff0c;欢迎交流。 摘要 《Head First设计模式》第6章笔记&#xff1a;结合示例应用和代码&#xff0c;介绍命令模式&#xff0c;包括遇到的问题、采用的解决方案、遵循的 OO 原则、…

记一次数据库连接 bug

整个的报错如下&#xff1a; com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Metho…

【游戏设计原理】76 - 惩罚

惩罚是玩家在游戏中得到反馈的一种形式&#xff0c;可以认为是一种负反馈。 除了文中提到的几种惩罚机制&#xff08;“生命/游戏结束/继续”、“枯萎”、“永久死亡”&#xff09;&#xff0c;还有其他一些常见的惩罚类型&#xff0c;它们的设计主要目的是增加游戏的挑战性&a…

Java 基于 SpringBoot+Vue 的二手车交易系统(附源码,部署+文档)

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

【Python使用】嘿马python高级进阶全体系教程第11篇:静态Web服务器-面向对象开发,1. 以面向对象的方式开发静态W

本教程的知识点为&#xff1a;操作系统 1. 常见的操作系统 4. 小结 ls命令选项 2. 小结 mkdir和rm命令选项 1. mkdir命令选项 压缩和解压缩命令 1. 压缩格式的介绍 2. tar命令及选项的使用 3. zip和unzip命令及选项的使用 4. 小结 编辑器 vim 1. vim 的介绍 2. vim 的工作模式 …

即现软著工具 - 让软著申请更高效

在软件著作权申请的过程中&#xff0c;开发者常常会遇到代码整理、统计和生成证明文件等繁琐且复杂的任务。为了解决这些问题&#xff0c;提高申请效率和成功率&#xff0c;给大家介绍一款工具&#xff1a;即现软著工具。 即现软著工具&#xff0c;能够快速整理软著申请的程序鉴…