Spark常见算子汇总

创建RDD

在Spark中创建RDD的方式分为三种:

  1. 从外部存储创建RDD
  2. 从集合中创建RDD
  3. 从其他RDD创建

textfile

调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD

parallelize

调用SparkContext 的 parallelize()方法,将一个存在的集合,变成一个RDD

makeRDD

方法一

/** Distribute a local Scala collection to form an RDD.** This method is identical to `parallelize`.*/def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {parallelize(seq, numSlices)}

方法二:分配一个本地Scala集合形成一个RDD,为每个集合对象创建一个最佳分区。

/*** Distribute a local Scala collection to form an RDD, with one or more* location preferences (hostnames of Spark nodes) for each object.* Create a new partition for each collection item.*/def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {assertNotStopped()val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMapnew ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)}

 举例

scala> val rdd = sc.parallelize(1 to 6, 2)
val rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:1scala> rdd.collect()
val res4: Array[Int] = Array(1, 2, 3, 4, 5, 6)scala> val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))
val seq: List[(String, List[String])] = List((American Person,List(Tom, Jim)), (China Person,List(LiLei, HanMeiMei)), (Color Type,List(Red, Blue)))scala> val rdd2 = sc.makeRDD(seq)
val rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:1scala> rdd2.partitions.size
val res0: Int = 3scala> rdd2.foreach(println)
American Person
Color Type
China Personscala> val rdd1 = sc.parallelize(seq)
val rdd1: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[1] at parallelize at <console>:1scala> rdd1.partitions.size
val res1: Int = 2scala> rdd2.collect()
val res2: Array[String] = Array(American Person, China Person, Color Type)scala> rdd1.collect()
val res3: Array[(String, List[String])] = Array((American Person,List(Tom, Jim)), (China Person,List(LiLei, HanMeiMei)), (Color Type,List(Red, Blue)))scala> var lines = sc.textFile("/root/tmp/a.txt",3)
var lines: org.apache.spark.rdd.RDD[String] = /root/tmp/a.txt MapPartitionsRDD[4] at textFile at <console>:1scala> lines.collect()
val res6: Array[String] = Array(a,b,c)scala> lines.partitions.size
val res7: Int = 3

转换算子

flatMap

map

reduceByKey

groupByKey

举例

scala> var lines = sc.textFile("/root/tmp/a.txt",3)
var lines: org.apache.spark.rdd.RDD[String] = /root/tmp/a.txt MapPartitionsRDD[13] at textFile at <console>:1scala> lines.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b).foreach(println)
(c,2)
(b,1)
(d,1)
(a,2)scala> lines.collect()
val res27: Array[String] = Array(a,b,c, c, a,d)scala> lines.map(_.split(",")).collect()
val res25: Array[Array[String]] = Array(Array(a, b, c), Array(c), Array(a, d))scala> lines.flatMap(_.split(",")).collect()
val res26: Array[String] = Array(a, b, c, c, a, d)

行动算子

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/203705.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

这些Java并发容器,你都了解吗?

文章目录 前言并发容器1.ConcurrentHashMap 并发版 HashMap示例 2.CopyOnWriteArrayList 并发版 ArrayList示例 3.CopyOnWriteArraySet 并发 Set示例 4.ConcurrentLinkedQueue 并发队列 (基于链表)示例 5.ConcurrentLinkedDeque 并发队列 (基于双向链表)示例 6.ConcurrentSkipL…

Vue学习计划-Vue2--Vue核心(五)条件、列表渲染、表单数据

1. 条件渲染 v-if v-if“表达式”v-else-if “表达式”v-else “表达式” 适用于&#xff1a;切换频率较低的场景 特点&#xff1a;不显示dom元素&#xff0c;直接被删除 注意&#xff1a;v-if和v-else-if、v-else一起使用&#xff0c;但要求结构不能被打断 v-if和template一…

Android笔记(十七):PendingIntent简介

PendingIntent翻译成中文为“待定意图”&#xff0c;这个翻译很好地表示了它的涵义。PendingIntent描述了封装Intent意图以及该意图要执行的目标操作。PendingIntent封装Intent的目标行为的执行是必须满足一定条件&#xff0c;只有条件满足&#xff0c;才会触发意图的目标操作。…

Kotlin 中的 also 和 run:选择正确的作用域函数

在 Kotlin 中&#xff0c;also 和 run 是两个十分有用的作用域函数。 虽然它们在功能上相似&#xff0c;但各自有独特的用途和适用场景。 一、分析&#xff1a; also&#xff1a;在对象的上下文中执行给定的代码块&#xff0c;并返回对象本身。它的参数是一个接收对象并返回…

分布式分布式事务分布式锁分布式ID

目录 分布式分布式系统设计理念目标设计思路中心化去中心化 基本概念分布式与集群NginxRPC消息中间件&#xff08;MQ&#xff09;NoSQL&#xff08;非关系型数据库&#xff09; 分布式事务1 事务2 本地事务3 分布式事务4 本地事务VS分布式事务5 分布式事务场景6 CAP原理7 CAP组…

ChatGPT发展历程

ChatGPT是一个在2020年成立的在线聊天平台&#xff0c;它的发展历程如下&#xff1a; 初期阶段&#xff1a;2020年&#xff0c;在全球疫情爆发的情况下&#xff0c;ChatGPT创始人开始思考如何为人们提供一个快捷、安全、便利的在线聊天平台。他们选择使用GPT&#xff08;生成对…

