【数据开发】pyspark入门与RDD编程

【数据开发】pyspark入门与RDD编程

文章目录

      • 1、pyspark介绍
      • 2、RDD与基础概念
      • 3、RDD编程
        • 3.1 Transformation/Action
        • 3.2 数据开发流程与环节

1、pyspark介绍

pyspark的用途

  • 机器学习
  • 专有的数据分析。
  • 数据科学
  • 使用Python和支持性库的大数据。

spark与pyspark的关系

  • spark是一种计算引擎,类似于hadoop架构下mapreduce,与mapreduce不同的是将计算的结果存入hdfs分布式文件系统。spark则是写入内存中,像mysql一样可以实现实时的计算,包括SQL查询。
  • spark不单单支持传统批量处理应用,更支持交互式查询、流式计算、机器学习、图计算等各种应用,
  • spark是由scala语言开发,具备python的接口,就是pyspark。
  • 简单理解,Pyspark是Spark的Python API,它允许Python开发者使用Python语言编写Spark程序,并且可以与其他Python库和工具集成。

pysql和pyspark的区别

  • 不同语言的支持范围:pysql只负责提交sql,pyspark则适合sql处理不了的逻辑
  • 第三方py库支持:pysql不支持依赖第三方py库,pyspark可以支持依赖
  • 返回数据条数限制:pysql单节点运行py逻辑,仅支持2w条数据;pyspark用分布式集群去执行复杂的逻辑,能支持全量数据。

pyspark安装与使用

  • 数据环境:pip install pyspark
  • 安装环境:Anaconda+Jupyter Notebooks
  • 其他:Hive/TDW库等相关的包

Spark提供了6大组件:
Spark Core
Spark SQL
Spark Streaming
Spark MLlib
Spark GraphX
SparkR
这些组件解决了使用Hadoop时碰到的特定问题。
在这里插入图片描述

在这里插入图片描述

2、RDD与基础概念

RDD模型是什么?

  • RDD ( Resilient Distributed Dataset )叫做弹性分布式数据集,是Spark的一种数据抽象集合

  • 它可以被执行在分布式的集群上进行各种操作,而且有较强的容错机制。RDD可以被分为若干个分区,每一个分区就是一个数据集片段,从而可以支持分布式计算。

  • 通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。

  • 更多可以参考, 参考2, 参考3
    在这里插入图片描述

RDD运行时相关的关键名词:

  • Client、Job、Master、Worker、Driver、Stage、Task以及Executor
  • Client:指的是客户端进程,主要负责提交job到Master;
  • Job:Job来自于我们编写的程序,Application包含一个或者多个job,job包含各种RDD操作;
  • Master:指的是Standalone模式中的主控节点,负责接收来自Client的job,并管理着worker,可以给worker分配任务和资源(主要是driver和executor资源);
  • Worker:指的是Standalone模式中的slave节点,负责管理本节点的资源,同时受Master管理,需要定期给Master回报heartbeat(心跳),启动Driver和Executor;
  • Driver:指的是 job(作业)的主进程,一般每个Spark作业都会有一个Driver进程,负责整个作业的运行,包括了job的解析、Stage的生成、调度Task到Executor上去执行;
  • Stage:中文名 阶段,是job的基本调度单位,因为每个job会分成若干组Task,每组任务就被称为 Stage;
  • Task:任务,指的是直接运行在executor上的东西,是executor上的一个线程;
  • Executor:指的是 执行器,顾名思义就是真正执行任务的地方了,一个集群可以被配置若干个Executor,每个Executor接收来自Driver的Task,并执行它(可同时执行多个Task)。
  • 更多可以参考

RDD任务调度的原理(血缘关系)

  • DAG有向无环图,Spark就是借用了DAG对RDD之间的关系进行了建模,用来描述RDD之间的因果依赖关系。因为在一个Spark作业调度中,多个作业任务之间也是相互依赖的,有些任务需要在一些任务执行完成了才可以执行的。
  • RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
    在这里插入图片描述

Spark的部署模式有哪些

  • 主要有local模式、Standalone模式、Mesos模式、YARN模式。
  • 更多可以参考

Shuffle操作是什么

  • Shuffle指的是数据从Map端到Reduce端的数据传输过程,Shuffle性能的高低直接会影响程序的性能。因为Reduce task需要跨节点去拉在分布在不同节点上的Map task计算结果,这一个过程是需要有磁盘IO消耗以及数据网络传输的消耗的,所以需要根据实际数据情况进行适当调整。
  • 另外,Shuffle可以分为两部分,分别是Map阶段的数据准备与Reduce阶段的数据拷贝处理,在Map端我们叫Shuffle Write,在Reduce端我们叫Shuffle Read。

RDD操作与惰性执行

  • 对于 Spark 处理的大量数据而言,会将数据切分后放入RDD作为Spark 的基本数据结构,开发者可以在 RDD 上进行丰富的操作,之后 Spark 会根据操作调度集群资源进行计算。

  • RDD 的操作分为转化(Transformation)操作和行动(Action)操作。转化操作就是从一个 RDD 产生一个新的 RDD,而行动操作就是进行实际的计算。

  • RDD 的操作是惰性的,当 RDD 执行转化操作的时候,实际计算并没有被执行,只有当 RDD 执行行动操作时才会促发计算任务提交,从而执行相应的计算操作。

RDD缓存优化

  • 因为惰性求值的,而有时候希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 及它的依赖,这样就会带来太大的消耗。为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。
  • Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,则可以通过构建它的转换来自动重构。被缓存的 RDD 被使用时,存取速度会被大大加速。一般情况下,Executor 内存的 60% 会分配给 cache,剩下的 40% 用来执行任务。

3、RDD编程

3.1 Transformation/Action

在这里插入图片描述

转化操作:
在这里插入图片描述

行动操作:
在这里插入图片描述

两大类示例:

