实验五 Spark SQL编程初级实践

Spark SQL编程初级实践

  • Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Scala语句完成下列操作:

  1. 查询所有数据;
  2. 查询所有数据,并去除重复的数据;
  3. 查询所有数据,打印时去除id字段;
  4. 筛选出age>30的记录;
  5. 将数据按age分组;
  6. 将数据按name升序排列;
  7. 取出前3行数据;
  8. 查询所有记录的name列,并为其取别名为username;
  9. 查询年龄age的平均值;
  10. 查询年龄age的最小值。

  • 编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

  • 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

表6-2 employee表原有数据

id

name

gender

Age

1

Alice

F

22

2

John

M

25

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表6-3 employee表新增数据

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

实验一 :Spark SQL基本操作

1)
// 导入必要的库
import org.apache.spark.sql.SparkSession// 创建SparkSession
val spark = SparkSession.builder().appName("Spark SQL Basic Operations").getOrCreate()// 读取JSON文件创建DataFrameval df = spark.read.json("file:///home/hadoop/employee.json")// (1) 查询所有数据
df.show()
(2)查询所有数据,并去除重复的数据
df.distinct().show()(3)
查询所有数据,打印时去除id字段
df.drop("id").show()(4)
筛选出age>30的记录
df.filter("age > 30").show()(5)
将数据按age分组
df.groupBy("age").count().show()(6)
将数据按name升序排列
df.orderBy("name").show()(7)
取出前3行数据
df.limit(3).show()(8)
查询所有记录的name列,并为其取别名为username
df.select($"name".alias("username")).show()(9)
查询年龄age的平均值
df.selectExpr("avg(age)").show()(10)
查询年龄age的最小值
df.selectExpr("min(age)").show()

实验二 :编程实现将RDD转换为DataFrame

编程代码:

import org.apache.spark.sql.{SparkSession, Row}  
import org.apache.spark.sql.types._  object RDDToDataFrameExample {  def main(args: Array[String]): Unit = {  // 创建SparkSession  val spark = SparkSession.builder()  .appName("RDD to DataFrame Example")  .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  .getOrCreate()  import spark.implicits._  // 指定employee.txt文件的位置  val inputFilePath = "file:///home/hadoop/employee.txt"  // 从文本文件读取数据创建RDD  val rdd = spark.sparkContext.textFile(inputFilePath)  // 定义DataFrame的schema  val schema = StructType(Array(  StructField("id", IntegerType, nullable = false),  StructField("name", StringType, nullable = false),  StructField("age", IntegerType, nullable = false)  ))  // 将RDD转换为DataFrame  val dataFrame = spark.createDataFrame(rdd.map { line =>  val parts = line.split(",")  Row(parts(0).toInt, parts(1), parts(2).toInt)  }, schema)  // 显示DataFrame内容  dataFrame.show(false)  // 按照指定格式打印所有数据  dataFrame.collect().foreach { row =>  println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")  }  // 停止SparkSession  spark.stop()  }  
}

 命令

/usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

 具体操作参考博客

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

实验三:编程实现利用DataFrame读写MySQL的数据

mysql代码

CREATE DATABASE sparktest;  
USE sparktest;  CREATE TABLE employee (  id INT PRIMARY KEY,  name VARCHAR(50),  gender CHAR(1),  age INT  
);  INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);

如何安装msyql参考博客

 在ubuntu上安装mysql(在线安装需要)-CSDN博客

如何安装mysl驱动程序jar包-CSDN博客

编程代码

