RDD 编程

文章目录

    • 1. RDD 创建
    • 2. RDD转换
    • 3. RDD动作
    • 4. 持久化
    • 5. 分区
    • 6. 文件数据读写
      • 6.1 本地
      • 6.2 hdfs
      • 6.3 Json文件
      • 6.4 Hbase

学习自 MOOC Spark编程基础

1. RDD 创建

  • 从文件创建
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 2.1.0/_/Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.scala> val lines = sc.textFile("file:///home/hadoop/workspace/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:////home/hadoop/workspace/word.txt MapPartitionsRDD[1] at textFile at <console>:24
  • 从 hdfs 创建
scala> val lines = sc.textFile("hdfs://localhost:9000/user/word.txt")
lines: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/word.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> val lines = sc.textFile("/user/word.txt")
lines: org.apache.spark.rdd.RDD[String] = /user/word.txt MapPartitionsRDD[9] at textFile at <console>:24
  • 通过并行集合创建
scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:26

2. RDD转换

  • filter(func),过滤
scala> val linesWithSpark = lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:26
  • map(func) , 映射
scala> val rdd2 = rdd.map(x => x+10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:28
scala> val words = lines.map(line => line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[15] at map at <console>:26

输出: n 个元素,每个元素是一个 String 数组

  • flatMap(func)
scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:26

输出:所有单词

  • groupByKey(), reduceByKey(func)
    按 key 合并,得到 value list,后者还可以根据 func 对 value list 进行操作

3. RDD动作

spark 遇到 RDD action 时才会真正的开始执行,遇到转换的时候,只是记录下来,并不真正执行

  • count() ,统计 rdd 元素个数
  • collect(),以数组形式返回所有的元素
  • first(),返回第一个元素
  • take(n),返回前 n 个元素
  • reduce(func),聚合
  • foreach(func),遍历
scala> val rdd = sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd.count()
res0: Long = 5scala> rdd.first()
res1: Int = 1scala> rdd.take(3)
res2: Array[Int] = Array(1, 2, 3)scala> rdd.reduce((a,b)=>a+b)
res3: Int = 15scala> rdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)scala> rdd.foreach(elem => println(elem))

4. 持久化

  • persist(),对一个 rdd 标记为持久化,遇到第一个 rdd动作 时,才真正持久化
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)scala> val rdd1 = sc.parallelize(list)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26scala> println(rdd1.count())
3scala> println(rdd1.collect().mkString("--"))
Hadoop--Spark--Hivescala> rdd1.cache() # 缓存起来,后续用到rdd1的时候,不用从头开始计算了
res10: rdd1.type = ParallelCollectionRDD[1] at parallelize at <console>:26

5. 分区

  • 提高并行度
  • 减小通信开销

分区原则:分区个数尽量 = 集群CPU核心数

  • 创建rdd时指定分区数量 sc.textFile(path, partitionNum)
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)scala> val rdd = sc.parallelize(arr, 2) # 2 个分区
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
  • 更改分区数量
scala> rdd.partitions.size
res0: Int = 2scala> val rdd1 = rdd.repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:28scala> rdd1.partitions.size
res1: Int = 1

  • wordCount 例子
scala> val lines = sc.| textFile("/user/word.txt") # 读取文件
lines: org.apache.spark.rdd.RDD[String] = /user/word.txt MapPartitionsRDD[6] at textFile at <console>:25scala> val wordCount = lines.flatMap(line => line.split(" ")).| map(word => (word, 1)).reduceByKey((a, b) => a+b)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:27scala> wordCount.collect() # 收集
res2: Array[(String, Int)] = Array((love,2), (spark,1), (c++,1), (i,2), (michael,1))scala> wordCount.foreach(println) # 打印
(spark,1)
(c++,1)
(i,2)
(michael,1)
(love,2)
  • 求平均值例子
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",3),("hadoop",7),("spark",3)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd.mapValues(x => (x, 1)).reduceByKey((x,y)=>(x._1+y._1, x._2+y._2)).mapValues(x => (x._1/x._2)).collect()
res0: Array[(String, Int)] = Array((spark,2), (hadoop,5))     

6. 文件数据读写

6.1 本地

