Saprk排序

1、基础排序算子sortBy和sortByKey

在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark0.9.0之后才引入的。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。

1.1 sortBy

sortBy是在RDD下的

该函数最多可以传三个参数: 第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的; 第二个参数是ascending,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序; 第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了

1.2 sortByKey

sortByKey是在OrderedRDDFunctions类下的。sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序

该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。

在转换调用sortByKey方法时,会从上下文中提取Ordering[K],private val ordering = implicitly[Ordering[K]]其中implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])这里的k:Ordering意思是指k必须是可以转换成ordering的子类。下面有2种方式

//1、定义K的排序时使用隐式值
implicit val ord: Ordering[Person] = new Ordering[Person] {
override def compare(x: Person, y: Person): Int = { 
x.name.compareTo(y.name)
}}sc.textFile("").foreachRDD(rdd => rdd.map(msg => (Person(msg), msg)).sortByKey())//2、key类实现Ordered[k]接口中的compare方法
case class Person(name:String) extends Ordered[Person]{override def compare(that: Person): Int = {this.name.compare(that.name)} def run(): Unit ={println("Person...")}
}sc.textFile("").foreachRDD(rdd => rdd.map(msg => (Person(msg), msg)).sortByKey())

2、二次排序

二次排序就是指排序的时候考虑2个维度。如我们排序的时候按照第一个列降序排序,有一种情况,第一列的Key相同怎么排,这个时候可能就需要借助二次排序考虑第二列。

比较明智的方法是自定义排序的key,不采用其他的方式是因为现在二次排序,后面可能3次,5次等,采用自定义key的方式只要重新复写自定义的key,就能用sortByKey。

spark要实现比较,就要实现Orderd这个排序的接口,另外一般也会序列化Serializable。sortByKey会根据Key进行排序,但是如果二次排序的话sortByKey不知道key是什么构建的想法就是,基于已有的数据自定义二次排序的key,sortByKey基于这个自定义的key进行比较。我们用mapToPair重新构造内容,加了自定义的key,value的内容就是已有的内容,根据排序然后把自定义的key去掉。

package com.quinto.sort/*** 自定义二次排序的key*/
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {override def compare(that: SecondarySortKey): Int = {if(this.first - that.first!=0){this.first - that.first}else{this.second - that.second}}
}
package com.quinto.sortimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object SecondarySortApp {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SecondarySort").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("E:\\test\\a.txt")val sorted = lines.map(lines=> ( new SecondarySortKey(lines.split(" ")(0).toInt,lines.split(" ")(1).toInt),lines))sorted.sortByKey(false).map(line => line._2).collect().foreach(println)}
}

3、TopN

3.1 基础TopN

排序后直接使用take算子取出前几个,take算子后返回的是数组,不是rdd,不能用collect。

package com.quinto.sortimport org.apache.spark.{SparkConf, SparkContext}object BasicYopNApp {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("BasicYopNApp").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("E:\\test\\a.txt")//①生成k-v键值对方便sortByKey进行排序(Int已经实现了排序比较的接口)②降序排序③过滤出排序内容本身⑤获取排名前5放入元素内容,元素内容构成一个Arraylines.map(line=>(line.toInt,line)).sortByKey(false).map(line=>line._2).take(5).foreach(println)}
}

3.2 分组TopN

分组排序就是有不同类型的数据,不同数据中每一种类型数据里面的TopN。

object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${GroupSortTopN.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/test.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key进行分组//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(5))}}sorted.foreach(println)sc.stop()}
}

3.3 优化分组TopN

