有没有专门做字体排版设的网站/营销方案网站

有没有专门做字体排版设的网站,营销方案网站,凡客优品家居,51模板网ppt大数据分析与应用实验任务十 实验目的: 通过实验掌握spark SQL的基本编程方法; 熟悉RDD到DataFrame的转化方法; 通过实验熟悉spark SQL管理不同数据源的方法。 实验任务: 进入pyspark实验环境,在桌面环境打开jup…

大数据分析与应用实验任务十

实验目的:

  • 通过实验掌握spark SQL的基本编程方法;

  • 熟悉RDD到DataFrame的转化方法;

  • 通过实验熟悉spark SQL管理不同数据源的方法。

实验任务:

进入pyspark实验环境,在桌面环境打开jupyter notebook,或者打开命令行窗口,输入pyspark,完成下列任务:

实验一、参考教材5.3-5.6节各个例程编写代码,逐行理解并运行。
1. DataFrame 的创建

在编写独立应用程序时,可以通过如下语句创建一个 SparkSession 对象:

from pyspark import SparkContext,SparkConf 
from pyspark.sql import SparkSession 
sparklzy = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

读取在“/usr/local/spark/examples/src/main/resources/”目录下的样例数据 people.json

dfluozhongye = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
dfluozhongye.show()

image-20231130112410232

2. DataFrame 的保存
peopleDFlzy = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json") peopleDFlzy.select("name", "age").write.format("json").save("file:///root/Desktop/luozhongye/newpeople.json")peopleDFlzy.select("name").write.format("text").save("file:///root/Desktop/luozhongye/newpeople.txt")

image-20231130112652757

如果要再次读取 newpeople.json 中的数据生成 DataFrame,可以直接使用 newpeople.json 目录名称,而不需要使用 part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json 文件(当然,使用这个文件也可以),代码如下:

peopleDFlzy = spark.read.format("json").load("file:///root/Desktop/luozhongye/newpeople.json") 
peopleDFlzy.show()

image-20231130112748692

3. DataFrame 的常用操作

创建好DataFrame以后,可以执行一些常用的DataFrame操作,包括printSchema()、select()、filter()、groupBy()和 sort()等。在执行这些操作之前,先创建一个 DataFrame:

dflzy=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
(1) printSchema()

可以使用 printSchema()操作打印出 DataFrame 的模式(Schema)信息

dflzy.printSchema()

image-20231130112956579

(2) select()

select()操作的功能是从 DataFrame 中选取部分列的数据。

# select()操作选取了 name和 age 这两个列,并且把 age 这个列的值增加 1。
dflzy.select(dflzy['name'],dflzy['age']+1,).show()

image-20231130113032027

(3) filter()

filter()操作可以实现条件查询,找到满足条件要求的记录。

# 用于查询所有 age 字段的值大于 20 的记录。
dflzy.filter(dflzy["age"]>20)

image-20231130113111358

(4) groupBy()

groupBy()操作用于对记录进行分组。

# 根据 age 字段进行分组,并对每个分组中包含的记录数量进行统计
dflzy.groupBy("age").count().show()

image-20231130113200672

(5) sort()

sort()操作用于对记录进行排序。

# 表示根据 age 字段进行降序排序;
dflzy.sort(dflzy["age"].desc()).show()
# 表示根据 age 字段进行降序排序,当 age 字段的值相同时,再根据 name 字段的值进行升序排序
dflzy.sort(dflzy["age"].desc(),dflzy["name"].asc()).show()

image-20231130113655460

4. 从 RDD 转换得到 DataFrame
(1) 利用反射机制推断 RDD 模式

把 “/usr/local/spark/examples/src/main/resources/”目录下的people.txt 加载到内存中生成一个 DataFrame,并查询其中的数据。完整的代码及其执行过程如下:

from pyspark.sql import Rowpeople = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line: line.split(",")).map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询使用
schemaPeople.createOrReplaceTempView("people")
personsDF = spark.sql("select name,age from people where age > 20")
# DataFrame 中的每个元素都是一行记录,包含 name 和 age 两个字段,分别用 p.name 和 p.age 来获取值
personsRDD = personsDF.rdd.map(lambda p: "Name: " + p.name + "," + "Age: " + str(p.age))
personsRDD.foreach(print)

