Spark编程实验三:Spark SQL编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":sc = SparkContext("local","Simple App")spark=SparkSession(sc)peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()rowRDD.createOrReplaceTempView("employee")personsDF = spark.sql("select * from employee")personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579062.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024免费的数据恢复软件EasyRecovery14自己操作就能恢复的方法

而今天小编为大家还是带来了同系列软件easyrecovery14,这是easyrecovery数据恢复软件中的技术员版本,不仅包含家庭版和专业版的所有功能,而且还旨在简化技术人员的数据恢复过程。软件拥有强大的数据恢复功能,支持使用的恢复场景有…

KNN与KD树博客总结

目录 总结小结: 总结 原始篇:KNN算法及其优缺点算法思想改进篇:KD树(KNN的plus版算法实现第一篇:平衡二叉树的构建(递归算法实现第二篇:KD树的构建(递归算法实现第三篇:…

CentOS 7 设置网络

CentOS 7 设置网络 正常情况 ①登陆进去之后使用下面的命令修改文件 echo ONBOOTyes >> /etc/sysconfig/network-scripts/ifcfg-ens33②如果是虚拟机重启后使用如下命令进行查看IP地址 ip addr注:到这里如果显示有两部分,则代表网络设置成功&a…

华为设备VRP系统管理

为了满足企业业务对网络的需求,网络设备中的系统文件需要不断进行升级。另外,网络设备中的配置文件也需要时常进行备份,以防设备故障或其他灾害给业务带来损害。在升级和备份系统文件或配置文件时,经常会使用FTP和TFTP来传输文件。…

服务器系统时间不同步如何处理

在分布式计算环境中,服务器系统时间的同步至关重要。然而,由于各种原因,服务器系统时间不同步的问题时有发生,这可能会导致严重的问题,如日志不准确、证书验证失败等。下面我们可以一起探讨下造成服务器系统时间不同的原因以及解决…

【Vue2+3入门到实战】(5)Vue基础之Computed计算属性 详细示例

目录 一、今日学习目标1.computed计算属性 二、computed计算属性1.概念2.语法3.注意4.案例5.代码准备 三、computed计算属性 VS methods方法1.computed计算属性2.methods计算属性3.计算属性的优势4.总结 四、计算属性的完整写法五、综合案例-成绩案例六、Computed计算属性总结 …

揭秘Pod状态与生命周期管理的秘密(中)

上一篇文章中主要介绍了Pod的基础概念与使用、删除。本文将带你一起学习Pod的几种容器(Init、Pause) 点击 这里 可以查看所有相关文章。 Init 容器 本文讲解 Init 容器的基本概念,这是一种专用的容器,在应用程序容器启动之前运行,用来包含…

住宅代理妙用:网络抓取的必备工具

什么是住宅代理? 要准确理解什么是住宅代理,首先需要了解什么是住宅IP。IP 地址是连接到网络时分配给单个设备的唯一标识符。这允许设备或端点直接相互通信,而无需跨线。 住宅IP是指分配给特定设备(例如计算机、手机、平板电脑等…

新版IDEA中Git的使用(二)

说明:前面介绍了在新版IDEA中Git的基本操作,本文介绍关于分支合并、拉取等操作; 例如,现在有一个项目,分支如下: main:主分支; dev:开发分支; test&#x…

CNVD原创漏洞审核和处理流程

一、CNVD原创漏洞审核归档和发布主流程 (一)审核和归档流程 审核流程分为一级、二级、三级审核,其中一级审核主要对提交的漏洞信息完整性进行审核,漏洞符合可验证(通用型漏洞有验证代码信息或多个互联网实例、事件型…

k8s的二进制部署1

k8s的二进制部署:源码包部署 k8smaster01:192.168.176.61 kube-apiserver kube-controller-manager kube-scheduler etcd k8smaster01:192.168.176.62 kube-apiserver kube-controller-manager kube-scheduler node节点01:192.…

promise的使用和实例方法

前言 异步,是任何编程都无法回避的话题。在promise出现之前,js中也有处理异步的方案,不过还没有专门的api能去处理链式的异步操作。所以,当大量的异步任务逐个执行,就变成了传说中的回调地狱。 function asyncFn(fn1, fn2, fn3) {setTimeout(() > {//处理第一个异步任务fn1…

element步骤条<el-steps>使用具名插槽自定义

element步骤条使用具名插槽自定义 步骤条使用具名插槽: <el-steps direction"vertical" :active"1"><el-step><template slot"description">//在此处可以写你的插槽内容</template>/el-step> </el-steps>步骤…

STM32 cubeMX 直流电机控制风扇转动

本文使用的是 HAL 库。 文章目录 前言一、直流电机介绍二、直流电机原理图三、直流电机控制方法四、STM32CubeMX 配置直流电机五、代码编写总结 前言 实验开发板&#xff1a;STM32F051K8。所需软件&#xff1a;keil5 &#xff0c; cubeMX 。实验目的&#xff1a;了解 直流电机…

远程宠物喂食系统:开启宠物护理新纪元

智能化远程宠物喂食系统正在革新宠物业&#xff0c;宠物用品店、宠物托管服务商&#xff0c;宠物健康检测机构、宠物社区运营方等相关企业可能将因此取得更高效的运营&#xff0c;更深入的客户关系。这个系统不仅增强了宠物行业的智能化元素&#xff0c;还通过其精确和有效的管…

软件测试自学还是报班好?

如果你学软件测试&#xff0c;是以就业为目的&#xff0c;而且是以高薪就业为目的&#xff0c;那我们就要去反推&#xff0c;为了这个目标&#xff0c;我们要去做什么事情。 为了“将高薪就业为目的&#xff0c;我们要做什么事情”阐述清楚&#xff0c;本文行文结构如下&#x…

并发代码中的错误处理挑战

克服并发编程中的复杂性 并发编程可能是增加软件系统效率和响应性的强大技术。它使多个工作负载能够同时运行&#xff0c;充分利用了现代多核CPU。然而&#xff0c;强大的能力伴随着巨大的责任&#xff0c;良好的错误管理是并发编程中的主要任务之一。 并发代码的复杂性 并发…

找不到UnityEngine.UI解决方案

重新安装Visual Studio 后&#xff0c;使用unity找不到UnityEngine.UI解决方案 关键是在unity里需要设置一下 这个路径 C:\Program Files\Microsoft Visual Studio\2022\Community\Common7\IDE

【centos】【golang】安装golang

下载安装包 方法1&#xff1a; 打开 https://go.dev/dl/ &#xff1b;点击下载golang的安装包&#xff1b;再使用ssh传到centos上&#xff08;略&#xff09; 方法2&#xff1a;能使用Google就可以这样 wget https://dl.google.com/go/go1.21.5.linux-amd64.tar.gz解压安装包…

快速实现Modbus和Profinet互转的方案

为了快速实现将Modbus信号转换为Profinet信号的畅通无阻&#xff0c;我们可以使用Modbus转Profinet网关&#xff08;XD-MDPN100/200&#xff09;。Modbus转Profinet网关&#xff08;XD-MDPN100/200&#xff09;可以实现快速的协议转换&#xff0c;将Modbus信号转换为Profinet信…