Spark算子 - Python

第1关:Transformation - map

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local","Simple App")# 2.创建一个1到5的列表ListList = [1,2,3,4,5]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:需求:偶数转换成该数的平方奇数转换成该数的立方"""# 5.使用 map 算子完成以上需求rdd_map = rdd.map(lambda x:(x*x if (x%2==0) else x*x*x))# 6.使用rdd.collect() 收集完成 map 转换的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

第2关:Transformation - mapPartitions

# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list = []for x in iterator:list.append((x,len(x)))return list
#********** End **********#if __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local","Simple App")# 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表Listdata = ["dog", "salmon", "salmon", "rat", "elephant"]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:需求:将字符串与该字符串的长度组合成一个元组,例如:dog  -->  (dog,3)salmon   -->  (salmon,6)"""# 5.使用 mapPartitions 算子完成以上需求partitions = rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

第3关:Transformation - filter

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local","Simple App")# 2.创建一个1到8的列表Listdata = [1,2,3,4,5,6,7,8]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:需求:过滤掉rdd中的奇数"""# 5.使用 filter 算子完成以上需求rdd_filter = rdd.filter(lambda x:x%2==0)# 6.使用rdd.collect() 收集完成 filter 转换的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

第4关:Transformation - flatMap

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local","Simple App")# 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listdata = [[1,2,3],[4,5,6],[7,8,9]]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:需求:合并RDD的元素,例如:([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)"""# 5.使用 filter 算子完成以上需求flat_map = rdd.flatMap(lambda x:x)# 6.使用rdd.collect() 收集完成 filter 转换的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

第5关:Transformation - distinct

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local","Simple App")# 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表Listdata = [1,2,3,4,5,6,5,4,3,2,1]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:需求:元素去重,例如:1,2,3,3,2,1  --> 1,2,31,1,1,1,     --> 1"""# 5.使用 distinct 算子完成以上需求a = rdd.distinct()# 6.使用rdd.collect() 收集完成 distinct 转换的元素print(a.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

第6关:Transformation - sortBy

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表Listdata = [1,3,5,7,9,8,6,4,2]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:需求:元素排序,例如:5,4,3,1,2  --> 1,2,3,4,5"""# 5.使用 sortBy 算子完成以上需求a = rdd.sortBy(lambda x:x)# 6.使用rdd.collect() 收集完成 sortBy 转换的元素print(a.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

第7关:Transformation - sortByKey

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表Listdata = [("B",1),("A",2),("C",3)]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:需求:元素排序,例如:[(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]"""# 5.使用 sortByKey 算子完成以上需求a = rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素print(a.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

第8关:Transformation - mapValues

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表Listdata = [("1",1),("2",2),("3",3),("4",4),("5",5)]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:需求:元素(key,value)的value进行以下操作:偶数转换成该数的平方奇数转换成该数的立方"""# 5.使用 mapValues 算子完成以上需求a = rdd.mapValues(lambda x:x*x if x%2==0 else x*x*x)# 6.使用rdd.collect() 收集完成 mapValues 转换的元素print(a.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

第9关:Transformations - reduceByKey

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表Listdata = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:需求:元素(key-value)的value累加操作,例如:(1,1),(1,1),(1,2)  --> (1,4)(1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)"""# 5.使用 reduceByKey 算子完成以上需求a = rdd.reduceByKey(lambda x,y:x+y)# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素print(a.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

第10关:Actions - 常用算子

# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local","Simple App")# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表Listdata = [1, 3, 5, 7, 9, 8, 6, 4, 2]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.收集rdd的所有元素并print输出print(rdd.collect())# 5.统计rdd的元素个数并print输出print(rdd.count())# 6.获取rdd的第一个元素并print输出print(rdd.first())# 7.获取rdd的前3个元素并print输出print(rdd.take(3))# 8.聚合rdd的所有元素并print输出print(rdd.reduce(lambda x,y:x+y))# 9.停止 SparkContextsc.stop()# ********** End **********#

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/147630.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++23中的新功能之十九继承的CTAD

一、继承 在c编程中,一定是脱离不开继承的。而继承中有很多小的细节需要关注,比如如何显示的使用父类的构造函数?如何通过构造函数来推导模板参数等等。这些小的细节,其实都是在实际应用中对一些相关技术的不断完善。 二、继承的…

微服务实战系列之Nacos

导语 欢迎来到 “Nacos” 的世界! Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称,一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单…

Ubuntu环境下以编译源码的方式安装Vim

目录 1. Ubuntu环境 2. 下载编译vim 2.1 效果截图 3. 配置环境变量 1. Ubuntu环境 Linux chris-166 6.2.0-36-generic #37~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Mon Oct 9 15:34:04 UTC 2 x86_64 x86_64 x86_64 GNU/Linux 2. 下载编译vim // 源码下载 chris_166chris-16…

文件传输客户端 SecureFX mac中文版支持多种协议

SecureFX mac是一款功能强大的文件传输客户端,可在 Mac 操作系统上使用。它由 VanDyke Software 公司开发,旨在为用户提供安全、可靠、高效的文件传输服务。 SecureFX 支持多种协议,包括 SFTP、SCP、FTP、FTP over SSL/TLS 和 HTTP/S。它使用…

联想系列台式机Win11系统改Win7系统BIOS设置步骤

联想最新一代的台式机默认操作系统Win11,采用UEFIGPT启动模式,并且开启了安全启动功能,一般用户不能直接将Win11改成Win7,如果需要更改操作系统,是需要再BIOS菜单中关闭安全启动功能的,并且把启动模式设置成…

2018年五一杯数学建模C题江苏省本科教育质量综合评价解题全过程文档及程序

