分布式的计算框架之Spark(python第三方库视角学习PySpark)

基本介绍

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。

特点介绍

Spark 主要有三个特点:

首先,高级 API 剥离了对集群本身的关注,Spark 应用开发者可以专注于应用所要做的计算本身。

其次,Spark 很快,支持交互式计算和复杂算法。

最后,Spark 是一个通用引擎,可用它来完成各种各样的运算,包括 SQL 查询、文本处理、机器学习等,而在 Spark 出现之前,我们一般需要学习各种各样的引擎来分别处理这些需求。(来源百度百科)

park对python语言的支持--->PySpark

Spark对Python语言的支持,重点体现在Python的第三方库: PySpark

PySpark是由Spark官方开发的Python语言第三方库Python开发者可以使用pip程序快速的安装PySpark并像其它三方库那样直接使用。

基础准备

PySpark库的安装

在命令行中输入:pip install pyspark

使用国内代理镜像网站:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

PySpark执行环境入口对象的构建

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

如何通过代码获得类对象,代码如下:

from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印pyspark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止pyspark程序)
sc.stop()

image.png

PySpark的编程模型

PySpark的编程,主要分为如下三大步骤:

image.png

数据输入

RDD对象

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为: 弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体即:

(1)数据存储在RDD内

(2)各类数据的计算方法,也都是RDD的成员方法

(3)RDD的数据计算方法,返回值依旧是RDD对象

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list、tuple、set、dict、str转换为PySpark的RDD对象。

from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1, 2, 3, 4, 5])  # 列表
rdd2 = sc.parallelize((1, 2, 3, 4, 5))  # 元组
rdd3 = sc.parallelize("study python")  # 字符串
rdd4 = sc.parallelize({1, 2, 3, 4, 5})  # 集合
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2", "key3": "value3"})  # 字典# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())# # 停止SparkContext对象的运行(停止pyspark程序)
sc.stop()

image.png

注意:

(1)字符串会被拆分出1个个的字符,存入RDD对象字典

(2)仅有key会被存入RDD对象

读取文件转RDD对象

PySpark支持通过SparkContext入口对象,来读取文件并构建出RDD对象

前面逻辑都是一样的,只是调用方法不一样,需要使用sc.textFile

from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)rdd = sc.textFile("G:\资料\2011年1月销售数据.txt")# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd.collect())# # 停止SparkContext对象的运行(停止pyspark程序)
sc.stop()

image.png

数据计算

PySpark的数据计算,都是依赖RDD对象内置丰富的“成员方法(算子)”来进行的

因为spark是一个分布式程序,内部运行机制比较复杂,暂不讨论,我们只需要知道在python中运行spark程序时,需要额外增加以下代码,否则会报错:spark找不到python程序

# 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境
os.environ['PYSPARK_PYTHON'] = sys.executable  # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable  # "E:/python/python.exe"

(1)map方法

功能:将RDD的数据一条条处理,返回新的RDD

语法:rdd.map(fun) # 需要传入一个函数

需求:给[1,2,3,4,5]每个数字乘以10

代码如下:

import sys
from pyspark import SparkConf, SparkContext
import os# 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境
os.environ['PYSPARK_PYTHON'] = sys.executable  # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable  # "E:/python/python.exe"# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 需求:通过map方法将全部数据都乘以10
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 把rdd中的每一个数据都调用匿名函数(也可通过def重新定义函数)去处理
rdd2 = rdd.map(lambda x: x * 10)
print(rdd2.collect())
# 停止SparkContext对象的运行(停止pyspark程序)
sc.stop()

image.png

(2)flatMap方法

功能:对RDD执行map操作,然后进行解除嵌套操作

# 嵌套的list
list = [[1,2,3],[4,5,6],[7,8,9]]# 解除了嵌套
list = [1,2,3,4,5,6,7,8,9]

示例代码:

import sys
from pyspark import SparkConf, SparkContext
import os# 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境
os.environ['PYSPARK_PYTHON'] = sys.executable  # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable  # "E:/python/python.exe"# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(["hello python", "hello word", "hello friend"])
# 需求:将RDD中的每一个单词都提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

image.png

(3)reduceByKey方法

功能:针对KV型RDD(二元元组),自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

语法:

rdd.reduceByKey(func)
# func:(V,V)-> V
# 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。

示例代码:

# 前面创建RDD对象不做赘述了
# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 80), ('男', 70), ('女', 100), ('女', 85)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())

image.png

案例1

对下面txt文件中的单词进行计数统计

image.png

代码如下:

import sys
from pyspark import SparkConf, SparkContext
import os# 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境
os.environ['PYSPARK_PYTHON'] = sys.executable  # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable  # "E:/python/python.exe"# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)# 读取数据文件
rdd = sc.textFile("G:/hello.txt")
# 读取全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 将所有单词都转换成二元元组,单词为key,value设置为1
word_one_rdd = word_rdd.map(lambda word: (word, 1))
# 分组并求和传承传承
result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())

image.png

(4)filter方法

功能:过滤想要的数据进行保留

语法:

rdd.filter(func)
# func: (T)-->bool  传入1个参数进来随意类型,返回值必须是true or false

示例代码:

# 前面创建RDD对象不做赘述了
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对RDD的数据进行过滤,偶数返回true,保留偶数
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())

image.png

(5)distinct方法

功能:对RDD数据进行去重,返回新RDD

语法:rdd.distinct()

实例代码:

# 前面创建RDD对象不做赘述了
# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5, 6, 7, 7, 8, 9])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())

image.png

(6)stortBy方法

功能:对RDD数据进行排序,基于你指定的排序依据

语法:

rdd.sortBy(func,ascending=False, numPartitions=1)
# func: (T) -> U: 告知按照rdd中的哪个数据进行排序,比如 lambda x: [1] 表按照rdd中的第二列元素进行排序
# ascending True升序 False 降序
# numPartitions: 用多少分区排序

示例代码:

以上对单词进行计数统计的案例1中,对结果按照单词出现的次数从大到小进行排序

# 对结果进行排序
final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(final_rdd.collect())

image.png

案例2

需求:对以下文件使用spark读取文件进行计算:

(1)各个城市销售额排名,从大到小

(2)全部城市,有哪些商品类别在售卖

(3)北京市有哪些商品类别在售卖

image.png

案例代码:

import sys
from pyspark import SparkConf, SparkContext
import os
import json# 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境
os.environ['PYSPARK_PYTHON'] = sys.executable  # "E:/python/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable  # "E:/python/python.exe"# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用,不管调用什么样的方法,返回的都是同一个对象
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)# (1)城市销售额排名
# 读取数据文件
file_rdd = sc.textFile("G:/orders.txt")
# 读取文件中单个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 将单个字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 取出城市和销售额数据(城市,销售额)
city_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 按照城市分组,按照销售额累计
city_result_rdd = city_money_rdd.reduceByKey(lambda a, b: a + b)
# 按照销售额累计结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("各个城市销售额排名,从大到小:", result1_rdd.collect())
# (2)全部城市,有哪些商品类别在售卖
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("全部城市,有哪些商品类别在售卖:", category_rdd.collect())
# (3)北京市有哪些商品类别在售卖
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
beijing_category = beijing_data_rdd.map(lambda x: x['category']).distinct()
print('北京市有哪些商品类别在售卖:', beijing_category.collect())

image.png

数据输出

RDD的结果输出为Python对象的各类方法

(1)collect方法

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

语法:rdd.collect(),返回值是一个list

此方法我们前面一直在使用,不做赘述

(2)reduce方法

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:rdd.reduce(func),两参数传入,1个返回值,返回值和参数要求类型一致

示例代码:

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))
# 输出结果为两两相加的值:15

(3)take方法

功能:取RDD的前n个元素,组成list返回

语法:rdd.take(num),num代表前几个元素

示例代码:

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(rdd.take(5))
# 输出结果为:[1, 2, 3, 4, 5]

(4)count方法

功能:计算RDD有多少条数据,返回值是一个数字

用法:rdd.count()

示例代码:

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(rdd.count())
# 输出结果为:10

注意:

关于Spark的方法(算子)还有很多很多,我们目前不是深入学习python语言,同时也没有学习分布式,只是简单学习对自动化测试打下基础,所以本篇文章只介绍了最基础的几个。

将RDD的内容输出到文件中

(5)saveAsTextFile方法

功能:将RDD的数据写入文本文件中,支持本地写出,hdfs(Hadoop分布式文件系统)等文件系统。

语法: rdd.saveAsTextFile("../data/output/test.txt")

