03-第一个Spark程序WordCount

Scala版

1)创建项目

增加 Scala 插件

Spark 由 Scala 语言开发的,咱们当前使用的 Spark 版本为 3.2.0,默认采用的 Scala 编译版本为 2.13,所以后续开发时。我们依然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件

创建Maven工程

创建Maven Project工程,GAV如下:

GroupIdArtifactIdVersion
com.clear.sparkbigdata-spark_2.131.0

创建Maven Module工程,GAV如下:

GroupIdArtifactIdVersion
com.clear.sparkspark-core1.0

POM

<repositories><!-- 指定仓库的位置,依次为aliyun、cloudera、jboss --><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>https://repository.jboss.com/nexus/content/groups/public/</url></repository>
</repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.13.5</scala.version><scala.binary.version>2.13</scala.binary.version><spark.version>3.2.0</spark.version><hadoop.version>3.1.3</hadoop.version>
</properties><dependencies><!-- 依赖Scala语言--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- Spark Core 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Hadoop Client 依赖 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency>
</dependencies><build><outputDirectory>target/classes</outputDirectory><testOutputDirectory>target/test-classes</testOutputDirectory><resources><resource><directory>${project.basedir}/src/main/resources</directory></resource></resources><plugins><!-- maven 编译插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding></configuration></plugin><!-- 该插件用于将 Scala 代码编译成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><!-- 声明绑定到 maven 的 compile 阶段 --><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins>
</build>
<dependencies><!-- spark-core依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.2.0</version></dependency>
</dependencies>
<build><plugins><!-- 该插件用于将 Scala 代码编译成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><!-- 声明绑定到 maven 的 compile 阶段 --><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

配置文件

在src/main/resources目录下放置如下三个文件,可以从服务器中拷贝:

  • core-site.xml
  • hdfs-site.xml
  • log4j.properties

3)代码编写

package com.clear.sparkimport org.apache.spark.{SparkConf, SparkContext}/*** 使用Scala语言使用SparkCore编程实现词频统计:WordCount* 从HDFS上读取文件,统计WordCount,将结果保存在HDFS上*/
object SparkWordCount {def main(args: Array[String]): Unit = {// todo 创建SparkContext对象,需要传递SparkConf对象,设置应用配置信息val conf = new SparkConf().setAppName("词频统计").setMaster("local[2]")val sc = new SparkContext(conf)// todo 读取数据,封装数据到RDDval inputRDD = sc.textFile("/opt/data/wc/README.md")// 分析数据,调用RDD算子val resultRDD = inputRDD.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey((tmp, item) => tmp + item)// 保存数据,将最终RDD结果数据保存至外部存储系统resultRDD.foreach(tuple => println(tuple))resultRDD.saveAsTextFile(s"/opt/data/wc-${System.nanoTime()}")// 应用程序结束,关闭资源sc.stop()}
}

4)测试

[nhk@kk01 wordcount]$ $SPARK_HOME/bin/spark-submit --class com.clear.WordCount /opt/data/wordcount/spark-core-scala-1.0.jar 

Java版

1)POM

<dependencies><!-- spark-core依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.2.0</version><scope>provided</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><configuration><archive><manifest><!-- mainClass标签填写主程序入口--><mainClass>com.clear.demo1.CreateFileUtil</mainClass><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix></manifest></archive><classesDirectory></classesDirectory></configuration></plugin><!-- 复制依赖文件到编译目录中 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>3.1.1</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin></plugins>
</build>

2)代码

package com.clear.wordcount;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class JavaSparkWordCount {public static void main(String[] args) {// 创建 SparkConf 对象配置应用SparkConf conf = new SparkConf().setAppName("JavaSparkWordCount").setMaster("local");// 基于 SparkConf 创建 JavaSparkContext 对象JavaSparkContext jsc = new JavaSparkContext(conf);// 加载文件内容JavaRDD<String> lines = jsc.textFile("file:///opt/data/wordcount/README.md");// 转换为单词 RDDJavaRDD<String> words = lines.flatMap(line ->Arrays.asList(line.split(" ")).iterator());// 统计每个单词出现的次数JavaPairRDD<String, Integer> counts = words.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> (x + y));// 输出结果counts.saveAsTextFile("file:///opt/data/wordcount/wc");// 关闭 JavaSparkContext 对象jsc.stop();}
}

3)测试