import os
import pyspark
from pyspark import SparkContext, SparkConfconf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)# 使用 parallelize方法直接实例化一个RDD
rdd = sc.parallelize(range(1,11),4) # 这里的 4 指的是分区数量
rdd.take(100)
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"""
----------------------------------------------Transform算子解析
----------------------------------------------
"""
# 以下的操作由于是Transform操作,因为我们需要在最后加上一个collect算子用来触发计算。
# 1. map: 和python差不多,map转换就是对每一个元素进行一个映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x*2)
print("原始数据:", rdd.collect())
print("扩大2倍:", rdd_map.collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 扩大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]# 2. flatMap: 这个相比于map多一个flat(压平)操作,顾名思义就是要把高维的数组变成一维
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print("原始数据:", rdd2.collect())
print("直接split之后的map结果:", rdd2.map(lambda x: x.split(" ")).collect())
print("直接split之后的flatMap结果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
# 直接split之后的map结果: [['hello', 'SamShare'], ['hello', 'PySpark']]
# 直接split之后的flatMap结果: ['hello', 'SamShare', 'hello', 'PySpark']# 3. filter: 过滤数据
rdd = sc.parallelize(range(1, 11), 4)
print("原始数据:", rdd.collect())
print("过滤奇数:", rdd.filter(lambda x: x % 2 == 0).collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 过滤奇数: [2, 4, 6, 8, 10]# 4. distinct: 去重元素
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print("原始数据:", rdd.collect())
print("去重数据:", rdd.distinct().collect())
# 原始数据: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# 去重数据: [4, 8, 16, 32, 2]# 5. reduceByKey: 根据key来映射数据
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print("原始数据:", rdd.collect())
print("原始数据:", rdd.reduceByKey(add).collect())
# 原始数据: [('a', 1), ('b', 1), ('a', 1)]
# 原始数据: [('b', 1), ('a', 2)]# 6. mapPartitions: 根据分区内的数据进行映射操作
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
# [1, 2, 3, 4]
# [3, 7]# 7. sortBy: 根据规则进行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]# 8. subtract: 数据集相减, Return each value in self that is not contained in other.
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))
# [('a', 1), ('b', 4), ('b', 5)]# 9. union: 合并两个RDD
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())
# [1, 1, 2, 3, 1, 1, 2, 3]# 10. interp: 取两个RDD的交集,同时有去重的功效
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.interp(rdd2).collect())
# [1, 2, 3]# 11. cartesian: 生成笛卡尔积
rdd = sc.parallelize([1, 2])
print(sorted(rdd.cartesian(rdd).collect()))
# [(1, 1), (1, 2), (2, 1), (2, 2)]# 12. zip: 拉链合并,需要两个RDD具有相同的长度以及分区数量
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]# 13. zipWithIndex: 将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
# [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]# 14. groupByKey: 按照key来聚合数据
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# [('a', 1), ('b', 1), ('a', 1)]
# [('a', 2), ('b', 1)]
# [('a', [1, 1]), ('b', [1])]# 15. sortByKey:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]# 16. join:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
# [('a', (1, 2)), ('a', (1, 3))]# 17. leftOuterJoin/rightOuterJoin
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
# [('a', (1, 2)), ('b', (4, None))]"""
----------------------------------------------Action算子解析
----------------------------------------------
"""
# 1. collect: 指的是把数据都汇集到driver端,便于后续的操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]# 2. first: 取第一个元素
sc.parallelize([2, 3, 4]).first()
# 2# 3. collectAsMap: 转换为dict,使用这个要注意了,不要对大数据用,不然全部载入到driver端会爆内存
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m
# {1: 2, 3: 4}# 4. reduce: 逐步对两个元素进行操作
rdd = sc.parallelize(range(10),5)
print(rdd.reduce(lambda x,y:x+y))
# 45# 5. countByKey/countByValue:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
# [('a', 2), ('b', 1)]
# [(('a', 1), 2), (('b', 1), 1)]# 6. take: 相当于取几个数据到driver端
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))
# [('a', 1), ('b', 1), ('a', 1)]# 7. saveAsTextFile: 保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)# 8. takeSample: 随机取数
rdd = sc.textFile("./test/data/hello_samshare.txt", 4)  # 这里的 4 指的是分区数量
rdd_sample = rdd.takeSample(True, 2, 0)  # withReplacement 参数1:代表是否是有放回抽样
rdd_sample# 9. foreach: 对每一个元素执行某种操作,不生成新的RDD
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)
# 45
3.2 数据开发流程与环节

创建会话

import pyspark
from pyspark.sql import SparkSession session = SparkSession.builder.appName('First App').getOrCreate()
sessionspark_session = SparkSession.builder.getOrCreate()
table_data = [row.asDict() for row in spark_session.sql('''SELECT xxx,id,xxx,xxx,from aaa.bbb where ds = {} and xxx != 0'''.format(newds).collect()]

读取数据

# 支持csv格式jdbc加载选项选项orc脚本模式表文本
data = session.read.csv('Datasets/titanic.csv')
# 检索带有标题的数据集
data = session.read.option.('header', 'true').csv('Datasets/titanic.csv')  
data.show()# 类似于pandas中使用info() 
data.printSchema()# 文本
data=sc.textFile("file://home/README.md")
data.saveAsTextFile(outputFile)# json
import json
data=input.map(lambdax:json.loads(x))
data.filter(lambda x:x["lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile)

数据过滤

select:
用于过滤符合条件的行,返回一个新的dataframe,支持选择一个或多个列。
df.select("列名")
df.select(col("列名"))
df.select(expr("sql表达式"))
dataframe.select(column_name) 
dataframe.select(column_1, column_2, .., column_N) collect:
返回一个list,每个元素为Row。可以通过df.collect()方法将dataframe转换为可遍历的dict列表。filter:
data = data.filter(data['Survived'] == 1)
data.show()

数据清洗

# 添加一列
data = data.withColumn('Age_after_3_y', data['Age']+3)# 删除一列
dataframe = dataframe.drop('column_name in strings')
data = data.drop('Age_after_3_y')
dataframe.show()# 改列名
data = data.na.drop(how = 'any', thresh = 2)
data.show()

多表计算

join:类似于sqljoin操作
df1.join(df2, 'on的列名', 'join选项')
其中join选项默认是'inner',可以自行选择其他join方法如:left, full, right, leftouter...

编程参考:1,2,3,4,5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/664528.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

简单实践 java spring boot 自动配置模拟

1.概要 1.1 需求,自己写一个redis-spring-boot-starter模拟自动配置 自动配置就是在引入*-starter坐标后,可以已经spring框架的规则实现一些Bean的自动注入,并设置一些参数的默认值,且也可以在引入的工程中修改这些配置的值。这…

金蝶云星空本地构建部署包时报错

文章目录 金蝶云星空本地构建部署包时报错报错内容原因分析 金蝶云星空本地构建部署包时报错 报错内容 描述 C:\Windows\Microsoft.NET\Framework\v4.0.30319\Microsoft.Common.targets(2769,5): error MSB3086: 任务未能使用 SdkToolsPath“”或注册表项“HKEY_LOCAL_MACHIN…

通过低代码开发实现数据可视化应用的简易指南

随着数据分析和决策变得越来越重要,数据可视化应用的需求也不断增长。低代码开发平台为开发人员提供了一种快速构建数据可视化应用的途径,本文将介绍如何利用低代码平台实现数据可视化应用的方法和步骤。 在当今数据驱动的时代,企业和组织需要…

面试150 二进制求和 位运算

Problem: 67. 二进制求和 文章目录 思路复杂度Code 思路 👨‍🏫 参考 复杂度 时间复杂度: O ( n ) O(n) O(n) 空间复杂度: O ( n ) O(n) O(n) Code class Solution {public String addBinary(String a, String b){StringBuilder ans new Stri…

MS Access 函数参考手册(MS Access 日期函数、MS Access 其他函数)

目录 MS Access 日期函数 MS Access Date() 函数 MS Access DateAdd() 函数 MS Access DateDiff() 函数 MS Access DatePart() 函数 MS Access DateSerial() 函数 MS Access DateValue() 函数 MS Access Day() 函数 MS Access Format() 函数 MS Access Hour() 函数 …

pyspark学习-spark.sql.functions 聚合函数

https://spark.apache.org/docs/3.4.1/api/python/reference/pyspark.sql/functions.html 1. approx_count_distinct和count_distinct #approx_count_distinct(col:ColumnOrName,rsd:Optionnal[float]None) """ 作用:返回列col的近似不同计数,返回…

