上一篇:
大数据-MapReduce-关于Json数据格式的数据的处理与练习-CSDN博客
16.7 Json在Spark中的引用
依旧利用上篇的数据去获取每部电影的平均分
{"mid":1,"rate":6,"uid":"u001","ts":15632433243}
{"mid":1,"rate":4,"uid":"u002","ts":15632433263}
{"mid":1,"rate":5,"uid":"u003","ts":15632403263}
{"mid":1,"rate":3,"uid":"u004","ts":15632403963}
{"mid":1,"rate":4,"uid":"u004","ts":15632403963}
{"mid":2,"rate":5,"uid":"u001","ts":15632433243}
{"mid":2,"rate":4,"uid":"u002","ts":15632433263}
{"mid":2,"rate":5,"uid":"u003","ts":15632403263}
{"mid":2,"rate":3,"uid":"u005","ts":15632403963}
{"mid":2,"rate":7,"uid":"u005","ts":15632403963}
{"mid":2,"rate":6,"uid":"u005","ts":15632403963}
{"mid":3,"rate":2,"uid":"u001","ts":15632433243}
{"mid":3,"rate":1,"uid":"u002","ts":15632433263}
{"mid":3,"rate":3,"uid":"u005","ts":15632403963}
{"mid":3,"rate":8,"uid":"u005","ts":15632403963}
{"mid":3,"rate":7,"uid":"u005","ts":15632403963}
Spark代码
/*** Test02.scala** Scala code for calculating the average rating of each movie.*/package com.doit.day0130import com.doit.day0126.Movie
import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Test02 {def main(args: Array[String]): Unit = {// 创建SparkConf对象,并设置应用程序名称和运行模式val conf = new SparkConf().setAppName("Starting...").setMaster("local[*]")// 创建SparkContext对象,并传入SparkConf对象val sc = new SparkContext(conf)// 读取数据文件"movie.json",并将其转换为RDDval rdd1 = sc.textFile("data/movie.json")// 将RDD中的每一行转换为Movie对象,并形成新的RDDval rdd2: RDD[Movie] = rdd1.map(line => {// 使用JSON解析器将每一行转换为Movie对象val mv = JSON.parseObject(line, classOf[Movie])mv})// 对RDD进行分组操作,以电影ID作为分组依据val rdd3: RDD[(Int, Iterable[Movie])] = rdd2.groupBy(_.mid)// 计算每个电影的评分总和和数量,并计算平均评分val rdd4 = rdd3.map(tp => {// 获取电影IDval mid = tp._1// 计算评分总和val sum = tp._2.map(_.rate).sum// 计算电影数量val size = tp._2.size// 计算平均评分(mid, 1.0 * sum / size)})// 打印出每部电影的平均评分rdd4.foreach(println)}
}