java spark wordcount_提交任务到spark(以wordcount为例)

1、首先需要搭建好hadoop+spark环境,并保证服务正常。本文以wordcount为例。

2、创建源文件,即输入源。hello.txt文件,内容如下:

tom jerry

henry jim

suse lusy

注:以空格为分隔符

3、然后执行如下命令:

hadoop fs -mkdir -p /Hadoop/Input(在HDFS创建目录)

hadoop fs -put hello.txt /Hadoop/Input(将hello.txt文件上传到HDFS)

hadoop fs -ls /Hadoop/Input (查看上传的文件)

hadoop fs -text /Hadoop/Input/hello.txt (查看文件内容)

4、用spark-shell先测试一下wordcount任务。

(1)启动spark-shell,当然需要在spark的bin目录下执行,但是这里我配置了环境变量。

25776956e8862e03ead60f6692dbdc07.png

(2)然后直接输入scala语句:

val file=sc.textFile("hdfs://hacluster/Hadoop/Input/hello.txt")

val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

rdd.collect()

rdd.foreach(println)

b9b271ebe5c1b13358f509cf869bf212.png

ok,测试通过。

5、Scala实现单词计数

1 packagecom.example.spark2

3 /**4 * User: hadoop

5 * Date: 2017/8/17 0010

6 * Time: 10:20

7*/

8 importorg.apache.spark.SparkConf9 importorg.apache.spark.SparkContext10 importorg.apache.spark.SparkContext._11

12 /**13 * 统计字符出现次数

14*/

15object ScalaWordCount {16def main(args: Array[String]) {17 if (args.length < 1) {18 System.err.println("Usage: ")19 System.exit(1)20}21

22 val conf = newSparkConf()23 val sc = newSparkContext(conf)24 val line = sc.textFile(args(0))25

26 line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)27

28sc.stop()29}30 }

6、用java实现wordcount

packagecom.example.spark;importjava.util.Arrays;importjava.util.List;importjava.util.regex.Pattern;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.FlatMapFunction;importorg.apache.spark.api.java.function.Function2;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;public final classWordCount {private static final Pattern SPACE = Pattern.compile(" ");public static void main(String[] args) throwsException {if (args.length < 1) {

System.err.println("Usage: JavaWordCount ");

System.exit(1);

}

SparkConf conf= new SparkConf().setAppName("JavaWordCount");

JavaSparkContext sc= newJavaSparkContext(conf);

JavaRDD lines = sc.textFile(args[0],1);

JavaRDD words = lines.flatMap(new FlatMapFunction() {private static final long serialVersionUID = 1L;

@Overridepublic Iterablecall(String s) {returnArrays.asList(SPACE.split(s));

}

});

JavaPairRDD ones = words.mapToPair(new PairFunction() {private static final long serialVersionUID = 1L;

@Overridepublic Tuple2call(String s) {return new Tuple2(s, 1);

}

});

JavaPairRDD counts = ones.reduceByKey(new Function2() {private static final long serialVersionUID = 1L;

@OverridepublicInteger call(Integer i1, Integer i2) {return i1 +i2;

}

});

List> output =counts.collect();for (Tuple2, ?>tuple : output) {

System.out.println(tuple._1()+ ": " +tuple._2());

}

sc.stop();

}

}

7、IDEA打包。

(1)File ---> Project Structure

2cf6bbe15f510af05bec2636c5ef65ee.png

a8e80253319b167f21d79e7afb56df7e.png

0d13148ed4549b0c5a6c6389cbefaab0.png

点击ok,配置完成后,在菜单栏中选择Build->Build Artifacts...,然后使用Build等命令打包。打包完成后会在状态栏中显示“Compilation completed successfully...”的信息,去jar包输出路径下查看jar包,如下所示。

c17b7e6d0a7b5ae70416954ae1cdc060.png

将这个wordcount.jar上传到集群的节点上,scp wordcount.jar root@10.57.22.244:/opt/   输入虚拟机root密码。

8、运行jar包。

本文以spark on yarn模式运行jar包。

执行命令运行javawordcount:spark-submit --master yarn-client --class com.example.spark.WordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt

执行命令运行scalawordcount:spark-submit --master yarn-client --class com.example.spark.ScalaWordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt

本文以java的wordcount为演示对象,如下图:

144e516c905529fabc30ecb8cb06c00e.png

以上是直接以spark-submit方式提交任务,下面介绍一种以java web的方式提交。

9、以Java Web的方式提交任务到spark。

用spring boot搭建java web框架,实现代码如下:

1)新建maven项目spark-submit

2)pom.xml文件内容,这里要注意spark的依赖jar包要与scala的版本相对应,如spark-core_2.11,这后面2.11就是你安装的scala的版本

4.0.0

org.springframework.boot

spring-boot-starter-parent

1.4.1.RELEASE

wordcount

spark-submit

1.0-SNAPSHOT

com.example.spark.SparkSubmitApplication

UTF-8

1.8

3.4

2.1.0

org.apache.commons

commons-lang3

${commons.version}

org.apache.tomcat.embed

tomcat-embed-jasper

provided

org.springframework.boot

spring-boot-starter-data-jpa

org.springframework.boot

spring-boot-starter-data-redis

org.springframework.boot

spring-boot-starter-test

test

com.jayway.jsonpath

json-path

org.springframework.boot

spring-boot-starter-web

spring-boot-starter-tomcat

org.springframework.boot

org.springframework.boot

spring-boot-starter-jetty

org.eclipse.jetty.websocket

*

org.springframework.boot

spring-boot-starter-jetty

provided

javax.servlet

jstl

org.eclipse.jetty

apache-jsp

provided

org.springframework.boot

spring-boot-starter-data-solr

org.springframework.boot

spring-boot-starter-data-jpa

org.springframework.boot

spring-boot-starter-web

javax.servlet

jstl

org.apache.spark

spark-core_2.11

${org.apache.spark-version}

org.apache.spark

spark-sql_2.11

${org.apache.spark-version}

org.apache.spark

spark-hive_2.11

${org.apache.spark-version}

org.apache.spark

spark-streaming_2.11

${org.apache.spark-version}

org.apache.hadoop

hadoop-client

2.7.3

org.apache.spark

spark-streaming-kafka_2.11

1.6.3

org.apache.spark

spark-graphx_2.11

${org.apache.spark-version}

org.apache.maven.plugins

maven-assembly-plugin

3.0.0

com.fasterxml.jackson.core

jackson-core

2.6.5

com.fasterxml.jackson.core

jackson-databind

2.6.5

com.fasterxml.jackson.core

jackson-annotations

2.6.5

war

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

maven2

http://repo1.maven.org/maven2/

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

maven-war-plugin

src/main/webapp

org.mortbay.jetty

jetty-maven-plugin

spring.profiles.active

development

org.eclipse.jetty.server.Request.maxFormContentSize

600000

true

/

7080

(3)SubmitJobToSpark.java

packagecom.example.spark;importorg.apache.spark.deploy.SparkSubmit;/***@authorkevin

**/

public classSubmitJobToSpark {public static voidsubmitJob() {

String[] args= new String[] { "--master", "yarn-client", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/wordcount.jar", "hdfs://hacluster/aa/hello.txt"};

SparkSubmit.main(args);

}

}

(4)SparkController.java

packagecom.example.spark.web.controller;importjavax.servlet.http.HttpServletRequest;importjavax.servlet.http.HttpServletResponse;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestMethod;importorg.springframework.web.bind.annotation.ResponseBody;importcom.example.spark.SubmitJobToSpark;

@Controller

@RequestMapping("spark")public classSparkController {private Logger logger = LoggerFactory.getLogger(SparkController.class);

@RequestMapping(value= "sparkSubmit", method ={ RequestMethod.GET, RequestMethod.POST })

@ResponseBodypublicString sparkSubmit(HttpServletRequest request, HttpServletResponse response) {

logger.info("start submit spark tast...");

SubmitJobToSpark.submitJob();return "hello";

}

}

5)将项目spark-submit打成war包部署到Master节点tomcat上,访问如下请求:

http://10.57.22.244:9090/spark/sparkSubmit

在tomcat的log中能看到计算的结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/556533.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis的四大特性和原理

