实验五 Spark SQL编程初级实践

Spark SQL编程初级实践

  • Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Scala语句完成下列操作:

  1. 查询所有数据;
  2. 查询所有数据,并去除重复的数据;
  3. 查询所有数据,打印时去除id字段;
  4. 筛选出age>30的记录;
  5. 将数据按age分组;
  6. 将数据按name升序排列;
  7. 取出前3行数据;
  8. 查询所有记录的name列,并为其取别名为username;
  9. 查询年龄age的平均值;
  10. 查询年龄age的最小值。

  • 编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

  • 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

表6-2 employee表原有数据

id

name

gender

Age

1

Alice

F

22

2

John

M

25

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表6-3 employee表新增数据

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

实验一 :Spark SQL基本操作

1)
// 导入必要的库
import org.apache.spark.sql.SparkSession// 创建SparkSession
val spark = SparkSession.builder().appName("Spark SQL Basic Operations").getOrCreate()// 读取JSON文件创建DataFrameval df = spark.read.json("file:///home/hadoop/employee.json")// (1) 查询所有数据
df.show()
(2)查询所有数据,并去除重复的数据
df.distinct().show()(3)
查询所有数据,打印时去除id字段
df.drop("id").show()(4)
筛选出age>30的记录
df.filter("age > 30").show()(5)
将数据按age分组
df.groupBy("age").count().show()(6)
将数据按name升序排列
df.orderBy("name").show()(7)
取出前3行数据
df.limit(3).show()(8)
查询所有记录的name列,并为其取别名为username
df.select($"name".alias("username")).show()(9)
查询年龄age的平均值
df.selectExpr("avg(age)").show()(10)
查询年龄age的最小值
df.selectExpr("min(age)").show()

实验二 :编程实现将RDD转换为DataFrame

编程代码:

import org.apache.spark.sql.{SparkSession, Row}  
import org.apache.spark.sql.types._  object RDDToDataFrameExample {  def main(args: Array[String]): Unit = {  // 创建SparkSession  val spark = SparkSession.builder()  .appName("RDD to DataFrame Example")  .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  .getOrCreate()  import spark.implicits._  // 指定employee.txt文件的位置  val inputFilePath = "file:///home/hadoop/employee.txt"  // 从文本文件读取数据创建RDD  val rdd = spark.sparkContext.textFile(inputFilePath)  // 定义DataFrame的schema  val schema = StructType(Array(  StructField("id", IntegerType, nullable = false),  StructField("name", StringType, nullable = false),  StructField("age", IntegerType, nullable = false)  ))  // 将RDD转换为DataFrame  val dataFrame = spark.createDataFrame(rdd.map { line =>  val parts = line.split(",")  Row(parts(0).toInt, parts(1), parts(2).toInt)  }, schema)  // 显示DataFrame内容  dataFrame.show(false)  // 按照指定格式打印所有数据  dataFrame.collect().foreach { row =>  println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")  }  // 停止SparkSession  spark.stop()  }  
}

 命令

/usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

 具体操作参考博客

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

实验三:编程实现利用DataFrame读写MySQL的数据

mysql代码

CREATE DATABASE sparktest;  
USE sparktest;  CREATE TABLE employee (  id INT PRIMARY KEY,  name VARCHAR(50),  gender CHAR(1),  age INT  
);  INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);

如何安装msyql参考博客

 在ubuntu上安装mysql(在线安装需要)-CSDN博客

如何安装mysl驱动程序jar包-CSDN博客

编程代码

