Spark RDD使用教程

文章目录

    • 1、创建RDD的三种方式
    • 2、Spark对RDD的操作
      • 2.1、Transformations(转换)
      • 2.2、Actions(动作)

1、创建RDD的三种方式

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件。详细可以查看RDD和pair RDD文档
使用例子SparkRdd.java

ppackage com.penngo.rdd;import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;/****/
public class SparkRdd {public static void main(String[] args) {//windows下调试spark需要使用https://github.com/steveloughran/winutilsSystem.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-3.3.1");System.setProperty("HADOOP_USER_NAME", "root");SparkSession spark = SparkSession.builder().appName("List2Rdd").master("local[*]").getOrCreate();JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());// 集合转成RDDList<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2);JavaRDD<Integer> rdd1 = sc.parallelize(data);int sum1 = rdd1.reduce((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);System.out.println("sum1============"+sum1);// 读取本地文件/hadoop文件转成RDDString path = "D:\\project\\data.txt";//path = "hdfs://testspark:9000/data.txt"JavaRDD<String> rddLine = sc.textFile(path, 2);JavaRDD<Integer> rdd2 =  rddLine.flatMap((FlatMapFunction<String, Integer>) s -> {String[] strs = s.split(",");List<Integer> list = new ArrayList<>();for(String str:strs){list.add(Integer.valueOf(str.trim()));}return list.iterator();});int sum2 = rdd2.reduce((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);System.out.println("sum2============"+sum2);JavaPairRDD<String, Integer> pairs = rdd2.mapToPair(s -> new Tuple2("num_" + s, 1));JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);counts.foreach(tp2->{System.out.println(tp2._1 + "=" + tp2._2);});spark.stop();}
}

2、Spark对RDD的操作

Spark对RDD的操作可以整体分为两类:
Transformation(转换):表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等。
Action(执行)表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

2.1、Transformations(转换)

Transformation(转换)

Meaning(含义)

map(func)

返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。

filter(func)

返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。

flatMap(func)与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item
mapPartitions(func)与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型。
mapPartitionsWithIndex(func)与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。
sample(withReplacement, fraction, seed)样本数据,设置是否放回(withReplacement)、采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。
union(otherDataset)返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集
intersection(otherDataset)

返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的

distinct([numTasks]))返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。
groupByKey([numTasks])

在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset

注意 : 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好。
注意 : 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。

reduceByKey(func, [numTasks])

在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 reduce 函数 func 来聚合,它必须为 (V,V) => V 类型。像 groupByKey 一样,可通过第二个可选参数来配置 reduce 任务的数量。

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 combine 函数和一个中间的 “zero” 值来聚合,它必须为 (V,V) => V 类型。为了避免不必要的配置,可以使用一个不同与 input value 类型的 aggregated value 类型。
sortByKey([ascending], [numTasks])在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs  dataset。
join(otherDataset, [numTasks])在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin,rightOuterJoin fullOuterJoin 来实现。
cogroup(otherDataset, [numTasks])在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。这个操作也调用了 groupWith
cartesian(otherDataset)在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。
pipe(command, [envVars])通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。
coalesce(numPartitions)

Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。

repartition(numPartitions)Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。
repartitionAndSortWithinPartitions(partitioner)根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。

2.2、Actions(动作)

Action

意思

reduce(func)使用函数 func 聚合数据集(dataset)中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它可以被并行地正确计算。
collect()在驱动程序中,以一个数组的形式返回数据集的所有元素。这在返回足够小(sufficiently small)的数据子集的过滤器(filter)或其他操作(other operation)之后通常是有用的。
count()返回数据集中元素的个数。
first()返回数据集中的第一个元素(类似于 take(1))。
take(n)将数据集中的前 n 个元素作为一个数组返回。
takeSample(withReplacementnum[seed])对一个数据集随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。
takeOrdered(n, [ordering])返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。
saveAsTextFile(path)将数据集中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。

saveAsSequenceFile(path
(Java and Scala)

将数据集中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 IntDoubleString 等等)。