一、redis的过期 A.应用场景 cookie自动过期&#xff0c;限时优惠价格&#xff0c;限制每分钟的访问次数 B.实现方式 setex(String key, int seconds, String value) expire key time #秒 pexpire key time #毫秒 expireat key time #秒 pexpireat key time #毫秒 C.实…

默认文献工具_工具分享??超好用的SCI外文文献下载工具

第一步&#xff1a;打开工具第二步&#xff1a;查找目标文献的DOI号&#xff08;知网、谷歌学术等等...&#xff09;&#xff0c;然后复制。第三步&#xff1a;粘贴到工具里面&#xff0c;然后点击「立即下载」第四步&#xff1a;点击立即下载之后&#xff0c;会自动打开默认浏…

Redis之Redis的事务

1.Redis的事务是什么 Redis 事务的本质是一组命令的集合,事务支持一次执行多个命令&#xff0c;一个事务中所有命令都会被序列化。(redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令). 1.1reids事务的特点 事务同命令一样都是Redis最小的执行单位&#xff0…

idea 设置内存_IDEA新特性:提前知道代码怎么走!

作者&#xff1a;简简单单OnlineZuozuo原文链接&#xff1a;https://blog.csdn.net/qq_15071263/article/details/104186309新特性IDEA - 2020.1 版本针对调试器和代码分析器的改进&#xff0c;值得期待1、对于调试器的加强&#xff1a;数据流分析辅助2、调试加强&#xff1a;属…

Redis的内部运作机制——Redis详解

本文将分五个部分来分析和总结Redis的内部机制&#xff0c;分别是&#xff1a;Redis数据库、Redis客户端、Redis事件、Redis服务器的初始化步骤、Redis命令的执行过程。 首先介绍一下Redis服务器的状态结构。Redis使用一个类型为“redisServer”的数据结构来保存整个Redis服务…

selenium之 chromedriver与chrome版本映射表_NLP实战篇之tf2训练与评估

本文是基于tensorflow2.2.0版本&#xff0c;介绍了模型的训练与评估。主要介绍了tf.keras的内置训练过程&#xff0c;包括compile、fit&#xff0c;其中compile中包含优化器、loss与metrics的使用&#xff0c;内置api中还包含了很多辅助工具&#xff0c;在Callback中进行介绍&a…

java会被rust替代吗_自从尝了 Rust,Java 突然不香了

Rust 是软件行业中相对而言比较新的一门编程语言&#xff0c;如果从语法上来比较&#xff0c;该语言与 C 其实非常类似&#xff0c;但从另一方面而言&#xff0c;Rust 能更高效地提供许多功能来保证性能和安全。而且&#xff0c;Rust 还能在无需使用传统的垃圾收集系统的情况下…

redis单线程原理___Redis为何那么快-----底层原理浅析

redis单线程原理 redis单线程问题 单线程指的是网络请求模块使用了一个线程&#xff08;所以不需考虑并发安全性&#xff09;&#xff0c;即一个线程处理所有网络请求&#xff0c;其他模块仍用了多个线程。 1. 为什么说redis能够快速执行 (1) 绝大部分请求是纯粹的内存操作…

asm 查看 数据文件 修改 时间_Oracle的ASM介绍及管理

Oracle的ASM介绍及管理Oracle经历过的文件系统历史操作系统--逻辑卷管理器(LVM)&#xff1a;管理文件相对容易&#xff0c;性能较差裸设备:管理文件相对困难&#xff0c;性能好OCFS(Oracle Cluster File System)&#xff1a;是ORACLE数据库文件系统ASM(Automatic Storage Manag…

深入理解 Redis Template及4种序列化方式__spring boot整合redis实现RedisTemplate三分钟快速入门

概述 使用Spring 提供的 Spring Data Redis 操作redis 必然要使用Spring提供的模板类 RedisTemplate&#xff0c; 今天我们好好的看看这个模板类 。 RedisTemplate 看看4个序列化相关的属性 &#xff0c;主要是 用于 KEY 和 VALUE 的序列化 。 举个例子&#xff0c;比如说我们…

java仿聊天室项目总结_Java团队课程设计-socket聊天室(Day4总结篇)

