RDD 编程

文章目录

    • 1. RDD 创建
    • 2. RDD转换
    • 3. RDD动作
    • 4. 持久化
    • 5. 分区
    • 6. 文件数据读写
      • 6.1 本地
      • 6.2 hdfs
      • 6.3 Json文件
      • 6.4 Hbase

学习自 MOOC Spark编程基础

1. RDD 创建

  • 从文件创建
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 2.1.0/_/Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.scala> val lines = sc.textFile("file:///home/hadoop/workspace/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:////home/hadoop/workspace/word.txt MapPartitionsRDD[1] at textFile at <console>:24
  • 从 hdfs 创建
scala> val lines = sc.textFile("hdfs://localhost:9000/user/word.txt")
lines: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/word.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> val lines = sc.textFile("/user/word.txt")
lines: org.apache.spark.rdd.RDD[String] = /user/word.txt MapPartitionsRDD[9] at textFile at <console>:24
  • 通过并行集合创建
scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:26

2. RDD转换

  • filter(func),过滤
scala> val linesWithSpark = lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:26
  • map(func) , 映射
scala> val rdd2 = rdd.map(x => x+10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:28
scala> val words = lines.map(line => line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[15] at map at <console>:26

输出: n 个元素,每个元素是一个 String 数组

  • flatMap(func)
scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26

输出:所有单词

  • groupByKey(), reduceByKey(func)
    按 key 合并,得到 value list,后者还可以根据 func 对 value list 进行操作

3. RDD动作

spark 遇到 RDD action 时才会真正的开始执行,遇到转换的时候,只是记录下来,并不真正执行

  • count() ,统计 rdd 元素个数
  • collect(),以数组形式返回所有的元素
  • first(),返回第一个元素
  • take(n),返回前 n 个元素
  • reduce(func),聚合
  • foreach(func),遍历
scala> val rdd = sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd.count()
res0: Long = 5scala> rdd.first()
res1: Int = 1scala> rdd.take(3)
res2: Array[Int] = Array(1, 2, 3)scala> rdd.reduce((a,b)=>a+b)
res3: Int = 15scala> rdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)scala> rdd.foreach(elem => println(elem))

4. 持久化

  • persist(),对一个 rdd 标记为持久化,遇到第一个 rdd动作 时,才真正持久化
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)scala> val rdd1 = sc.parallelize(list)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26scala> println(rdd1.count())
3scala> println(rdd1.collect().mkString("--"))
Hadoop--Spark--Hivescala> rdd1.cache() # 缓存起来,后续用到rdd1的时候,不用从头开始计算了
res10: rdd1.type = ParallelCollectionRDD[1] at parallelize at <console>:26

5. 分区

  • 提高并行度
  • 减小通信开销

分区原则:分区个数尽量 = 集群CPU核心数

  • 创建rdd时指定分区数量 sc.textFile(path, partitionNum)
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)scala> val rdd = sc.parallelize(arr, 2) # 2 个分区
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
  • 更改分区数量
scala> rdd.partitions.size
res0: Int = 2scala> val rdd1 = rdd.repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:28scala> rdd1.partitions.size
res1: Int = 1

  • wordCount 例子
scala> val lines = sc.| textFile("/user/word.txt") # 读取文件
lines: org.apache.spark.rdd.RDD[String] = /user/word.txt MapPartitionsRDD[6] at textFile at <console>:25scala> val wordCount = lines.flatMap(line => line.split(" ")).| map(word => (word, 1)).reduceByKey((a, b) => a+b)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:27scala> wordCount.collect() # 收集
res2: Array[(String, Int)] = Array((love,2), (spark,1), (c++,1), (i,2), (michael,1))scala> wordCount.foreach(println) # 打印
(spark,1)
(c++,1)
(i,2)
(michael,1)
(love,2)
  • 求平均值例子
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",3),("hadoop",7),("spark",3)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd.mapValues(x => (x, 1)).reduceByKey((x,y)=>(x._1+y._1, x._2+y._2)).mapValues(x => (x._1/x._2)).collect()
res0: Array[(String, Int)] = Array((spark,2), (hadoop,5))     

6. 文件数据读写

6.1 本地

