用idea工具scala 和 Java开发 spark案例:WordCount

目录

一 环境准备

二 scala代码编写

三 java 代码编写


一 环境准备

        创建一个 maven 工程

        添加下列依赖

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>

        原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改

        

二 scala代码编写

        首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址

         新建一个 scala 的 object,编写代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object WordCountDemo {def main(args: Array[String]): Unit = {val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")val sc : SparkContext = SparkContext.getOrCreate(conf)var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()//    val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
//    val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
//    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
//    val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)//打印到 console//    result2.glom().collect.foreach(x=>println(x.toList))//保存到 hdfsresult2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")}}

        这里稍微解释一下代码中的一些函数:

        map:转换函数,数据集合中每个元素进行一次我们定义的方法

        flatMap: 与map类似,但是映射为0个或多个

        collect:以数组的形式返回数据集中的所有元素 

        glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

 

        云服务器的朋友可能有的报错

22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]

        出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到

        解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip

        在 idea 中,resource 文件夹中添加文件 hdfs-site.xml

        hdfs-site.xml内容:

<!-- datanode 通信是否使用域名,默认为false,改为true --><property><name>dfs.client.use.datanode.hostname</name><value>true</value><description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.</description></property>

三 java 代码编写

        这里原数据存储在本地,文件名为 input.txt

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.Map;public class WordCount {public static void main(String[] args) {// 创建SparkConf对象SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");// 创建JavaSparkContext对象JavaSparkContext sc = new JavaSparkContext(conf);// 读取文本文件JavaRDD<String> lines = sc.textFile("input.txt");// 计算单词出现次数JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> x + y);Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();// 输出结果for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {System.out.println(entry.getKey() + ": " + entry.getValue());}// 关闭JavaSparkContext对象sc.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/103805.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Studio for Platform (ASfP) 使用教程

文章目录 编写脚本下载源代码lunch 查看版本 归纳的很清楚&#xff0c;下载Repo并下载源码->可以参考我的 Framework入门のPiex 6P源码(下载/编译/刷机) 启动图标&#xff08;重启生效&#xff09; [Desktop Entry] EncodingUTF-8 NameAndroidStudio …

大模型微调学习

用好大模型的层次&#xff1a;1. 提示词工程(prompt engineering); 2. 大模型微调(fine tuning)为什么要对大模型微调&#xff1a; 1. 大模型预训练成本非常高&#xff1b; 2. 如果prompt engineering的效果达不到要求&#xff0c;企业又有比较好的自有数据&#xff0c;能够通过…

C++使用两个栈实现双端队列——F1 B1 B2 B3 B4 B5 PF PF PB PB

用两个栈v1,v2分别表示左栈和右栈&#xff0c;即可实现双端队列。 当从前面插入元素时&#xff0c;v1.push() 当从后面插入元素时&#xff0c;v2.push() 当两个栈都不空的时候&#xff0c;不管前面后面出栈&#xff0c;都直接Pop 当有一个栈空&#xff0c;比如左栈v1空了&am…

scapy构造ND报文

控制报文之&#xff1a;找邻居报文 什么是ND报文 ND报文是指网络中的 Neighbor Discovery&#xff08;ND&#xff09;控制报文。Neighbor Discovery 是 IPv6 网络中的一种协议&#xff0c;它用于管理网络节点之间的邻居关系、地址解析、路由缓存维护和自动配置等任务。ND 协议…

Django实现音乐网站 ⒆

使用Python Django框架做一个音乐网站&#xff0c; 本篇主要为排行榜功能及音乐播放器部分功能实现。 目录 推荐排行榜优化 设置歌手、单曲跳转链接 排行榜列表渲染优化 视图修改如下&#xff1a; 模板修改如下&#xff1a; 单曲详情修改 排行榜列表 设置路由 视图处理…

【Mysql】Mysql的启动选项和系统变量(二)

概述 在Mysql的设置项中一般都有各自的默认值&#xff0c;比方说mysql 5.7服务器端允许同时连入的客户端的默认数量是 151 &#xff0c;表的默认存储引擎是 InnoDB &#xff0c;我们可以在程序启动的时候去修改这些默认值&#xff0c;对于这种在程序启动时指定的设置项也称之为…

MySQL建表操作和用户权限

1.创建数据库school&#xff0c;字符集为utf8 mysql> create database school character set utf8; 2.在school数据库中创建Student和Score表 mysql> create table school.student( -> Id int(10) primary key, -> Stu_id int(10) not null, -> C_n…

【Python 零基础入门】 Numpy

【Python 零基础入门】第六课 Numpy 概述什么是 Numpy?Numpy 与 Python 数组的区别并发 vs 并行单线程 vs 多线程GILNumpy 在数据科学中的重要性 Numpy 安装Anaconda导包 ndarraynp.array 创建数组属性np.zeros 创建np.ones 创建 数组的切片和索引基本索引切片操作数组运算 常…

C# 使用 RSA 加密算法生成证书签名产生“The system cannot find the file specified”异常

使用 C# 中 RSA&#xff08;System.Security.Cryptography.RSA&#xff09; 加密算法生成证书签名进行身份验证&#xff0c;在 VS2022 开发工具本地运行应用程序一切正常。 但将应用程序部署到远程服务器&#xff08;如&#xff1a;Azure App Services&#xff09;&#xff0c…

alsa音频pcm设备之i2c调试

i2cdetect 列举 I2C bus i2cdetect -l ls /dev/i2c* 列出I2C bus i2c-7 上面连接的所有设备,并得到i2c设备地址 i2cdetect -y 7 发现i2c设备的位置显示为UU或表示设备地址的数值,UU表示设备在driver中被使用. I2cdump i2c设备大量register的值 i2cdump -y 7 0x40 I2cset设置…

ROS中的图像数据

无论是USB摄像头还是RGBD摄像头&#xff0c;发布的图像数据格式多种多样&#xff0c;在处理这些数据之前&#xff0c;我们首先需要了解这些数据的格式。 二维图像数据 连接USB摄像头到PC端的USB接口&#xff0c;通过以下命令启动摄像头&#xff1a; roslaunch usb_cam usb_ca…

运行的 akrun 会打印信息到控制台,如何取消打印 -- chatGPT

gpt: 如果运行的程序 akrun 打印信息到控制台&#xff0c;但您希望取消或禁止它的输出&#xff0c;可以尝试以下方法&#xff1a; 1. **重定向输出到空文件**&#xff1a;您可以将程序的标准输出重定向到一个空文件&#xff0c;从而禁止信息输出到控制台。执行以下命令&#…

springboot 集成 zookeeper 问题记录

springboot 集成 zookeeper 问题记录 环境 springboot - 2.7.8 dubbo - 3.1.11 dubbo-dependencies-zookeeper-curator5 - 3.1.11 模拟真实环境&#xff0c;将 windows 上的 zookeeper 迁移到虚拟机 linux 的 docker 环境 failed to connect to zookeeper server 迁移到…

Spring Boot中的Redis自动配置与使用

Spring Boot中的Redis自动配置与使用 Redis是一种高性能的开源内存数据库&#xff0c;常用于缓存、会话管理和消息队列等场景。Spring Boot提供了自动配置来简化在Spring应用程序中使用Redis的过程。本文将介绍Spring Boot中的Redis自动配置是什么以及如何使用它来轻松集成Red…

分权分域有啥内容?

目前的系统有什么问题&#xff1f; 现在我们的系统越来越庞大&#xff0c;可是每一个人进来的查看到的内容完全一样&#xff0c;没有办法灵活的根据不同用户展示不同的数据 例如我们有一个系统&#xff0c;期望不同权限的用户可以看到不同类型的页面&#xff0c;同一个页面不…

动态盘转换为基本盘

问题描述 不小心将磁盘0&#xff08;C和D是基本盘&#xff0c;蓝颜色&#xff09;改成了动态盘&#xff08;C和D是动态盘&#xff0c;橘黄色&#xff09;&#xff1f;如何修改回来呢&#xff1f; 解决方案&#xff1a; 使用DiskGenius将动态磁盘转换为基本磁盘 操作之前一定…

Linux友人帐之系统管理与虚拟机相关

一、虚拟机相关操作 1.1虚拟机克隆 虚拟机克隆是指将一个已经安装好的虚拟机复制出一个或多个完全相同的副本&#xff0c;包括虚拟机的配置、操作系统、应用程序等&#xff0c;从而节省安装和配置的时间和资源。 虚拟机克隆的主要用途有&#xff1a; 创建多个相同或相似的虚拟…

并发编程——1.java内存图及相关内容

这篇文章&#xff0c;我们来讲一下java的内存图及并发编程的预备内容。 首先&#xff0c;我们来看一下下面的这两段代码&#xff1a; 下面&#xff0c;我们给出上面这两段代码在运行时的内存结构图&#xff0c;如下图所示&#xff1a; 下面&#xff0c;我们来具体的讲解一下。…

CubeMX+BabyOS 使用方法

MCU&#xff1a;STM32G030F 编译器&#xff1a;MDK 托管工具&#xff1a;Sourcetree CubeMX创建工程 BabyOS克隆 添加子模块 git submodule add https://gitee.com/notrynohigh/BabyOS.git BabyOS 切换dev 分支 查看当前分支 git branch -a 切换本地分支到dev git che…

关于网络协议的若干问题(一)

1、当网络包到达一个网关的时候&#xff0c;可以通过路由表得到下一个网关的 IP 地址&#xff0c;直接通过 IP 地址找就可以了&#xff0c;为什么还要通过本地的 MAC 地址呢&#xff1f; 答&#xff1a;IP报文端到端的传输过程中&#xff0c;在没有NAT情况下&#xff0c;目的地…