一、统计socket单词数
侦听TCP套接字的数据服务器接收到的文本数据中的单词数。
二、maven配置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency>
</dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>
三、编程代码
public class SocketApp implements SparkConfInfo {public static void main(String[] args) throws InterruptedException {JavaStreamingContext streamingContext = new SocketApp().getStreamingContext("SocketApp", 5);JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream("localhost", 8891);JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")).stream().filter(v->v.length()>0).iterator());JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);wordCounts.foreachRDD(v->{v.foreach(s-> System.out.println(s._1+":" + s._2));System.out.println("---------------------------");});streamingContext.start();streamingContext.awaitTermination();}
}public interface SparkConfInfo {default JavaStreamingContext getStreamingContext(String appName, int second){SparkConf sparkConf = getSparkConf();sparkConf.setAppName(appName);JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(second));return jssc;}default SparkSession getSparkSession(String appName){SparkConf sparkConf = getSparkConf();SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}default SparkConf getSparkConf() {SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}return sparkConf;}
}
输入内容
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
输出结果
Mark:2
Tom:2
Jesse:2
Philip:2
Alice:2
Jone:2
Terry:4
Alma:2
Ben:1
Lucy:4
Mary:1
Jack:4
---------------------------