scala> val textFile = sc.| textFile("file:///home/hadoop/workspace/word.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/workspace/word.txt MapPartitionsRDD[5] at textFile at <console>:25scala> textFile.| saveAsTextFile("file:///home/hadoop/workspace/writeword")# 后面跟的是一个目录,而不是文件名
ls /home/hadoop/workspace/writeword/
part-00000  part-00001  _SUCCESShadoop@dblab-VirtualBox:/usr/local/spark/bin$ cat /home/hadoop/workspace/writeword/part-00000
i love programming
it is very interesting
  • 再次读取写入的文件(会把目录下所有文件读取)
scala> val textFile = sc.textFile("file:///home/hadoop/workspace/writeword")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/workspace/writeword MapPartitionsRDD[9] at textFile at <console>:24

6.2 hdfs

scala> val textFile = | sc.textFile("hdfs://localhost:9000/user/word.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/word.txt MapPartitionsRDD[11] at textFile at <console>:25scala> textFile.first()
res6: String = i love programming

保存到 hdfs (默认 当前用户的目录前缀 /user/用户名/

scala> textFile.saveAsTextFile("writeword")

查看 hdfs

hadoop@dblab-VirtualBox:/usr/local/hadoop/bin$ ./hdfs dfs -ls -R /user/
drwxr-xr-x   - hadoop supergroup          0 2021-04-22 16:01 /user/hadoop
drwxr-xr-x   - hadoop supergroup          0 2021-04-21 22:48 /user/hadoop/.sparkStaging
drwx------   - hadoop supergroup          0 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002
-rw-r--r--   1 hadoop supergroup      73189 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002/__spark_conf__.zip
-rw-r--r--   1 hadoop supergroup  120047699 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002/__spark_libs__4686608713384839717.zip
drwxr-xr-x   - hadoop supergroup          0 2021-04-22 16:01 /user/hadoop/writeword
-rw-r--r--   1 hadoop supergroup          0 2021-04-22 16:01 /user/hadoop/writeword/_SUCCESS
-rw-r--r--   1 hadoop supergroup         42 2021-04-22 16:01 /user/hadoop/writeword/part-00000
-rw-r--r--   1 hadoop supergroup         20 2021-04-22 16:01 /user/hadoop/writeword/part-00001
drwxr-xr-x   - hadoop supergroup          0 2017-11-05 21:57 /user/hive
drwxr-xr-x   - hadoop supergroup          0 2017-11-05 21:57 /user/hive/warehouse
drwxr-xr-x   - hadoop supergroup          0 2017-11-05 21:57 /user/hive/warehouse/hive.db
-rw-r--r--   1 hadoop supergroup         62 2021-04-21 20:06 /user/word.txt

6.3 Json文件

hadoop@dblab-VirtualBox:/usr/local/hadoop/bin$ cat /usr/local/spark/examples/src/main/resources/people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> val jsonStr = sc.| textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
jsonStr: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.json MapPartitionsRDD[14] at textFile at <console>:25scala> jsonStr.foreach(println)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
  • 解析 json 文件
scala.util.parsing.json.JSON
JSON.parseFull(jsonString : String)
返回 Some or None 

编写程序

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
object JSONRead{def main(args:Array[String]){val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"val conf = new SparkConf().setAppName("JSONRead")val sc = new SparkContext(conf)val jsonStrs = sc.textFile(inputFile)val res = jsonStrs.map(s => JSON.parseFull(s))res.foreach({ r => r match {case Some(map:Map[String, Any]) => println(map)case None => println("parsing failed")case other => println("unknown data structure: " + other)}})}
}   

使用 sbt 编译打包为 jar,spark-submit --class "JSONRead" <路径 of jar>(有待实践操作)
参考: 使用Intellij Idea编写Spark应用程序(Scala+SBT) http://dblab.xmu.edu.cn/blog/1492-2/

6.4 Hbase

hadoop@dblab-VirtualBox:/usr/local/hbase/bin$ ./hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.5, r239b80456118175b340b2e562a5568b5c744252e, Sun May  8 20:29:26 PDT 2016hbase(main):001:0> disable "student"
0 row(s) in 3.0730 secondshbase(main):002:0> drop "student"
0 row(s) in 1.3530 secondshbase(main):003:0> create "student","info"
0 row(s) in 1.3570 seconds=> Hbase::Table - student
hbase(main):004:0> put "student","1","info:name","michael"
0 row(s) in 0.0920 secondshbase(main):005:0> put "student","1","info:gender","M"
0 row(s) in 0.0410 secondshbase(main):006:0> put "student","1","info:age","18"
0 row(s) in 0.0080 seconds

也需要编写程序,sbt 编译打包

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/472328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用python解决生活问题_Python解决生活问题之闹钟程序的实现

昨天下班回家忘了带手机充电器&#xff0c;手机熄火没闹钟了&#xff0c;可现实是迟到30分钟以内要罚100RMB&#xff0c;超过30分钟算旷工要扣除3天工资&#xff0c;想想这代价&#xff0c;好吧&#xff0c;还是自己动手写一个闹钟程序吧&#xff01; 系统环境&#xff1a; Lin…

推荐系统可利用的特征

学自 极客时间 《深度学习推荐系统实战》 推荐系统就是利用“用户信息”&#xff0c;“物品信息”&#xff0c;“场景信息”这三大部分有价值数据&#xff0c;通过构建推荐模型得出推荐列表的工程系统 特征其实是对某个行为过程相关信息的抽象表达 构建特征原则&#xff1a;尽…

iis无法读取配置文件_SpringBoot 有很多读取配置文件的方法,你知道几个? 静态方法读取呢?...

SpringBoot 如何在静态方法中读取配置文件的值在Spring中呢有很多读取配置文件值的相关注解,读取这些配置文件都是依赖于Spring的方式。我发现的读取配置文件的方式有好几种。1、Value 注解2、ConfigurationProperties 和 EnableConfigurationProperties&#xff08;Compent&am…

LeetCode 1837. K 进制表示下的各位数字总和

文章目录1. 题目2. 解题1. 题目 给你一个整数 n&#xff08;10 进制&#xff09;和一个基数 k &#xff0c;请你将 n 从 10 进制表示转换为 k 进制表示&#xff0c;计算并返回转换后各位数字的 总和 。 转换后&#xff0c;各位数字应当视作是 10 进制数字&#xff0c;且它们的…

微信开发学习日记(一):快速阅读5本书,掌握整体背景

2015年1月开始学习微信开发。已经有多年开发经验了&#xff0c;从网上文章来看&#xff0c;微信开发主要是接口&#xff0c;然后是业务逻辑&#xff0c;不是很难。所以&#xff0c;我比较强调学习效率。一天学一点&#xff0c;是不能满足我的快速学习欲望的。在京东上&#xff…

c语言中把一个数缩小十倍_C语言实例第04期,在控制台打印出著名的杨辉三角...

点击上方“C语言中文社区”&#xff0c;选择“设为星标★”技术干货第一时间送达&#xff01;往期回顾&#xff1a;C语言实例第01期&#xff0c;十进制数转换二进制数C语言实例第02期&#xff0c;判断某一年是否为闰年C语言实例第03期&#xff0c;使用*打印平行四边形实例代码/…

LeetCode 1838. 最高频元素的频数(二分查找)

文章目录1. 题目2. 解题1. 题目 元素的 频数 是该元素在一个数组中出现的次数。 给你一个整数数组 nums 和一个整数 k 。 在一步操作中&#xff0c;你可以选择 nums 的一个下标&#xff0c;并将该下标对应元素的值增加 1 。 执行最多 k 次操作后&#xff0c;返回数组中最高频…

wafer小程序服务器,Wafer - 企业级微信小程序全栈方案

Wafer 服务端 SDK - C#本项目是 Wafer 组成部分&#xff0c;以 SDK 的形式为业务服务器提供以下服务&#xff1a;SDK 获取本项目遵守 MIT 协议&#xff0c;可以直接下载 SDK 源码进行修改、编译和发布。如果使用自动部署并选择 C# 语言&#xff0c;则分配的业务服务器里已经部署…

Android学习按键事件监听与Command模式

Android学习按键事件监听与Command模式 - Dufresne - 博客园 Android学习按键事件监听与Command模式 一 Command模式 意图&#xff1a; 将一个请求封装为一个对象&#xff0c;从而使你可用不同的请求对客户进行参数化&#xff1b; 对请求排队或记录请求日志&#xff0c;以及支持…

fileinputstream_从Java中的FileInputStream读取字节

以下示例显示了如何从Java中的FileInputStream读取字节。import java.io.File;import java.io.FileInputStream;public class fileInputStream {public static void main(String[] args) {byte[] data new byte[1024]; //allocates memory for 1024 bytes//be careful about h…

LeetCode 1839. 所有元音按顺序排布的最长子字符串(滑动窗口)

文章目录1. 题目2. 解题1. 题目 当一个字符串满足如下条件时&#xff0c;我们称它是 美丽的 &#xff1a; 所有 5 个英文元音字母&#xff08;a &#xff0c;e &#xff0c;i &#xff0c;o &#xff0c;u&#xff09;都必须 至少 出现一次。这些元音字母的顺序都必须按照 字…

特征处理

学自 极客时间 《深度学习推荐系统实战》 特征分为两类&#xff1a; 类别特征&#xff08;性别、地理位置、季节、天气、风格&#xff09;数值型特征&#xff08;年龄、收入、点击量、点击率&#xff09; 类别特征经过 One-Hot 编码后放入特征向量、或者 多个的特征值采用 M…

WIN 10 安装 Hadoop 2.7.7 + Spark 2.4.7 记录

文章目录0. 常规解压安装&#xff0c;并添加环境变量1. 下载并覆盖 bin 文件夹2. 使VERSION文件的clusterID一致3. 贴下单机配置4. 测试 Hadoop5. 安装Spark环境&#xff1a;win 10 java 1.8.0_281 Scala 2.11.11 Hadoop 2.7.7 Spark2.4.70. 常规解压安装&#xff0c;并添加…

canal mysql从库_canal中间件|数据增量同步解决方案

上一文中提到延时双删等策略实现数据一致性的时候&#xff0c;可能存在删除缓存失败的情况&#xff0c;就会出现缓存和数据库不一致的问题。为了应对删除缓存失败而导致数据不一致的问题&#xff0c;可以通过回溯数据库日志文件&#xff0c;提供一个保障的重试机制即可。流程如…

dbscan聚类算法matlab_密度聚类DBSCAN、HDBSCAN(转)

&#xfeff;# 密度聚类DBSCAN、HDBSCANDBSCANDBSCAN&#xff08;Density-Based Spatial Clustering of Applications with Noise&#xff0c;具有噪声的基于密度的聚类方法&#xff09;是一种基于密度的空间聚类算法。该算法将具有足够密度的区域划分为簇&#xff0c;并在具有…

Spark IDEA 编程环境配置

文章目录1. 下载资料准备2. 建立项目、添加环境设置3. 第一个 Spark 程序学自&#xff1a;Spark机器学习实战 https://book.douban.com/subject/35280412/ 记录一下踩过的坑&#xff1a; 环境&#xff1a;win 10 java 1.8.0_281 Scala 2.11.11 Hadoop 2.7.7 Spark2.4.7 …

天池 在线编程 订单分配(回溯)

文章目录1. 题目2. 解题1. 题目 描述 打车派单场景, 假定有N个订单&#xff0c;待分配给N个司机。 每个订单在匹配司机前&#xff0c;会对候选司机进行打分&#xff0c;打分的结果保存在N*N的矩阵score&#xff0c;其中score[i][j]代表订单 i 派给司机 j 的分值。 假定每个订单…

2015年北京户口全攻略

最新统计数据指出 截至2013年底&#xff0c;北京市常住人口为2114.8万人&#xff0c;其中常住外来人口为802.7万人。和“北京户口”捆绑的字眼&#xff0c;历来是身份、福利&#xff0c;以至于幸福感、安全感。那么北京户口有什么好处&#xff1f;外来人口如何落户北京呢&#…

水晶报表 jar包版本过低_工具类学习-UReport报表设计器整合

dada-report报表整合UReport报表设计器工具结合日常工作和学习实践&#xff0c;针对传统报表子站面临的问题&#xff0c;尝试借助UReport报表设计器解决1.可在现有工程基础上引入Ureport2报表设计器Ureport报表设计器是一个基于WEB的在线报表设计器&#xff0c;其具有良好的界面…

LeetCode 1844. 将所有数字用字符替换

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的字符串 s &#xff0c;它的 偶数 下标处为小写英文字母&#xff0c;奇数 下标处为数字。 定义一个函数 shift(c, x) &#xff0c;其中 c 是一个字符且 x 是一个数字&#xff0c;函数返回字母表中 c 后面第 x 个字符。 …