spark应用程序转换_Spark—RDD编程常用转换算子代码实例

Spark—RDD编程常用转换算子代码实例

Spark rdd 常用 Transformation 实例:

1、def map[U: ClassTag](f: T => U): RDD[U]   将函数应用于RDD的每一元素,并返回一个新的RDD

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTestextendsApp{

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//map

var source = sc.parallelize(1 to 10)

source.collect().foreach(e=>print(e+","))//1 2 3 4 5 6 7 8 9 10

var sourceMap = source.map(_*10)

sourceMap.collect().foreach(e=>print(e+","))//10 20 30 40 50 60 70 80 90 100

}

2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD

//filter

var source = sc.parallelize(1 to 10)

source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 6 7 8 9 10

var sourceMap = source.filter(_.

sourceMap.collect().foreach(e=>print(e+" "))//1 2 3 4

3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]   将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。

//flatMap

var source = sc.parallelize(1 to 5)

source.collect().foreach(e=>print(e+" "))//1 2 3 4 5

var sourceMap = source.flatMap(x=>(1to x))

sourceMap.collect().foreach(e=>print(e+" "))//1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]    将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//mapPartitions

var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))

source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)

var sourceMap =source.mapPartitions(partitionsFun)

sourceMap.collect().foreach(e=> print(e + " ")) //jams jack

}

def partitionsFun(iter:Iterator[(String,String)]): Iterator[String]={

var males=List[String]()while(iter.hasNext){

val next=iter.next()

next match {case (_,"male") => males =next._1::malescase _ =>}

}returnmales.iterator

}

}

5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]  将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//mapPartitionsWithIndex

var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))

source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)

var sourceMap =source.mapPartitionsWithIndex(partitionsFunWithIndex)

sourceMap.collect().foreach(e=> print(e + " ")) //[1]jams [1]jack

}

def partitionsFunWithIndex(index:Int,iter:Iterator[(String,String)]): Iterator[String]={

var males=List[String]()while(iter.hasNext){

val next=iter.next()

next match {case (_,"male") => males="["+index+"]"+next._1 :: malescase _ =>}

}

males.iterator

}

}

6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//sample

var source = sc.parallelize(1 to 10)

source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8 9 10

var sourceMap = source.sample(true,0.4,2)

sourceMap.collect().foreach(e=> print(e + " ")) //1 2 2

}

}

7、def union(other: RDD[T]): RDD[T]  将两个RDD中的元素进行合并,返回一个新的RDD

//union

var source = sc.parallelize(1 to 3)

source.collect().foreach(e=> print(e + " "))//1 2 3

var rdd = sc.parallelize(6 to 9)

var sourceMap=source.union(rdd)

sourceMap.collect().foreach(e=> print(e + " "))//1 2 3 6 7 8 9

8、def intersection(other: RDD[T]): RDD[T]  将两个RDD做交集,返回一个新的RDD

//intersection

var source = sc.parallelize(1 to 8)

source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8

var rdd = sc.parallelize(6 to 9)

var sourceMap=source.intersection(rdd)

sourceMap.collect().foreach(e=> print(e + " "))//6 8 7

9、def distinct(): RDD[T]  将当前RDD进行去重后,返回一个新的RDD

//distinct

var source = sc.parallelize(List(1,1,2,2,3,3,4,4,5,5))

source.collect().foreach(e=> print(e + " "))//1 1 2 2 3 3 4 4 5 5

var sourceMap =source.distinct()

sourceMap.collect().foreach(e=> print(e + " "))//4 2 1 3 5

10、def partitionBy(partitioner: Partitioner): RDD[(K, V)]  根据设置的分区器重新将RDD进行分区,返回新的RDD

//partitionBy

var source = sc.parallelize(List((1,"111"),(2,"222"),(3,"333"),(4,"444")),4)

source.collect().foreach(e=> print(e + " "))

print("分区数:"+source.partitions.size)//(1,111) (2,222) (3,333) (4,444) 分区数:4

var sourceMap = source.partitionBy(new HashPartitioner(2))

sourceMap.collect().foreach(e=> print(e + " "))

print("分区数:"+sourceMap.partitions.size)//(2,222) (4,444) (1,111) (3,333) 分区数:2

11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]   根据Key值将相同Key的元组的值用func进行计算,返回新的RDD

//reduceByKey

var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))

source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)

var sourceMap = source.reduceByKey((x,y)=>x+y)