import org.apache.spark.sql.{SparkSession, Row}  
import java.util.Properties  
import org.apache.spark.sql.SparkSession  
import org.apache.spark.sql.Dataset  
import org.apache.spark.sql.Row  
import org.apache.spark.sql.functions.max  
import org.apache.spark.sql.functions.sum  object MySQLDataFrameExample {  def main(args: Array[String]): Unit = {  // 创建SparkSession  val spark = SparkSession.builder()  .appName("MySQL DataFrame Example")  .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  .getOrCreate()  import spark.implicits._  // 配置MySQL JDBC连接  val jdbcProperties = new Properties()  jdbcProperties.setProperty("user", "root")  jdbcProperties.setProperty("password", "mysql")  jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")  // 定义MySQL的JDBC连接URL  val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"  // 创建DataFrame以插入数据  val newEmployeeData = Seq(  (3, "Mary", "F", 26),  (4, "Tom", "M", 23)  ).toDF("id", "name", "gender", "age")  // 将DataFrame数据插入到MySQL的employee表中  newEmployeeData.write  .mode("append") // 使用append模式来添加数据,而不是覆盖  .jdbc(jdbcUrl, "employee", jdbcProperties)  // 从MySQL读取employee表的数据  val employeeDF = spark.read  .jdbc(jdbcUrl, "employee", jdbcProperties)  // 打印age的最大值  val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)  println(s"Max age: $maxAge")  // 打印age的总和  val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)  println(s"Sum of ages: $sumAge")  // 停止SparkSession  spark.stop()  }  
}

编程详细步骤参考

 如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

 运行命令

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

产生错误

主要问题都在实验三中,因为实验三中涉及到一个mysql数据库连接

命令更新为

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

加了一个mysl驱动的jar的引用

如何安装mysql驱动参考博客

如何安装mysl驱动程序jar包-CSDN博客

打包失败

这个问题是代码错误

代码未引入一些包

加上下面这些就可以了

import org.apache.spark.sql.{SparkSession, Row}  

import java.util.Properties  

import org.apache.spark.sql.SparkSession  

import org.apache.spark.sql.Dataset  

import org.apache.spark.sql.Row  

import org.apache.spark.sql.functions.max  

import org.apache.spark.sql.functions.sum  

运行失败

未引入mysl驱动程序

要下载mysql驱动

采用命令引入

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

参考链接

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4422.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

堆的介绍,实现(c语言实现)

目录 堆的概念 堆的性质: 堆的分类 父子结点的下标关系 堆的向下调整算法 ​编辑小堆 大堆 建堆 堆的向上调整算法 小堆 大堆 堆的基本操作 定义堆 初始化堆 销毁堆 打印堆 堆的插入 堆的删除 大堆(Max Heap)的向下调整算法…

CentOS系统服务器装机后常用的操作命令大全

博主猫头虎的技术世界 🌟 欢迎来到猫头虎的博客 — 探索技术的无限可能! 专栏链接: 🔗 精选专栏: 《面试题大全》 — 面试准备的宝典!《IDEA开发秘籍》 — 提升你的IDEA技能!《100天精通鸿蒙》 …

【Linux系统编程】基础指令(三)

💞💞 前言 hello hello~ ,这里是大耳朵土土垚~💖💖 ,欢迎大家点赞🥳🥳关注💥💥收藏🌹🌹🌹 💥个人主页&#x…

Redis底层数据结构之IntSet

目录 一、概述二、IntSet结构三、自动升级 redis底层数据结构已完结👏👏👏: ☑️redis底层数据结构之SDS☑️redis底层数据结构之ziplist☑️redis底层数据结构之quicklist☑️redis底层数据结构之Dict☑️redis底层数据结构之Int…

java中switch条件语句的用法、switch的三种语法、switch支持的参数类型

文章目录 一、switch的应用场景二、switch三种语法2.1、switch 标准方式2.2、switch - > 用法2.2、switch yield 用法 三、什么是case穿透?四、示例4.1、标准示例4.2、错误示例4.3、引申用法(多条件合并) 一、switch的应用场景 在分支结构…

Elasticsearch概念 使用docker安装Elasticsearch和kibana

目录 一、Elasticsearch概念 倒排索引和正向索引 正向和倒排 二、ES安装 三、安装 kibana 四、IK分词器 下载ES中文分词器 扩展或停用词条 一、Elasticsearch概念 倒排索引和正向索引 正向索引 就像在mysql数据中搜索非主键字段的内容,就需要逐条数据的去查…

WEB攻防-.NET特性常见漏洞

目录 前置知识: DLL文件 .NET和DLL文件 C#和DLL文件 关系总结 .NET 配置调试-信息泄露 .NET 源码反编译-DLL 反编译与未授权访问 编译DLL文件 反编译DLL文件 注意事项 案例: 验证代码文件有没有可以绕过(Cookie&Session&…

【C++】二叉树的进阶

二叉树的进阶 二叉搜索树概念操作实现创建树形结构拷贝构造函数构造函数析构函数赋值运算符重载循环版本查找插入删除 递归版本查找插入删除 应用K模型KV模型性能分析 二叉树进阶面试题二叉树创建字符串二叉树的分层遍历I最近公共祖先二叉搜索树与双向链表前序遍历与中序遍历构…

PyCharm 无法运行的解决方案

问题: PyCharm 无法运行,该怎么办? 解决方案: 1. 检查 Python 解释器 确保已为 PyCharm 配置正确的 Python 解释器。打开 PyCharm,转到“文件”>“设置”>“项目”>“Python 解释器”。选择所需的 Python …

数组和指针经典笔试题讲解下

目录 创作不易,如对您帮助,还望一键三连,谢谢!!! 题目一: 题目二: 题目三: 题目四: 题目五: 题目六: 题目七: 创作…

震惊!某省图书馆竟然可以注册后直接访问知网并下载文章?

四川省图书馆 使用说明 1.点击进入https://portal.sclib.org/interlibSSO/main/main.jsp 显示如下: 2.关注四川省图书馆公众号并注册 3.点击馆外登录并使用刚注册的用户名密码登录 显示如下: 4.登录成功后跳转至首页并点击cnki即可正常使用

2024年最新一线互联网企业高级软件测试工程师面试题大全

1、功能测试 功能测试是游戏测试中跟“玩游戏”最相关的一个环节。 当然这里的“玩”不是要真的让你感受快乐,而是要通过“玩”游戏,发现存在的问题或不合理的地方。因此,这个“玩”的过程基本不会感受到游戏的乐趣。事实上,每一次…

低代码+定制物资管理:创新解决方案探析

引言 在当今快速变化的商业环境中,企业面临着不断增长的挑战,如提高效率、降低成本、满足客户需求等。为了应对这些挑战,企业需要不断创新并采用先进的技术解决方案。在这样的背景下,低代码开发和定制化物资管理成为了引领企业变…

【3GPP】【核心网】【5G】5G核心网协议解析(四)(超详细)

1. 欢迎大家订阅和关注,精讲3GPP通信协议(2G/3G/4G/5G/IMS)知识点,专栏会持续更新中.....敬请期待! 目录 1. NGAP 按流程功能分类 1.1 接口管理过程 1.1.1 NG Setup 1.2.1 NAS消息传输过程 Transport of NAS Messa…

Android 14设置android:importantForAutofill=“no“无效

密码输入框EditText不希望弹出Google的是否保存密码弹出框, 直接设置了android:importantForAutofill"no", android:inputType"textPassword|textNoSuggestions"在安卓12手机上有效,但是在安卓14上面就不行了&#xff0…

记一次JSON.toJSONString()转换时非属性方法空指针异常排查及toJSONString保留null值属性

记一次JSON.toJSONString()转换时非属性方法空指针异常排查及toJSONString保留null值属性 异常详情 有一个类,里面有两个属性和一个类似工具的getRealName()方法如下: getRealName()方法就是获取这个人的真实名字,如果获取不到就以name返回…

堆的介绍,实现,使用(c语言实现)

目录 堆的概念 堆的性质: 堆的分类 父子结点的下标关系 堆的向下调整算法 ​编辑小堆 大堆 建堆 堆的向上调整算法 小堆 大堆 堆的基本操作 定义堆 初始化堆 销毁堆 打印堆 堆的插入 堆的删除 大堆(Max Heap)的向下调整算法…

Java 设计模式(上)

目录 一、单一职责原则 二、开闭原则 三、里氏替换原则 四、迪米特法则 五、接口隔离原则 六、依赖倒置原则 七、工厂方法 八、抽象工厂 九、建造者模式 十、原型模式 十一、单例模式 十二、适配器模式 一、单一职责原则 单一职责原则又称单一功能原则,…

回溯-单词搜索

给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中,返回 true ;否则,返回 false 。 单词必须按照字母顺序,通过相邻的单元格内的字母构成,其中“相邻”单元格是那些水平相邻或垂直相…

SpringMVC深解--一起学习吧之架构

SpringMVC的工作原理主要基于请求驱动,它采用了前端控制器模式来进行设计。以下是SpringMVC工作原理的详细解释: 请求接收与分发: 当用户发送一个请求到Web服务器时,这个请求首先会被SpringMVC的前端控制器(Dispatche…