image-20231130113902840

(2)使用编程方式定义 RDD 模式

利用 Spark SQL 查询 people.txt

from pyspark.sql.types import *
from pyspark.sql import Row# 下面生成“表头”
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name inschemaString.split(" ")]
schema = StructType(fields)
# 下面生成“表中的记录
lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda x: x.split(","))
people = parts.map(lambda p: Row(p[0], p[1].strip()))
# 下面把“表头”和“表中的记录”拼装在一起
schemaPeople = spark.createDataFrame(people, schema)
# 注册一个临时表供后面的查询使用
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name,age FROM people")
results.show()

image-20231130114042174

实验二、完成p113页实验内容第1题(spark SQL基本操作),另注意自行修改题目中的数据。
1. Spark SQL 基本操作

将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1 , "name":" Ella" , "age":36 } 
{ "id":2, "name":"Bob","age":29 } 
{ "id":3 , "name":"Jack","age":29 } 
{ "id":4 , "name":"Jim","age":28 } 
{ "id":4 , "name":"Jim","age":28 } 
{ "id":5 , "name":"Damon" } 
{ "id":5 , "name":"Damon" }
{ "id":6 , "name":"罗忠烨" }

为 employee.json 创建 DataFrame,并编写 Python 语句完成下列操作:

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("SparkSQLBasicOperations").getOrCreate()# 读取 JSON 文件并创建 DataFrame
employee_dflzy = spark.read.json("/root/Desktop/luozhongye/employee.json")

(1)查询所有数据;

employee_dflzy.show()

image-20231130114320725

(2)查询所有数据,并去除重复的数据;

employee_dflzy.dropDuplicates().show()

image-20231130114352878

(3)查询所有数据,打印时去除 id 字段;

employee_dflzy.select("name", "age").show()

image-20231130114426647

(4)筛选出 age>30 的记录;

employee_dflzy.filter(employee_dflzy["age"] > 30).show()

image-20231130114455985

(5)将数据按 age 分组;

employee_dflzy.groupBy("age").count().show()

image-20231130114520305

(6)将数据按 name 升序排列;

employee_dflzy.orderBy("name").show()

image-20231130114547358

(7)取出前 3 行数据;

employee_dflzy.limit(3).show()

image-20231130114612796

(8)查询所有记录的 name 列,并为其取别名为 username;

employee_dflzy.select("name").withColumnRenamed("name", "username").show()

image-20231130114635538

(9)查询年龄 age 的平均值;

employee_dflzy.agg({"age": "avg"}).show()

image-20231130114703990

(10)查询年龄 age 的最小值。

employee_dflzy.agg({"age": "min"}).show()

image-20231130114736776

(11)停止 SparkSession

spark.stop()
2. 编程实现将 RDD 转换为 DataFrame

源文件employee.txt内容如下(包含 id,name,age):

1,Ella,36 
2,Bob,29 
3,Jack,29

实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 创建 SparkSession
spark = SparkSession.builder.appName("RDDtoDataFrame").getOrCreate()# 读取文本文件并创建 RDD
rdd = spark.sparkContext.textFile("/root/Desktop/luozhongye/employee.txt")# 定义数据模式
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])# 将 RDD 转换为 DataFrame
employee_df = rdd.map(lambda line: line.split(",")).map(lambda x: (int(x[0]), x[1], int(x[2]))).toDF(schema=schema)# 打印 DataFrame 的所有数据
employee_df.show(truncate=False)# 停止 SparkSession
spark.stop()

image-20231130120104859

