1. 创建Maven项目
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wakedata</groupId><artifactId>code</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><encoding>UTF-8</encoding><spark.version>3.4.1</spark.version><scala.version>2.12.14</scala.version></properties><dependencies><!-- scala依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark core 即为spark内核 ,其他⾼级组件都要依赖spark core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency></dependencies><build><!--scala待编译的文件目录--><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><!--scala插件--><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!--<arg>-make:transitive</arg>--><!--scala2.11 netbean不支持这个参数--><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><!--manven打包插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.itcast.rpc.Master</mainClass> <!--main方法--></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
2.目录结构
3. 代码实现
package sparkCoreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/**** 1. 创建SparkContext* 2. 创建RDD* 3. 调用RDD的Transformation算子* 4. 调用Action* 5. 释放资源*/object wordcount_01 {def main(args: Array[String]): Unit = {val conf:SparkConf = new SparkConf().setAppName("WordCount").setMaster("local")//创建SparkContext,使⽤SparkContext来创建RDDval sc: SparkContext = new SparkContext(conf)//spark写Spark程序,就是对抽象的神奇的⼤集合【RDD】编程,调⽤它⾼度封装的API //使⽤SparkContext创建RDDval lines: RDD[String] = sc.textFile("./data/words.txt")//切分压平val words: RDD[String] = lines.flatMap(_.split(" "))将单词和⼀组合放在元组中val wordsAndOne: RDD[(String, Int)] = words.map((_, 1))//分组聚合,reduceByKey可以先局部聚合再全局聚合val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _)//排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)//打印结果sorted.foreach(line => print(line))//释放资源sc.stop()}}
运行结果: