Spark经典案例
- 链接操作案例
- 二次排序案例
链接操作案例
案例需求
数据介绍
代码如下:
package base.charpter7import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023 * @package base.charpter7 * @className base.charpter7.Join * @description ${description} * @author pblh123* @date 2023/11/28 17:25* @version 1.0**/object Join {def main(args: Array[String]): Unit = {// 1. 创建一个sc对象if (args.length != 4) {println("usage is WordCount <rating> <movie> <output>")System.exit(5)}val murl = args(0)val ratingfile = args(1)val movingfile = args(2)val outputfile = args(3)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(murl).getOrCreate()val sc: SparkContext = spark.sparkContext// 2. 代码主体// 判断输出路径是否存在,存在则删除val conf: Configuration = new Configuration()val fs: FileSystem = FileSystem.get(conf)if (fs.exists(new Path(outputfile))) {println(s"存在目标文件夹$outputfile")fs.delete(new Path(outputfile))println(s"目标文件夹$outputfile 已删除")}else println(s"目标文件夹$outputfile 不存在")//rating etlval ratingrdd: RDD[String] = sc.textFile(ratingfile, 1)val rating: RDD[(Int, Double)] = ratingrdd.map(line => {val fileds: Array[String] = line.split("::")(fileds(1).toInt, fileds(2).toDouble)})val movieScores: RDD[(Int, Double)] = rating.groupByKey().map(x => {val avg = x._2.sum / x._2.size(x._1, avg)})// move etlval movierdd: RDD[String] = sc.textFile(movingfile)// movieid,(movieid,title)val movieskey: RDD[(Int, (Int, String))] = movierdd.map(line => {val fileds: Array[String] = line.split("::")(fileds(0).toInt, fileds(1))}).keyBy(tup => tup._1)// movieid,(movieid,avg_rating)val sskey: RDD[(Int, (Int, Double))] = movieScores.keyBy(tup => tup._1)// movieid, (movieid,avg_rating),(movieid,title)val joinres: RDD[(Int, ((Int, Double), (Int, String)))] = sskey.join(movieskey)// movieid,avg_rating,titleval res: RDD[(Int, Double, String)] = joinres.filter(f => f._2._1._2 > 4.0).map(f => (f._1, f._2._1._2, f._2._2._2))
// val res: RDD[(Int, Double, String)] = sskey.join(movieskey)
// .filter(f => f._2._1._2 > 4.0)
// .map(f => (f._1, f._2._1._2, f._2._2._2))res.take(5).foreach(println)res.saveAsTextFile(outputfile)// 3. 关闭sc,spark对象sc.stop()spark.stop()}
}
运行结果
二次排序案例
需求及数据说明:
代码实现
SecondarySortKey.class 方法
package base.charpter7/*** @projectName sparkGNU2023 * @package base.charpter7 * @className base.charpter7.SecondarySortKey * @description ${description} * @author pblh123* @date 2023/11/29 17:01* @version 1.0*/class SecondarySortKey(val first:Int, val second:Int) extends Ordered[SecondarySortKey] with Serializable{override def compare(that: SecondarySortKey): Int = {if (this.first - that.first != 0){this.first - that.first} else {this.second - that.second}}
}
SecondarySortApp.scala方法
package base.charpter7import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023 * @package base.charpter7 * @className base.charpter7.SecondarySortApp * @description ${description} * @author pblh123* @date 2023/11/29 17:04* @version 1.0**/object SecondarySortApp {def main(args: Array[String]): Unit = {// 1. 创建spark,sc对象if (args.length != 2) {println("您需要输入二个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext// 2. 代码主体// 读取一个txt文件val inputfile: String = args(1)val lines: RDD[String] = sc.textFile(inputfile, 1)// 进行二次排序val pairRDDwithSort: RDD[(SecondarySortKey, String)] = lines.map(line => {val strings: Array[String] = line.split(" ")(new SecondarySortKey(strings(0).toInt, strings(1).toInt), line)})val pairRDDwithSort2: RDD[(SecondarySortKey, String)] = pairRDDwithSort.sortByKey(false)val sortedRes: RDD[String] = pairRDDwithSort2.map(sortedline => sortedline._2)sortedRes.collect().foreach(println)// 3. 关闭sc,spark对象sc.stop()spark.stop()}
}
配置参数
运行效果