Python学习从0到1 day26 第三阶段 Spark ④ 数据输出

半山腰太挤了,你该去山顶看看

                                        —— 24.11.10

一、输出为python对象

1.collect算子

功能:

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

语法:

rdd.collect()

返回值是一个list列表

示例:

from pyspark import SparkConf,SparkContext
import osconf = SparkConf().setMaster("local").setAppName("test_spark")
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
sc = SparkContext(conf = conf)Set = {"小明","小红","小强"}
Tuple = ("小明","小红","小强")set_rdd = sc.parallelize(Set)
tuple_rdd = sc.parallelize(Tuple)print(set_rdd.collect())
print(tuple_rdd.collect())


2.reduce算子

功能:

对RDD数据集按照你传入的逻辑进行聚合

语法:

rdd.reduce(func)rdd = sc.parallelize(range(1 , 10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a , b : a + b))

返回值等同于计算函数的返回值

示例:

from pyspark import SparkContext,SparkConf
import os
import jsonos.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local").setAppName("test_spark")
sc = SparkContext(conf = conf)List = [1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(List)
print(rdd.reduce(lambda x, y : x + y))


3.take算子

功能:

取RDD的前N个元素,组合成list返回

语法:

sc.parallelize([3,2,1,4,5,6]).take(5)    # [3,2,1,4,5]

 返回前n个元素组成的list

示例:

from pyspark import SparkContext,SparkConf
import os
import jsonos.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
List = (1,2,3,4,5,6,7,8,9)
rdd = sc.parallelize(List)
res = rdd.take(4)
print("前四个元素为:"+res)


4.count算子

功能:

计算RDD有多少条数据

语法:

sc.parallelize([3,2,1,4,5,6]).count()

返回值是一个数字

示例:

from pyspark import SparkConf,SparkContext
import os
import jsonos.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)rdd = sc.parallelize(["yyh","hl","grq","zxj","cby","wfe","mrr","qjy"])
print(rdd.count())


二、输出到文件中

1.saveAsTextFile算子

功能:

RDD的数据写入文本文件

支持本地写出、 hdfs等文件系统

语法:

rdd = sc.parallelize([1,2,3,4,5])
rdd.saveAsTextFile("../data/output/test.txt")

2.配置Hadoop相关依赖

调用保存文件的算子,需要配置Hadoop依赖

① 下载Hadoop安装包

http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz

② 解压到电脑任意位置

③ 在Python代码中使用os模块配置:

os.environ['HADOOP HOME']='HADOOP解压文件夹路径'
E:\python.learning\hadoop分布式相关\hadoop-3.0.0

④ 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内

https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe

⑤ 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内

https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll


3.代码示例

from pyspark import SparkConf,SparkContext
import osconf = SparkConf().setMaster("local").setAppName("test_spark")
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
sc = SparkContext(conf = conf)# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])# 准备RDD2
rdd2 = sc.parallelize([("Hello, 3"),("Spark", 5),("Hi", 7)])# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output1/rdd1")
rdd2.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output2/rdd2")
rdd3.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output3/rdd3")

注:如果输出路径的文件存在,代码将会报错


4.运行结果

创建几个文件取决于Hadoop上的分区数量

解决方式:修改rdd的分区


5.修改rdd分区为1个

方式1

Sparkconf对象设置属性全局并行度为1:

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
conf = SparkConf().setMaster("local").setAppName("test_spark")
conf.set("spark.default.parallelize", "1")
sc = SparkContext(conf = conf)# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])# 准备RDD2
rdd2 = sc.parallelize([("Hello, 3"),("Spark", 5),("Hi", 7)])# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output1/rdd1")
rdd2.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output2/rdd2")
rdd3.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output3/rdd3")

方式2

创建RDD的时候设置 parallelize方法传入numSlices参数为1:

rdd1 = sc.parallelize([1,2,3,4,5],1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60508.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DNS解析库

DNS解析库 dnsDNS的解析库以及域名的详解解析库dns解析的端口dns域名的长度限制流程优先级在现实环境中实现内网的dns解析 练习(Ubuntu内网实现DNS解析)主服务器备服务器 dns 域名系统,域名和ip地址互相映射的一个分布式的数据库&#xff0c…

kafka 生产经验——数据积压(消费者如何提高吞吐量)

bit --> byte --> kb -->mb -->gb --> tb --> pb --> eb -> zb -->yb

【记录】公司管理平台部署:容器化部署

前置条件 技能要求 了解Docker基本使用和常用命令。会写Dockerfile文件。会写docker-compose文件环境要求 云服务器,已安装好安装Docker本机 IntelliJ IDEA 2022.1.3配置 配置服务器SSH连接 进入 Settings -> Tools -> SSH Configurations 点击加号创建SSH连接配置 填…

从零开始 blender插件开发

blender 插件开发 文章目录 blender 插件开发环境配置1. 偏好设置中开启相关功能2. 命令行打开运行脚本 API学习专有名词1. bpy.data 从当前打开的blend file中,加载数据。2. bpy.context 可用于获取活动对象、场景、工具设置以及许多其他属性。3. bpy.ops 用户通常…

el-table 行列文字悬浮超出屏幕宽度不换行的问题

修改前的效果 修改后的效果 ui框架 element-plus 在网上找了很多例子都没找到合适的 然后这个东西鼠标挪走就不显示 控制台也不好调试 看了一下El-table的源码 他这个悬浮文字用的el-prpper 包着的 所以直接改 .el-table .el-propper 设置为max-width:1000px 就可以了 吐槽一…

Tcp中的流量控制,拥塞控制,超时重传时间的选择,都附带相应例子说明

端口号的了解 通常进行通信时,发送方使用任意端口,指定接收方为指定端口,因为接收方在接收到后的需要根据发送方指定的接收方端口号,来选择使用哪一个服务进程进行处理。 端口号还可以分类为两个大类: TCP和UDP报文的…

Nextflow最佳实践:如何在云上高效处理大规模数据集

1. Nextflow 软件架构介绍 Nextflow 是一个用于简化数据驱动计算流程的工具,可以在各种计算环境中轻松部署。它采用了分布式计算和容器技术,实现了高度模块化、可重复性和可扩展性。NextFlow 的软件架构主要包括以下几个部分: 用户界面&…

一文看懂ERP、SCM、SRM、WMS、TMS、进销存管理系统

经常有人来私信问我ERP、SCM、SRM、WMS、TMS、进销存管理系统等等,它们听起来都很专业,但到底各自是什么?承担着怎样的角色呢?它们具体都有哪些功能?相互之间又存在怎样的关联,对企业而言又意味着什么呢&am…

深度学习——优化算法、激活函数、归一化、正则化

文章目录 🌺深度学习面试八股汇总🌺优化算法方法梯度下降 (Gradient Descent, GD)动量法 (Momentum)AdaGrad (Adaptive Gradient Algorithm)RMSProp (Root Mean Square Propagation)Adam (Adaptive Moment Estimation)AdamW 优化算法总结 经验和实践建议…

YOLOv11实战宠物狗分类

本文采用YOLOv11作为核心算法框架,结合PyQt5构建用户界面,使用Python3进行开发。YOLOv11以其高效的特征提取能力,在多个图像分类任务中展现出卓越性能。本研究针对5种宠物狗数据集进行训练和优化,该数据集包含丰富的宠物狗图像样本…

星期-时间范围选择器 滑动选择时间 最小粒度 vue3

星期-时间范围选择器 功能介绍属性说明事件说明实现代码使用范例 根据业务需要,实现了一个可选择时间范围的周视图。用户可以通过鼠标拖动来选择时间段,并且可以通过快速选择组件来快速选择特定的时间范围。 如图: 功能介绍 时间范围选择&…

云岚到家 秒杀抢购

目录 秒杀抢购业务特点 常用技术方案 抢券 抢券界面 进行抢券 我的优惠券列表 活动查询 系统设计 活动查询分析 活动查询界面显示了哪些数据? 面向高并发如何提高活动查询性能? 如何保证缓存一致性? 数据流 Redis数据结构设计 如…

JavaWeb常见注解

1.Controller 在 JavaWeb 开发中,Controller是 Spring 框架中的一个注解,主要用于定义控制器类(Controller),是 Spring MVC 模式的核心组件之一。它表示该类是一个 Spring MVC 控制器,用来处理 HTTP 请求并…

光伏储能微电网协调控制器

安科瑞 Acrel-Tu1990 1. 产品介绍 ACCU-100微电网协调控制器是一款专为微电网、分布式发电和储能系统设计的智能协调控制设备。该装置能够兼容包括光伏系统、风力发电、储能系统以及充电桩等多种设备的接入。它通过全天候的数据采集与分析,实时监控光伏、风能、储…

【C++课程学习】:继承:默认成员函数

🎁个人主页:我们的五年 🔍系列专栏:C课程学习 🎉欢迎大家点赞👍评论📝收藏⭐文章 目录 构造函数 🍩默认构造函数(这里指的是编译器生成的构造函数)&#…

泷羽sec学习打卡-Linux基础2

声明 学习视频来自B站UP主 泷羽sec,如涉及侵权马上删除文章 笔记的只是方便各位师傅学习知识,以下网站只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 关于Linux的那些事儿-Base2 一、Linux-Base2linux有哪些目录呢?不同目录下有哪些具体的文件呢…

TCP拥塞控制

TCP拥塞控制(Congestion Control) 什么是拥塞控制? 拥塞控制(Congestion Control)主要针对整个网络中的数据传输速率进行调节,防止过多的数据注入网络中,这样可以使网络中的路由器或链路不致于过载,以避免…

Unity教程(十八)战斗系统 攻击逻辑

Unity开发2D类银河恶魔城游戏学习笔记 Unity教程(零)Unity和VS的使用相关内容 Unity教程(一)开始学习状态机 Unity教程(二)角色移动的实现 Unity教程(三)角色跳跃的实现 Unity教程&…

自动驾驶合集(更新中)

文章目录 车辆模型控制路径规划 车辆模型 车辆模型基础合集 控制 控制合集 路径规划 规划合集

网站架构知识之Ansible进阶(day022)

1.handler触发器 应用场景:一般用于分发配置文件时候,如果配置文件有变化,则重启服务,如果没有变化,则不重启服务 案列01:分发nfs配置文件,若文件发生改变则重启服务 2.when判断 用于给ans运…