C语言:内存函数(memcpy memmove memset memcmp使用)

和黛玉学编程呀------------- 后续更新的节奏就快啦 memcpy使用和模拟实现 使用 void * memcpy ( void * destination, const void * source, size_t num ) 1.函数memcpy从source的位置开始向后复制num个字节的数据到destination指向的内存位置。 2.这个函数在遇到 \0 的时候…

确保分布式系统的稳定性:深入理解接口幂等性

确保分布式系统的稳定性:深入理解接口幂等性 在分布式系统中,网络波动、系统故障或用户操作可能导致同一个请求被多次发送至服务器,如果服务器对每个重复的请求都作出新的响应,就可能导致数据的不一致或业务逻辑的错误。为了解决…

常用抓包软件集合(Fiddler、Charles)

1. Fiddler 介绍:Fiddler是一个免费的HTTP和HTTPS调试工具,支持Windows平台。它可以捕获HTTP和HTTPS流量,并提供了丰富的调试和分析功能。优点:易于安装、易于使用、支持多种扩展、可以提高开发效率。缺点:只支持Wind…

龙芯3A6000_统信UOS_麒麟KYLINOS上创建密钥对加解密文件

原文链接:龙芯3A6000|统信UOS/麒麟KYLINOS上创建密钥对加解密文件 大家好!在当今数字化时代,数据安全变得越来越重要。为了帮助大家更好地保护自己的数据,今天我为大家带来一篇关于在统信UOS和麒麟KYLINOS操作系统上创建和使用密钥…

