【Python-Spark(大规模数据)】

Python-Spark(大规模数据)

  • ■ Spark
  • ■ PySparl编程模型
  • ■ 基础准备
  • ■ 数据输入
  • ■ RDD的map成员方法的使用
  • ■ RDD的flatMap成员方法的使用
  • ■ RDD的reduceByKey成员方法的使用
  • ■ 单词计数统计
  • ■ RDD的filter成员方法的使用
  • ■ RDD的distinct成员方法的使用
  • ■ RDD的sortBy成员方法的使用
  • ■ 案例:JSON商品统计
  • ■ 将RDD输出为Python对象
  • ■ 将RDD输出到文件中
  • ■ PySpark综合案例
  • ■ PySpark综合案例

■ Spark

Apache Spark 是用于大规模数据处理的统一分析引擎。
PySpark是由Spark官方开发的Python语言第三方库。

■ PySparl编程模型

  • 通过SparkContext对象,完成数据输入
  • 输入数据后得到RDD对象,对RDD对象进行迭代计算
  • 最终通过RDD对象的成员方法,完成数据输出工作
    在这里插入图片描述

■ 基础准备

# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

■ 数据输入

"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
# rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# rdd2 = sc.parallelize((1, 2, 3, 4, 5))
# rdd3 = sc.parallelize("abcdefg")
# rdd4 = sc.parallelize({1, 2, 3, 4, 5})
# rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
#
# # 如果要查看RDD里面有什么内容,需要用collect()方法
# print(rdd1.collect())
# print(rdd2.collect())
# print(rdd3.collect())
# print(rdd4.collect())
# print(rdd5.collect())# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("D:/hello.txt")
print(rdd.collect())
rdd.map()
sc.stop()

■ RDD的map成员方法的使用

"""
演示RDD的map成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
# def func(data):
#     return data * 10rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)print(rdd2.collect())
# (T) -> U
# (T) -> T# 链式调用

■ RDD的flatMap成员方法的使用

"""
演示RDD的flatMap成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])# 需求,将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

■ RDD的reduceByKey成员方法的使用

"""
演示RDD的reduceByKey成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())

■ 单词计数统计

"""
完成练习案例:单词计数统计
"""# 1. 构建执行环境入口对象
from pyspark import SparkContext, SparkConf
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2. 读取数据文件
rdd = sc.textFile("D:/hello.txt")
# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4. 将所有单词都转换成二元元组,单词为Key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 5. 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
print(result_rdd.collect())

■ RDD的filter成员方法的使用

"""
演示RDD的filter成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对RDD的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)print(rdd2.collect())

■ RDD的distinct成员方法的使用

"""
演示RDD的distinct成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()print(rdd2.collect())

■ RDD的sortBy成员方法的使用

"""
演示RDD的sortBy成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 1. 读取数据文件
rdd = sc.textFile("D:/hello.txt")
# 2. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 3. 将所有单词都转换成二元元组,单词为Key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 4. 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 5. 对结果进行排序
final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(final_rdd.collect())

■ 案例:JSON商品统计

