Spark编程实验三:Spark SQL编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":sc = SparkContext("local","Simple App")spark=SparkSession(sc)peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()rowRDD.createOrReplaceTempView("employee")personsDF = spark.sql("select * from employee")personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579062.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第二十七天 | 回溯算法part4

力扣题目 用时&#xff1a;未知 1、93.复原IP地址 2、78.子集 3、90.子集II 力扣题目记录 93.复原IP地址 这个题和上一个题差不多&#xff0c;只是4段数字&#xff0c;只有三个句点&#xff0c;所以第四段要单独处理判断子串是否合法 class Solution { private:vector<s…

Qt/QML编程学习之心得:实现一个图片浏览器(十八)

QML中有个重要控件,经常使用就是image,通常可以用它来显示一张图片。如果想结合openfiledialog来让image显示图片,也就是做一个简易的图片浏览器,怎么弄呢? DefaultFileDialog.qml: import QtQuick 2.0 import QtQuick.Dialogs 1.0FileDialog {id: fileDialogtitle: &qu…

2024免费的数据恢复软件EasyRecovery14自己操作就能恢复的方法

而今天小编为大家还是带来了同系列软件easyrecovery14&#xff0c;这是easyrecovery数据恢复软件中的技术员版本&#xff0c;不仅包含家庭版和专业版的所有功能&#xff0c;而且还旨在简化技术人员的数据恢复过程。软件拥有强大的数据恢复功能&#xff0c;支持使用的恢复场景有…

KNN与KD树博客总结

目录 总结小结&#xff1a; 总结 原始篇&#xff1a;KNN算法及其优缺点算法思想改进篇&#xff1a;KD树&#xff08;KNN的plus版算法实现第一篇&#xff1a;平衡二叉树的构建&#xff08;递归算法实现第二篇&#xff1a;KD树的构建&#xff08;递归算法实现第三篇&#xff1a;…

CentOS 7 设置网络

CentOS 7 设置网络 正常情况 ①登陆进去之后使用下面的命令修改文件 echo ONBOOTyes >> /etc/sysconfig/network-scripts/ifcfg-ens33②如果是虚拟机重启后使用如下命令进行查看IP地址 ip addr注&#xff1a;到这里如果显示有两部分&#xff0c;则代表网络设置成功&a…

华为设备VRP系统管理

为了满足企业业务对网络的需求&#xff0c;网络设备中的系统文件需要不断进行升级。另外&#xff0c;网络设备中的配置文件也需要时常进行备份&#xff0c;以防设备故障或其他灾害给业务带来损害。在升级和备份系统文件或配置文件时&#xff0c;经常会使用FTP和TFTP来传输文件。…

服务器系统时间不同步如何处理

在分布式计算环境中&#xff0c;服务器系统时间的同步至关重要。然而&#xff0c;由于各种原因&#xff0c;服务器系统时间不同步的问题时有发生,这可能会导致严重的问题&#xff0c;如日志不准确、证书验证失败等。下面我们可以一起探讨下造成服务器系统时间不同的原因以及解决…

【Vue2+3入门到实战】(5)Vue基础之Computed计算属性 详细示例

目录 一、今日学习目标1.computed计算属性 二、computed计算属性1.概念2.语法3.注意4.案例5.代码准备 三、computed计算属性 VS methods方法1.computed计算属性2.methods计算属性3.计算属性的优势4.总结 四、计算属性的完整写法五、综合案例-成绩案例六、Computed计算属性总结 …

【测试开发】测试分类相关知识

文章目录 目录 文章目录 前言 一、测试分类 1.按照测试对象划分&#xff1a; 界面测试&#xff1a; 可靠性测试&#xff1a; 容错性测试&#xff1a; 文档测试 &#xff1a; 兼容性测试 &#xff1a; 易用性测试 &#xff1a; 安装卸载测试 &#xff1a; 安全测试 &#xff1a;…

揭秘Pod状态与生命周期管理的秘密(中)

上一篇文章中主要介绍了Pod的基础概念与使用、删除。本文将带你一起学习Pod的几种容器(Init、Pause) 点击 这里 可以查看所有相关文章。 Init 容器 本文讲解 Init 容器的基本概念&#xff0c;这是一种专用的容器&#xff0c;在应用程序容器启动之前运行&#xff0c;用来包含…

Spring Boot 3.2 新特性之 HTTP Interface

SpringBoot 3.2引入了新的 HTTP interface 用于http接口调用&#xff0c;采用了类似 openfeign 的风格。 具体的代码参照 示例项目 https://github.com/qihaiyan/springcamp/tree/master/spring-http-interface 一、概述 HTTP Interface 是一个类似于 openfeign 的同步接口调…

住宅代理妙用:网络抓取的必备工具

什么是住宅代理&#xff1f; 要准确理解什么是住宅代理&#xff0c;首先需要了解什么是住宅IP。IP 地址是连接到网络时分配给单个设备的唯一标识符。这允许设备或端点直接相互通信&#xff0c;而无需跨线。 住宅IP是指分配给特定设备&#xff08;例如计算机、手机、平板电脑等…

新版IDEA中Git的使用(二)

说明&#xff1a;前面介绍了在新版IDEA中Git的基本操作&#xff0c;本文介绍关于分支合并、拉取等操作&#xff1b; 例如&#xff0c;现在有一个项目&#xff0c;分支如下&#xff1a; main&#xff1a;主分支&#xff1b; dev&#xff1a;开发分支&#xff1b; test&#x…

CNVD原创漏洞审核和处理流程

一、CNVD原创漏洞审核归档和发布主流程 &#xff08;一&#xff09;审核和归档流程 审核流程分为一级、二级、三级审核&#xff0c;其中一级审核主要对提交的漏洞信息完整性进行审核&#xff0c;漏洞符合可验证&#xff08;通用型漏洞有验证代码信息或多个互联网实例、事件型…

CCRC信息安全认证指什么?

CCRC信息安全认证&#xff0c;全称为中国网络安全审查技术与认证中心&#xff08;China Information Security Certification Center&#xff09;认证&#xff0c;它是中国信息安全领域的一项权威资质认证。该认证由中央机构编制委员会办公室批准成立&#xff0c;隶属于国家市场…

k8s的二进制部署1

k8s的二进制部署&#xff1a;源码包部署 k8smaster01&#xff1a;192.168.176.61 kube-apiserver kube-controller-manager kube-scheduler etcd k8smaster01&#xff1a;192.168.176.62 kube-apiserver kube-controller-manager kube-scheduler node节点01&#xff1a;192.…

promise的使用和实例方法

前言 异步,是任何编程都无法回避的话题。在promise出现之前,js中也有处理异步的方案,不过还没有专门的api能去处理链式的异步操作。所以,当大量的异步任务逐个执行,就变成了传说中的回调地狱。 function asyncFn(fn1, fn2, fn3) {setTimeout(() > {//处理第一个异步任务fn1…

WPF RelativeSource

RelativeSource 类在 WPF 中提供了以下几种模式&#xff1a; RelativeSource Self&#xff1a;指定当前元素作为相对源。可以在当前元素的属性中绑定到自身的属性。 示例&#xff1a; <TextBlock Text"{Binding Text, RelativeSource{RelativeSource Self}}" /&…

element步骤条<el-steps>使用具名插槽自定义

element步骤条使用具名插槽自定义 步骤条使用具名插槽: <el-steps direction"vertical" :active"1"><el-step><template slot"description">//在此处可以写你的插槽内容</template>/el-step> </el-steps>步骤…

STM32 cubeMX 直流电机控制风扇转动

本文使用的是 HAL 库。 文章目录 前言一、直流电机介绍二、直流电机原理图三、直流电机控制方法四、STM32CubeMX 配置直流电机五、代码编写总结 前言 实验开发板&#xff1a;STM32F051K8。所需软件&#xff1a;keil5 &#xff0c; cubeMX 。实验目的&#xff1a;了解 直流电机…