2019年五一杯数学建模 C题 江苏省本科教育质量综合评价 原题再现 随着中国的改革开放,国家的综合实力不断增强,中国高等教育发展整体已进入世界中上水平。作为一个教育大省,江苏省的本科教育发展在全国名列前茅,而江苏省13个地级…

Django测试环境搭建及ORM查询(创建外键|跨表查询|双下划线查询 )

文章目录 一、表查询数据准备及测试环境搭建模型层前期准备测试环境搭建代码演示 二、ORM操作相关方法三、ORM常见的查询关键字四、ORM底层SQL语句五、双下划线查询数据查询(双下划线)双下划线小训练Django ORM __双下划线细解 六、ORM外键字段创建基础表…

Linux常用命令——bye命令

在线Linux命令查询工具 bye 命令用于中断FTP连线并结束程序。。 补充说明 bye命令在ftp模式下,输入bye即可中断目前的连线作业,并结束ftp的执行。 语法 bye实例 bye在线Linux命令查询工具

蓝眼开源云盘部署全过程(手动安装)

环境概述: 系统-Centos7.4 数据库-MySQL8 云盘系统-Tank4.0.1 前提:操作系统已完成安装,有外部网络。 一.安装数据库 cd到合适的目录进行下载安装操作,期间不要切换出去。 wget https://dev.mysql.com/get/mysql80-community-r…

Linux使用ifconifg命令,没有显示ens33

Linux使用ifconifg命令,没有显示ens33 1.问题2.步骤2.1 查看虚拟机的组件是否启动了2.2 修改网络配置文件 ONBOOT修改为yes2.3 重启网络2.4 修改网络服务配置 3.解决 1.问题 打开虚拟机准备使用xshell连接时发现连接失败,在机器上查看ip发现ens33不现实…

C++项目案例圆和点的关系 (涉及知识点:头文件定义类,cpp文件实现类,类和作用域,linux编译运行c++项目)

一.项目描述 点与圆有三种关系&#xff1a; 点在圆外 点在圆上 点在圆内计算点到圆心的距离就能判断点在圆的哪个地方。二.项目结构 三.include文件 3.1 Circle类的声明 Circle.h // 防止头文件重复包含 #pragma once // #include<iostream> #include "Point.h&…

互联网上门预约洗衣洗鞋店小程序;

拽牛科技干洗店洗鞋店软件&#xff0c;方便快捷&#xff0c;让你轻松洗衣。只需在线预约洗衣洗鞋服务&#xff0c;附近的门店立即上门取送&#xff0c;省心省力。轻松了解品牌线下门店&#xff0c;通过列表形式展示周围门店信息&#xff0c;自动选择最近门店为你服务。简单填写…

小数背包问题

问题描述 有一个背包&#xff0c;背包容量是C,有N(1<N≤1000)个物品&#xff0c;每个物品右对应的价值val和重量weight。 要求尽可能让装入背中的物品总价值最大&#xff0c;但不能超过总容量。其中物品可以分割成任意大小。 小数背包与01背包 小数背包问题0 1背包问题物品…

SpringSecurity5|12.实现RememberMe 及 实现原理分析

security/day08 这个功能大家还熟悉么&#xff1f;我们在登录网站的时候&#xff0c;除了让你输入用户名和密码&#xff0c;还会有个勾选框&#xff1a; 记住我&#xff01;&#xff01;&#xff01;不是让大家记住我哈。 值得一提的是&#xff0c;Spring Security 也提供了这个…

设计模式篇---装饰模式

文章目录 概念结构实例总结 概念 装饰模式&#xff1a;动态的给一个对象增加一些额外的职责。就扩展功能而言&#xff0c;装饰模式提供了 一种比使用子类更加灵活的替代方案。 装饰模式是一种对象结构型模式&#xff0c;它以对客户透明的方式动态地给一个对象附加上更多的责任…

asp.net学生成绩评估系统VS开发sqlserver数据库web结构c#编程计算机网页项目

一、源码特点 asp.net 学生成绩评估系统 是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 系统运行视频连接&#xff1a;https://www.bilibili.com/video/BV1Wz4y1A7CG/ 二、功能介绍 本系统使用Microsof…

Mysql主从搭建

Mysql主从搭建 1.Mysql下载1.1 查看操作系统2.2 下载mysql安装包 2.Mysql安装2.1 解压2.2 目录重命名2.3 创建data&#xff0c;存储文件2.4 创建用户组2.5 授权用户2.6 配置环境变量2.7 编辑my.cnf2.8 创建相关目录和文件2.9 初始化数据库2.10 复制mysql.server到/etc/init.d/下…

要做好解决方案工程师,这些核心技能是必须要掌握的。

要做好解决方案工程师&#xff0c;以下是一些比较中肯的建议&#xff1a; 1、了解客户需求&#xff1a;解决方案工程师需要深入了解客户的需求和挑战&#xff0c;以便为他们提供定制化的解决方案。通过与客户交流、调研市场趋势等方式&#xff0c;了解客户的业务需求和目标&…

适用于 Windows 的 10 个最佳视频转换器:快速转换高清视频

您是否遇到过由于格式不兼容而无法在您的设备上播放视频或电影的情况&#xff1f;您想随意播放从您的相机、GoPro 导入的视频&#xff0c;还是以最合适的格式将它们上传到媒体网站&#xff1f;您的房间里是否有一堆 DVD 光盘&#xff0c;想将它们转换为数字格式以便于播放&…

软件测试人员提问常用的ChatGPT通用提示词模板

如何设计有效的软件测试用例&#xff1f; 如何运用自动化测试工具进行软件测试&#xff1f; 如何进行软件的功能测试、性能测试和安全测试&#xff1f; 如何评估软件测试的质量和覆盖范围&#xff1f; 软件测试有哪些常见的缺陷和错误&#xff0c;如何识别和解决&#xff1…