Scala 开发 Spark 程序

看spark和scala版本

运行spark-shell

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 1.6.0

/_/

 

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)

得到spark版本1.6.0,scala版本2.10.5

 

下载安装特定版本的scala

https://www.scala-sbt.org/download.html

 

 

下载特定版本的spark包

https://archive.apache.org/dist/spark/

spark-1.6.0.tgz

其中examples是示例代码

 

开发

idea的scala插件安装参见Scala 写第一个程序HelloWorld:https://blog.csdn.net/whq12789/article/details/89453424。

idea新建Project,选择Scala——sbt,输入名称目录,注意版本选择与spark的scala相同版本。

build.sbt添加依赖,此处版本要求与spark版本相同

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"

src/main/scala右击新建Package:com.whq.test

右击包,新建Scala Class选择Kind为Object,名称为Hi,代码如下

package com.whq.testimport org.apache.spark.{SparkConf, SparkContext}object HdfsWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: HdfsWordCount <directory>")System.exit(1)}val inputFile=args(0)val outputFile=args(1)val conf = new SparkConf().setAppName("wordCount")// Create a Scala Spark Context.val sc = new SparkContext(conf)// Load our input data.val input =  sc.textFile(inputFile)// Split up into words.val words = input.flatMap(line => line.split(" "))// Transform into word and count.val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}// Save the word count back out to a text file, causing evaluation.counts.saveAsTextFile(outputFile)}}

统计分组信息。

右侧sbt点sbt tasks中的package,打包后的文件拷贝到服务器上准备运行。

 

su - hdfs

vi hello.txt添加内容

you,jumpi,jumpyou,jumpi,jumpjump

 

提交到spark目录下

hdfs dfs -put hello.txt /spark/

清空目标目录

hdfs dfs -rm -r /spark/out

 

提交运行

spark-submit --master yarn --deploy-mode client --class com.whq.test.HdfsWordCount sparksbt_2.10-0.1.jar /spark/hello.txt /spark/out

 

结果会输出到hdfs的/spark/out目录中

查看结果命令

hdfs dfs -cat /spark/out/_SUCCESS
hdfs dfs -cat /spark/out/part-00000
hdfs dfs -cat /spark/out/part-00001

 

 

Spark常用接口

常用接口

Spark主要使用到如下这几个类:

  • SparkContext:是Spark的对外接口,负责向调用该类的java应用提供Spark的各种功能,如连接Spark集群,创建RDD等。
  • SparkConf:Spark应用配置类,如设置应用名称,执行模式,executor内存等。
  • RDD(Resilient Distributed Dataset):用于在Spark应用程序中定义RDD的类。
  • PairRDDFunctions:为key-value对的RDD数据提供运算操作,如groupByKey。
  • Broadcast: 广播变量类。广播变量允许保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。
  • StorageLevel: 数据存储级别,有内存(MEMORY_ONLY),磁盘(DISK_ONLY),内存+磁盘(MEMORY_AND_DISK)等。

RDD上支持两种类型的操作: :transformation和action,这两种类型的常用方法如下。

表 transformation

方法

说明

map(func)

对调用map的RDD数据集中的每个element都使用func。

filter(func)

对RDD中所有元素调用func,返回f为true的元素。

flatMap(func)

先对RDD所有元素调用func,然后将结果扁平化。

sample(withReplacement,faction,seed)

抽样。

union(otherDataset)

返回一个新的dataset,包含源dataset和给定dataset的元素的集合。

distinct([numTasks])

去除重复元素。

groupByKey(numTasks)

返回(K,Iterable[V]),将key相同的value组成一个集合。

reduceByKey(func,[numTasks])

对key相同的value调用func。

sortByKey([ascending],[numTasks])

按照key来进行排序,是升序还是降序,ascending是boolean类型。

join(otherDataset,[numTasks])

当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数。

cogroup(otherDataset,[numTasks])

当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数。

cartesian(otherDataset)

笛卡尔积。

表 action

API

说明

reduce(func)

对RDD中的元素调用func。

collect()

返回包含RDD中所有元素的一个数组。

count()

返回的是dataset中的element的个数。

first()

返回的是dataset中的第一个元素。

take(n)

返回前n个elements。

takeSample(withReplacement,num,seed)

takeSample(withReplacement,num,seed)对dataset随机抽样,返回有num个元素组成的数组。withReplacement表示是否使用replacement。

saveAsTextFile(path)

把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中。

saveAsSequenceFile(path)

只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统。

countByKey()

对每个key出现的次数做统计。

foreach(func)

在数据集的每一个元素上,运行函数func。

countByValue()

对RDD中每个元素出现的次数进行统计。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/416957.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Scala 写第一个程序HelloWorld