3. 编程实现利用 DataFrame 读写 MySQL 的数据
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee, 包含下表 所示的两行数据。
idnamegenderage
1AliceF22
2JohnM25
-- 创建数据库
CREATE DATABASE IF NOT EXISTS sparktest;-- 切换到 sparktest 数据库
USE sparktest;-- 创建 employee 表
CREATE TABLE IF NOT EXISTS employee (id INT PRIMARY KEY,name VARCHAR(255),gender CHAR(1),age INT
);-- 插入数据
INSERT INTO employee VALUES (1, 'Alice', 'F', 22), (2, 'John', 'M', 25);
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入表 5-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
idnamegenderage
3MaryF26
4TomM23
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame# 创建 SparkSession
#"/path/to/mysql-connector-java-x.x.xx.jar":实际的 MySQL Connector/J JAR 文件路径。
spark = SparkSession.builder.appName("MySQLDataFrame").config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar" 
).getOrCreate()# 读取数据到 DataFrame
employee_data = [(3, 'Mary', 'F', 26), (4, 'Tom', 'M', 23)]
columns = ["id", "name", "gender", "age"]
new_data_df = spark.createDataFrame(employee_data, columns)# 配置 MySQL 连接信息
mysql_url = "jdbc:mysql://localhost:3306/sparktest"
mysql_properties = {"user": "your_username",# 实际的 MySQL 用户名"password": "your_password",# 实际的 MySQL 密码"driver": "com.mysql.cj.jdbc.Driver"
}# 将数据写入 MySQL 表
new_data_df.write.jdbc(url=mysql_url, table="employee", mode="append", properties=mysql_properties)# 从 MySQL 中读取数据到 DataFrame
employee_df = spark.read.jdbc(url=mysql_url, table="employee", properties=mysql_properties)# 打印 DataFrame 的所有数据
employee_df.show()# 打印 age 的最大值和总和
employee_df.agg({"age": "max", "age": "sum"}).show()# 停止 SparkSession
spark.stop()

本文结束欢迎点赞,收藏,有问题可以在评论区讨论!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/184687.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux:docker镜像的创建(5)

1.基于已有镜像创建 步骤: 1.将原始镜像加入容器并运行 2.在原始镜像中部署各种服务 3.退出容器 4.使用下面命令将容器生成新的镜像 现在我们在这个容器里做了一些配置,我们要把他做成自己镜像 docker commit -m "centos7_123" -a "tarr…

20. Matplotlib 数据可视化

目录 1. 简介2. Matplotlib 开发环境2.1 画图2.2 画图接口2.4 线形图2.5 散点图2.6 等高线图2.7 直方图 1. 简介 Matplotlib网址:https://matplotlib.org/ 数据可视化是数据分析中最重要的工作之一。Matploblib是建立在Numpy数组基础上的多平台数据可视化程序库&a…

PostgreSQL 分区表插入数据及报错:子表明明存在却报不存在以及column “xxx“ does not exist 解决方法

PostgreSQL 分区表插入数据及报错:子表明明存在却报不存在以及column “xxx“ does not exist 解决方法 问题1. 分区表需要先创建子表在插入,创建子表立马插入后可能会报错子表不存在;解决: 创建子表及索引后,sleep10毫…

【JavaWeb】会话过滤器监听器

会话&过滤器&监听器 文章目录 会话&过滤器&监听器一、会话1.1 Cookie1.2 Session1.3 三大域对象 二、过滤器三、监听器3.1 application域监听器3.2 session域监听器3.3 request域监听器3.4 session域的两个特殊监听器3.4.1 session绑定监听器3.4.2 钝化活化监听…

医院电子病历编辑器源码(支持云端SaaS服务)

电子病历系统基于云端SaaS服务的方式,采用B/S(Browser/Server)架构提供,采用前后端分离模式开发和部署。使用用户通过浏览器即能访问,无需关注系统的部署、维护、升级等问题,系统充分考虑了模板化、 配置化…

树与二叉树堆:堆的意义

目录 堆的意义: 第一是堆的排序,第二是堆的top k 排行问题 堆的 top k 排行问题: 面对大量数据的top k 问题: 堆排序的实现:——以升序为例 方法一 交换首尾: 建立大堆: 根结点尾结点的…

express+mySql实现用户注册、登录和身份认证

expressmySql实现用户注册、登录和身份认证 注册 注册时需要对用户密码进行加密入库,提高账户的安全性。用户登录时再将密码以相同的方式进行加密,再与数据库中存储的密码进行比对,相同则表示登录成功。 安装加密依赖包bcryptjs cnpm insta…

CompletableFuture详解

目录 介绍 Future介绍 CompletableFuture介绍 CompletableFuture常用的API介绍 常用的静态方法源码解析 runAsync 源码 案例 结果 supplyAsync 源码 案例 结果 规律 CompletableFuture获取返回值方法介绍 返回值区别 代码演示 返回结果 CompletableFuture其…

