IDEA编写各种WordCount运行

目录

一、编写WordCount(Spark_scala)提交到spark高可用集群

1.项目结构

2.导入依赖

3.编写scala版的WordCount

4.maven打包

5.运行jar包

​6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果


搞了一个晚上加一个白天,总算搞出来了,呼~~

本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:

《Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2》

一、编写WordCount(Spark_scala)提交到spark高可用集群

首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码

1.项目结构

2.导入依赖

<name>spark-in-action</name><url>http://maven.apache.org</url><!-- 定义的一些常量 --><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><encoding>UTF-8</encoding><spark.version>3.3.2</spark.version><scala.version>2.12.15</scala.version></properties><dependencies><!-- scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency></dependencies><!-- 配置Maven的镜像库 --><!-- 依赖下载国内镜像库 --><repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></repository></repositories><!-- maven插件下载国内镜像库 --><pluginRepositories><pluginRepository><id>ali-plugin</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></pluginRepository></pluginRepositories><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

3.编写scala版的WordCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCount")// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(_.split(" "))// 将单词和1组合放入到元组中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}

4.maven打包

这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖

5.运行jar包

首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的

创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下

提交命令

[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1

--master spark://node141:7077     spark的master节点

--class cn.doitedu.day01.WordCount   运行的类名

--executor-memory 1g   占用的内存

--total-executor-cores 4  占用的核数

./spark-in-action-1.0.jar  运行的jar包地址

hdfs://node141:9000/words.txt   代码中args[0]对应的参数

hdfs://node141:9000/out-1  代码中args[1]对应的参数

6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

package cn.doitedu.day01import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 使用本地模式运行Spark程序(开发调试的时候使用)*/
object LocalWordCount {def main(args: Array[String]): Unit = {// 指定当前用户为rootSystem.setProperty("HADOOP_USER_NAME", "root")val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(line => {val words = line.split(" ")println(words)  // debugwords})// 将单词和1组合放入到元祖中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

查看运行结果

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String lines) throws Exception {String[] words = lines.split("\\s+");return Arrays.asList(words).iterator();}});JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);}});JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 将原来的kv顺序颠倒  (flink,3)  ----> (3,flink)JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {return tp.swap(); // 交换}});JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {return tp.swap();}});result.saveAsTextFile(args[1]);jsc.stop();}
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class JavaLambdaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
//        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);// 排序JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);// 调回来JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);result.saveAsTextFile(args[1]);jsc.stop();}
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

好啦~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/737510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSM整合项目(删除家居 + 分页查询)

1.删除家居 1.需求分析 2.编写Service层 1.FurnService.java 添加方法 //删除家居public void del(Integer id);2.FurnServiceImpl.java 实现方法 Overridepublic void del(Integer id) {furnMapper.deleteByPrimaryKey(id);}3.单元测试 Testpublic void del() {furnService.…

微信公众号调用沙箱支付

沙箱支付 登录支付宝开放平台&#xff0c;选择底部沙箱支付 下载密钥生成工具 生成应用私钥与公钥&#xff0c;上传沙箱支付&#xff0c;获得支付宝公钥 配置支付通知与支付回调地址 SpringBoot配置 yml文件 这里的地址必须与沙箱配置的一样 controller package com.zq…

植物病害识别:YOLO水稻病害识别数据集(11000多张,yolo标注)

YOLO水稻病害识别数据集&#xff0c;包含叶斑病&#xff0c;褐斑病&#xff0c;细菌性枯萎病&#xff0c;东格鲁病毒病4个常见病害类别&#xff0c;共11000多张图像&#xff0c;yolo标注完整&#xff0c;可直接训练。 适用于CV项目&#xff0c;毕设&#xff0c;科研&#xff0c…

第十五届蓝桥杯-UART接收不定长指令的处理

学习初衷&#xff1a; 不仅仅为了比赛&#xff01; 目录 一、问题引入 二、UART常用的三种工作模式 1.UART工作在中断模式 2.UART工作在DMA模式下 3.uart工作在接收转空闲的模式下 三、获取指令中需要的数据 四、printf函数的实现 一、问题引入 问题引入&#xff1a;请…

【Java】仓库管理系统 SpringBoot+LayUI+DTree(源码)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

深度学习Top10算法

自2006年深度学习概念被提出以来&#xff0c;20年快过去了&#xff0c;深度学习作为人工智能领域的一场革命&#xff0c;已经催生了许多具有影响力的算法。以下是深度学习top10算法&#xff0c;它们在创新性、应用价值和影响力方面都具有重要的地位。 1、深度神经网络&#xf…

鸿蒙开发(四)-低代码开发

鸿蒙开发(四)-低代码开发 本文主要介绍下鸿蒙下的低代码开发。 鸿蒙低代码是指在鸿蒙操作系统进行应用开发时&#xff0c;采用简化开发流程和减少编码量的方式来提高开发效率。 1&#xff1a;开启低代码开发 首先我们打开DevEco Studio .然后创建工程。 如图所示&#xff…

定制红酒:如何根据客户需求调整红酒口感与风格

在云仓酒庄洒派&#xff0c;云仓酒庄洒派深知不同消费者对于红酒的口感与风格有着不同的喜好和需求。因此&#xff0c;云仓酒庄洒派根据消费者的具体要求&#xff0c;灵活调整红酒的口感与风格&#xff0c;以满足他们的期望。 首先&#xff0c;云仓酒庄洒派会与消费者进行深入的…

AHU 汇编 实验三

实验名称&#xff1a;实验三 串操作指令 二、实验内容&#xff1a; 在数据段定义缓冲区&#xff0c;从键盘接收两串字符到两个缓冲区&#xff0c;将第二串中与第一串字符不一致的字符显示在屏幕。 实验过程&#xff1a; 源代码&#xff1a; data segmentmess1 db 16,?,16…

数据分析-Pandas如何画图验证数据随机性

数据分析-Pandas如何画图验证数据随机性 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表…

【字典合集】SecLists-更全面的渗透测试字典 v2024.1

下路路径 SecLists-更全面的渗透测试字典 v2024.1 简介 SecLists 是一个致力于收集各种安全字典的开源项目。这些字典包括但不限于&#xff1a;密码字典、用户名字典、网络扫描结果、漏洞利用载荷、web shells、可用于渗透测试的Payloads、以及其他各种安全相关的字典。 这…

Docker初体验之安装部署和镜像加速(openeuler版)

安装部署&#xff1a; 本人使用的为openeuler版本&#xff0c;无法使用二进制进行安装&#xff08;使用二进制安装时&#xff0c;无法使用docker中的补全命令&#xff0c;需要重新进行配置&#xff09;在此使用yum直接进行安装。 [rootlocalhost ~]# yum install docker 镜像…

在VMvare中虚拟机安装centos7和初始设置

下载镜像 阿里云的镜像站&#xff1a;https://mirrors.aliyun.com/centos/7/isos/x86_64/ 创建虚拟机过程 虚拟机创建过程比较简单&#xff0c;以下在VMvare16中进行安装 点击左上角&#xff0c;文件-新建虚拟机&#xff1a; 选择典型 选择刚刚下载好的镜像 输入虚拟机…

如何判断DNS解析故障?

DNS解析负责将域名解析到对应的IP地址&#xff0c;从而实现用户通过域名访问站点的效果。因此DNS解析是整个互联网中非常关键和基础的一个环节&#xff0c;但也是众多网站运营者和管理者经常忽视的一个环节。所以在出现DNS解析错误时&#xff0c;很多人都会感到手足无措&#x…

【Echarts】曲线图上方显示数字以及自定义值,标题和副标题居中,鼠标上显示信息以及自定义信息

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是《前端》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌握…

第七次作业

IPSEC VPPN实验配置 目标&#xff1a;在FW5和FW3之间建立一条IPSEC通道&#xff0c;保证10.0.2.0/24网段可以正常访问到192.168.1.0/24 1.FW1和FW2进行双机热备&#xff08;之前实验没保存&#xff0c;可看上个实验&#xff09; 还有一些配置前面实验有。 2.场景选择点对点…

探究精酿啤酒的秘密:原料中的天然酵母与纯净水质

在啤酒的世界中&#xff0c;Fendi Club精酿啤酒以其与众不同的口感和深远的余味吸引了全球的啤酒爱好者。而这一切&#xff0c;都归功于其选用的上好原料&#xff0c;特别是天然酵母和纯净水质。 天然酵母是啤酒的灵魂。与工业生产的啤酒酵母不同&#xff0c;天然酵母富含丰富的…

分布式之Ribbon使用以及原理

Ribbon使用以及原理 1、负载均衡的两种方式 服务器端负载均衡 传统的方式前端发送请求会到我们的的nginx上去&#xff0c;nginx作为反向代理&#xff0c;然后路由给后端的服务器&#xff0c;由于负载均衡算法是nginx提供的&#xff0c;而nginx是部署到服务器端的&#xff0c;所…

20240310-1-Java后端开发知识体系

Java 基础 知识体系 Questions 1. HashMap 1.8与1.7的区别 1.71.8底层结构数组链表数组链表/红黑树插入方式头插法尾插法计算hash值4次位运算5次异或运算1次位运算1次异或运算扩容、插入先扩容再插入先插入再扩容扩容后位置计算重新hash原位置或原位置旧容量 (1) 扩容因子…

论文笔记:Evaluating the Performance of Large Language Models on GAOKAO Benchmark

1 论文思路 采用zero-shot prompting的方式&#xff0c;将试题转化为ChatGPT的输入 对于数学题&#xff0c;将公式转化为latex输入 主观题由专业教师打分 2 数据 2010~2022年&#xff0c;一共13年间的全国A卷和全国B卷 3 结论 3.1 不同模型的zeroshot 高考总分 3.2 各科主…