import org.apache.spark.sql.{SparkSession, Row}  
import java.util.Properties  
import org.apache.spark.sql.SparkSession  
import org.apache.spark.sql.Dataset  
import org.apache.spark.sql.Row  
import org.apache.spark.sql.functions.max  
import org.apache.spark.sql.functions.sum  object MySQLDataFrameExample {  def main(args: Array[String]): Unit = {  // 创建SparkSession  val spark = SparkSession.builder()  .appName("MySQL DataFrame Example")  .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  .getOrCreate()  import spark.implicits._  // 配置MySQL JDBC连接  val jdbcProperties = new Properties()  jdbcProperties.setProperty("user", "root")  jdbcProperties.setProperty("password", "mysql")  jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")  // 定义MySQL的JDBC连接URL  val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"  // 创建DataFrame以插入数据  val newEmployeeData = Seq(  (3, "Mary", "F", 26),  (4, "Tom", "M", 23)  ).toDF("id", "name", "gender", "age")  // 将DataFrame数据插入到MySQL的employee表中  newEmployeeData.write  .mode("append") // 使用append模式来添加数据,而不是覆盖  .jdbc(jdbcUrl, "employee", jdbcProperties)  // 从MySQL读取employee表的数据  val employeeDF = spark.read  .jdbc(jdbcUrl, "employee", jdbcProperties)  // 打印age的最大值  val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)  println(s"Max age: $maxAge")  // 打印age的总和  val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)  println(s"Sum of ages: $sumAge")  // 停止SparkSession  spark.stop()  }  
}

编程详细步骤参考

 如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

 运行命令

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

产生错误

主要问题都在实验三中,因为实验三中涉及到一个mysql数据库连接

命令更新为

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

加了一个mysl驱动的jar的引用

如何安装mysql驱动参考博客

如何安装mysl驱动程序jar包-CSDN博客

打包失败

这个问题是代码错误

代码未引入一些包

加上下面这些就可以了

import org.apache.spark.sql.{SparkSession, Row}  

import java.util.Properties  

import org.apache.spark.sql.SparkSession  

import org.apache.spark.sql.Dataset  

import org.apache.spark.sql.Row  

import org.apache.spark.sql.functions.max  

import org.apache.spark.sql.functions.sum  

运行失败

未引入mysl驱动程序

要下载mysql驱动

采用命令引入

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

参考链接

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4422.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

堆的介绍,实现(c语言实现)

目录 堆的概念 堆的性质: 堆的分类 父子结点的下标关系 堆的向下调整算法 ​编辑小堆 大堆 建堆 堆的向上调整算法 小堆 大堆 堆的基本操作 定义堆 初始化堆 销毁堆 打印堆 堆的插入 堆的删除 大堆(Max Heap)的向下调整算法…

CentOS系统服务器装机后常用的操作命令大全

博主猫头虎的技术世界 🌟 欢迎来到猫头虎的博客 — 探索技术的无限可能! 专栏链接: 🔗 精选专栏: 《面试题大全》 — 面试准备的宝典!《IDEA开发秘籍》 — 提升你的IDEA技能!《100天精通鸿蒙》 …

Vim 编辑器中大写键的命令

Vim 编辑器中有很多大写键的命令,这些命令通常用于执行特定的操作或进入特定的模式。 A:在当前行的末尾进入插入模式。B:向后移动一个单词。C:更改从当前光标位置到行尾的内容。进入插入模式。D:删除从当前光标位置到…

【Linux系统编程】基础指令(三)

💞💞 前言 hello hello~ ,这里是大耳朵土土垚~💖💖 ,欢迎大家点赞🥳🥳关注💥💥收藏🌹🌹🌹 💥个人主页&#x…

Redis底层数据结构之IntSet

目录 一、概述二、IntSet结构三、自动升级 redis底层数据结构已完结👏👏👏: ☑️redis底层数据结构之SDS☑️redis底层数据结构之ziplist☑️redis底层数据结构之quicklist☑️redis底层数据结构之Dict☑️redis底层数据结构之Int…

java中switch条件语句的用法、switch的三种语法、switch支持的参数类型

文章目录 一、switch的应用场景二、switch三种语法2.1、switch 标准方式2.2、switch - > 用法2.2、switch yield 用法 三、什么是case穿透?四、示例4.1、标准示例4.2、错误示例4.3、引申用法(多条件合并) 一、switch的应用场景 在分支结构…

Elasticsearch概念 使用docker安装Elasticsearch和kibana

目录 一、Elasticsearch概念 倒排索引和正向索引 正向和倒排 二、ES安装 三、安装 kibana 四、IK分词器 下载ES中文分词器 扩展或停用词条 一、Elasticsearch概念 倒排索引和正向索引 正向索引 就像在mysql数据中搜索非主键字段的内容,就需要逐条数据的去查…

WEB攻防-.NET特性常见漏洞

目录 前置知识: DLL文件 .NET和DLL文件 C#和DLL文件 关系总结 .NET 配置调试-信息泄露 .NET 源码反编译-DLL 反编译与未授权访问 编译DLL文件 反编译DLL文件 注意事项 案例: 验证代码文件有没有可以绕过(Cookie&Session&…

【C++】二叉树的进阶

二叉树的进阶 二叉搜索树概念操作实现创建树形结构拷贝构造函数构造函数析构函数赋值运算符重载循环版本查找插入删除 递归版本查找插入删除 应用K模型KV模型性能分析 二叉树进阶面试题二叉树创建字符串二叉树的分层遍历I最近公共祖先二叉搜索树与双向链表前序遍历与中序遍历构…

toFixed() 保留小数不精准,大数据计算 bignumber.js、big.js

Big.js: Big.js 是另一个类似于 Decimal.js 的高精度计算库,它也可以解决 JavaScript 浮点数计算的精度问题。 npm install big.js const Big require(big.js);const a new Big(0.1); const b new Big(0.2); const result a.plus(b); // 使用 Big.js 进行加法运…

PyCharm 无法运行的解决方案

问题: PyCharm 无法运行,该怎么办? 解决方案: 1. 检查 Python 解释器 确保已为 PyCharm 配置正确的 Python 解释器。打开 PyCharm,转到“文件”>“设置”>“项目”>“Python 解释器”。选择所需的 Python …

英语六级常用词汇2

英语六级常用词汇1 blush [blʌʃ] 脸红(名词、动词):面部因羞愧、激动或尴尬而变红。Example: She began to blush when they complimented her.翻译: 当他们称赞她时,她开始脸红。 pedal [ˈpɛdəl] 踏板&#xff…

数组和指针经典笔试题讲解下

目录 创作不易,如对您帮助,还望一键三连,谢谢!!! 题目一: 题目二: 题目三: 题目四: 题目五: 题目六: 题目七: 创作…

在 Ubuntu 下使用 clash-for-linux-backup

记录一下如何在 Ubuntu(其它带bash的Linux应该都可以)下运行Clash 有人已经制作了方便使用的脚本, 仓库为 https://github.com/Elegycloud/clash-for-linux-backup, 直接使用这个仓库就可以了. clone 这个仓库 https://github.com/Elegycloud/clash-for-linux-backup 到本地 …

震惊!某省图书馆竟然可以注册后直接访问知网并下载文章?

四川省图书馆 使用说明 1.点击进入https://portal.sclib.org/interlibSSO/main/main.jsp 显示如下: 2.关注四川省图书馆公众号并注册 3.点击馆外登录并使用刚注册的用户名密码登录 显示如下: 4.登录成功后跳转至首页并点击cnki即可正常使用

2024年最新一线互联网企业高级软件测试工程师面试题大全

1、功能测试 功能测试是游戏测试中跟“玩游戏”最相关的一个环节。 当然这里的“玩”不是要真的让你感受快乐,而是要通过“玩”游戏,发现存在的问题或不合理的地方。因此,这个“玩”的过程基本不会感受到游戏的乐趣。事实上,每一次…

BIO NIO AIO有什么区别?

通俗易懂地解释这些东西是我的风格, BIO就是阻塞io,就是一个程序在发出io请求之后不能干任何别的事,只能等待请求,不断检测io的状态,只有接受到反馈之后才能干别的事 适用场景: 用作请求少而且连接时间短的情况 NIO就是非阻塞,也就是没有阻塞,怎么没有阻塞了?说白了就是发出…

低代码+定制物资管理:创新解决方案探析

引言 在当今快速变化的商业环境中,企业面临着不断增长的挑战,如提高效率、降低成本、满足客户需求等。为了应对这些挑战,企业需要不断创新并采用先进的技术解决方案。在这样的背景下,低代码开发和定制化物资管理成为了引领企业变…

管理情绪方法中篇【三分法、整理自己的观念合理、人活着要有弹性】

是 VS 非、成功 VS 不成功 二分法 逃出二分法【二合一,三分法】:不公平才是公平,公平才是不公平 有弹性回答: 1、好像还不错 2、到时在看看,有弹性 3、没有意见,我突然想到一个意…

【3GPP】【核心网】【5G】5G核心网协议解析(四)(超详细)

1. 欢迎大家订阅和关注,精讲3GPP通信协议(2G/3G/4G/5G/IMS)知识点,专栏会持续更新中.....敬请期待! 目录 1. NGAP 按流程功能分类 1.1 接口管理过程 1.1.1 NG Setup 1.2.1 NAS消息传输过程 Transport of NAS Messa…