【pyspark速成专家】3_Spark之RDD编程1

目录

​编辑

一,创建RDD

二,常用Action操作

三,常用Transformation操作


一,创建RDD

创建RDD主要有两种方式,一个是textFile加载本地或者集群文件系统中的数据,

第二个是用parallelize方法将Driver中的数据结构并行化成RDD。

#从本地文件系统中加载数据
file = "./data/hello.txt"
rdd = sc.textFile(file,3)
rdd.collect()['hello world','hello spark','spark love jupyter','spark love pandas','spark love sql']#从集群文件系统中加载数据
#file = "hdfs://localhost:9000/user/hadoop/data.txt"
#也可以省去hdfs://localhost:9000
#rdd = sc.textFile(file,3)#parallelize将Driver中的数据结构生成RDD,第二个参数指定分区数
rdd = sc.parallelize(range(1,11),2)
rdd.collect()[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

二,常用Action操作

Action操作将触发基于RDD依赖关系的计算。

collect

rdd = sc.parallelize(range(10),5) 
#collect操作将数据汇集到Driver,数据过大时有超内存风险
all_data = rdd.collect()
all_data[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

take

#take操作将前若干个数据汇集到Driver,相比collect安全
rdd = sc.parallelize(range(10),5) 
part_data = rdd.take(4)
part_data[0, 1, 2, 3]

takeSample

#takeSample可以随机取若干个到Driver,第一个参数设置是否放回抽样
rdd = sc.parallelize(range(10),5) 
sample_data = rdd.takeSample(False,10,0)
sample_data[7, 8, 1, 5, 3, 4, 2, 0, 9, 6]

first

#first取第一个数据
rdd = sc.parallelize(range(10),5) 
first_data = rdd.first()
print(first_data)0

count

#count查看RDD元素数量
rdd = sc.parallelize(range(10),5)
data_count = rdd.count()
print(data_count)10

reduce

#reduce利用二元函数对数据进行规约
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)45

foreach

#foreach对每一个元素执行某种操作,不生成新的RDD
#累加器用法详见共享变量
rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)45

countByKey

#countByKey对Pair RDD按key统计数量
pairRdd = sc.parallelize([(1,1),(1,4),(3,9),(2,16)]) 
pairRdd.countByKey()defaultdict(int, {1: 2, 3: 1, 2: 1})

saveAsTextFile

#saveAsTextFile保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()['2', '3', '4', '1', '0']

三,常用Transformation操作

Transformation转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。

map