saveAsObjectFile(path
(Java and Scala)

使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。
countByKey()仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)对 的 hashmap
foreach(func)对数据集中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个累加器(Accumulator)或与外部存储系统(external storage systems)进行交互。注意:修改除 foreach() 之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 理解闭包(Understanding closures) 部分。

参考自官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/198332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7.Vue UI库

7.Vue UI库 7.1移动端常用的UI库 &#xff08;1&#xff09; Vant&#xff1a;Vant 4 - A lightweight, customizable Vue UI library for mobile web apps.A lightweight, customizable Vue UI library for mobile web apps.https://vant-ui.github.io/vant/#/zh-CN &#xf…

提升--22---ReentrantReadWriteLock读写锁

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 ReadWriteLock----读写锁1.读写锁介绍线程进入读锁的前提条件&#xff1a;线程进入写锁的前提条件&#xff1a;而读写锁有以下三个重要的特性&#xff1a; Reentran…

React 好用的工具库

1、html-react-parser HTML 到 React 解析器&#xff0c;适用于服务器 &#xff08;Node.js&#xff09; 和客户端&#xff08;浏览器&#xff09;&#xff0c;适用于React节点修改过滤等需求 解析器将 HTML 字符串转换为一个或多个 React 元素。可以将一个元素替换为另一个元素…

个体诊所电子处方系统哪个好用,推荐一款可以自由设置配方模板能够填写病历可以查询历史病历的门诊处方笺软件

一、前言 1、功能实用&#xff0c;操作简单&#xff0c;不会电脑也会操作&#xff0c;软件免安装&#xff0c;已内置数据库。 2、中医西医均可用此软件开电子处方&#xff0c;支持病历记录查询&#xff0c;药品进出库管理&#xff0c;支持配方模板一键导入电子处方。 二、电子…

理解数据库事务和回滚:概念、实例与Python脚本实现

文章目录 概念银行案例实践创建数据Python脚本中的事务回滚 结语&#xff1a;保障数据安全与完整性的关键 概念 事务&#xff08;Transaction&#xff09;: 数据库中的事务是一组不可分割的操作集合&#xff0c;它们要么全部成功&#xff0c;要么全部失败。这个概念保证了数据…

Amazon CodeWhisperer 正式可用, 并面向个人开发者免费开放

文章作者&#xff1a;深度-围观 北京——2023年4月18日&#xff0c;亚马逊云科技宣布&#xff0c;实时 AI 编程助手 Amazon CodeWhisperer 正式可用&#xff0c;同时推出的还有供所有开发人员免费使用的个人版&#xff08;CodeWhisperer Individual&#xff09;。CodeWhisperer…

SAP ABAP Tree Control 对象与ALV Grid 对象关联

Tree Control 对象与ALV Grid 对象关联 在双击 Tree 对象时&#xff0c;变更ALV Trid 对象的显示&#xff0c;实现界面如图9-11 所示。 Screen 设计界面如图9-12 所示。 主程序&#xff1a; REPORT ytest36. DATA: ok_code TYPE sy-ucomm,save_ok TYPE sy-ucomm. DATA: wa_co…

【C++】map和set的使用及注意事项

map和set的使用及注意事项 1.关联式容器2. 键值对3.set3.1接口介绍3.1.1构造3.1.2迭代器3.1.3容量3.1.4修改 3.2set使用及注意事项 4.multiset5.map6.multimap349. 两个数组的交集 1.关联式容器 在初阶阶段&#xff0c;我们已经接触过STL中的部分容器&#xff0c;比如&#xf…

vue项目解决计算后浮点数精度问题

1.1 问题描述 计算出的结果本来应该为13.8386&#xff0c;但是这里因为js精度问题&#xff0c;导致后边多了一串的0000001。 1.2 使用场景 求和&#xff0c;每个物品的单价*数量 1.3 解决办法 引入第三方库Decimal 1.4 vue项目中Decimal安装步骤 1.4.1 安装Decimal np…

C语言枚举详解,typedef简介(能看懂文字就能明白系列)