运行:

[nhk@kk01 wordcount]$ $SPARK_HOME/bin/spark-submit --class com.clear.wordcount.JavaSparkWordCount /opt/data/wordcount/spark-core-demo-1.0.jar 

查看结果:

[nhk@kk01 wc]$ pwd
/opt/data/wordcount/wc
[nhk@kk01 wc]$ ll
total 8
-rw-r--r--. 1 nhk nhk 4591 Jul 30 17:48 part-00000
-rw-r--r--. 1 nhk nhk    0 Jul 30 17:49 _SUCCESS
[nhk@kk01 wc]$ head part-00000 
(package,1)
(For,3)
(Programs,1)
(processing.,2)
(Because,1)
(The,1)
(cluster.,1)
(its,1)
([run,1)
(APIs,1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/42122.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ebay灯串UL报告 UL588检测标准

季节性和装饰性照明用品即灯串以及配件都是便携式插头连接的临时性商品&#xff0c;最大额定输入电压为 120 伏。 由 ILAC ISO 17025 认证的实验室出具的检测报告&#xff0c;确认每件商品均已经过检测&#xff0c;符合下列要求&#xff1a; 季节性和装饰性照明用品(灯串&…

企业中商业智能BI,常见的工具和技术

商业智能&#xff08;Business Intelligence&#xff0c;简称BI&#xff09;数据可视化是通过使用图表、图形和其他可视化工具来呈现和解释商业数据的过程。它旨在帮助组织更好地理解和分析他们的数据&#xff0c;从而做出更明智的商业决策。 常见的商业智能数据可视化工具和技…

AtcoderABC222场

A - Four DigitsA - Four Digits 题目大意 给定一个整数N&#xff0c;其范围在0到9999之间&#xff08;包含边界&#xff09;。在将N转换为四位数的字符串后&#xff0c;输出它。如果N的位数不足四位&#xff0c;则在前面添加必要数量的零。 思路分析 可以使用输出流的格式设…

鼠标样式和指向

学习抖音&#xff1a; 渡一前端教科频道 图上指针跟着鼠标移动&#xff0c;并且改变方向 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><style>* {padding: 0;margin: 0;}.arrow {position: fixed;width: 3…

Spring Clould 消息队列 - RabbitMQ

视频地址&#xff1a;微服务&#xff08;SpringCloudRabbitMQDockerRedis搜索分布式&#xff09; 初识MQ-同步通讯的优缺点&#xff08;P61&#xff0c;P62&#xff09; 同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&…

数据库名字添加中文

Jetbrains 可以呀&#xff0c;这个ui 相当棒 from database import Sqlite3Database from googletrans import Translator import csvif __name__ "__main__":TRANS_EN2ZH Falsetranslator Translator()sqlite Sqlite3Database("./drurmu.db")sqlite.r…

x.view(a,b)及x = x.view(x.size(0), -1) 的理解说明

x.view()就是对tensor进行reshape&#xff1a; 我们在创建一个网络的时候&#xff0c;会在Foward函数内看到view的使用。 首先这里是一个简单的网络&#xff0c;有卷积和全连接组成。它的foward函数如下&#xff1a; class NET(nn.Module):def __init__(self,batch_size):sup…

小米交卷大模型,全新小爱同学实测来了

本文源自&#xff1a;量子位 果然只有雷军和小米&#xff0c;能抢走风口上大模型的热度。 在雷军的年度演讲分享中&#xff0c;讲武大求学经历&#xff0c;分享学霸4年大学2年完课经验&#xff1b;讲被《硅谷之火》点燃&#xff0c;勤奋练习写最好的代码&#xff0c;开启第一…

armbian使用1panel快速部署部署springBoot项目后端

文章目录 前言环境准备实现步骤第一步&#xff1a;Armbian安装1panel第二步&#xff1a;安装数据库第三步&#xff1a;查看数据库容器重要信息【重要】查看容器所在的网络查看容器连接地址 第四步&#xff1a;项目配置和打包第五步:构建项目镜像 前言 这里只是简单记录部署spr…

一次性解决office部署问题(即点即用等)

前言 因为之前电脑安装了office2019&#xff0c;后面需要安装Visio&#xff0c;下载安装时报错30204-44,查看发现之前安装的office版本是即点即用版&#xff0c;可能这两者不兼容。网上搜索教程等&#xff0c;最后发现一个工具&#xff1a;Office Tool Plus&#xff0c;可以方便…

【水文学法总结】河道内生态流量计算方法(含MATLAB实现代码)

生态流量&#xff08;Ecological Flow, EF&#xff09; 是指维持河道内生态环境所需要的水流流量。生态流量计算方法众多&#xff0c;主要分为水文学方法、栖息地模拟法、水力学方法、整体法等&#xff0c;各方法多用于计算维持河道生态平衡的最小生态流量&#xff08;Minimum …

LeetCode 141.环形链表

文章目录 &#x1f4a1;题目分析&#x1f4a1;解题思路&#x1f514;接口源码&#x1f4a1;深度思考❓思考1❓思考2 题目链接&#x1f449; LeetCode 141.环形链表&#x1f448; &#x1f4a1;题目分析 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中…

【ES6】—let 声明方式

一、不属于顶层对象window let 关键字声明的变量&#xff0c;不会挂载到window的属性 var a 5 console.log(a) console.log(window.a) // 5 // 5 // 变量a 被挂载到window属性上了 &#xff0c; a window.alet b 6 console.log(b) console.log(window.b) // 6 // undefin…

原生js获取今天、昨天、近7天的时间(年月日时分秒)

有的时候我们需要将今天,昨天,近7天的时间(年月日时分秒)作为参数传递给后端,如下图: 那怎么生成这些时间呢?如下代码里,在methods里的toDay方法、yesterDay方法、weekDay方法分别用于生成今天、昨天和近7天的时间: <template><div class="box"&…

暂停Windows更新的方法,可延后数十万年,简单且有手就行

前言 近年来&#xff0c;Windows更新频率过快&#xff0c;最大只能暂停更新5周&#xff0c;导致用户不厌其烦&#xff0c;从网上找到的暂停更新的方法不是过于繁琐就是毫无效果&#xff0c;或者是暂停的时间有限&#xff0c;无意中发现一个大神的帖子可以通过修改注册表信息以达…

Java定时任务方案

一、Timer import java.util.Timer; import java.util.TimerTask;public class TimerExample {public static void main(String[] args) {Timer timer new Timer();TimerTask task new TimerTask() {Overridepublic void run() {System.out.println("Task executed at:…

uni-app自定义多环境配置,动态修改appid

背景 在企业级项目开发中&#xff0c;一般都会分为开发、测试、预发布、生产等多个环境&#xff0c;在工程化中使用不同的打包命令改变环境变量解决不同环境各种变量需要手动修改的问题&#xff0c;比如接口请求地址&#xff0c;不同环境的请求路径前缀都是不同的。在使用uni-…

Docker中为RabbitMQ安装rabbitmq_delayed_message_exchange延迟队列插件

1、前言 rabbitmq_delayed_message_exchange是一款向RabbitMQ添加延迟消息传递&#xff08;或计划消息传递&#xff09;的插件。 插件下载地址&#xff1a;https://www.rabbitmq.com/community-plugins.html 1、下载插件 首先需要确定我们当前使用的RabbitMQ的版本&#xff0c…

Android隐藏输入法

1、方法一(如果输入法在窗口上已经显示&#xff0c;则隐藏&#xff0c;反之则显示) InputMethodManager imm (InputMethodManager) getSystemService(Context.INPUT_METHOD_SERVICE); imm.toggleSoftInput(0, InputMethodManager.HIDE_NOT_ALWAYS); 2、方法二(view为接受软…