scala> val textFile = sc.| textFile("file:///home/hadoop/workspace/word.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/workspace/word.txt MapPartitionsRDD[5] at textFile at <console>:25scala> textFile.| saveAsTextFile("file:///home/hadoop/workspace/writeword")# 后面跟的是一个目录,而不是文件名
ls /home/hadoop/workspace/writeword/
part-00000  part-00001  _SUCCESShadoop@dblab-VirtualBox:/usr/local/spark/bin$ cat /home/hadoop/workspace/writeword/part-00000
i love programming
it is very interesting
  • 再次读取写入的文件(会把目录下所有文件读取)
scala> val textFile = sc.textFile("file:///home/hadoop/workspace/writeword")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/workspace/writeword MapPartitionsRDD[9] at textFile at <console>:24

6.2 hdfs

scala> val textFile = | sc.textFile("hdfs://localhost:9000/user/word.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/word.txt MapPartitionsRDD[11] at textFile at <console>:25scala> textFile.first()
res6: String = i love programming

保存到 hdfs (默认 当前用户的目录前缀 /user/用户名/

scala> textFile.saveAsTextFile("writeword")

查看 hdfs

hadoop@dblab-VirtualBox:/usr/local/hadoop/bin$ ./hdfs dfs -ls -R /user/
drwxr-xr-x   - hadoop supergroup          0 2021-04-22 16:01 /user/hadoop
drwxr-xr-x   - hadoop supergroup          0 2021-04-21 22:48 /user/hadoop/.sparkStaging
drwx------   - hadoop supergroup          0 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002
-rw-r--r--   1 hadoop supergroup      73189 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002/__spark_conf__.zip
-rw-r--r--   1 hadoop supergroup  120047699 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002/__spark_libs__4686608713384839717.zip
drwxr-xr-x   - hadoop supergroup          0 2021-04-22 16:01 /user/hadoop/writeword
-rw-r--r--   1 hadoop supergroup          0 2021-04-22 16:01 /user/hadoop/writeword/_SUCCESS
-rw-r--r--   1 hadoop supergroup         42 2021-04-22 16:01 /user/hadoop/writeword/part-00000
-rw-r--r--   1 hadoop supergroup         20 2021-04-22 16:01 /user/hadoop/writeword/part-00001
drwxr-xr-x   - hadoop supergroup          0 2017-11-05 21:57 /user/hive
drwxr-xr-x   - hadoop supergroup          0 2017-11-05 21:57 /user/hive/warehouse
drwxr-xr-x   - hadoop supergroup          0 2017-11-05 21:57 /user/hive/warehouse/hive.db
-rw-r--r--   1 hadoop supergroup         62 2021-04-21 20:06 /user/word.txt

6.3 Json文件

hadoop@dblab-VirtualBox:/usr/local/hadoop/bin$ cat /usr/local/spark/examples/src/main/resources/people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> val jsonStr = sc.| textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
jsonStr: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.json MapPartitionsRDD[14] at textFile at <console>:25scala> jsonStr.foreach(println)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
  • 解析 json 文件
scala.util.parsing.json.JSON
JSON.parseFull(jsonString : String)
返回 Some or None 

编写程序

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
object JSONRead{def main(args:Array[String]){val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"val conf = new SparkConf().setAppName("JSONRead")val sc = new SparkContext(conf)val jsonStrs = sc.textFile(inputFile)val res = jsonStrs.map(s => JSON.parseFull(s))res.foreach({ r => r match {case Some(map:Map[String, Any]) => println(map)case None => println("parsing failed")case other => println("unknown data structure: " + other)}})}
}   

使用 sbt 编译打包为 jar,spark-submit --class "JSONRead" <路径 of jar>(有待实践操作)
参考: 使用Intellij Idea编写Spark应用程序(Scala+SBT) http://dblab.xmu.edu.cn/blog/1492-2/

6.4 Hbase

hadoop@dblab-VirtualBox:/usr/local/hbase/bin$ ./hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.5, r239b80456118175b340b2e562a5568b5c744252e, Sun May  8 20:29:26 PDT 2016hbase(main):001:0> disable "student"
0 row(s) in 3.0730 secondshbase(main):002:0> drop "student"
0 row(s) in 1.3530 secondshbase(main):003:0> create "student","info"
0 row(s) in 1.3570 seconds=> Hbase::Table - student
hbase(main):004:0> put "student","1","info:name","michael"
0 row(s) in 0.0920 secondshbase(main):005:0> put "student","1","info:gender","M"
0 row(s) in 0.0410 secondshbase(main):006:0> put "student","1","info:age","18"
0 row(s) in 0.0080 seconds

也需要编写程序,sbt 编译打包

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/472328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos 限制只能访问某个目录的php文件

vi /etc/php.ini #编辑 open_basedir .:/tmp/ #在380行 设置表示允许访问当前目录(即PHP脚本文件所在之目录)和/tmp/目录,可以防止php木马跨站,如果改了之后安装程序有问题(例如&#xff1a;织梦内容管理系统)&#xff0c;可以注销此行&#xff0c; 或者直接写上程序的目录/da…

python saltstack web_saltstack web uiweb平台界面

拾壹015/04/2016 下午 3:254楼2016-04-15 15:18:15,632 [cherrypy.error ][ERROR ][10998] [15/Apr/2016:15:18:15] ENGINE TypeError(“argument of type ‘NoneType’ is not iterable”,)Traceback (most recent call last):File “/usr/lib/python2.6/site-packages/cherry…

楼宇自控ba系统 服务器,楼宇自控BA系统

1、系统介绍楼宇自控系统 (Building Automation System) 针对楼宇内各种机电设备进行集中管理和监控。楼宇控制系统主要包括空调新风机组、送排风机、集水坑与排水泵、电梯、变配电、照明等。在整个楼宇范围内&#xff0c;通过整套楼宇自动控制系统及其内置最优化控制程序和预设…

用python解决生活问题_Python解决生活问题之闹钟程序的实现

昨天下班回家忘了带手机充电器&#xff0c;手机熄火没闹钟了&#xff0c;可现实是迟到30分钟以内要罚100RMB&#xff0c;超过30分钟算旷工要扣除3天工资&#xff0c;想想这代价&#xff0c;好吧&#xff0c;还是自己动手写一个闹钟程序吧&#xff01; 系统环境&#xff1a; Lin…

推荐系统可利用的特征

学自 极客时间 《深度学习推荐系统实战》 推荐系统就是利用“用户信息”&#xff0c;“物品信息”&#xff0c;“场景信息”这三大部分有价值数据&#xff0c;通过构建推荐模型得出推荐列表的工程系统 特征其实是对某个行为过程相关信息的抽象表达 构建特征原则&#xff1a;尽…

搭建git服务器

在远程仓库一节中&#xff0c;我们讲了远程仓库实际上和本地仓库没啥不同&#xff0c;纯粹为了7x24小时开机并交换大家的修改。 GitHub就是一个免费托管开源代码的远程仓库。但是对于某些视源代码如生命的商业公司来说&#xff0c;既不想公开源代码&#xff0c;又舍不得给GitHu…

回拨系统服务器,CISCO接入服务器回拨功能的实现

很多人都在实践中配置过CISCO的接入服务器&#xff0c;象2511、2620或者是5300。接入服务器提供了廉价的通过电话拨号远程访问企业网的方式。但是接入服务器提供的回拨功能却很少有人用到。回拨的过程是用户拨通接入服务器&#xff0c;输入用户名/密码&#xff0c;通过认证后&a…

python3时间戳转换成时间_Python3 日期与时间戳相互转换

开发中经常会对时间格式处理&#xff0c;对于时间数据&#xff0c;比如2019-02-28 10:23:29&#xff0c;有时需要日期与时间戳进行相互转换&#xff0c;在Python3中主要用到time模块&#xff0c;相关的函数如下&#xff1a;其中unix_time函数是正常时间转unix时间戳&#xff0c…

iis无法读取配置文件_SpringBoot 有很多读取配置文件的方法,你知道几个? 静态方法读取呢?...

SpringBoot 如何在静态方法中读取配置文件的值在Spring中呢有很多读取配置文件值的相关注解,读取这些配置文件都是依赖于Spring的方式。我发现的读取配置文件的方式有好几种。1、Value 注解2、ConfigurationProperties 和 EnableConfigurationProperties&#xff08;Compent&am…

LeetCode 1837. K 进制表示下的各位数字总和

文章目录1. 题目2. 解题1. 题目 给你一个整数 n&#xff08;10 进制&#xff09;和一个基数 k &#xff0c;请你将 n 从 10 进制表示转换为 k 进制表示&#xff0c;计算并返回转换后各位数字的 总和 。 转换后&#xff0c;各位数字应当视作是 10 进制数字&#xff0c;且它们的…

ajax 请求svg,jQuery append 到AJAX加载的SVG问题

我已成功通过AJAX从外部文件加载了一些svg:$("#svg").load(svgUrl " svg", function() {// do stuff});我的HTML看起来像这样:...我可以看到图形很好。现在&#xff0c;我想向已加载的svg添加一些其他svg元素。我将脚本更改为:$("#svg").load(s…

微信开发学习日记(一):快速阅读5本书,掌握整体背景

2015年1月开始学习微信开发。已经有多年开发经验了&#xff0c;从网上文章来看&#xff0c;微信开发主要是接口&#xff0c;然后是业务逻辑&#xff0c;不是很难。所以&#xff0c;我比较强调学习效率。一天学一点&#xff0c;是不能满足我的快速学习欲望的。在京东上&#xff…

python json序列化对象_Python学习之json序列化

一、什么是序列化在我们存储数据或者网络传输数据的时候&#xff0c;需要对我们的对象进行处理&#xff0c;把对象处理成方便存储和传输的数据格式。这个过程叫序列化&#xff0c;不同的序列化结果也不同&#xff0c;但目的是一样的&#xff0c;都是为了存储和传输在Python中三…

c语言中把一个数缩小十倍_C语言实例第04期,在控制台打印出著名的杨辉三角...

点击上方“C语言中文社区”&#xff0c;选择“设为星标★”技术干货第一时间送达&#xff01;往期回顾&#xff1a;C语言实例第01期&#xff0c;十进制数转换二进制数C语言实例第02期&#xff0c;判断某一年是否为闰年C语言实例第03期&#xff0c;使用*打印平行四边形实例代码/…

LeetCode 1838. 最高频元素的频数(二分查找)

文章目录1. 题目2. 解题1. 题目 元素的 频数 是该元素在一个数组中出现的次数。 给你一个整数数组 nums 和一个整数 k 。 在一步操作中&#xff0c;你可以选择 nums 的一个下标&#xff0c;并将该下标对应元素的值增加 1 。 执行最多 k 次操作后&#xff0c;返回数组中最高频…

wafer小程序服务器,Wafer - 企业级微信小程序全栈方案

Wafer 服务端 SDK - C#本项目是 Wafer 组成部分&#xff0c;以 SDK 的形式为业务服务器提供以下服务&#xff1a;SDK 获取本项目遵守 MIT 协议&#xff0c;可以直接下载 SDK 源码进行修改、编译和发布。如果使用自动部署并选择 C# 语言&#xff0c;则分配的业务服务器里已经部署…

Android学习按键事件监听与Command模式

Android学习按键事件监听与Command模式 - Dufresne - 博客园 Android学习按键事件监听与Command模式 一 Command模式 意图&#xff1a; 将一个请求封装为一个对象&#xff0c;从而使你可用不同的请求对客户进行参数化&#xff1b; 对请求排队或记录请求日志&#xff0c;以及支持…

postek二次开发_20190626_二次开发BarTender打印机_C#代码_一边读取TID_一边打印_打印机POSTEK...

demo代码如下:private void btnPrint_Click(object sender, EventArgs e){if (this.btnPrint.Text "停止打印"){SetBtnPrintUIEnable();return;}//禁用界面上的相关按钮SetBtnPrintUIDisable();var dt new DataTable();new Task(() >{///开始的打印//1. 获取选中…

fileinputstream_从Java中的FileInputStream读取字节

以下示例显示了如何从Java中的FileInputStream读取字节。import java.io.File;import java.io.FileInputStream;public class fileInputStream {public static void main(String[] args) {byte[] data new byte[1024]; //allocates memory for 1024 bytes//be careful about h…

LeetCode 1839. 所有元音按顺序排布的最长子字符串(滑动窗口)

文章目录1. 题目2. 解题1. 题目 当一个字符串满足如下条件时&#xff0c;我们称它是 美丽的 &#xff1a; 所有 5 个英文元音字母&#xff08;a &#xff0c;e &#xff0c;i &#xff0c;o &#xff0c;u&#xff09;都必须 至少 出现一次。这些元音字母的顺序都必须按照 字…