sourceMap.collect().foreach(e=> print(e + " "))//(hello,2) (world,2)

12、def groupByKey(): RDD[(K, Iterable[V])]   将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD

//groupByKey

var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))

source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)

var sourceMap =source.groupByKey()

sourceMap.collect().foreach(e=> print(e + " "))//(hello,CompactBuffer(1, 1)) (world,CompactBuffer(1, 1))

13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]   根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。

packagetop.ruandbimportorg.apache.spark.{ SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//combineByKey 计算平均成绩

var scores = Array(("lucy", 89), ("jack", 77), ("lucy", 100), ("james", 65), ("jack", 99),

("james", 44))

var input=sc.parallelize(scores);

input.collect().foreach(e=> print(e + " "))//(lucy,89) (jack,77) (lucy,100) (james,65) (jack,99) (james,44)

var output = input.combineByKey((v) => (v, 1),

(acc: (Int, Int), v)=> (acc._1 + v, acc._2 + 1),

(acc1: (Int, Int), acc2: (Int, Int))=> (acc1._1 + acc2._1, acc1._2 +acc2._2))

output.collect().foreach(e=> print(e + " "))//(james,(109,2)) (jack,(176,2)) (lucy,(189,2))

var result = output.map{case (key,value) => (key,value._1/value._2.toDouble)}

result.collect().foreach(e=> print(e + " "))//(james,54.5) (jack,88.0) (lucy,94.5)

}

}

14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,

combOp: (U, U) => U): RDD[(K, U)]   通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/470223.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

struts2中一些常用的写法 记录

