Spark的数据输入、数据计算、数据输出

PySpark的编程,主要氛围三大步骤:1)数据输入、2)数据处理计算、3)数据输出
1)数据输入:通过SparkContext对象,晚上数据输入
2)数据处理计算:输入数据后得到RDD对象,对RDD对象进行迭代计算
3)数据输出:最终通过RDD对象的成员方法,完成数据输出工作

安装pyspark

pip install pyspark
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark   # 清华大学镜像

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象 PySpark的执行环境入口对象是:类SparkContext的类对象

构建PySpark执行环境入口对象:

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 链式调用的写法# 基于SparkConf类对象创建parkContext类对象
sc = SparkContext(conf=conf)# 打印PySpark的允许版本
print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

RDD: 弹性分布式数据集

1. python数据容器 转RDD对象
通过SpaarkContext对象的parallelize成员方法,将python数据容器转换为PySpark的RDD对象
2. 读取文件数据 转RDD对象
通过SpaarkContext入口对象textFile()方法,来读取文件,来构建出RDD对象

通过PySpark代码加载数据,即数据输入:

from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 通过parallelize方法将python数据容器加载到spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value"})# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())  # [1, 2, 3, 4, 5]
print(rdd2.collect())  # [1, 2, 3, 4, 5]
print(rdd3.collect())  # ['a', 'b', 'c', 'd', 'e', 'f', 'g']
print(rdd4.collect())  # [1, 2, 3, 4, 5]
print(rdd5.collect())  # ['key1', 'key2']# 通过textFile方法,读取文件数据加载到spark内,成为RDD对象
rdd6 = sc.textFile("./test.txt")
print(rdd6.collect())  # ['123456', '123456', '123456']sc.stop()

数据计算:

PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?
依赖,RDD对象内置丰富的 成员方法(算子)

map方法:

对RDD内的元素逐个处理,并返回一个新的RDD;接受一个处理函数,,可用lambda匿名函数快速编写

from pyspark import SparkConf, SparkContext# 添加python解释器路径
# import os
# os.environ['PYSPARK_PYTHON'] = "python.exe"  # python解释器路径# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")# 基于SparkConf类对象创建sparkContext类对象
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# 通过map方法将全部数据都乘以10
# def func(data):
#     return data * 10# rdd2 = rdd.map(func)  # (T) -> U : 表示func函数必须有一个参数和一个返回值
# print(rdd2.collect())  # [10, 20, 30, 40, 50]# 匿名函数
# rdd2 = rdd.map(lambda x: x * 10)
# print(rdd2.collect())# 链式调用
rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd3.collect())  # [15, 25, 35, 45, 55]sc.stop()

flatmap算子:

对rdd执行map操作,然后进行解除嵌套操作

from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])# 需求:将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())  # ['itheima', 'itcast', '666', 'itheima', 'itheima', 'itcast', 'python', 'itheima']

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107923.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MybatisPlus多表关联分页返回结果异常

1. 按照该博客进行多表关联分页查询: https://blog.csdn.net/code_ang/article/details/116448694 2.在实际测试过程中,发现异常,分页返回的结果时而正确,时而错误。 count函数满足预期 count函数不满足预期 只是count了主表的…

竞赛 深度学习YOLOv5车辆颜色识别检测 - python opencv

文章目录 1 前言2 实现效果3 CNN卷积神经网络4 Yolov56 数据集处理及模型训练5 最后 1 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 **基于深度学习YOLOv5车辆颜色识别检测 ** 该项目较为新颖,适合作为竞赛课题方向&#xff0…

C++之基于Winsock2封装UDPServer与UDPClient

文章目录 Socket过程UDPServer.hUDPServer.cppUDPClient.hUDPClient.cppmain.cppCMakeLists.txt测试截图 Socket过程 UDPServer UDPClient UDPServer.h #ifndef UDPSERVER_H_INCLUDED #define UDPSERVER_H_INCLUDED#include <iostream> #include <string> #inclu…

Rust-是否使用Rc<T>

Rust的所有权机制&#xff0c;数据允许通过借用的方式&#xff0c;在函数的上下文中传递数据。如果离开数据作用的有效范围&#xff0c;这个借用就会失效&#xff0c;编译就会报错。这也是我们不会将借用(引用&#xff09;作为函数的返回值的原因。下面的代码编译失败。 fn cr…

SAP-FI模块 处理自动生成会计凭证增强

2、固定资产业务过渡科目摘要增强功能-MIGO ENHANCEMENT 2 ZEHENC_SAPMF05A. "active version * FI 20221215&#xff1a;固定资产业务过渡科目摘要增强功能 WAIT UP TO 1 SECONDS.READ TABLE xbseg WITH KEY hkont 1601990001. IF sy-subrc 0.DATA: lt_bkdf TYPE …

flink教程

文章目录 来自于尚硅谷教程1. Flink概述1.1 特点1.2 与SparkStreaming对比 2. Flink部署2.1 集群角色2.2 部署模式2.3 Standalone运行模式2.3.1 本地会话模式部署2.3.2 应用模式 2.4 YARN运行模式2.4.1 会话模式部署2.4.2 应用模式部署 2.5 历史服务 3. 系统架构3.1 并行度3.2 …

C# 与 C/C++ 的交互

