浅学实战:探索PySpark实践,解锁大数据魔法!

文章目录

  • Spark和PySpark概述
    • 1.1 Spark简介
    • 1.2 PySpark简介
  • 二 基础准备
    • 2.1 PySpark库的安装
    • 2.2 构建SparkContext对象
    • 2.3 SparkContext和SparkSession
    • 2.4 构建SparkSession对象
    • 2.5 PySpark的编程模型
  • 三 数据输入
    • 3.1 RDD对象
    • 3.2 Python数据容器转RDD对象
    • 3.3 读取文件转RDD对象
  • 四 数据计算
    • 4.1 map算子
    • 4.2 flatMap算子
    • 4.3 reduceByKey算子
    • 4.4 使用SparkSession可能出现的问题
    • 4.5 filter算子
    • 4.6 distinct算子
    • 4.7 sortBy算子
    • 4.8 综合案例
  • 五 数据输出
    • 5.1 输出为python对象
      • 5.1.1 collect算子
      • 5.1.2 reduce算子
      • 5.1.3 take算子
      • 5.1.4 count算子
    • 5.2 输出到文件
      • 5.2.1 环境准备
      • 5.2.2 saveAsTextFile算子
      • 5.2.3 coalesce/repartition调整分区
  • 六 综合案例:搜索引擎日志分析
    • 6.1 需求
    • 6.2 参考代码
    • 6.3 输出结果

Spark和PySpark概述

1.1 Spark简介

在这里插入图片描述

  • PySpark是Apache Spark的Python库,Apache Spark是一个开源的大数据处理框架。Apache Spark旨在进行大规模数据处理和分析,并提供统一的API,用于批处理、实时流处理、机器学习和图处理。

  • PySpark允许开发人员使用Python编程语言与Spark交互,使更多习惯于Python而不是传统的Scala或Java的数据工程师、数据科学家和分析师能够使用Spark。

  • 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据

1.2 PySpark简介

  • PySpark是由Spark官方开发的Python语言第三方库。
    Python开发者可以使用pip程序快速的安装PySpark并像其它三方库那样直接使用。
    在这里插入图片描述

二 基础准备

2.1 PySpark库的安装

  • PySpark同样可以使用pip程序进行安装。

  • 在”CMD”命令提示符程序内,输入:

pip install pyspark

或使用国内代理镜像网站(清华大学源)

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

在这里插入图片描述

2.2 构建SparkContext对象

  • 想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。
  • PySpark的执行环境入口对象是:SparkContext 的类对象
  • 使用PySpark库来创建和管理一个Spark应用程序的基本示例
# 导入必要的模块
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
  1. 导入必要的模块
from pyspark import SparkConf, SparkContext
  • pyspark是PySpark库的主要模块,SparkConfSparkContext是两个重要的类,用于配置和管理Spark应用程序
  1. 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
  • SparkConf对象用于配置Spark应用程序的各种属性
  • .setMaster("local[*]")设置了Spark的运行模式为本地模式,其中"local[*]"表示使用所有可用的本地处理核心
  • .setAppName("test_spark_app")设置了应用程序的名称为"test_spark_app"
  1. 创建SparkContext对象
sc = SparkContext(conf=conf)
  • 基于前面创建的SparkConf对象,创建SparkContext对象,它是与Spark集群通信的主要入口点。
  • 通过此对象,您可以在集群上创建RDD(弹性分布式数据集)并执行各种操作。
  1. 当前SparkContext对象所使用的Spark版本
print(sc.version)
  1. 停止SparkContext对象
sc.stop()

2.3 SparkContext和SparkSession

  • SparkContextSparkSession 都是 Apache Spark 中用于创建和管理 Spark 应用程序的关键组件,但它们在 Spark 版本 2.0 之后的引入中扮演了不同的角色。

SparkContext:

  • SparkContext(简称为 sc)是 Spark 的早期上下文对象,它是连接 Spark 集群的主要入口点。在 Spark 2.0 之前的版本中,开发人员使用 SparkContext 来创建 RDD、广播变量和累加器等,以及执行各种操作。它充当了应用程序与 Spark 集群之间的中间人,负责管理与集群的通信,任务调度,数据分布等。

  • 在 Spark 2.0 之后,随着 SparkSession 的引入,SparkContext 仍然可用,但更多的功能被集成到了 SparkSession 中,因此在新的应用程序中更常用 SparkSession

