IDEA编写各种WordCount运行

目录

一、编写WordCount(Spark_scala)提交到spark高可用集群

1.项目结构

2.导入依赖

3.编写scala版的WordCount

4.maven打包

5.运行jar包

​6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果


搞了一个晚上加一个白天,总算搞出来了,呼~~

本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:

《Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2》

一、编写WordCount(Spark_scala)提交到spark高可用集群

首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码

1.项目结构

2.导入依赖

<name>spark-in-action</name><url>http://maven.apache.org</url><!-- 定义的一些常量 --><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><encoding>UTF-8</encoding><spark.version>3.3.2</spark.version><scala.version>2.12.15</scala.version></properties><dependencies><!-- scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency></dependencies><!-- 配置Maven的镜像库 --><!-- 依赖下载国内镜像库 --><repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></repository></repositories><!-- maven插件下载国内镜像库 --><pluginRepositories><pluginRepository><id>ali-plugin</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></pluginRepository></pluginRepositories><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

3.编写scala版的WordCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCount")// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(_.split(" "))// 将单词和1组合放入到元组中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}

4.maven打包

这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖

5.运行jar包

首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的

创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下

提交命令

[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1

--master spark://node141:7077     spark的master节点

--class cn.doitedu.day01.WordCount   运行的类名

--executor-memory 1g   占用的内存

--total-executor-cores 4  占用的核数

./spark-in-action-1.0.jar  运行的jar包地址

hdfs://node141:9000/words.txt   代码中args[0]对应的参数

hdfs://node141:9000/out-1  代码中args[1]对应的参数

6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

package cn.doitedu.day01import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 使用本地模式运行Spark程序(开发调试的时候使用)*/
object LocalWordCount {def main(args: Array[String]): Unit = {// 指定当前用户为rootSystem.setProperty("HADOOP_USER_NAME", "root")val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(line => {val words = line.split(" ")println(words)  // debugwords})// 将单词和1组合放入到元祖中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

查看运行结果

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String lines) throws Exception {String[] words = lines.split("\\s+");return Arrays.asList(words).iterator();}});JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);}});JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 将原来的kv顺序颠倒  (flink,3)  ----> (3,flink)JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {return tp.swap(); // 交换}});JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {return tp.swap();}});result.saveAsTextFile(args[1]);jsc.stop();}
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class JavaLambdaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
//        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);// 排序JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);// 调回来JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);result.saveAsTextFile(args[1]);jsc.stop();}
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

好啦~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/737510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSM整合项目(删除家居 + 分页查询)

1.删除家居 1.需求分析 2.编写Service层 1.FurnService.java 添加方法 //删除家居public void del(Integer id);2.FurnServiceImpl.java 实现方法 Overridepublic void del(Integer id) {furnMapper.deleteByPrimaryKey(id);}3.单元测试 Testpublic void del() {furnService.…

微信公众号调用沙箱支付

沙箱支付 登录支付宝开放平台&#xff0c;选择底部沙箱支付 下载密钥生成工具 生成应用私钥与公钥&#xff0c;上传沙箱支付&#xff0c;获得支付宝公钥 配置支付通知与支付回调地址 SpringBoot配置 yml文件 这里的地址必须与沙箱配置的一样 controller package com.zq…

植物病害识别:YOLO水稻病害识别数据集(11000多张,yolo标注)

YOLO水稻病害识别数据集&#xff0c;包含叶斑病&#xff0c;褐斑病&#xff0c;细菌性枯萎病&#xff0c;东格鲁病毒病4个常见病害类别&#xff0c;共11000多张图像&#xff0c;yolo标注完整&#xff0c;可直接训练。 适用于CV项目&#xff0c;毕设&#xff0c;科研&#xff0c…

第十五届蓝桥杯-UART接收不定长指令的处理

学习初衷&#xff1a; 不仅仅为了比赛&#xff01; 目录 一、问题引入 二、UART常用的三种工作模式 1.UART工作在中断模式 2.UART工作在DMA模式下 3.uart工作在接收转空闲的模式下 三、获取指令中需要的数据 四、printf函数的实现 一、问题引入 问题引入&#xff1a;请…

【Java】仓库管理系统 SpringBoot+LayUI+DTree(源码)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

Java状态模式源码剖析及使用场景

状态模式 一、介绍二、订单处理系统中使用三、Spring 中事务怎么使用状态模式&#xff1f; 一、介绍 状态模式(State Pattern)是一种行为型设计模式,它允许对象在内部状态改变时改变它的行为,对象看起来好像修改了它的类。 状态模式的核心思想是将对象的状态封装成独立的类,使…

深度学习Top10算法

自2006年深度学习概念被提出以来&#xff0c;20年快过去了&#xff0c;深度学习作为人工智能领域的一场革命&#xff0c;已经催生了许多具有影响力的算法。以下是深度学习top10算法&#xff0c;它们在创新性、应用价值和影响力方面都具有重要的地位。 1、深度神经网络&#xf…

鸿蒙开发(四)-低代码开发

鸿蒙开发(四)-低代码开发 本文主要介绍下鸿蒙下的低代码开发。 鸿蒙低代码是指在鸿蒙操作系统进行应用开发时&#xff0c;采用简化开发流程和减少编码量的方式来提高开发效率。 1&#xff1a;开启低代码开发 首先我们打开DevEco Studio .然后创建工程。 如图所示&#xff…

定制红酒:如何根据客户需求调整红酒口感与风格

在云仓酒庄洒派&#xff0c;云仓酒庄洒派深知不同消费者对于红酒的口感与风格有着不同的喜好和需求。因此&#xff0c;云仓酒庄洒派根据消费者的具体要求&#xff0c;灵活调整红酒的口感与风格&#xff0c;以满足他们的期望。 首先&#xff0c;云仓酒庄洒派会与消费者进行深入的…

AHU 汇编 实验三

实验名称&#xff1a;实验三 串操作指令 二、实验内容&#xff1a; 在数据段定义缓冲区&#xff0c;从键盘接收两串字符到两个缓冲区&#xff0c;将第二串中与第一串字符不一致的字符显示在屏幕。 实验过程&#xff1a; 源代码&#xff1a; data segmentmess1 db 16,?,16…

XGB-19:Xgboost常见问题QA

常见问题解答 本文档包含关于XGBoost的常见问题。 如何调整参数 请参阅参数调整指南。 模型描述 请参阅提升树介绍。 大数据集 XGBoost被设计为内存高效。通常&#xff0c;只要数据适合本机内存&#xff0c;它就可以处理问题。这通常意味着数百万个实例。 如果内存不足&…

数据分析-Pandas如何画图验证数据随机性

数据分析-Pandas如何画图验证数据随机性 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表…

【字典合集】SecLists-更全面的渗透测试字典 v2024.1

下路路径 SecLists-更全面的渗透测试字典 v2024.1 简介 SecLists 是一个致力于收集各种安全字典的开源项目。这些字典包括但不限于&#xff1a;密码字典、用户名字典、网络扫描结果、漏洞利用载荷、web shells、可用于渗透测试的Payloads、以及其他各种安全相关的字典。 这…

Docker初体验之安装部署和镜像加速(openeuler版)

安装部署&#xff1a; 本人使用的为openeuler版本&#xff0c;无法使用二进制进行安装&#xff08;使用二进制安装时&#xff0c;无法使用docker中的补全命令&#xff0c;需要重新进行配置&#xff09;在此使用yum直接进行安装。 [rootlocalhost ~]# yum install docker 镜像…

如何远程SSH连接在家的服务器主机

当您需要通过SSH远程连接到家里的服务器主机时&#xff0c;以下是更详细的实施步骤&#xff1a; 1. 确保服务器主机已开启SSH服务 安装SSH服务&#xff1a;首先&#xff0c;确保您的服务器主机上安装了SSH服务。根据您的操作系统&#xff0c;您可以使用相应的包管理器来安装。…

LeetCode 174.地下城游戏 Python题解

地下城游戏 # 地下城游戏 """ 恶魔们抓住了公主并将她关在了地下城dungeon的右下角。地下城是由mxn个房间组成的二维网格。我们英勇的骑士最初被安置在左上角的房间里&#xff0c; 他必须穿过地下城并通过对抗恶魔来拯救公主。 骑士的初始健康点数为一个正整数…

在VMvare中虚拟机安装centos7和初始设置

下载镜像 阿里云的镜像站&#xff1a;https://mirrors.aliyun.com/centos/7/isos/x86_64/ 创建虚拟机过程 虚拟机创建过程比较简单&#xff0c;以下在VMvare16中进行安装 点击左上角&#xff0c;文件-新建虚拟机&#xff1a; 选择典型 选择刚刚下载好的镜像 输入虚拟机…

如何判断DNS解析故障?

DNS解析负责将域名解析到对应的IP地址&#xff0c;从而实现用户通过域名访问站点的效果。因此DNS解析是整个互联网中非常关键和基础的一个环节&#xff0c;但也是众多网站运营者和管理者经常忽视的一个环节。所以在出现DNS解析错误时&#xff0c;很多人都会感到手足无措&#x…

【Echarts】曲线图上方显示数字以及自定义值,标题和副标题居中,鼠标上显示信息以及自定义信息

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是《前端》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌握…

第七次作业

IPSEC VPPN实验配置 目标&#xff1a;在FW5和FW3之间建立一条IPSEC通道&#xff0c;保证10.0.2.0/24网段可以正常访问到192.168.1.0/24 1.FW1和FW2进行双机热备&#xff08;之前实验没保存&#xff0c;可看上个实验&#xff09; 还有一些配置前面实验有。 2.场景选择点对点…