#map操作对每个元素进行一个映射转换
rdd = sc.parallelize(range(10),3)
rdd.collect()[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]rdd.map(lambda x:x**2).collect()[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

filter

#filter应用过滤条件过滤掉一些数据
rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()[6, 7, 8, 9]

flatMap

#flatMap操作执行将每个元素生成一个Array后压平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()[['hello', 'world'], ['hello', 'China']]rdd.flatMap(lambda x:x.split(" ")).collect()['hello', 'world', 'hello', 'China']

sample

#sample对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
rdd = sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()[1, 4, 9]

distinct

#distinct去重
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()[4, 1, 5, 2, 3]

subtract

#subtract找到属于前一个rdd而不属于后一个rdd的元素
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.subtract(b).collect()[0, 1, 2, 3, 4]

union

#union合并数据
a = sc.parallelize(range(5))
b = sc.parallelize(range(3,8))
a.union(b).collect()[0, 1, 2, 3, 4, 3, 4, 5, 6, 7]

intersection

#intersection求交集
a = sc.parallelize(range(1,6))
b = sc.parallelize(range(3,9))
a.intersection(b).collect()[3, 4, 5]

cartesian

#cartesian笛卡尔积
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()[('LiLei', 'HanMeiMei'),('LiLei', 'Lily'),('Tom', 'HanMeiMei'),('Tom', 'Lily')]

sortBy

#按照某种方式进行排序
#指定按照第3个元素大小进行排序
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()[(4, 1, 1), (3, 2, 2), (1, 2, 3)]

zip

#按照拉链方式连接两个RDD,效果类似python的zip函数
#需要两个RDD具有相同的分区,每个分区元素数量相同rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]

zipWithIndex

#将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/14486.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

fortran77 初始化矩阵 打印矩阵 模版 备拷

1,源码 SUBROUTINE INIT_MATRIX(A, m, n, lda)DOUBLE PRECISION A(*)CALL SRAND(2024)DO i1, mDO j1, nA(i lda*(j-1)) RAND() RAND() C WRITE(*, (F8.4)) A(i)END DOEND DOENDSUBROUTINE PRINT_MATRIX(A, m, n, lda)DOUBLE PREC…

【Vue3】封装axios请求(cli和vite)

原文作者:我辈李想 版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。 Vue 【Vue3】env环境变量的配置和使用(区分cli和vite) 文章目录 Vue前言一、常见用法二、vue3cli封装接口1..env配置2..dev(开…

ADC协议详解

文章目录 简介工作流程原理图时序图 优点与缺点 简介 模数转换器(ADC,Analog-to-Digital Converter)是一种将模拟信号转换为数字信号的电子设备。模拟信号通常表示物理测量的连续变化,如声音、温度、压力等,而数字信号…

codewars check_same_case 题解

题目 编写一个函数来检查两个给定的字符是否大小写相同。 如果任何字符不是字母,则返回-1如果两个字符大小写相同,则返回1如果两个字符都是字母且大小写不同,则返回0 例子 a并g返回1A并C返回1b并G返回0B并g返回00并?返回-1题解 1 此题主…

AI大模型与产品策略:产品经理的致胜之道

随着AI大模型的快速进化,其生态的构建,已经从C端过度到了B端。 作为产品经理,我们应该及时响应大趋势,在产品策略上融入AI大模型模块,深度挖掘AI大模型的应用价值,这才是作为PM在现阶段最有价值的地方。 …

想学接口测试,不知道那个工具适合?

引言: 接口测试在软件开发中扮演着至关重要的角色,它可以帮助我们验证系统的功能、性能和安全性。而选择适合的工具是进行接口测试的重要一步。本文将从零开始,为你详细介绍如何选择合适的工具,并提供规范的指导。 一、了解接口…

初识C语言——第二十八天

代码练习1&#xff1a; 用函数的方式实现9*9乘法表 void print_table(int n) {int i 0;int j 0;for (i 1; i< n; i){for (j 1; j< i; j){printf("%d*%d%-3d ", i, j, i * j);}printf("\n");}}int main() {int n 0;scanf("%d", &a…

汉明码(海明码)的计算的规则

一.汉明码的由来 1.汉明码&#xff08;Hamming Code&#xff09;&#xff0c;是在电信领域的一种线性调试码&#xff0c;以发明者理查德卫斯里汉明的名字命名。汉明码在传输的消息流中插入验证码&#xff0c;当计算机存储或移动数据时&#xff0c;可能会产生数据位错误&#x…

【VUE】 如何关闭ESlint的自动修复功能

问题描述例如&#xff1a;原书写代码ESLint自动修复报错如下 方案一、在文件中添加屏蔽警告的代码html代码中JavaScript代码中 方案二、关闭ESLint的自动修复功能1、VSCode 扩展找到 ESLint 插件2、在设置中找到在 settings,json 中编辑3、将"autoFix": true改为&quo…

4.双指针+递归

一、双指针编程技巧 方法参数传递数组 将数组通过方法参数传递&#xff0c;方法操作的数组和main方法中的数组指向同一块内存区域&#xff0c;意味着方法操作数组&#xff0c;同时会引起main方法中数组的改变以引用的方式作为方法参数进行传递的 元素交换 定义临时变量temp&a…

第十二节 SpringBoot Starter 系列结束语

感谢阅读&#xff0c;到这里&#xff0c;本系列课程就结束了。 一、为什么选择 SpringBoot Starter SpringBoot 近年来已经成为 Java 应用的必备框架&#xff1b; 而 SpringBoot starter 模式已经成为各大中间件集成到 SpringBoot 应用的首选方式&#xff0c;通过引入 xxx-st…

C++ | Leetcode C++题解之第101题对称二叉树

题目&#xff1a; 题解&#xff1a; class Solution { public:bool check(TreeNode *u, TreeNode *v) {queue <TreeNode*> q;q.push(u); q.push(v);while (!q.empty()) {u q.front(); q.pop();v q.front(); q.pop();if (!u && !v) continue;if ((!u || !v) ||…

爬虫基础1

一、爬虫的基本概念 1.什么是爬虫&#xff1f; 请求网站并提取数据的自动化程序 2.爬虫的分类 2.1 通用爬虫&#xff08;大而全&#xff09; 功能强大&#xff0c;采集面广&#xff0c;通常用于搜索引擎&#xff1a;百度&#xff0c;360&#xff0c;谷歌 2.2 聚焦爬虫&#x…

Android App启动流程和源码详解

前言 之前看了些App启动流程的文章&#xff0c;但是看得很浅显&#xff0c;隔了没多久就忘了&#xff0c;自己抓耳挠腮的终于看完了&#xff0c;看得头疼哦。因为很多是个人理解&#xff0c;大哥们主打一个7分信&#xff0c;2分思考&#xff0c;1分怀疑哈。 主要看的源码是An…

pytorch-20_1 LSTM在股价数据集上的预测实战

LSTM在股价数据集上的预测实战 使用完整的JPX赛题数据&#xff0c;并向大家提供完整的lstm流程。 导包 import numpy as np #数据处理 import pandas as pd #数据处理 import matplotlib as mlp import matplotlib.pyplot as plt #绘图 from sklearn.preprocessing import M…

人类交互4 感觉输入和运动输出

人类感觉系统概述 人类感觉系统是由多个感觉器官和神经系统组成&#xff0c;负责感知外部世界的各种刺激和信息。人类感觉系统包括以下几个主要部分&#xff1a; 视觉系统&#xff1a;视觉系统由眼睛、视神经和大脑视觉皮层组成&#xff0c;负责感知光线、颜色和形状&#xff…

datasheet芯片数据手册—新手入门学习(二)【8-18】

参考芯片手册已经上传&#xff0c;可自行下载 因为芯片参考手册内容比较多&#xff0c;故再一次介绍本文内容主要讲解章节。 目录 8、内容介绍 命令真值表 9、Command Definitions 10、READ Operations &#xff08;1&#xff09;页面读取操作 &#xff08;2&#xff…

YTM32的flash应用答疑-详解写保护功能

YTM32的flash应用答疑-详解写保护功能 文章目录 YTM32的flash应用答疑-详解写保护功能IntroductionPrincipleOperation & DemonstrationDemo #1 验证基本的写保护功能Demo #2 编程CUS_NVR设定EFM_ADDR_PROT初值Demo #3 启用写保护后试试块擦除操作 Conclusion Introduction…

报名倒计时两周|2024 OpenTiny 开源之夏项目直播解读回顾

5月16日&#xff0c;OpenTiny 开源社区成功举办了以《OpenTiny 开源之夏项目解读直播》为主题的直播活动。此次直播中&#xff0c;华为云的高级前端工程师曾令卡、华为云的高级前端工程师伍其和与10位开源之夏技术专家携手组成项目导师团&#xff0c;面向广大开发者一同深入探讨…

Java类和对象(五)—— 抽象类、接口、Object类和内部类

抽象类 在继承体系下&#xff0c;父类有些方法可能是要被重写的&#xff0c;如果我们事先就知道某些方法需要重写的话&#xff0c;我们可以不用在父类里面具体实现这个方法&#xff0c;这时候我们会用到抽象方法&#xff0c;这时候我们会用到关键字abstract关键字来修饰 publ…