SparkSession:

  • SparkSession 是在 Spark 2.0 中引入的新概念,它是创建 Spark 应用程序的主要入口点。SparkSession 继承自 SparkContext,并将许多原来与 SparkContext 相关的功能整合到一个对象中,包括创建 RDD、DataFrame 和 Dataset,以及执行 SQL 查询和 Spark 应用程序的配置。它还提供了一个更统一的 API,使得在 Spark 中处理不同类型的数据更加便捷。

  • 通过 SparkSession,可以使用 DataFrame API 来处理结构化数据,进行 SQL 查询,还可以使用 RDD API 进行更底层的操作。此外,SparkSession 也提供了与 Hive 集成的功能,可以在 Spark 中执行 Hive 查询。

总结:

  • SparkContext 是早期的 Spark 上下文,负责连接 Spark 集群和管理任务。它在 Spark 2.0 后仍然可用,但在新应用程序中更常用 SparkSession
  • SparkSession 是 Spark 2.0 引入的主要上下文,继承了 SparkContext 的功能,并在其基础上整合了更多功能,用于创建和管理 Spark 应用程序,处理不同类型的数据和执行查询。

2.4 构建SparkSession对象

  • 以下是创建SparkSession的简单示例:
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"# 创建一个SparkSession
spark = SparkSession.builder \.appName("PySparkExample") \.getOrCreate()# 将数据加载到DataFrame中
data = [("Alice", 25), ("Bob", 30), ("Charlie", 22)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)# 执行DataFrame操作
df.show()

2.5 PySpark的编程模型

  • SparkContext类对象,是PySpark编程中一切功能的入口。
    PySpark的编程,主要分为如下三大步骤:
    在这里插入图片描述
  1. 数据输入:通过SparkContext完成数据读取
  2. 数据计算:读取到的数据转换为RDD对象,调用RDD的成员方法完成计算
  3. 数据输出:调用RDD的数据输出相关成员方法,将结果输出到list、元组、字典、文本文件、数据库等

在这里插入图片描述

三 数据输入

3.1 RDD对象

  • PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
  • RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
  • PySpark针对数据的处理,都是以RDD对象作为载体,即:
    • 数据存储在RDD内
    • 各类数据的计算方法,也都是RDD的成员方法
    • RDD的数据计算方法,返回值依旧是RDD对象
      在这里插入图片描述

3.2 Python数据容器转RDD对象

  • PySpark支持通过SparkContext对象的parallelize成员方法,将:list、tuple、set、dict、str转换为PySpark的RDD对象
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5]) #list
rdd2 = sc.parallelize((1, 2, 3, 4, 5)) #元组
rdd3 = sc.parallelize("abcdefg")       #字符串
rdd4 = sc.parallelize({1, 2, 3, 4, 5}) #集合
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # 字典# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
sc.stop()

3.3 读取文件转RDD对象

  • PySpark也支持通过SparkContext入口对象,来读取文件,来构建出RDD对象。
"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)#用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("C:/hello.txt")
print(rdd.collect())
rdd.map()
sc.stop()

四 数据计算

  • PySpark的数据计算,都是基于RDD对象来进行,依赖RDD对象内置丰富的:成员方法(算子)进行实现

4.1 map算子

  • 功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数)返回新的RDD

  • map 是 PySpark 中的一个转换操作,用于对 RDD 中的每个元素应用一个函数,并返回一个新的 RDD,其中包含应用函数后的结果。

  • 链式调用:对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子

  • 其基本语法如下:

new_rdd = old_rdd.map(lambda x: function(x))
  • old_rdd: 原始的 RDD,即要进行转换的数据集。

  • new_rdd: 经过 map 转换后生成的新 RDD。

  • lambda x: function(x): 这是一个匿名函数(lambda 函数),它定义了应用于每个元素 x 的转换操作。你可以根据需要定义任何适当的操作和函数。

  • x: 表示 RDD 中的每个元素,function(x) 就是将该函数应用于元素 x 的结果

  • 使用 map 算子将 RDD 中的每个元素进行平方处理