"""
完成练习案例:JSON商品统计
需求:
1. 各个城市销售额排名,从大到小
2. 全部城市,有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# TODO 需求1: 城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 将一个个JSON字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
# (城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result1_rdd.collect())
# TODO 需求2: 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())
# 2.2 对全部商品类别进行去重
# TODO 需求3: 北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:", result3_rdd.collect())
# 3.3 进行商品类别去重

■ 将RDD输出为Python对象

"""
演示将RDD输出为Python对象
"""from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")sc.stop()

■ 将RDD输出到文件中

"""
演示将RDD输出到文件中
"""from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

■ PySpark综合案例

"""
演示PySpark综合案例
"""from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("D:/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求1的结果:", result1)# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求2的结果:", result2)# TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\filter(lambda x: x[2] == '黑马程序员').\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(1)
print("需求3的结果:", result3)# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\saveAsTextFile("D:/output_json")

■ PySpark综合案例

"""
演示PySpark综合案例
"""from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python'
os.environ['HADOOP_HOME'] = "/export/server/hadoop-3.3.1"
conf = SparkConf().setAppName("spark_cluster")
conf.set("spark.default.parallelism", "24")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("hdfs://m1:8020/data/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求1的结果:", result1)# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求2的结果:", result2)# TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\filter(lambda x: x[2] == '黑马程序员').\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(1)
print("需求3的结果:", result3)# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\saveAsTextFile("hdfs://m1:8020/output/output_json")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/3658.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LANGUAGE-DRIVEN SEMANTIC SEGMENTATION

环境不易满足,不建议复现

详解js中的console对象

对于前端开发而言,console对象大家肯定都很熟悉,最常用的 console.log() 是开发调试必用的 但是对于console对象的其他方法,相对而言使用的就比较少了。下面详细介绍一下: 谷歌浏览器输出console对象: 值得一提的是不…

JAVA MQTT 发布主题请求,订阅主题接收,订阅主题回复,发布主题再接收回复,三步走

先看效果 一、准备工作 1.官网下载emqx压缩包放到自己的盘符下,不要带中文路径 下载 EMQX 2.在路径的bin中,cmd,启动emqx服务 emqx start 3.访问服务,能打开就证明启动成功,登录的话官网默认的密码账号(…

【C#】Stopwatch计时器

使用Stopwatch检查C#中代码块的执行时间,比如歌曲,图片的下载时间问题 首先,我们可看到Stopwatch 类内部的函数。 根据需求,我们具体可使用到 Start() 开始计时,Stop() 停止计时等 //创建 Stopwatch 实例 Stopwatch …

STM32单片机C语言模块化编程实战:LED控制详解与示例

一、开发环境 硬件:正点原子探索者 V3 STM32F407 开发板 单片机:STM32F407ZGT6 Keil版本:5.32 STM32CubeMX版本:6.9.2 STM32Cube MCU Packges版本:STM32F4 V1.27.1 之前介绍了很多关于点灯的方法,比如…

ARM DMIPS算力说明

ARM DMIPS算力说明 ARM算力参考官网地址 https://en.wikipedia.org/wiki/List_of_ARM_processors Product familyARM architectureProcessorFeatureCache (I / D), MMUTypical MIPS MHzReferenceARM1ARMv1ARM1First implementationNoneARM2ARMv2ARM2ARMv2 added the MUL (mu…

【SSM进阶学习系列丨整合篇】Spring+SpringMVC+MyBatis 框架配置详解

文章目录 一、环境准备1.1、创建数据库和表1.2、导入框架依赖的jar包1.3、修改Maven的编译版本1.4、完善Maven目录1.5、编写项目需要的包1.6、编写实体、Mapper、Service 二、配置MyBatis环境2.1、配置mybatis的主配置文件2.2、编写映射文件2.3、测试环境是否正确 三、配置Spri…

el-table 三角形提示

<template><div><el-table :data"tableData" style"width: 100%"><el-table-column prop"ddd" label"日期2" width"150" /><el-table-column prop"ddd" label"日期2" width…

[C++][算法基础]分组背包问题(动态规划)

有 &#x1d441; 组物品和一个容量是 &#x1d449; 的背包。 每组物品有若干个&#xff0c;同一组内的物品最多只能选一个。 每件物品的体积是 &#xff0c;价值是 &#xff0c;其中 &#x1d456; 是组号&#xff0c;&#x1d457; 是组内编号。 求解将哪些物品装入背包&a…

AI大模型探索之路-训练篇1:大语言模型微调基础认知

文章目录 前言一、微调技术概述二、微调的必要性三、大模型的微调方法四、微调过程中的技术细节五、微调后的模型评估与应用总结 前言 在人工智能的广阔研究领域内&#xff0c;大型预训练语言模型&#xff08;Large Language Models, LLMs&#xff09;已经成为推动技术革新的关…

一、路由基础

1.路由协议的优先级 路由器分别定义了外部优先级和内部优先级&#xff08;越小越优&#xff09; 路由选择顺序&#xff1a;外部优先级>>内部优先级&#xff08;相同时&#xff09; ①外部优先级&#xff1a;用户可以手工为各路由协议配置的优先级 ②内部优先级&#xf…

OmniPlan Pro for Mac v4.8.0中文激活版 项目流程管理工具

OmniPlan Pro for Mac是一款功能强大的项目管理软件&#xff0c;它以其直观的用户界面和丰富的功能&#xff0c;帮助用户轻松管理各种复杂的项目。 OmniPlan Pro for Mac v4.8.0中文激活版 通过OmniPlan Pro&#xff0c;用户可以轻松创建任务&#xff0c;设置任务的开始和结束时…

Pulsar【部署 02】Pulsar可视化工具Manager安装使用

Pulsar Manager 是一个基于 web 的 GUI 管理和监视工具&#xff0c;可帮助管理员和用户管理和监视租户、命名空间、主题、订阅、代理、集群等&#xff0c;并支持对多个环境进行动态配置。 可视化工具Manager安装使用 1.Docker1.1 拉取镜像并启动1.2 设置用户名密码1.3 登录并添…

openstack界面简单修改

openstack Ubuntu主题登录界面修改修改登陆界面背景登录框边缘添加透明效果修改登录界面logo更换站点图片更换项目logo图片 本实验基于VMware17&#xff0c;使用Ubuntu2310搭建openstack-B版 Ubuntu主题 以下配置只对Ubuntu主题生效 登录界面修改 原界面 关闭登录界面域名输…

LTD271次升级 | 网站/小程序可设访问IP的黑白名单 • 官微中心支持PDF等办公文件预览与并分享 • 订单退款显示更详尽明细

1、新增IP访问限制功能&#xff1b; 2、订单新增交易号显示与退款明细显示&#xff1b; 3、自定义地址增加四级地区&#xff1b; 4、Android版App优化文件功能&#xff1b; 5、已知问题修复与优化&#xff1b; 01 官微中心 1) 新增IP限制访问功能 允许或者禁止某些 IP 或…

信创需求激增,国产服务器操作系统赋能数字化转型

信创&#xff0c;即信息技术应用创新&#xff0c;是指在关键领域和环节推进信息技术的自主创新&#xff0c;构建安全可控的信息技术体系。随着数字化转型的加速&#xff0c;信创需求激增&#xff0c;国产服务器操作系统在其中扮演着至关重要的角色。国产服务器操作系统如何赋能…

WPF —— lCommand命令实例

首先在标签页面设置一个Button按钮 <Button Width"100" Height"40" Content"测试" ></Button> 1 创建一个类 继承于ICommand这个接口&#xff0c; 这个接口一般包含三部分&#xff1a; 俩个方法&#xff1a;一个判断指令是不是…

【树莓派】yolov5 Lite,目标检测,树莓派4B,推理v5lite-e_end2end.onnx,摄像头实时目标检测

文章目录 YOLOv5 Lite: 在树莓派上轻松运行目标检测1. 环境配置2. 克隆项目3. 安装依赖项4. 下载模型权重5. 理解end2end的含义6. 示例推理7. 文件介绍8. 把文件弄到树莓派4B执行9. 进一步尝试fp16的onnx&#xff08;行不通&#xff09;10. 视频流检测 这里有大概的环境配置&am…

淘宝图片搜索API接口:技术原理、使用方法与最佳实践指南

淘宝图片搜索API接口技术详解 在数字化时代&#xff0c;图片搜索已经成为一种高效、直观的信息检索方式。淘宝作为国内最大的电商平台&#xff0c;其图片搜索API接口对于提高购物体验和商家运营效率具有重要意义。本文将详细解析淘宝图片搜索API接口的技术原理、使用方法和注意…

WordPress安装报错常见问题

WordPress安装过程很简单&#xff0c;不过还是有些朋友会碰到安装WordPress出错的情况。前不久我们遇到Hostease的客户在安装wordpress的时候遇到安装wordpress出错。显示数据连接错误。 数据库连接失败 数据库连接失败是最常见的错误情况。 添加图片注释&#xff0c;不超过 …