idea安装scala插件 Settings——Plugins——搜索Scala&#xff0c;进行安装 下载scala-2.12.8.zip并解压 https://www.scala-lang.org/download/ 所有版本列表 https://www.scala-lang.org/download/all.html 新建sbt项目&#xff08;类似maven&#xff09; 选择Scale——…

Spark 连接 HBase 入库及查询操作

本实例采用Scala开发&#xff0c;实现了RDD数据两种方式入库到HBase&#xff0c;从HBase中读取数据并print输出。 build.sbt name : "SparkSbt"version : "0.1"scalaVersion : "2.10.5"libraryDependencies "org.apache.spark" %% &…

C++程序设计语言(特别版) -- 一个桌面计算器

前言 这里要介绍各种语句和表达式&#xff0c;将通过一个桌面计算器的程序做些事情&#xff0c;该计算器提供四种座位浮点数的中缀运算符的标准算术运算。这个计算器由四个部分组成&#xff1a;一个分析器&#xff0c;一个输入函数&#xff0c;一个符号表和一个驱动程序。分析器…

前端学习(2364):图片的上传

message.vue <template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" mode""></image></view> </template>…

superset docker方式安装

superset是优秀的数据可视化开源项目&#xff0c;为用户提供了丰富的图表视觉效果&#xff0c;基于python开发。但仅可以以图表、表格形式展示结果&#xff0c;不支持页面组装等高级BI功能。 准备环境docker 然后执行命令 docker pull amancevice/superset docker run --det…

※交换排序(1)——快速排序(quick sort)

快速排序使用分治策略(Divide and Conquer)来把一个序列分为两个子序列。步骤为&#xff1a; 从序列中挑出一个元素&#xff0c;作为"基准"(pivot). 把所有比基准值小的元素放在基准前面&#xff0c;所有比基准值大的元素放在基准的后面&#xff08;相同的数可以到任…

前端学习(2365):图片的预览

message.vue <template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" click"previewImg(item)"></image></view>…

superset 图标调整

superset docker方式安装 系统为debian 9 安装路径为 /usr/local/lib/python3.6/site-packages/superset/ web页面左上角图标路径为&#xff1a; static/assets/images/superset-logo2x.png 采用Flask框架 页面在views/中&#xff0c;core.py中 #欢迎页面 expose(/welco…

第五周思维导图

转载于:https://www.cnblogs.com/java1765415329/p/6686733.html

superset 时区问题Timestamp subtraction must have the same timezones or no timezones

superset绑定数据源后&#xff0c;切换到图表展示时报错&#xff1a; Timestamp subtraction must have the same timezones or no timezones File "/usr/local/lib/python3.6/site-packages/superset/utils/core.py", line 362, in datetime_to_epoch return (dtt…

使用Kotlin开发Android应用 - 环境搭建 (1)

一. 在Android Studio上安装Kotlin插件 按快捷键Command, -> 在Preferences界面找到Plugins -> 点击Browse repositories ... -> 输入Kotlin -> 安装Kotlin (老版本需要安装 Kotlin 和Kotlin Extensions For Android两个插件, 最新版本的Kotlin插件包含了Android E…

前端学习(2367):两种方式导航跳转和传参

<template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" click"previewImg(item)"></image><navigator url"../…

sqlserver启用登陆审计

客户端管理工具进入后&#xff1a;安全性——审核——新建审核——审核目标&#xff08;就是存储位置&#xff09;选application log&#xff08;windows系统应用日志&#xff09;或者选File&#xff08;文件目录中会产生一堆日志文件&#xff0c;设置最大滚动更新文件数、最大…

java基础——equals及==的区别

①数值比较【必须为整数&#xff0c;小数因为精度问题不能通过这个比较&#xff0c;只能依靠d1-d2<0.0000000001这样的比较两者的不同】&#xff0c;值相等就行。 ②引用比较&#xff0c;引用的对象有父子关系或者是同一类的才能比较&#xff0c;只有当指向同一个引用才相等…

前端学习(2368):编程式导航

<template><view><button type"primary" click"chooseImg">上传图片</button><image v-for"item in imgArr" :src"item" click"previewImg(item)"></image><navigator url"../…

centos7 firewall防火墙实现映射其他机器的端口

vi /etc/sysctl.conf 增加一行 net.ipv4.ip_forward 1 使上述修改生效 sysctl -p firewall-cmd --add-port5432/tcp --permanent #开放端口 firewall-cmd --add-forward-portport5432:prototcp:toaddr10.0.197.189:toport5432 --permanent #添加端口映射 10.0.197.189的54…

Navicat Premium 未保存和已执行SQL存储位置

未保存备份存放目录地址&#xff1a; C:\Users\{登录用户名}\Documents\Navicat\MySQL\servers\{MySQL连接名称}\{数据库名称} SQL执行历史文件地址&#xff1a; C:\Users\{登录用户名}\Documents\Navicat\Premium\logs\LogHistory.txt