上述在编码过程当中使用groupByKey,我们说着这个算子的性能很差,因为没有本地预聚合,所以应该在开发过程当中尽量避免使用,能用其它代替就代替。使用combineByKey模拟

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${GroupSortByCombineByKeyTopN.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/test.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 5) {ab.take(5)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 5) {ab.take(5)} else {ab}}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473670.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ACwing 3. 完全背包问题(DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包&#xff0c;每种物品都有无限件可用。 第 i 种物品的体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。…

Crontab定时任务访问url实例

以下操作均是在ubuntu 下操作的&#xff1a; 1、进入crontab文件的编写状态&#xff1a; crontab -e 2、第一次进入编写crontab文件的界面&#xff0c;系统会提示选择相应的编辑器&#xff0c;一般我们选择vi编辑器就可以了&#xff1a;选择/usr/bin/vim.tiny 12345Select a…

ACwing 4. 多重背包问题 I(DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件&#xff0c;每件体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使物品体积总和不超过背包容量&#xff0c;且价值总和最大。 输出最大价值。 输入格式…

数据算法与结构基本知识

数据结构与算法作用 没有看过数据结构和算法&#xff0c;有时面对问题可能会没有任何思路&#xff0c;不知如何下手去解决&#xff1b;大部分时间可能解决了问题&#xff0c;可是对程序运行的效率和开销没有意识&#xff0c;性能低下&#xff1b;有时会借助别人开发的利器暂时…

Master HA源码解析

1、Master HA概述 Spark在生产上做HA一般采用的是通过zookeeper的方式&#xff0c;配置3个master的话是比较可靠的方式。采用zookeeper做HA的话zookeeper会保存整个Spark程序运行时候的元数据&#xff08;包括Workers&#xff0c;Drivers&#xff0c;Applications&#xff0c;…

DNS坑爹呢?!

昨天下午3点多&#xff0c;大量网民反映无法上网。多家DNS服务商通过微博透露&#xff0c;在1月21日下午3点20分左右&#xff0c;全国所有通用顶级域的根出现异常&#xff0c;导致部分国内网民无法访问.com域名网站&#xff0c;对中国互联网造成严重影响。 昨天下午有事出去&am…

数据结构顺序表基本流程

生活中很多事物是有顺序关系的&#xff0c;如班级座位从前到后是按排的顺序&#xff0c;从左到右是按列的顺序&#xff0c;可以很方便的定位到某一个位置&#xff0c;但如果座位是散乱的&#xff0c;就很难定位。 在程序中&#xff0c;经常需要将一组&#xff08;通常是同为某…

Spark2.x RPC解析

1、概述 在Spark中很多地方都涉及网络通信&#xff0c;比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后&#xff0c;master 和worker 之间完全不使用akka 通信&#xff0c;改用netty实现。因为使用Akka…

LeetCode 1629. 按键持续时间最长的键

文章目录1. 题目2. 解题1. 题目 LeetCode 设计了一款新式键盘&#xff0c;正在测试其可用性。测试人员将会点击一系列键&#xff08;总计 n 个&#xff09;&#xff0c;每次一个。 给你一个长度为 n 的字符串 keysPressed &#xff0c;其中 keysPressed[i] 表示测试序列中第 …

数据结构中的栈

整理衣服时&#xff0c;先放冬天的衣服&#xff0c;后放夏天的衣服&#xff0c;这样夏天的衣服就在上面&#xff0c;方便夏季取用。 栈&#xff08;stack&#xff09;&#xff0c;有些地方称为堆栈&#xff0c;是一种容器&#xff0c;可存入数据元素、访问元素、删除元素&…

数据结构中的队列

生活中很多时候需要排队来维持秩序&#xff0c;如等公交、取票、办理银行业务等。 队列&#xff08;queue&#xff09;是只允许在一端进行插入操作&#xff0c;而在另一端进行删除操作的线性表。 队列是一种先进先出的&#xff08;First In First Out&#xff09;的线性表&am…

SparkContext解析

1、SparkContext概述 Spark的程序编写是基于SparkContext的&#xff0c;体现在2方面&#xff1a;①Spark编程的核心基础&#xff08;RDD&#xff09;&#xff0c;第一个RDD是由SparkContext创建的&#xff1b;②Spark程序的调度优化也是基于SparkContext&#xff0c;RDD在一开…

LeetCode 1630. 等差子数组

文章目录1. 题目2. 解题1. 题目 如果一个数列由至少两个元素组成&#xff0c;且每两个连续元素之间的差值都相同&#xff0c;那么这个序列就是 等差数列 。更正式地&#xff0c;数列 s 是等差数列&#xff0c;只需要满足&#xff1a;对于每个有效的 i &#xff0c; s[i1] - s[…

LeetCode 1631. 最小体力消耗路径(DFS + 二分查找)

文章目录1. 题目2. 解题1. 题目 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights &#xff0c;其中 heights[row][col] 表示格子 (row, col) 的高度。 一开始你在最左上角的格子 (0, 0) &#xff0c;且你希望去最右下角的格子 (rows-1, columns-1) &…

Spark资源调度分配

1、任务调度与资源调度 任务调度&#xff1a;是指通过DAGScheduler&#xff0c;TaskScheduler&#xff0c;SchedulerBackend等进行的作业调度。 资源调度&#xff1a;是指应用程序获取资源。 任务调度是在资源调度的基础上&#xff0c;没有资源调度&#xff0c;那么任务调度…

两个栈实现队列与两个队列实现栈

1. 两个栈实现队列 实现一 思路 s1是入栈的&#xff0c;s2是出栈的。 入队列&#xff0c;直接压到s1是就行了出队列&#xff0c;先把s1中的元素全部出栈压入到s2中&#xff0c;弹出s2中的栈顶元素&#xff1b;再把s2的所有元素全部压回s1中 实现二 思路 s1是入栈的&#xff0c…

ACwing 5. 多重背包问题 II(二进制拆分+DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件&#xff0c;每件体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使物品体积总和不超过背包容量&#xff0c;且价值总和最大。 输出最大价值。 输入格式…

排序:冒泡排序与选择排序

冒泡排序 冒泡排序&#xff08;英语&#xff1a;Bubble Sort&#xff09;是一种简单的排序算法。它重复地遍历要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果他们的顺序错误就把他们交换过来。遍历数列的工作是重复地进行直到没有再需要交换&#xff0c;也就是说该…

Spark Master的注册机制与状态管理

目录 1、Master接收注册的主要对象 2、Master接收Worker的注册 3、Master接收Driver的注册 4、Master处理Driver状态变化 5、Master接收Application的注册 6、Master处理Executor状态变化 1、Master接收注册的主要对象 Master主要接受注册的对象是&#xff1a;Applicatio…

排序:插入排序与希尔排序

插入排序 插入排序&#xff08;英语&#xff1a;Insertion Sort&#xff09;是一种简单直观的排序算法。它的工作原理是通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。插入排序在实现上&#xff0c;在从…