【Docker】Swarm内部的负载均衡与VIP

在Docker Swarm中,有两种方式可以实现内部的负载均衡:Service VIP和Routing Mesh。 Service VIP(Virtual IP):Service VIP是一种基于VIP的负载均衡方式,它为每个服务分配一个虚拟IP地址。当请求到达Servic…

Word异常退出文档找回怎么操作?4个正确恢复方法!

“刚刚我在用word编辑文档,但是突然word就显示异常了,然后莫名其妙就自动退出了,这可怎么办?我还有机会找回这些文档吗?” 当我们在使用Microsoft Word时,突然遭遇到程序异常退出的情况,可能会让…

TCP 连接建立

1:TCP 三次握手过程是怎样的? 客户端和服务端都处于 CLOSE 状态,服务端主动监听某个端口,处于 LISTEN 状态 第一次握手:客户端带着序号和SYN为1,把第一个 SYN 报文发送给服务端,客户端处于 SYN-…

Elasticsearch:对时间序列数据流进行降采样(downsampling)

降采样提供了一种通过以降低的粒度存储时间序列数据来减少时间序列数据占用的方法。 指标(metrics)解决方案收集大量随时间增长的时间序列数据。 随着数据老化,它与系统当前状态的相关性越来越小。 降采样过程将固定时间间隔内的文档汇总为单…

【论文】Bao:一种用于现代多核嵌入式系统的轻型静态分区管理程序

Bao:一种用于现代多核嵌入式系统的轻型静态分区管理程序 个人学习过程中 Bao Hypervisor 论文翻译(借助翻译工具个人校对),仅供学习使用,由于个人对一些技术专有名词不够熟悉,翻译不当的地方欢迎指出 论文地…

[Docker]十二.Docker consul集群搭建、微服务部署,Consul集群+Swarm集群部署微服务实战

一.Docker consul集群搭建 Consul 是 Go 语言写的开源的服务发现软件, Consul 具有 服务发现、健康检查、 服务治理、微服务熔断处理 等功能,在微服务中讲过如何搭建consul集群,接下来看看在 Dokcer 中如何去创建搭建consul 集群 1.linux上面部署consul集…

Qt 天气预报项目

参考引用 QT开发专题-天气预报 1. JSON 数据格式 1.1 什么是 JSON JSON (JavaScript Object Notation),中文名 JS 对象表示法,因为它和 JS 中对象的写法很类似 通常说的 JSON,其实就是 JSON 字符串,本质上是一种特殊格式的字符串…

【UE】剔除环境颜色

效果 步骤 1. 新建一个空白项目,勾选光线追踪选项 2. 新建一个Basic关卡 3. 添加初学者内容包到内容浏览器 4. 新建一个材质“M_Red” 打开“M_Red”,设置基础颜色为红色 在场景中随便布置一些物品,然后给其中的一个球体设置材质为“M_Red”…

Oracle--索引

文章目录 一、索引是什么?二、索引的原理三、索引的特征四、创建索引的方式五、怎么确认索引六、案列七、复合索引 一、索引是什么? 索引(INDEX)是数据库中用于提高查询效率的一种数据结构。它可以加速数据库表的数据查找、过滤和排序等操作。索引是一…

python 协程

1. 协程 协程,又称微线程,纤程。英文名Coroutine。 https://www.cnblogs.com/coder-qi/p/10163416.html 协程不是计算机提供的,是人为创造的上下文切换技术,也可以被称为微线程。简而言之 其实就是在一个线程中实现代码块相互切…

Lesson 08 string类 (中)

C:渴望力量吗,少年? 文章目录 二、string类的介绍与使用2. 使用(5)string类对象的修改操作 三、拷贝1. 引入2. 浅拷贝3. 深拷贝 总结 二、string类的介绍与使用 2. 使用 (5)string类对象的修改…

Scrum敏捷开发流程及支撑工具

Scrum是一种敏捷开发框架,用于管理复杂的项目。以下这些步骤构成了Scrum敏捷开发流程的核心。通过不断迭代、灵活应对变化和持续反馈,Scrum框架帮助团队快速交付高质量的产品。 以下是Scrum敏捷开发流程的基本步骤: 产品Backlog创建&#xf…