(2/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)

附录 A1 - 《PMBOK指南》映射 表A1显示了第六版《PMBOK指南》中定义的项目管理过程组与知识领域之间的对应关系 本附录说明了如何利用混合和敏捷方法处理《PMBOK指南》知识领域&#xff08;请参见表A1-2&#xff09;中所述的属性&#xff0c;其中涵盖了相同和不同的属性&…

conda 安装教程分享

大家好&#xff0c;我是微赚淘客系统的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我将向大家介绍如何使用conda进行安装。 首先&#xff0c;让我们来了解一下conda。conda是Anaconda发行版的一部分&#xff0c;它是一个开源的包管理系…

为什么那些删库跑路的人都极其下流

为什么那些删库跑路的人都极其下流&#xff1f;因为真的下流。注意&#xff0c;我不是针对跑路者的人品&#xff0c;遇到不公正待遇不敢反抗本身就比下流还下流&#xff0c;我说的是这种对抗方式太多低等。 干不好是能力问题&#xff0c;你不干是态度问题。记住我的话&#xf…

golang版aes-cbc-pkcs7加密解密base64hex字符串输入输出

最近项目中golang项目中使用aes加解密&#xff0c;做个记录方便以后使用 aes-cbc-pkcs7加密解密base64输入输出 type AesBase64 struct {key []byte // 允许16,24,32字节长度iv []byte // 只允许16字节长度 }func NewAesBase64(key []byte, iv []byte) *AesBase64 {return &…

C#网络应用程序(Web页面浏览器、局域网聊天程序)

目录 一、创建Web页面浏览器 1.示例源码 2.生成效果 二、局域网聊天程序 1.类 2.服务器端 3.客户端 一、创建Web页面浏览器 TextBox 控件用来输入要浏览的网页地址&#xff0c;Button控件用来执行浏览网页操作&#xff0c; WebBrowser控件用来显示要浏览的网页。这个控…

Matlab 曲线动态绘制

axes(handles.axes1); % 选定所画坐标轴 figure也可 h1 animatedline; h1.Color b; h1.LineWidth 2; h1.LineStyle -; % 线属性设置 for i 1 : length(x)addpoints(h1,x(i),y(i)); % x/y为待绘制曲线数据drawnow;pause(0.01); % 画点间停顿 end 示例&#xff1a; figure…

exynos4412—中断处理

一、什么是中断 一种硬件上的通知机制&#xff0c;用来通知CPU发生了某种需要立即处理的事件 分为&#xff1a; 内部中断 CPU执行程序的过程中&#xff0c;发生的一些硬件出错、运算出错事件&#xff08;如分母为0、溢出等等&#xff09;&#xff0c;不可屏蔽外部中断 外设发…

scitb包1.6版本发布,一个为制作专业统计表格而生的R包

目前&#xff0c;本人写的scitb包1.6版本已经正式在R语言官方CRAN上线&#xff0c;scitb包是一个为生成专业化统计表格而生的R包。 可以使用以下代码安装 install.packages("scitb")安装过旧版本的从新安装一次就可以升级了,根据粉丝的建议&#xff0c;增加了Overal…

RocketMQ-RocketMQ集群实践(搭建)

搭建RocketMQ可视化管理服务 下载可视化客户端源码下载 | RocketMQ 这里只提供了源码&#xff0c;并没有提供直接运行的jar包。将源码下载下来后&#xff0c;需要解压并进入对应的目录&#xff0c;使用maven进行编译。(需要提前安装maven客户端) mvn clean package -Dmaven.t…

手动部署1个Cloud Run service

什么是Cloud Run 来自chatgpt&#xff1a; Google Cloud Run 是一项全托管的服务器托管平台&#xff0c;它允许您在容器化的环境中运行无服务器应用程序。Cloud Run 提供了一种简单而灵活的方式来构建、部署和扩展应用程序&#xff0c;无需管理底层基础设施。 以下是 Cloud …

sql常用语法练习

表名word namecontinentareapopulationgdpAfghanistanAsia6522302550010020343000000AlbaniaEurope28748283174112960000000AlgeriaAfrica238174137100000188681000000AndorraEurope468781153712000000AngolaAfrica124670020609294100990000000.... 基础练习 基础查询 1、阅…

RocketMq实战(待完善)

目录 生产者 发送消息固定步骤 发送模式 1. 单向发送 2. 同步发送 3. 异步发送 生产消息完整代码 消费者 消费消息固定步骤 简单消费代码示例 消息模型 广播消息 顺序消息 延迟消息 批量消息 事务消息 生产者 发送消息固定步骤 1.创建消息生产者producer&#…

操作系统的运行机制+中断和异常

一、CPU状态 在CPU设计和生产的时候就划分了特权指令和非特叔指令&#xff0c;因此CPU执行一条指令前就能断出其类型 CPU有两种状态&#xff0c;“内核态”和“用户态” 处于内核态时&#xff0c;说明此时正在运行的是内核程序&#xff0c;此时可以执行特权指令。 处于用户态…

Jenkins+Maven+Gitlab+Tomcat 自动化构建打包,部署

环境准备Jenkins工具、环境、插件配置全局变量配置安装插件Deploy to containerMaven Integration plugin配置国内mvn源 创建maven项目 环境准备 1、安装服务 Jenkins工具、环境、插件配置 全局变量配置 Manage Jenkins>tools>JDK 安装 安装插件 Deploy to contai…