1.对日期进行处理 Date current new Date(); java.text.SimpleDateFormat sdf new java.text.SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); String time sdf.format(current); 或者: 插入当前时间:pstmt.setDate(4,new java.sql.Date(new ja…

React SSR

一、为什么需要SSR呢? 单页面富应用的局限: 之前我们开发的应用程序,如果直接请求可以看到上面几乎没有什么内容。 但是为什么我们页面可以看到大量的内容呢? 因为当我们请求下来静态资源之后会执行JS,JS会去请求数…

linux 命令 读phy_PHY LINUX (转载整理)

每每分析网络问题的时候,总要梳理层次关系,本想自己写一个关于PHY的文档,找到网上有人写的一篇比较好,所以转载下来,仅供初学者参考。以太网 MAC(链路层)PHY(物理层/RTL8201F,88E1111);集成型DM9000,RTL813…

Laravel框架开发规范-修订版

1.PHP编码规范 1.1 标签 PHP 程序可以使用<?php ?>或<? ?>来界定PHP代码 在HTML 页面中嵌入纯变量时&#xff0c;使用<? ?>这样的形式 纯PHP类文件&#xff0c;文件开始标签使用<?php&#xff0c;闭合标签?>必须省略 1.2 编码 PHP文件必须使用…

vue显示两位小数的方法_Vue toFixed保留两位小数的3种方式

Vue toFixed保留两位小数的3种方式第一种&#xff1a;直接写在js里面&#xff0c;这是最简单的val.toFixed(2)第二种&#xff1a;在ElementUi表格中使用第三种&#xff1a;在取值符号中使用 {{}}定义一个方法towNumber(val) {return val.toFixed(2)}使用{{ towNumber(row.equiV…

Tyvj 1176 火焰巨魔的惆怅

Tyvj 1176 火焰巨魔的惆怅 背景 TYVJ2月月赛第一道巨魔家族在某天受到了其他种族的屠杀&#xff0c;作为一个英雄&#xff0c;他主动担任了断后的任务&#xff0c;但是&#xff0c;在巨魔家族整体转移过后&#xff0c;火焰巨魔却被困住了&#xff0c;他出逃的方式也只有召唤小火…

Vue3 组件通信学习笔记

一、父子组件之间通信 父子组件之间如何进行通信呢&#xff1f; 父组件传递给子组件&#xff1a;通过props属性&#xff1b;子组件传递给父组件&#xff1a;通过$emit触发事件&#xff1b; 1.1 父组件传递给子组件 在开发中很常见的就是父子组件之间通信&#xff0c;比如父…

js调用vlc_如何使用HTML5或JavaScript查看RTSP流,而不使用Real Player插件上的VLC插件等插件?...

The idea is to develop a cross-platform, standalone application that could play a video, streamed over RTSP, using HTML5 or JavaScript or any other web technology.解决方案RTSP is a protocol on the same level as HTTP. Its impossible to do RTSP via HTTP.The …

HDU-3729 二分匹配 匈牙利算法

题目大意&#xff1a;学生给出其成绩区间&#xff0c;但可能出现矛盾情况&#xff0c;找出合理组合使没有说谎的人尽可能多&#xff0c;并按maximum lexicographic规则输出组合。 //用学生去和成绩匹配&#xff0c;成绩区间就是学生可以匹配的成绩#include <iostream> #i…

Vue3 slot插槽——(默认插槽、具名插槽、作用域插槽)

一、认识插槽Slot 在开发中&#xff0c;我们会经常封装一个个可复用的组件&#xff1a; 前面我们会通过props传递给组件一些数据&#xff0c;让组件来进行展示&#xff1b;但是为了让这个组件具备更强的通用性&#xff0c;我们不能将组件中的内容限制为固定的div、span等等这…

dbcc dbreindex server sql_DBCC DBREINDEX重建索引提高SQL Server性能

DBCC DBREINDEX重建索引提高SQL Server性能[转载]大多数SQL Server表需要索引来提高数据的访问速度&#xff0c;如果没有索引&#xff0c;SQL Server要进行表格扫描读取表中的每一个记录才能找到索要的数据。索引可以分为簇索引和非簇索引&#xff0c;簇索引通过重排表中的数据…

Vue动态组件和组件缓存

一、切换组件案例 比如我们现在想要实现了一个功能&#xff1a; 点击一个tab-bar&#xff0c;切换不同的组件显示&#xff1b; 这个案例我们可以通过两种不同的实现思路来实现&#xff1a; 方式一&#xff1a;通过v-if来判断&#xff0c;显示不同的组件&#xff1b; 方式二…

hidl 原理分析_一个 health service 不生效问题引出的一点知识

从 Android P 开始&#xff0c;Google 开始推荐厂家再定制一个 health 。前不久遇到一个定制 health 中的信息未成功反应到 Framework 的问题&#xff0c;在分析解决问题的过程中&#xff0c;学习到了一点新知识&#xff0c;所以就在这篇文章里根据解决问题的流程做一个小小的记…

比较list集合相等的方法

1. 内容相同,不管顺序! public static boolean isListContentEquals(List<String> src, List<String> des){return src.containsAll(des) && des.containsAll(src);} 2.内容和顺序都完全一样 方法1: public static boolean isListsCompletelyEqual(List<…

Webpack的代码分包Vue3中定义异步组件分包refs的使用

一、默认的打包过程&#xff1a; 默认情况下&#xff0c;在构建整个组件树的过程中&#xff0c;因为组件和组件之间是通过模块化直接依赖的&#xff0c;那么webpack在打包时就会将组件模块打包到一起&#xff08;比如一个app.js文件中&#xff09;&#xff1b; 这个时候随着项…

经典JS

用apply把document当作this传入getId函数&#xff0c;帮助“修正”this; document.getElementById (function (func) {return function () {return func.apply(document, arguments);} })(document.getElementById);//调用 var getId document.getElementById; var div getI…

组件的v-model Mixin extends

一、组件的v-model 前面我们在input中可以使用v-model来完成双向绑定&#xff1a; 这个时候往往会非常方便&#xff0c;因为v-model默认帮助我们完成了两件事&#xff1b;v-bind:value的数据绑定 和 input的事件监听&#xff1b; 如果我们现在封装了一个组件&#xff0c;其…

springmvcget中文乱码_解决SpringMVC Controller 接收页面传递的中文参数出现乱码的问题...

新配置一个spring的MVC项目&#xff0c;发现对Get请求的中文参数出现了乱码&#xff1a;查看了SpingMVC中关于编码的配置(在web.xml中)&#xff0c;如下&#xff1a;encodingFilterorg.springframework.web.filter.CharacterEncodingFilterencodingutf-8forceEncodingtrueencod…

SQL学习之SELECT子句顺序

下面来总计下之前的随笔中所说过的所有的SELECT子句的顺序。 子句            说明            是否必须使用 SELECT 要返回的列或者表达式 是 FROM 从中检索数据的表 …

Vue3过渡动画实现

一、认识动画 在开发中&#xff0c;我们想要给一个组件的显示和消失添加某种过渡动画&#xff0c;可以很好的增加用户体验&#xff1a; React框架本身并没有提供任何动画相关的API&#xff0c;所以在React中使用过渡动画我们需要使用一个第三方库react-transition-group&…