Spark---转换算子、行动算子、持久化算子

一、转换算子和行动算子

1、Transformations转换算子

1)、概念

Transformations类算子是一类算子(函数)叫做转换算子,如map、flatMap、reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

2)、Transformation类算子

filter :过滤符合条件的记录数,true保留,false过滤掉

map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

reduceByKey:将相同的Key根据相应的逻辑进行处理。

sortByKey/sortBy:作用在K,V格式的RDD上,对Key进行升序或者降序排序。

2、Action行动算子

1)、概念:

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

2)、Action类算子

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。

take(n):返回一个包含数据集前n个元素的集合。

first:first=take(1),返回数据集中的第一个元素。

foreach:循环遍历数据集中的每个元素,运行相应的逻辑。

collect:将计算结果回收到Driver端。

3)、demo:动态统计出现次数最多的单词个数,过滤掉。

  • 一千万条数据量的文件,过滤掉出现次数多的记录,并且其余记录按照出现次数降序排序。

假设有一个records.txt文件

hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark

代码处理:

package com.bjsxt.demo;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;
/*** 动态统计出现次数最多的单词个数,过滤掉。* @author root**/
public class Demo1 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("demo1");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile("./records.txt");JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String t) throws Exception {return new Tuple2<String, Integer>(t, 1);}});JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}}).take(1);System.out.println("take--------"+take);JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Integer> v1) throws Exception {return !v1._1.equals(take.get(0)._1);}}).reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}});result.foreach(new VoidFunction<Tuple2<String,Integer>>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t);}});jsc.stop();}
}

3、Spark代码流程

1)、创建SparkConf对象

可以设置Application name。

可以设置运行模式。

可以设置Spark application的资源需求。

2)、创建SparkContext对象

3)、基于Spark的上下文创建一个RDD,对RDD进行处理。

4)、应用程序中要有Action类算子来触发Transformation类算子执行。

5)、关闭Spark上下文对象SparkContext。

二、Spark持久化算子

1、控制算子

1)、概念

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

2)、cache

默认将RDD的数据持久化到内存中。cache是懒执行。

注意:chche()=persist()=persist(StorageLevel.Memory_Only)

测试cache文件:

测试代码:

1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ 
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();

persist:

可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本数。

持久化级别如下:

2、cache和persist的注意事项

1)、cache和persist都是懒执行,必须有一个action类算子触发执行。

2)、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

3)、cache和persist算子后不能立即紧跟action算子。

4)、cache和persist算子持久化的数据当applilcation执行完成之后会被清除。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
  • persist(StorageLevel.DISK_ONLY)与Checkpoint的区别?

1)、checkpoint需要指定额外的目录存储数据,checkpoint数据是由外部的存储系统管理,不是Spark框架管理,当application完成之后,不会被清空。

2)、cache() 和persist() 持久化的数据是由Spark框架管理,当application完成之后,会被清空。

3)、checkpoint多用于保存状态。

  • checkpoint 的执行原理:

1)、当RDD的job执行完毕后,会从finalRDD从后往前回溯。

2)、当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。

3)、Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
  • 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/161432.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jina AI 的 8K 向量模型上线 AWS Marketplace,支持本地部署!

在当前多模态 AI 和大模型技术风头正劲的背景下&#xff0c;Jina AI 始终领跑于创新前沿&#xff0c;技术领先。2023 年 10 月 30 日&#xff0c;Jina AI 隆重推出 jina-embeddings-v2&#xff0c;这是全球首款支持 8192 输入长度的开源向量大模型&#xff0c;其性能媲美 OpenA…

汇编-PROC定义子过程(函数)

过程定义 过程用PROC和ENDP伪指令来声明&#xff0c; 并且必须为其分配一个名字(有效的标识符) 。目前为止&#xff0c; 我们所有编写的程序都包含了一个main过程&#xff0c; 例如&#xff1a; 当要创建的过程不是程序的启动过程时&#xff0c; 就用RET指令来结束它。RET强制…

Bean依赖注入注解开发

value Value("xfy")private String userName;private String userName;Value("xiao")public void setUserName(String userName) {this.userName userName;} Autowired // 根据类型进行注入 如果同一类型的Bean有多个&#xff0c;尝试根基名字进行二次…

AIGC,ChatGPT AI绘画 Midjourney 注册流程详细步骤

AI 绘画,Midjourney完成高清图片绘制,轻松掌握AI工具。 前期准备: ① 一个能使用的谷歌账号 ② 可以访问外网 Midjourney注册 1.进入midjourney官网https://www.midjourney.com 点击左下角”Join the Beta”,就可以注册,第一次使用的小伙伴会弹出提示,只需要点击Acc…

2019年12月 Scratch(三级)真题解析#中国电子学会#全国青少年软件编程等级考试