想要这个方法正常运行,还需要配置Hadoop依赖,自行百度配置

运行之后,内容会存在多个分区中,输出的结果是一个文件夹,有几个分区就输出多少个结果文件

修改RDD分区为1个

方式1:SparkConf对象设置conf.set("spark.default.parallelism", "1")

方式2:创建RDD的时候,sc.parallelize方法传入numSlices参数为1

# 方式1,SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("Tocal[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 方式2, 创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], 1)

行动吧,在路上总比一直观望的要好,未来的你肯定会感 谢现在拼搏的自己!如果想学习提升找不到资料,没人答疑解惑时,请及时加入扣群: 320231853,里面有各种软件测试+开发资料和技术可以一起交流学习哦。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/818284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

transformer在生物基因DNA的应用:DNABERT、DNABERT-2

参考: https://www.youtube.com/watch?vmk-Se29QPBA&t1388s 写明这些训练模型可以最终训练好可以进行DNA特征向量的提取,应用与后续1、DNABERT https://github.com/jerryji1993/DNABERT 主要思路就是把DNA序列当成连续文本数据,直接用…

ES11-12

1-ES11-Promise.allSettled Promise.allSettled0)方法返回一个在所有给定的promise都已经fulfilled或rejected后的promise,并带有一个对象数组,每个对象表示对应的promise结果。 简单来说不管成功失败都会调用.then(),然后处理成功和失败的结果 const promises [ …

项目4-图书管理系统2+统一功能处理

1. 拦截器(Interceptor) 我们完成了强制登录的功能, 后端程序根据Session来判断用户是否登录, 但是实现⽅法是比较麻烦的。 所需要处理的内容: • 需要修改每个接⼝的处理逻辑 • 需要修改每个接⼝的返回结果 • 接⼝定义修改, 前端代码也需…

淘宝商品详情API数据采集接口|如何快速采集淘宝商品数据?

如何快速采集淘宝商品数据 无论是谁,如果单凭人工的方式去收集淘宝、天猫等平台的商品数据信息,工作量是巨大的,如果借助有采集软件的第三方公司操作,则可实现对大数据的轻松掌握,但是外包给第三方公司需要支付一定的…

javaee初阶———多线程(三)

T04BF 👋专栏: 算法|JAVA|MySQL|C语言 🫵 小比特 大梦想 此篇文章与大家分享多线程专题第三篇,关于线程安全方面的内容 如果有不足的或者错误的请您指出! 目录 八、线程安全问题(重点)1.一个典型的线程不安全的例子2.出现线程不安全的原因3.解决线程不安…

对于普通人来说ChatGPT3.5和4.0的区别,要不要升级GPT4.0

ChatGPT3.5和4.0在官方给出的有哪些区别呢?简单罗列一下,我个人觉得官方给的都太高级,我们平时普通人很难问ChatGPT这种问题。 官方测试所涉及的能力: 视觉能力代码能力数学计算能力工具使用能力与人的交互能力人类专业考试的能…

Node.js从基础到高级运用】二十三、Node.js中自动重启服务器

引言 在Node.js开发过程中,我们经常需要修改代码后重启服务器来应用这些更改。手动重启不仅效率低下,而且会打断开发流程。幸运的是,有一些工具可以帮助我们自动化这个过程。本文将介绍如何使用nodemon来实现Node.js服务器的自动重启。 什么是…

AR智能眼镜方案_MTK平台安卓主板芯片|光学解决方案

AR眼镜作为一种引人注目的创新产品,其芯片、显示屏和光学方案是决定整机成本和性能的关键因素。在这篇文章中,我们将探讨AR眼镜的关键技术,并介绍一种高性能的AR眼镜方案,旨在为用户带来卓越的体验。 AR眼镜的芯片选型至关重要。一…

数据结构---绪论

一、绪论: 1.什么是数据? 数据是信息的载体,是描述客观事物属性的数,字符及所有能输入到计算机中并被计算机程序识别和处理的符号的集合。数据是计算机程序加工的原料。 数据元素--描述一个个体 数据元素,数据项&am…

React + 项目(从基础到实战) -- 第七期

使用ant design 表单组件,开发登录,注册,搜索功能 React 表单组件 ,受控组件 案列 使用defaultVlue属性 bug : 改变了数据源,但是页面未重新渲染 {/* 表单组件 */}<button onClick{()>{console.log(text);}}>打印</button><button onClick{()>[setText(&…

【InternLM 实战营第二期笔记01】书生·浦语大模型全链路开源体系+InternLM2技术报告

本次课程链接在GitHub上&#xff1a;InternLM/Tutorial at camp2 (github.com) 第一次课程录播链接&#xff1a;书生浦语大模型全链路开源体系_哔哩哔哩_bilibili InternLM2技术报告&#xff1a;arxiv.org/pdf/2403.17297.pdf 一、书生浦语大模型全链路开源体系笔记 Intern…

【读书笔记】自动驾驶与机器人中的SLAM技术——高翔

文章会对本书第五章节及以后章节进行总结概括。每日更新一部分。一起读书吧。 第五章——基础点云处理 重点&#xff1a;点云的相邻关系是许多算法的基础 5.1 激光雷达传感器与点云的数学模型 5.1.1激光雷达传感器的数学模型 雷达有两种&#xff1a;机械旋转式激光雷达&…

python 海龟画图tutle螺旋线

目录 初识turtle模块 基本绘图概念 示例&#xff1a;绘制一个正方形 示例&#xff1a;绘制彩色螺旋线 附录 常用命令 其它命令 在Python编程中&#xff0c;使用turtle模块进行图形绘制是一种非常有趣和富有教育意义的活动。通过控制一个小海龟&#xff08;Turtle&#x…

【产品经理修炼之道】- 厂商银业务之保兑仓

保兑仓 保兑仓是指供应商、购货商、银行签订三方协议&#xff0c;以银行信用为载体&#xff0c;以银行承兑汇票为结算工具&#xff0c;由银行控制货权&#xff0c;供应商受托保管货物并对银行承兑汇票保证金以外部分以货物回购为担保措施&#xff0c;购货商随缴保证金随提货而设…

《QT实用小工具·二十六》运行时间记录

1、概述 源码放在文章末尾 运行时间记录&#xff0c;包含如下功能&#xff1a; 可以启动和停止服务&#xff0c;在需要的时候启动。 可以指定日志文件存放目录。 可以指定时间日志输出间隔。 可以单独追加一条记录到日志文件。 日志为文本格式&#xff0c;清晰明了。 软…

AAAI24 - Model Reuse Tutorial

前言 如果你对这篇文章感兴趣&#xff0c;可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」&#xff0c;查看完整博客分类与对应链接。 该篇 Tutorial 主要对 Model Reuse 当下的进展进行了整理和总结。 The Paradigm Shifts Tutorial 中指出在一些数据量比较…

STM32学习和实践笔记(12):蜂鸣器实验

蜂鸣器主要分为两种&#xff0c;一种是压电式的无源蜂鸣器&#xff0c;一种是电磁式的有源蜂鸣器。 有源和无源是指其内部有没有振荡器。 无源的没有内部振荡器&#xff0c;需要输入1.5-5KHZ的音频信号来驱动压电蜂鸣片发声。 有源的内部有振荡器&#xff0c;因此只需要供给…

PostgreSQL入门到实战-第二十七弹

PostgreSQL入门到实战 PostgreSQL中数据分组操作(二)官网地址PostgreSQL概述PostgreSQL中HAVING命令理论PostgreSQL中HAVING命令实战更新计划 PostgreSQL中数据分组操作(二) 使用PostgreSQL HAVING子句来指定组或聚合的搜索条件 官网地址 声明: 由于操作系统, 版本更新等原因…

基于Python豆瓣电影数据可视化分析系统的设计与实现

大数据可视化项目——基于Python豆瓣电影数据可视化分析系统的设计与实现 2024最新项目 项目介绍 本项目旨在通过对豆瓣电影数据进行综合分析与可视化展示&#xff0c;构建一个基于Python的大数据可视化系统。通过数据爬取收集、清洗、分析豆瓣电影数据&#xff0c;我们提供了…

Docker:使用编排Compose快速部署容器化应用

1、简述 Docker Compose 是 Docker 官方提供的一个工具&#xff0c;用于定义和管理多容器应用。它通过一个简单的 YAML 文件来定义应用的服务、网络、卷等配置&#xff0c;并提供了一组命令来启动、停止、构建和管理应用。使用 Docker Compose 可以让开发人员轻松地在本地开发…