Java团队课程设计-socket聊天室(Day4总结篇)团队名称&#xff1a;ChatRoom项目git地址&#xff1a;git提交记录(仅截取部分)&#xff1a;面向对象设计包图、类图包图UML类图总结&#xff1a;首先总结一下这几天遇到的问题和解决方案使用ObjectInputStream/ObjectOutputStream的…

python基础代码技巧_Python 代码优化技巧(二)

Python 是一种脚本语言&#xff0c;相比 C/C 这样的编译语言&#xff0c;在效率和性能方面存在一些不足&#xff0c;但是可以通过代码调整来提高代码的执行效率。本文整理一些代码优化技巧。 代码优化基本原则代码正常运行后优化。 很多人一开始写代码就奔着性能优化的目标&…

rpm 讲解

CentOS7主要有rpm和yum这两种包软件的管理。两种包的管理各有用处&#xff0c;其中主要区别是&#xff1a;YUM使用简单但需要联网&#xff0c;YUM会去网上的YUM包源去获取所需要的软件包。而RPM的需要的操作经度比较细&#xff0c;需要我们做的事情比较多。 软件包的安装和卸是…

java顺序表冒泡排序_冒泡排序就这么简单 - Java3y的个人空间 - OSCHINA - 中文开源技术交流社区...

冒泡排序就这么简单在我大一的时候自学c语言和数据结构&#xff0c;我当时就接触到了冒泡排序(当时使用的是C语言编写的)。现在大三了&#xff0c;想要在暑假找到一份实习的工作&#xff0c;又要回顾一下数据结构与算法的知识点了。排序对我们来说是一点也不陌生了&#xff0c;…

python 多线程和协程结合_如何让 python 处理速度翻倍?内含代码

阿里妹导读&#xff1a;作为在日常开发生产中非常实用的语言&#xff0c;有必要掌握一些python用法&#xff0c;比如爬虫、网络请求等场景&#xff0c;很是实用。但python是单线程的&#xff0c;如何提高python的处理速度&#xff0c;是一个很重要的问题&#xff0c;这个问题的…

python批量生成图_利用Python批量生成任意尺寸的图片

实现效果 通过源图片&#xff0c;在当前工作目录的/img目录下生成1000张&#xff0c;分别从1*1到1000*1000像素的图片。 效果如下&#xff1a;目录结构 实现示例 # -*- coding: utf-8 -*- import threading from PIL import Image image_size range(1, 1001) def start(): for…

Mysql 如果有多个可选条件怎么加索引_MySQL|mysql-索引

1、索引是什么 1.1索引简介 索引是表的目录&#xff0c;是数据库中专门用于帮助用户快速查询数据的一种数据结构。类似于字典中的目录&#xff0c;查找字典内容时可以根据目录查找到数据的存放位置&#xff0c;以及快速定位查询数据。对于索引&#xff0c;会保存在额外的文件…

Spring-bean的循环依赖以及解决方式___Spring源码初探--Bean的初始化-循环依赖的解决

本文主要是分析Spring bean的循环依赖&#xff0c;以及Spring的解决方式。 通过这种解决方式&#xff0c;我们可以应用在我们实际开发项目中。 什么是循环依赖&#xff1f;怎么检测循环依赖Spring怎么解决循环依赖Spring对于循环依赖无法解决的场景Spring解决循环依赖的方式我们…

Spring中bean的作用域与生命周期

在Spring中&#xff0c;那些组成应用程序的主体及由Spring IoC容器所管理的对象&#xff0c;被称之为bean。简单地讲&#xff0c;bean就是由IoC容器初始化、装配及管理的对象&#xff0c;除此之外&#xff0c;bean就与应用程序中的其他对象没有什么区别了。而bean的定义以及bea…

Spring循环依赖的三种方式

引言&#xff1a;循环依赖就是N个类中循环嵌套引用&#xff0c;如果在日常开发中我们用new 对象的方式发生这种循环依赖的话程序会在运行时一直循环调用&#xff0c;直至内存溢出报错。下面说一下Spring是如果解决循环依赖的。 第一种&#xff1a;构造器参数循环依赖 Spring容…