什么是平台调用 (P/Invoke) P/Invoke 是可用于从托管代码访问非托管库中的结构、回调和函数的一种技术。 托管代码与非托管的区别 托管代码和非托管代码的主要区别是内存管理方式和对计算机资源的访问方式。托管代码通常运行在托管环境中&#xff0c;如 mono 或 java 虚拟机等…

【C++】缺省参数与函数重载

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 前言 本篇文章博主将带你学习缺省参数与函数重载&…

.Net Core 6 运行环境手动安装流程

安装.NET Core 6 概述 在开始之前&#xff0c;我们首先需要了解一下整个安装过程的流程。下面的表格将展示安装.NET Core 6的步骤以及每一步需要做的事情。 步骤 动作 说明 1 下载.NET Core 6 SDK 从官方网站下载.NET Core 6 SDK安装包 2 安装.NET Core 6 SDK …

Unnatural Instructions: Tuning Language Models with (Almost) No Human Labor

本文是LLM系列文章&#xff0c;针对《Unnatural Instructions: Tuning Language Models with (Almost) No Human Labor》的翻译。 TOC 摘要 指令调优使预训练的语言模型能够从推理时间的自然语言描述中执行新的任务。这些方法依赖于以众包数据集或用户交互形式进行的大量人工…

win10如何取消文件夹分组

问题描述 最近不知道把哪里碰了&#xff0c;win10文件夹显示的文件都是按照日期分组了&#xff0c;很讨厌。如下图所示 修改方法 1、文件夹空白处-右击 2、分组依据(P)-选择(无)(N) 下面是操作好之后的效果图 结束 -----华丽的分割线&#xff0c;以下是凑字数&#xff0c;大…

超声波清洗机需要注意什么?不能错过的超声波清洗机

超声波清洗机在当今社会已经越来越受到人们的欢迎&#xff0c;它利用超声波的振动来清洁物品表面&#xff0c;能够快速、高效地清除污垢、油脂等。但是&#xff0c;在购买超声波清洗机时&#xff0c;需要注意哪些问题呢&#xff1f;本文将为您介绍购买超声波清洗机需要注意的几…

2023/10/15总结

学习总结 最近开始写项目了&#xff0c;然后写的过程中遇到了跨域问题。 为什么会出现跨域问题 由于浏览器的同源策略限制。同源策略是一种约定&#xff0c;它是浏览器最核心也是最基本的安全功能。如果缺少了同源策略&#xff0c;那么浏览器的正常功能可能都会收到影响。所谓…

gitlab 维护

一 环境信息 二 日常维护 2.1 gitlab mirror 2.1.1 常见方法 社区版本gitab mirror 只能push&#xff0c;默认限制了局域网内mirror 需要修改admin/setting/network(网络)/outbound(出站请求) 勾选允许局域网即可。 2.1.2 疑难问题 内网有三个gitlab A: GITLAB 12 B\C GI…

hash join的基本原理是怎样的?

我们知道数据库里面两表关联主要有三种常见的关联方式&#xff0c;即 nested loop joinhash joinmerge join nested loop join在OLTP交易场景占比是最多的&#xff0c;常用于关联字段为主键或索引字段的情况&#xff0c;通过主键或索引以及loop的方式&#xff0c;A表可以快速…

【特纳斯电子】基于单片机的火灾监测报警系统-仿真设计

视频及资料链接&#xff1a;基于单片机的火灾监测报警系统-仿真设计 - 电子校园网 (mcude.com) 编号&#xff1a; T0152203M-FZ 设计简介&#xff1a; 本设计是基于单片机的火灾监测报警系统&#xff0c;主要实现以下功能&#xff1a; 1.通过OLED显示温度、烟雾、是否有火…

摩尔信使MThings的设备高级参数

摩尔信使MThings支持三级参数管理方案&#xff0c;依次为&#xff1a;数据级、设备级、通道级。 设备级参数不仅包含设备名称、设备地址等常用信息&#xff0c;同时提供了诸多高级参数&#xff0c;其同样是为了满足不同用户应用场景中所面临的差异化需求&#xff0c;以更加灵活…

PostGIS是否有方法能将一个Polygon面切割成若干份小的Polygon面,且每一份的面积差不多大

问题 PostGIS是否有方法能将一个Polygon面切割成若干份小的Polygon面&#xff0c;且每一份的面积差不多大&#xff1f;其实并没有现成的方法&#xff0c;但是通过灵活运用postgis函数可以快速实现这样的功能&#xff0c;总共只要简单的5步就可以了&#xff0c;下文具体说明。二…

【数据结构C/C++】优先(级)队列

文章目录 什么是优先队列&#xff1f;堆排序代码实现408考研各数据结构C/C代码&#xff08;Continually updating&#xff09; 什么是优先队列&#xff1f; 下面的内容来自于百度百科。 如果我们给每个元素都分配一个数字来标记其优先级&#xff0c;不妨设较小的数字具有较高的…

JAVAEE初阶相关内容第十四弹--网络初识

写在前&#xff1a; 这一部分开启网络部分的相关知识&#xff0c;这一弹内容初始网络将主要进行网络相关知识的简单介绍&#xff0c;以及着重介绍协议、协议分层、OSI七层模型、TCP/IP五层模型、封装和分用。 需要认识协议&#xff0c;并知道协议的效果是什么&#xff1b;知道…