from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"
# 创建 SparkSession
spark = SparkSession.builder.appName("RDDMapExample").getOrCreate()# 创建一个 RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)# 使用 map 算子对 RDD 中的每个元素进行平方
squared_rdd = rdd.map(lambda x: x**2)
# 支持链式调用
# squared_rdd = rdd.map(lambda x: x**2).map(lambda x: x + 5)
# 收集结果并打印
result = squared_rdd.collect()
print(result)# 停止 SparkSession
spark.stop()
  • 使用 map 转换操作对一个包含字符串的 RDD 进行转换,将每个字符串转换为大写形式
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"# 创建 SparkSession
spark = SparkSession.builder.appName("MapExample").getOrCreate()# 创建一个包含字符串的 RDD
data = ["hello", "world", "pyspark", "map"]
rdd = spark.sparkContext.parallelize(data)# 使用 map 转换操作将每个字符串转换为大写形式
upper_case_rdd = rdd.map(lambda x: x.upper())# 收集结果并打印
result = upper_case_rdd.collect()
print(result)# 停止 SparkSession
spark.stop()

4.2 flatMap算子

  • flatMap 是 PySpark 中的另一个转换操作,类似于 map,但是它会将每个输入元素映射为零个或多个输出元素,然后将所有输出元素组合成一个扁平的 RDD。

  • flatMap 可以用于在一个 RDD 中对每个元素应用一个函数,并生成多个结果元素,最终将这些结果组合成一个新的 RDD。简单说,对rdd执行map操作,然后进行解除嵌套操作
    在这里插入图片描述

  • flatMap 的语法如下:

    new_rdd = old_rdd.flatMap(lambda x: function(x))
    
    • old_rdd 是要进行转换的原始 RDD,
    • function 是一个应用于每个元素的函数,x 是 RDD 中的每个元素。flatMap 算子会将 function(x) 应用于每个元素,并返回一个或多个结果元素,然后将所有结果元素扁平化为一个新的 RDD。
  • 使用 flatMap 转换操作对一个包含单词列表的 RDD 进行转换,将每个单词拆分为字符,并生成一个扁平的字符列表

from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"# 创建 SparkSession
spark = SparkSession.builder.appName("FlatMapExample").getOrCreate()# 创建一个包含单词列表的 RDD
data = ["hello world", "pyspark flatMap", "example"]
rdd = spark.sparkContext.parallelize(data)# 使用 flatMap 转换操作将每个单词拆分为字符
character_list_rdd = rdd.flatMap(lambda line: line.split())# 收集结果并打印
result = character_list_rdd.collect()
print(result)
# ['hello', 'world', 'pyspark', 'flatMap', 'example']# 停止 SparkSession
spark.stop()

4.3 reduceByKey算子

  • reduceByKey 是 PySpark 中的一个转换操作,用于对键值对形式的 RDD 进行聚合计算。它按照键对 RDD 中的值(键值对中的键)进行分组,并应用一个指定的聚合函数来合并每个键对应的值。
  • 这个操作在执行分布式计算时特别有用,因为它能够将具有相同键的数据分布在同一个分区上,从而减少数据传输和处理开销。
  • reduceByKey 的基本语法如下:

    new_rdd = old_rdd.reduceByKey(lambda x, y: function(x, y))
    
    • old_rdd 是原始的键值对形式的 RDD
    • function 是一个应用于每对值的聚合函数
    • xy 是每对值的两个元素。
    • reduceByKey 算子将具有相同键的值进行聚合,将聚合函数应用于每对值,并返回一个新的键值对形式的 RDD。
  • 使用 reduceByKey 转换操作对一个包含单词计数的 RDD 进行聚合计算

    from pyspark.sql import SparkSession
    # 设置本地python解释器 请注意修改为自己的python解释器目录
    import os
    os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"# 创建 SparkSession
    spark = SparkSession.builder.appName("ReduceByKeyExample").getOrCreate()# 创建一个包含单词的 RDD
    data = ["hello", "world", "hello", "pyspark", "world"]
    rdd = spark.sparkContext.parallelize(data)# 将单词映射为键值对,并使用 reduceByKey 进行单词计数
    word_counts_rdd = rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)# 收集结果并打印
    result = word_counts_rdd.collect()
    print(result)# 停止 SparkSession
    spark.stop()
    
    • 输出结果
    [('hello', 2), ('world', 2), ('pyspark', 1)]
    
  • 注意:SparkContextSpark 的基础上下文,而 SparkSession 是在 Spark 2.0 之后引入的高级概念,用于在 Spark 中创建和管理 DataFrameDataset

4.4 使用SparkSession可能出现的问题

