一、Sort
计算文本里面的每个单词出现的个数,单词个数逆序(相同个数单词正序)输出结果。
二、maven设置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>
三、编程代码
public class SortApp implements SparkConfInfo{public static class SortData implements Comparable<SortData>, Serializable {private String word;private Integer count;public SortData(String word, Integer count) {this.word = word;this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public Integer getCount() {return count;}public void setCount(Integer count) {this.count = count;}@Overridepublic int compareTo(SortData o) {if (o == null) {return 1;}int diff = o.count - this.count;if (diff != 0)return diff;if(word == o.word)return 0 ;if(word == null)return -1;if(o.word == null)return 1;return this.word.compareTo(o.word);}}public static void main(String[]args){String filePath = "F:\\test\\log.txt";SparkSession sparkSession = new SortApp().getSparkConf("sort");List<String> wordCounts = sparkSession.sparkContext().textFile(filePath, 4).toJavaRDD().flatMap(v -> Arrays.asList(v.split("[(\\s+)(\r?\n),.。'’]")).iterator()).filter(v -> v.matches("[a-zA-Z-]+")).map(String::toLowerCase).mapToPair(v -> new Tuple2<>(v, 1)).reduceByKey(Integer::sum).map(v->new SortData(v._1, v._2)).sortBy(v -> v, true,4).map(v->v.word).collect();wordCounts.forEach(v -> System.out.println(v));sparkSession.stop();}
}public interface SparkConfInfo {default SparkSession getSparkConf(String appName){SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}
}
文件内容
Spark Streaming is an extension of the core Spark API that enables scalable,high-throughput, fault-tolerant stream processing of live 。data streams. Data, can be ,ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems,Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.This guide shows you how to start writing Spark Streaming programs with DStreams. You can write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2), all of which are presented in this guide. You will find tabs throughout this guide that let you choose between code snippets of different languages. databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
输出
spark
can
of
and
data
you
a
be
in
or
streaming
dstreams
from
guide
high-level
stream
streams
this
algorithms
as
dstream
flume
is
kafka
kinesis
like
live
on
processed
processing
programs
sources
that
to
which
with
abstraction
all
an
api
apply
applying
are
between
by
called
choose
code
complex
continuous
core
created
dashboards
databases
different
discretized
either
enables
expressed
extension
fact
fault-tolerant
filesystems
finally
find
functions
graph
high-throughput
how
input
internally
introduced
java
join
languages
learning
let
machine
many
map
operations
other
out
presented
provides
pushed
python
rdds
reduce
represented
represents
s
scala
scalable
sequence
shows
snippets
sockets
start
such
tabs
tcp
the
throughout
using
will
window
write
writing