系列文章目录 C语言基础专栏 笔记详解 &#x1f31f; 个人主页&#xff1a;古德猫宁- &#x1f308; 信念如阳光&#xff0c;照亮前行的每一步 文章目录 系列文章目录&#x1f308; *信念如阳光&#xff0c;照亮前行的每一步* 前言一、枚举类型的声明枚举常量三、枚举类型的优…

索尼PMW580视频帧EC碎片重组开启方法

索尼PMW580视频帧EC碎片重组开启方法 索尼PMW-580摄像机生成的MXF文件存在严重的碎片化&#xff0c;目前CHS零壹视频恢复程序MXF版、专业版、高级版已经支持重组结构体正常的碎片&#xff0c;同时也支持对于结构体破坏或者覆盖后仅存在音视频帧EC数据的重组&#xff0c;需要注…

开关电源有哪些EMI整改?|深圳比创达电子EMC

某控制产品在进行辐射发射测试时&#xff0c;发现测试结果超标&#xff0c;辐射发射测试结果如下图所示&#xff1a; 控制产品在去掉发射源之前&#xff0c;就在各外部端口采取了各种滤波措施&#xff0c;结果并无明显作用&#xff0c;即使把所有相关外部引线全部拿走(只剩下电…

快速学习PyQt5的动画和图形效果

Pyqt5相关文章: 快速掌握Pyqt5的三种主窗口 快速掌握Pyqt5的2种弹簧 快速掌握Pyqt5的5种布局 快速弄懂Pyqt5的5种项目视图&#xff08;Item View&#xff09; 快速弄懂Pyqt5的4种项目部件&#xff08;Item Widget&#xff09; 快速掌握Pyqt5的6种按钮 快速掌握Pyqt5的10种容器&…

菜鸟学习日记(python)——运算符

我们进行运算时&#xff0c;需要两类数据&#xff0c;操作数和运算符&#xff0c;例如&#xff1a;ab就是一个运算&#xff0c;它的操作数是a和b&#xff0c;运算符是‘’ 在python中运算符包括以下几大类&#xff1a; 算数运算符比较&#xff08;关系&#xff09;运算符赋值…

spark无法执行pi_如何验证spark搭建完毕

在配置yarn环境下的spark时&#xff0c;执行尚硅谷的以下命令发现报错&#xff0c;找不到这个也找不到那个&#xff0c;尚硅谷的代码是 bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-exam…

Android AIDL实现开放系统级API 提供三方app调用

需求场景 当上层app需要调用一些系统底层的资源以及属性操作&#xff08;比如Settings.system 属性的读写&#xff09;&#xff0c;甚至驱动节点的读写操作&#xff0c;上层app毫无疑问是没有权限的&#xff0c;所以就需要我们在framework 系统层做一个中转和代理&#xff0c;也…

java 下载文件,复制文件

1&#xff0c;java通过浏览器下载文件 ApiOperation(value "导出", notes "", response String.class)GetMapping("/export")public HttpServletResponse export(String path, HttpServletResponse response) { // String path "…

Linux C语言 42-进程间通信IPC之网络通信(套接字)

Linux C语言 42-进程间通信IPC之网络通信&#xff08;套接字&#xff09; 本节关键字&#xff1a;C语言 进程间通信 网络通信 套接字 TCP UDP 相关库函数&#xff1a;socket、bind、listen、accept、send、recv、sendto、recvfrom 参考之前的文章 Linux C语言 30-套接字操作…

k8s部署单机模式的minio

k8s部署单机模式的minio 一、说明二、yaml内容三、步骤3.1 创建资源3.2 查看启动日志3.2 查看svc并访问控制台 一、说明 项目使用minio&#xff0c;准备在k8s环境部署一套minio试用。 1.关于minio的原理和概念参考: https://mp.weixin.qq.com/s?__bizMzI3MDM5NjgwNg&mid…

国防科技大博士招生入学考试【50+论文主观题】

目录 回答模板大意创新和学术价值启发 论文分类&#xff08;根据问题/场景分类&#xff09;数学问题Efficient Multiset Synchronization&#xff08;高效的多集同步【简单集合/可逆计数Bloom过滤器】&#xff09;大意创新和学术价值启发 An empirical study of Bayesian netwo…