Please install psutil to have better support with spilling
  • 需要安装 psutil 库,以便在 PySpark 中更好地监控和管理系统资源使用,包括内存、磁盘和网络资源。
  • 要安装 psutil,在Python 环境中使用以下命令:
    pip install psutil
    

4.5 filter算子

  • filter :用于从 RDD 中筛选出满足指定条件的元素,并返回一个包含筛选结果的新 RDD。

  • filter 方法的基本语法如下:

    new_rdd = old_rdd.filter(lambda x: condition(x))
    
    • old_rdd 是要进行筛选的原始 RDD,condition 是一个用于筛选元素的函数,x 是 RDD 中的每个元素。
    • filter 方法将 condition(x) 应用于每个元素,并将满足条件的元素组成一个新的 RDD。
  • 使用 filter 方法从包含数字的 RDD 中筛选出偶数

from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"
# 创建 SparkSession
spark = SparkSession.builder.appName("FilterExample").getOrCreate()# 创建一个包含数字的 RDD
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = spark.sparkContext.parallelize(data)# 使用 filter 方法筛选出偶数
even_rdd = rdd.filter(lambda x: x % 2 == 0)# 收集结果并打印
result = even_rdd.collect()
print(result) # [2, 4, 6, 8, 10]# 停止 SparkSession
spark.stop()

4.6 distinct算子

  • distinct :对RDD数据进行去重操作,并返回一个包含这些不重复元素的新 RDD。

  • distinct 方法的基本语法如下:

    new_rdd = old_rdd.distinct()
    
    • old_rdd 是要进行操作的原始 RDD,distinct 方法将会返回一个新的 RDD,其中包含了所有不重复的元素。
  • 使用 distinct 方法从包含重复元素的 RDD 中获取不重复的元素

from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ['PYSPARK_PYTHON'] = "C:\environment\Python3.11.4\python.exe"# 创建 SparkSession
spark = SparkSession.builder.appName("DistinctExample").getOrCreate()# 创建一个包含重复元素的 RDD
data = [1, 2, 3, 2, 4, 3, 5, 6, 1]
rdd = spark.sparkContext.parallelize(data)# 使用 distinct 方法获取不重复的元素
distinct_rdd = rdd.distinct()# 收集结果并打印
result = distinct_rdd.collect()
print(result) # [1, 2, 3, 4, 5, 6]# 停止 SparkSession
spark.stop()

4.7 sortBy算子

  • sortBy :用于对 RDD 中的元素根据指定的排序依据进行排序,并返回一个新的包含排序结果的 RDD。

  • sortBy 方法的基本语法如下:

    new_rdd = old_rdd.sortBy(lambda x: key_function(x), ascending=True,numPartitions=1)
    
    • old_rdd 是要进行排序的原始 RDD
    • key_function 是一个用于从每个元素中提取排序关键字的函数
    • x 是 RDD 中的每个元素。
    • ascending 参数指定排序顺序,为 True 表示升序,为 False 表示降序。
    • numPartitions (可选参数),用于指定输出 RDD 的分区数量,全局排序需要设置分区数为1

  • 使用 sortBy 方法对包含数字的 RDD 进行升序排序
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("SortByExample").getOrCreate()# 创建一个包含数字的 RDD
data = [5, 2, 8, 1, 3, 7, 4, 6]
rdd = spark.sparkContext.parallelize(data)# 使用 sortBy 方法对 RDD 进行升序排序
sorted_rdd = rdd.sortBy(lambda x: x, ascending=True)# 收集结果并打印
result = sorted_rdd.collect()
print(result) # [1, 2, 3, 4, 5, 6, 7, 8]# 停止 SparkSession
spark.stop()

4.8 综合案例

{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}|{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}|{"id":3,"timestamp":"2019-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2019-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}|{"id":5,"timestamp":"2019-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}|{"id":6,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"杭州","money":"1550"}
{"id":7,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"北京","money":"5611"}|{"id":8,"timestamp":"2019-05-08T03:01.00Z","category":"家电","areaName":"北京","money":"4410"}|{"id":9,"timestamp":"2019-05-08T01:03.00Z","category":"家具","areaName":"郑州","money":"1120"}
{"id":10,"timestamp":"2019-05-08T01:01.00Z","category":"家具","areaName":"北京","money":"6661"}|{"id":11,"timestamp":"2019-05-08T05:03.00Z","category":"家具","areaName":"杭州","money":"1230"}|{"id":12,"timestamp":"2019-05-08T01:01.00Z","category":"书籍","areaName":"北京","money":"5550"}
{"id":13,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"5550"}|{"id":14,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"1261"}|{"id":15,"timestamp":"2019-05-08T03:03.00Z","category":"电脑","areaName":"杭州","money":"6660"}
{"id":16,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"天津","money":"6660"}|{"id":17,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"9000"}|{"id":18,"timestamp":"2019-05-08T05:01.00Z","category":"书籍","areaName":"北京","money":"1230"}
{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}
{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}
{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}
{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}
{"id":27,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"5600"}|{"id":28,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"8000"}|{"id":29,"timestamp":"2019-05-08T02:03.00Z","category":"服饰","areaName":"杭州","money":"7000"}
  • 需求,复制以上内容到文件中,使用Spark读取文件进行计算:
    1. 各个城市销售额排名,从大到小
    2. 全部城市,有哪些商品类别在售卖
    3. 北京市有哪些商品类别在售卖

  • 参考代码:
"""
完成练习案例:JSON商品统计
需求:
1. 各个城市销售额排名,从大到小
2. 全部城市,有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# TODO 需求1: 城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 将一个个JSON字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
# (城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result1_rdd.collect())
# TODO 需求2: 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())
# 2.2 对全部商品类别进行去重
# TODO 需求3: 北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别并进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:", result3_rdd.collect())

五 数据输出

5.1 输出为python对象

5.1.1 collect算子

在这里插入图片描述

  • 功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

  • collect 操作会触发实际的计算操作,因此在处理大量数据时需要谨慎使用,以避免资源耗尽或内存溢出的问题。

  • collect 操作的基本语法

    result = rdd.collect()
    
    • rdd 是要执行 collect 操作的 RDD
    • result 是一个包含所有元素的本地 Python 列表。
  • 使用 collect 操作将 RDD 中的元素收集到本地列表并打印

    from pyspark.sql import SparkSession# 创建 SparkSession
    spark = SparkSession.builder.appName("CollectExample").getOrCreate()# 创建一个包含数字的 RDD
    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)# 使用 collect 操作将 RDD 中的元素收集到本地列表
    result = rdd.collect()# 打印结果
    print(result)# 停止 SparkSession
    spark.stop()
    
  • 请注意,collect 操作将所有数据从分布式计算节点收集到驱动程序节点,因此适用于数据量较小的情况。对于大规模数据集,使用 collect 可能会导致驱动程序节点的内存溢出。如果需要对数据进行分析、汇总或展示,建议使用其他适当的操作,如 takefirstreduce 等,以避免在内存方面的问题。

5.1.2 reduce算子

  • reduce 是一种动作(action)操作,用于对 RDD 中的元素进行累积计算。它可以将一个二元操作应用于 RDD 中的所有元素,从而将元素逐个合并以得到一个单一的结果。

  • reduce 操作是一个逐步的、迭代的过程,它从左到右依次将每个元素与累积值进行二元操作,直到得到最终结果。

  • reduce 操作的基本语法如下:

    result = rdd.reduce(lambda x, y: binary_function(x, y))
    
    • rdd 是要执行 reduce 操作的 RDD
    • binary_function 是一个二元操作的函数,xy 分别是 RDD 中的两个元素
    • reduce 操作将会从左到右依次将每对元素应用二元操作,得到一个最终的结果。
  • 使用 reduce 操作计算 RDD 中所有元素的累加和

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("ReduceExample").getOrCreate()# 创建一个包含数字的 RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)# 使用 reduce 操作计算 RDD 中所有元素的累加和
total_sum = rdd.reduce(lambda x, y: x + y)# 打印结果
print("Total sum:", total_sum)# 停止 SparkSession
spark.stop()

5.1.3 take算子

  • take 是一种动作(action)操作,用于从 RDD 中获取指定数量的元素,并返回一个包含这些元素的本地 Python 列表。与 collect 不同,take 操作仅获取指定数量的元素,而不是整个 RDD 的所有元素。

  • take 操作的基本语法如下:

    result = rdd.take(num_elements)
    
    • rdd 是要执行 take 操作的 RDD
    • num_elements 是要获取的元素数量
    • result 是一个包含获取的元素的本地 Python 列表。

使用 take 操作从 RDD 中获取指定数量的元素

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("TakeExample").getOrCreate()# 创建一个包含数字的 RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)# 使用 take 操作从 RDD 中获取前三个元素
selected_elements = rdd.take(3)# 打印结果
print(selected_elements) # [1, 2, 3]# 停止 SparkSession
spark.stop()
  • 注意:take 操作仅在集群中获取指定数量的元素,而不会将整个 RDD 的所有元素都收集到驱动程序节点上。这使得 take 在需要获取一小部分数据样本时非常有用,而不会耗尽内存。

5.1.4 count算子

  • count 是一种动作(action)操作,用于计算 RDD 中元素的数量。它返回一个整数,表示 RDD 中的元素个数。

  • count 操作的基本语法如下:

    num_elements = rdd.count()
    
    • rdd 是要执行 count 操作的 RDD
    • num_elements 是包含 RDD 中元素数量的整数。
  • 使用 count 操作计算 RDD 中元素的数量

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("CountExample").getOrCreate()# 创建一个包含数字的 RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)# 使用 count 操作计算 RDD 中元素的数量
element_count = rdd.count()# 打印结果
print("Number of elements:", element_count) # Number of elements: 5# 停止 SparkSession
spark.stop()
  • 注意,count 操作会触发对 RDD 的完整扫描,以确定元素的数量。对于大型数据集,可能需要一些时间来计算数量。

5.2 输出到文件

5.2.1 环境准备

  • 这里使用的是Hadoop3.3.5,并使用兼容版本版本的winutils
  • 调用保存文件的算子,需要配置Hadoop依赖
  1. 下载Hadoop安装包,解压到电脑任意位置
    • 可进入Hadoop按照下面的步骤进行操作,也可直接点击连接进行跳转
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述

在这里插入图片描述

  1. 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’
    感谢cdarlint的贡献
  2. 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
    在这里插入图片描述
    在这里插入图片描述
  3. 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
    • 下载方法:点击文件,跳转页面,点击右侧下载按钮即可
      在这里插入图片描述
      在这里插入图片描述

5.2.2 saveAsTextFile算子

  • saveAsTextFile 是一种动作(action)操作,用于将 RDD 中的内容保存到文本文件中。

  • saveAsTextFile 操作的基本语法如下:

    rdd.saveAsTextFile(output_path)
    
    • rdd 是要保存的 RDD
    • output_path 是指定的输出目录路径,用于存储 RDD 的内容。当执行 saveAsTextFile 操作时,PySpark 将会将 RDD 中的每个元素转换为字符串,并将这些字符串逐行写入到文本文件中。

  • 修改rdd分区为1个

    • 方式1,SparkConf对象设置属性全局并行度为1:
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    conf.set("spark.default.parallelism","1")
    sc = SparkContext(conf=conf)
    
    • 方式2,创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)
  • 使用 saveAsTextFile 操作将 RDD 中的元素保存到文本文件
    • 注意,saveAsTextFile 操作会自动分区数据并将数据保存到多个文件中,每个分区一个文件。一般默认分区数和cpu核心数相同
    from pyspark import SparkConf, SparkContext
    import os
    import json
    os.environ['PYSPARK_PYTHON'] = 'C:/environment/Python3.11.4/python.exe'
    os.environ['HADOOP_HOME'] = "C:/environment/hadoop-3.3.5"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    # conf.set("spark.default.parallelism","1")
    sc = SparkContext(conf=conf)# 准备RDD1 numSlices设置分区为1
    rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)# 准备RDD2 numSlices设置分区为1
    rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)# 准备RDD3 numSlices设置分区为1
    rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)# 输出到文件中
    rdd1.saveAsTextFile("C:/output1")
    rdd2.saveAsTextFile("C:/output2")
    rdd3.saveAsTextFile("C:/output3")
    

在这里插入图片描述

5.2.3 coalesce/repartition调整分区

  • saveAsTextFile 操作默认会将数据分区并保存到多个文件中,每个分区一个文件,以便更好地利用分布式存储。如果希望将数据保存为单个文本文件,可以使用 coalescerepartition 操作来调整 RDD 的分区数量,将所有数据合并到一个分区中,然后再使用 saveAsTextFile 操作。

  • 演示:使用 coalescerepartition 的示例,以将数据保存为单个文本文件

    • 使用 coalesce
    from pyspark.sql import SparkSession# 创建 SparkSession
    spark = SparkSession.builder.appName("SaveSingleTextFileExample").getOrCreate()# 创建一个包含数字的 RDD
    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data, numSlices=4)  # 指定初始分区数为 4# 将数据合并到一个分区
    single_partition_rdd = rdd.coalesce(1)# 将合并后的数据保存为单个文本文件
    output_path = "single_text_file"
    single_partition_rdd.saveAsTextFile(output_path)# 停止 SparkSession
    spark.stop()
    
    • 使用 repartition
    from pyspark.sql import SparkSession# 创建 SparkSession
    spark = SparkSession.builder.appName("SaveSingleTextFileExample").getOrCreate()# 创建一个包含数字的 RDD
    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data, numSlices=4)  # 指定初始分区数为 4# 将数据重新分区为一个分区
    single_partition_rdd = rdd.repartition(1)# 将重新分区后的数据保存为单个文本文件
    output_path = "single_text_file"
    single_partition_rdd.saveAsTextFile(output_path)# 停止 SparkSession
    spark.stop()
    

六 综合案例:搜索引擎日志分析

6.1 需求

在这里插入图片描述
读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度)Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件

6.2 参考代码

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'C:/environment/Python3.11.4/python.exe'
os.environ['HADOOP_HOME'] = "C:/environment/hadoop-3.3.5"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("C:/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: x.split("\t")[0][:2])\.map(lambda x:(x, 1))\.reduceByKey(lambda a, b: a + b)\.sortBy(lambda x: x[1], ascending=False, numPartitions=1)\.take(3)
print("需求1的结果:", result1)# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求2的结果:", result2)# TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\filter(lambda x: x[2] == '黑马程序员').\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(1)
print("需求3的结果:", result3)# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\saveAsTextFile("C:/output_json")

6.3 输出结果

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/14 13:18:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
需求1的结果: [('20', 3479), ('23', 3087), ('21', 2989)]
需求2的结果: [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]
需求3的结果: [('22', 245)]

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/40980.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA的常用设置,让你更快速的编程

一、前言 在使用JetBrains的IntelliJ IDEA进行软件开发时,了解和正确配置一些常用设置是非常重要的。IDEA的强大功能和定制性使得开发过程更加高效和舒适。 在本文中,我们将介绍一些常用的IDEA设置,帮助您更好地利用IDEA进行开发。这些设置包…

Java面向对象——封装以及this关键字

封 装 封装是面向对象编程(OOP)的三大特性之一,它将数据和操作数据的方法组合在一个单元内部,并对外部隐藏其具体实现细节。在Java中,封装是通过类的访问控制修饰符(如 private、protected、public&#x…

Linux MQTT智能家居项目(智能家居界面布局)

文章目录 前言一、创建工程项目二、界面布局准备工作三、正式界面布局总结 前言 一、创建工程项目 1.选择工程名称和项目保存路径 2.选择QWidget 3.添加保存图片的资源文件: 在工程目录下添加Icon文件夹保存图片: 将文件放入目录中: …

网络层协议

网络层协议 IP协议基本概念协议头格式网段划分特殊的IP地址IP地址的数量限制私有IP地址和公网IP地址路由IP协议头格式后续 在复杂的网络环境中确定一个合适的路径 IP协议 承接上文,TCP协议并不会直接将数据传递给对方,而是交付给下一层协议,…

音视频FAQ(三):音画不同步

摘要 本文介绍了音画不同步问题的五个因素:编码和封装阶段、网络传输阶段、播放器中的处理阶段、源内容产生的问题以及转码和编辑。针对这些因素,提出了相应的解决方案,如使用标准化工具、选择强大的传输协议、自适应缓冲等。此外&#xff0…

uniapp微信小程序区分正式版,开发版,体验版

小程序代码区分是正式版,开发版,还是体验版 通常正式和开发环境需要调用不同域名接口,发布时需要手动更换 或者有些东西不想在正式版显示,只在开发版体验版中显示,也需要去手动隐藏 官方没有明确给出判断环境的方法&a…

SciencePub学术 | CCF推荐重点计算机SCIE征稿中

SciencePub学术 刊源推荐: CCF推荐重点计算机SCIE征稿中!信息如下,录满为止: 一、期刊概况: CCF推荐重点SCIE简介 【期刊简介】IF:4.0,JCR2区,中科院3区; 【版面类型】正刊&#…

Swift 基础

工程目录 请点击下面工程名称,跳转到代码的仓库页面,将工程 下载下来 Demo Code 里有详细的注释 点击下载代码:swift-01

记录一下基于jeecg-boot3.0的待办消息移植记录

因为之前没有记录,所以还要看代码进行寻找,比较费劲,所以今天记录一下: 1、后端 SysAnnouncementController 下面函数增加待办的几个显示内容给前端用 具体代码如下: /*** 功能:补充用户数据&#xff0c…

由小波变换模极大值重建信号

给定信号, 令小波变换的尺度 则x(t)的二进小波变换为 令为取模极大值时的横坐标,那么就是模极大值。 目标是由坐标、模极大值及最后一级的低频分量重建信号x(t) 为了重建x(t),假定有一信号集合h(t),该集合中信号的小波变换和x(…

打印出二进制的奇数位和偶数位

void print(int a) {int i0;printf("奇数位:");for(i30;i>0;i-2){printf("%d ",(a>>i)&1);}printf("\n");printf("偶数位:");for(i31;i>1;i-2){printf("%d ",(a>>i)&1);} …

Linux MQTT智能家居(温度,湿度,环境监测,摄像头等界面布局设置)

文章目录 前言一、温度湿度曲线布局二、环境监测界面布局三、摄像头界面布局总结 前言 本篇文章来完成另外三个界面的布局设置。 这里会使用到 feiyangqingyun的一些控件库。 一、温度湿度曲线布局 TempHumtiy.h: #ifndef TEMPHUMTIY_H #define TEMPHUMTIY_H#include <…

使用Python批量将Word文件转为PDF文件

说明&#xff1a;在使用Minio服务器时&#xff0c;无法对word文件预览&#xff0c;如果有需要的话&#xff0c;可以将word文件转为pdf文件&#xff0c;再存储到Minio中&#xff0c;本文介绍如何批量将word文件&#xff0c;转为pdf格式的文件&#xff1b; 安装库 首先&#xff…

Python系统学习1-9-类一之类语法

一、类之初印象 1、类就是空表格&#xff0c;将变量&#xff08;列名&#xff09;和函数&#xff08;行为&#xff09;结合起来 2、创建对象&#xff0c;表达具体行 3、创建类就是创建数据的模板 --操作数据时有提示 --还能再组合数据的行为 --结构更加清晰 4、类的内存分配…

vue项目根据word模版导出word文件

一、安装依赖 //1、docxtemplaternpm install docxtemplater pizzip -S//2、jszip-utilsnpm install jszip-utils -S//3、pizzipnpm install pizzip -S//4、FileSaver npm install file-saver --save二、创建word模版 也就是编辑一个word文档&#xff0c;文档中需要动态取值的…

【JAVA】数组练习

⭐ 作者&#xff1a;小胡_不糊涂 &#x1f331; 作者主页&#xff1a;小胡_不糊涂的个人主页 &#x1f4c0; 收录专栏&#xff1a;浅谈Java &#x1f496; 持续更文&#xff0c;关注博主少走弯路&#xff0c;谢谢大家支持 &#x1f496; 数组练习 1. 数组转字符串2. 数组拷贝3.…

在vue中使用swiper轮播图(搭配watch和$nextTick())

在组件中使用轮播图展示图片信息&#xff1a; 1.下载swiper,5版本为稳定版本 cnpm install swiper5 2.在组件中引入swiper包和对应样式&#xff0c;若多组件使用swiper&#xff0c;可以把swiper引入到main.js入口文件中&#xff1a; import swiper/css/swiper.css //引入swipe…

SpringBoot系列---【SpringBoot在多个profiles环境中自由切换】

SpringBoot在多个profiles环境中自由切换 1.在resource目录下新建dev&#xff0c;prod两个目录&#xff0c;并分别把dev环境的配置文件和prod环境的配置文件放到对应目录下&#xff0c;可以在配置文件中指定激活的配置文件&#xff0c;也可以默认不指定。 2.在pom.xml中最后位置…

07微服务的事务管理机制

一句话导读 在单体应用程序中&#xff0c;事务通常是在单个数据库或单个操作系统中管理的&#xff0c;而在微服务架构中&#xff0c;事务需要跨越多个服务和数据库&#xff0c;这就使得事务管理变得更加复杂和困难。 目录 一句话导读 一、微服务事务管理的定义和意义 二、微…

Layui列表表头去掉复选框改为选择

效果&#xff1a; 代码&#xff1a; // 表头复选框去掉改为选择 $(".layui-table th[data-field"0"] .layui-table-cell").html("<span>选择</span>");