【日常聊聊】开源软件影响力

🍎个人博客:个人主页 🏆个人专栏:JAVA ⛳️ 功不唐捐,玉汝于成 目录 前言 正文 方向一:开源软件如何推动技术创新 方向二:开源软件的商业模式 方向三:开源软件的安全风险 方…

UDP和TCP的区别和联系

传输层:定义传输数据的协议端口号,以及流控和差错校验。 协议有:TCP、UDP等 UDP和TCP的主要区别包括以下几个方面: 1、连接性与无连接性:TCP是面向连接的传输控制协议,而UDP提供无连接的数据报服务。这意…

为期 90 天的免费数据科学认证(KNIME)

从 2 月 1 日开始,KNIME 官方将免费提供 KNIME 认证 90 天。 无论您是刚刚迈入数据科学领域、已经掌握了一些技术,还是正在构建预测模型,都可以参加为期 90 天的 KNIME 认证挑战赛,完成尽可能多的认证并获得数据科学技能免费认证。…

C#验证字符串是否纯字母:用正则表达式 vs 用Char.IsLetter方法加遍历

目录 一、使用的方法 1.使用正则表达式 2.使用Char.IsLetter方法 二、实例 1. 源码 2.生成效果 一、使用的方法 1.使用正则表达式 使用正则表达式可以验证用户输入的字符串是否为字母。匹配的正则表达式可以是:^[A-Za-z]$、^[A-Za-z]{1,}$、^[A-Za-z]*$。 …

【C语言】数组的应用:扫雷游戏(包含扩展和标记功能)附完整源代码

这个代码还是比较长的,为了增加可读性,我们还是把他的功能分装到了test.c,game.c,game.h里面。 扫雷游戏的规则相信大家来阅读本文之前已经知晓了,如果点到雷就输了,如果不是雷,点到的格子会显…

Pytorch-统计学方法、分布函数、随机抽样、线性代数运算、矩阵分解

Tensor中统计学相关的函数 torch.mean() #返回平均值 torch.sum() #返回总和 torch.prod() #计算所有元素的积 torch.max() # 返回最大值 torch.min() # 返回最小值 torch.argmax() #返回最大值排序的索引值 torch.argmin() #返回最小值排序的索引值 torch.std() #返回标准差 …

BEV感知算法学习

BEV感知算法学习 3D目标检测系列 Mono3D(Monocular 3D Object Detection for Autonomous Driving) 流程: 通过在地平面上假设先验,在3D空间中对具有典型物理尺寸的候选边界框进行采样;然后我们将这些方框投影到图像平面上,从而避…

JAVA 栈的实现

洗盘子 时间限制:1.000S 空间限制:128MB 题目描述 在餐厅里,洗盘子的工作需要使用到栈这种数据结构。 假设你手里有一个盘子堆放区。现在需要模拟洗盘子的过程,每个盘子都有一个编号。 盘子堆放区操作说明: 1…

在 Windows 10 上使用 Visual Studio 2022 进行 C++ 桌面开发

工具下载链接:https://pan.quark.cn/s/c70b23901ccb 环境介绍 在今天的快速发展的软件开发行业中,选择合适的开发环境是非常关键的一步。对于C开发人员来说,Visual Studio 2022(VS2022)是一个强大的集成开发环境&…

C#中检查空值的最佳实践

C#中检查空值的最佳实践 在C#编程中,处理空值是一项基础且重要的任务。正确地检查变量是否为null可以帮助我们避免NullReferenceException,这是C#最常见的运行时错误之一。本文将探讨为什么使用is关键字进行空值检查是一种优于使用的做法。 操作符&…