Scratch等级考试(1~4级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 怎样修改图章的颜色? A:只需要一个数字来设置颜色 B:设置RGB的值 C:在画笔中设置颜色、饱和度、亮度 D:在外观中设置或修改角色颜色特效 答案:D 在外观中设置或修改角色颜色特…

【深度学习】脸部修复,CodeFormer,论文,实战

代码&#xff1a; https://github.com/sczhou/CodeFormer 论文&#xff1a;https://arxiv.org/abs/2206.11253 Towards Robust Blind Face Restoration with Codebook Lookup Transformer 文章目录 论文摘要1 引言2 相关工作**4 实验****4.1 数据集****4.2 实验设置和指标***…

【ArrayList是如何扩容(ArrayList、LinkedList、与Vector的区别)】

ArrayList、LinkedList、与Vector的区别 解读ArrayList 是一个可改变大小的数组LinkedList 是一个双向链表Vector 属强同步类 拓展知识面ArrayList是如何扩容&#xff1f;如何利用List实现LRU&#xff1f; 解读 List主要有ArrayList、LinkedList与Vector几种实现。这三者都实现…

[论文笔记] Scaling Laws for Neural Language Models

概览: 一、总结 计算量、数据集大小、模型参数量大小的幂律 与 训练损失呈现 线性关系。 三个参数同时放大时,如何得到最佳的性能? 更大的模型 需要 更少的样本 就能达到相同的效果。 </

开源WIFI继电器之源代码

源代码:WiFiRelay: 基于ESP8285的WiFi继电器代码

笔记本外接显示器的一些基本操作

1>&#xff0c;安装问题直接问客服&#xff0c;正常情况是将显示屏接上电源&#xff0c;然后用先将显示屏和笔记本的HDMI接口连接即可。 按下组合键 win p ,选择 “复制”。 2>&#xff0c;接上显示屏后&#xff0c;原笔记本无声音&#xff1f; 1、找到笔记本电脑右下…

Doris 建表示例(七)

建表语法 使用 CREATE TABLE 命令建立一个表(Table)。更多详细参数可以查看&#xff1a; HELP CREATE TABLE; 建表语法&#xff1a; CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name(column_definition1[, column_definition2, ...][, index_definition1[, i…

阿里云99元服务器ECS经济型e实例性能如何?测评来了

阿里云服务器优惠99元一年&#xff0c;配置为云服务器ECS经济型e实例&#xff0c;2核2G配置、3M固定带宽和40G ESSD Entry系统盘&#xff0c;CPU采用Intel Xeon Platinum架构处理器&#xff0c;2.5 GHz主频&#xff0c;3M带宽下载速度384KB/秒&#xff0c;上传速度1028KB/秒&am…

人工智能对我们的生活影响

目录 前言 一、人工智能的领域 二、人工智能的应用 三、对人工智能的看法 总结 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助。 &#x1f4a1;本文由Filotimo__✍️原创&#xff0c;首发于CSDN&#x1f4…

运算与表达式模板(第一节)

目录 前言 一、表达式模板简介 为什么引入表达式模板&#xff1f; 缓式求值&#xff08;Memoization&#xff09; 关系 好处 前言 一个深度学习框架的初步实现为例&#xff0c;讨论如何在一个相对较大的项目中深入应用元编程&#xff0c;为系统优化提供更多的可能。 以…

阿里云服务器ECS经济型e实例优惠99元性能怎么样?

阿里云服务器ECS经济型e实例优惠99元性能怎么样&#xff1f;阿里云服务器优惠99元一年&#xff0c;配置为云服务器ECS经济型e实例&#xff0c;2核2G配置、3M固定带宽和40G ESSD Entry系统盘&#xff0c;CPU采用Intel Xeon Platinum架构处理器&#xff0c;2.5 GHz主频&#xff0…

使用NPOI处理EXCEL文件:例1-关于优化的一些问题

记得有一次处理Excel文件对比&#xff0c;自己前后使用VBA和NPOI对比了下效率。由于涉及到页面的渲染和刷新&#xff0c;二者的处理速度差了个数量级&#xff08;10多秒和几十分钟的差别&#xff09;。当然使用NPOI操作时也做了一定优化。印象这么深刻这次一有需求就想到了NPOI…

千云物流 - 使用k8s负载均衡openelb

openelb的介绍 具体根据官方文档进行安装官方文档,这里作为测试环境的安装使用. OpenELB 是一个开源的云原生负载均衡器实现,可以在基于裸金属服务器、边缘以及虚拟化的 Kubernetes 环境中使用 LoadBalancer 类型的 Service 对外暴露服务。OpenELB 项目最初由 KubeSphere 社区…

redis的性能管理及集群架构(主从复制、哨兵模式)

一、redis的性能管理 1、内存指标info memory 内存指标&#xff08;重要&#xff09; used_memory:853736 数据占用的内存 used_memory_rss:10551296 redis向操作系统申请的内存 used_memory_peak:853736 redis使用内存的峰值 注&#xff1a;单位&#xff1a;字节 系…

策略模式应用(内窥镜项目播放不同种类的视频)

新旧代码对比 策略模式 基本概念 策略模式是一种行为设计模式&#xff0c;它定义了一系列算法&#xff0c;将每个算法封装起来&#xff0c;并且使它们可以互相替换。策略模式允许客户端选择算法的具体实现&#xff0c;而不必改变客户端的代码。这样&#xff0c;客户端代码就…

中国区域250米归一化植被指数数据集(2000-2022)

中国区域250米归一化植被指数数据集(2000-2022)数据集是中国区域2000至2022年月度归一化植被指数产品&#xff0c;空间分辨率250米&#xff0c;合成方式采用月最大值合成&#xff0c;每年12期&#xff0c;共275期。本产品是基于Aqua/Terra-MODIS卫星传感器MOD13Q1产品以及土地利…