Spark【RDD编程(二)RDD编程基础】

前言

接上午的那一篇,下午我们学习剩下的RDD编程,RDD操作中的剩下的转换操作和行动操作,最好把剩下的RDD编程都学完。

Spark【RDD编程(一)RDD编程基础】

RDD 转换操作

6、distinct

对 RDD 集合内部的元素进行去重,然后把去重后的其他元素放到一个新的 RDD 集合内。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDTransForm {def main(args: Array[String]): Unit = {// 创建SparkContext对象val conf = new SparkConf()conf.setAppName("spark core rdd transform").setMaster("local")val sc = new SparkContext(conf)// 通过并行集合创建RDD对象val arr = Array("Spark","Flink","Spark","Storm")val rdd1: RDD[String] = sc.parallelize(arr)val rdd2: RDD[String] = rdd1.distinct()rdd2.foreach(println)//关闭SparkContextsc.stop()}
}

运行输出:

Flink
Spark
Storm

可以看到,重复的元素"Spark"被去除掉。 

7、union

对 两个 RDD 集合进行并集运算,并返回新的 RDD集合,虽然是并集运算,但整个过程不会把重复的元素去除掉。
// 通过并行集合创建RDD对象val arr1 = Array("Spark","Flink","Storm")val arr2 = Array("Spark","Flink","Hadoop")val rdd1: RDD[String] = sc.parallelize(arr1)val rdd2: RDD[String] = sc.parallelize(arr2)val rdd3: RDD[String] = rdd1.union(rdd2)rdd3.foreach(println)

运行结果:

Spark
Flink
Storm
Spark
Flink
Hadoop
可以看到,重复的元素"Spark"和"Flink"没有被去除。

8、intersection

对两个RDD 集合进行交集运算。

// 通过并行集合创建RDD对象val arr1 = Array("Spark","Flink","Storm")val arr2 = Array("Spark","Flink","Hadoop")val rdd1: RDD[String] = sc.parallelize(arr1)val rdd2: RDD[String] = sc.parallelize(arr2)val rdd3: RDD[String] = rdd1.intersection(rdd2)rdd3.foreach(println)

运行结果:

Spark
Flink

"Spark"和"Flink"是两个RDD集合都有的。 

9、subtract

对两个RDD 集合进行差集运算,并返回新的RDD 集合。

rdd1.substract(rdd2) 返回的是 rdd1有而rdd2中没有的元素,并不会把rdd2中有rdd1中没有的元素也包进来。

// 通过并行集合创建RDD对象val arr1 = Array("Spark","Flink","Storm")val arr2 = Array("Spark","Flink","Hadoop")val rdd1: RDD[String] = sc.parallelize(arr1)val rdd2: RDD[String] = sc.parallelize(arr2)val rdd3: RDD[String] = rdd1.subtract(rdd2)rdd3.foreach(println)

运算结果:

Storm

"Storm"是rdd1中有的二rdd2中没有的,并不会返回"Hadoop"。 

10、zip

把两个 RDD 集合中的元素以键值对的形式进行合并,所以需要确保两个RDD 集合的元素个数必须是相同的。

// 通过并行集合创建RDD对象val arr1 = Array("Spark","Flink","Storm")val arr2 = Array(1,3,5)val rdd1: RDD[String] = sc.parallelize(arr1)val rdd2: RDD[Int] = sc.parallelize(arr2)val rdd3: RDD[(String,Int)] = rdd1.zip(rdd2)rdd3.foreach(println)

运行结果:

(Spark,1)
(Flink,3)
(Storm,5)

RDD 行动操作

RDD 的行动操作是真正触发计算的操作,计算过程十分简单。

1、count

返回 RDD 集合中的元素数量。

2、collect

以数组的形式返回 RDD 集合中所有元素。

3、first

返回 RDD 集合中的第一个元素。

4、take(n)

返回 RDD 集合中前n个元素。

5、reduce(func)

以规则函数func对RDD集合中的元素进行循环处理,比如将所有元素加到一起或乘起来。

6、foreach

对RDD 集合进行遍历,输出RDD集合中所有元素。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDAction {def main(args: Array[String]): Unit = {// 创建SparkContext对象val conf = new SparkConf()conf.setAppName("spark core rdd transform").setMaster("local")val sc = new SparkContext(conf)//通过并行集合创建 RDD 对象val arr: Array[Int] = Array(1,2,3,4,5)val rdd: RDD[Int] = sc.parallelize(arr)val size: Long = rdd.count()val nums: Array[Int] = rdd.collect()val value: Int = rdd.first()val res: Array[Int] = rdd.take(3)val sum: Int = rdd.reduce((v1, v2) => v1 + v2)println("size = " + size)println("The all elements are ")nums.foreach(println)println("The first element in rdd is " + value)println("The first three elements are ")res.foreach(println)println("sum is " + sum)rdd.foreach(print)//关闭SparkContextsc.stop()}}

运行结果:

size = 5
The all elements are 
1
2
3
4
5
The first element in rdd is 1
The first three elements are 
1
2
3
sum is 15
12345
Process finished with exit code 0

文本长度计算案例

计算 data 目录下的文件字节数(文本总长度)。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object FileLength {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("spark core rdd transform").setMaster("local")val sc = new SparkContext(conf)val rdd1: RDD[String] = sc.textFile("data")val rdd2: RDD[Int] = rdd1.map(line => line.length)val fileLength: Int = rdd2.reduce((len1, len2) => len1 + len2)println("File length is " + fileLength)sc.stop()}
}

持久化

在Spark 中,RDD采用惰性机制,每次遇到行动操作,就会从头到尾开始执行计算,这对于迭代计算代价是很大的,因为迭代计算经常需要多次重复使用相同的一组数据。

  • 使用cache() 方法将需要持久化的RDD对象持久化进缓存中
  • 使用unpersist() 方法将持久化rdd从缓存中释放出来
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDCache {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("spark core rdd transform").setMaster("local")val sc = new SparkContext(conf)val list = List("Hadoop","Spark","Hive","Flink")val rdd: RDD[String] = sc.parallelize(list)rdd.cache()println(rdd.count())  //第一次行动操作println(rdd.collect.mkString(",")) //第二次行动操作rdd.unpersist() //把这个持久化的rdd从缓存中移除,释放内存空间sc.stop()}
}

分区

分区的作用

        RDD 是弹性分布式数据集,通过 RDD 都很大,会被分成多个分区,分别保存在不同的节点上。进行分区的好处:  

  1. 增加并行度。一个RDD不分区直接进行计算的话,不能充分利用分布式集群的计算优势;如果对RDD集合进行分区,由于一个文件保存在分布式系统中不同的机器节点上,可以就近利用本分区的机器进行计算,从而实现多个分区多节点同时计算,并行度更高。
  2. 减少通信开销。通过数据分区,对于一些特定的操作(如join、reduceByKey、groupByKey、leftOuterJoin等),可以大幅度降低网络传输。

分区的原则

        使分区数量尽量等于集群中CPU核心数目。可以通过设置配置文件中的 spark.default.parallelism 这个参数的值,来配置默认的分区数目。

设置分区的个数 

1、创建 RDD对象时指定分区的数量

1.1、通过本地文件系统或HDFS加载

sc.textFile(path,partitionNum)

1.2、通过并行集合加载 

 对于通过并行集合来创建的RDD 对象,如果没有在参数中指定分区数量,默认分区数目为 min(defaultParallelism,2) ,其中defaultParallelism就是配置文件中的spark.default.parallelism。如果是从HDFS中读取文件,则分区数目为文件分片的数目。

2、使用repartition()方法重新设置分区个数

val rdd2 = rdd1.repartition(1)    //重新设置分区为1

自定义分区函数

继承 org.apache.spark.Partitioner 这个类,并实现下面3个方法:

  1. numPartitions: Int ,用于返回创建出来的分区数。
  2. getPartition(key: Any),用于返回给定键的分区编号(0~paratitionNum-1)。
  3. equals(),Java中判断相等想的标准方法。

注意:Spark 的分区函数针对的是(key,value)类型的RDD,也就是说,RDD中的每个元素都是(key,value)类型的,然后函数根据 key 对RDD 元素进行分区。所以,当要对一些非(key,value)类型的 RDD 进行自定义分区时,需要首先把 RDD 元素转换为(key,value)类型,然后再使用分区函数。

案例

将奇数和偶数分开写到不同的文件中去。

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}class MyPartitioner(numParts: Int = 2) extends Partitioner{//覆盖默认的分区数目override def numPartitions: Int = numParts//覆盖默认的分区规则override def getPartition(key: Any): Int = {if (key.toString.toInt%2==0) 1 else 0}
}
object MyPartitioner{def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("partitioner").setMaster("local")val sc: SparkContext = new SparkContext(conf)val data: Array[Int] = (1 to 100).toArrayval rdd: RDD[Int] = sc.parallelize(data,5)val savePath:String = System.getProperty("user.dir")+"/data/rdd/out"rdd.map((_,1)).partitionBy(new MyPartitioner()).map(_._1).saveAsTextFile(savePath)sc.stop()}
}

我们在代码中创建RDD 对象的时候,我们指定了分区默认的数量为 5,然后我们使用我们自定义的分区,观察会不会覆盖掉默认的分区数量: 

运行结果:

我们可以看到,除了校验文件,一共生成了两个文件,其中一个保存了1~100的所有奇数,一个保存了1~100的所有偶数; 

综合案例

在上一篇博客中,我们已经做过WordCount了,但是明显篇幅比较长,这里我们简化后只需要两行代码:

    //使用本地文件作为数据加载创建RDD 对象val rdd: RDD[String] = sc.textFile("data/word.txt")//RDD("Hadoop is good","Spark is better","Spark is fast")val res_rdd: RDD[(String,Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)//flatMap://RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))//RDD("Hadoop","is",good","Spark","is","better","Spark","is","fast"))

运行结果:

(Spark,2)
(is,3)
(fast,1)
(good,1)
(better,1)
(Hadoop,1)

总结

至此,我们RDD基础编程部分就结束了,但是RDD编程还没有结束,接下来我会继续学习键值对RDD、数据读写,最后总结性低做一个大的综合案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74290.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

酷克数据与华为合作更进一步 携手推出云数仓联合解决方案

在一起,共迎新机遇!8月25-26日,2023华为数据存储用户精英论坛在西宁召开。酷克数据作为国内云原生数据仓库的代表企业,也是华为重要的生态合作伙伴,受邀参与本次论坛,并展示了云数仓领域最新前沿技术以及联…

JavaScript设计模式(五)——发布订阅模式、桥接模式、组合模式

个人简介 👀个人主页: 前端杂货铺 🙋‍♂️学习方向: 主攻前端方向,正逐渐往全干发展 📃个人状态: 研发工程师,现效力于中国工业软件事业 🚀人生格言: 积跬步…

前后端分离项目-基于springboot+vue的it职业生涯规划系统的设计与实现(内含代码+文档+报告)

it职业生涯规划系统在jdk环境中,使用Java语言进行编码,使用Mysql创建数据表保存本系统产生的数据。系统可以提供信息显示和相应服务。总之,it职业生涯规划系统集中管理信息,有着保密性强,效率高,存储空间大…

高忆管理:六连板捷荣技术或难扛“华为概念股”大旗

在本钱商场上名不见经传的捷荣技术(002855.SZ)正扛起“华为概念股”大旗。 9月6日,捷荣技术已拿下第六个连续涨停板,短短七个生意日,股价累积涨幅逾越90%。公司已连发两份股票生意异动公告。 是炒作,还是…

Linux命令200例:mkfs用于创建文件系统

🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌。CSDN专家博主,阿里云社区专家博主,2023年6月csdn上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师&#xff0…

Pytest系列-fixture的详细使用和结合conftest.py的详细使用(3)

介绍 前面一篇讲了setup、teardown可以实现在执行用例前或结束后加入一些操作,但这种都是针对整个脚本全局生效的。 Fixture是pytest的非常核心功能之一,在不改变被装饰函数的前提下对函数进行功能增强,经常用于自定义测试用例前置和后置工作…

恭贺弘博创新2023下半年软考(中/高级)认证课程顺利举行

为迎接2023年下半年软考考试,弘博创新于2023年9月2日举行了精品的软考中/高级认证课程,线下线上学员都积极参与学习。 在课程开始之前,弘博创新的老师为学员们提供了详细的学习资料和准备建议,以确保学员们在课程中能够跟上老师的…

【实践篇】Redis最强Java客户端(三)之Redisson 7种分布式锁使用指南

文章目录 0. 前言1. Redisson 7种分布式锁使用指南1.1 简单锁:1.2 公平锁:1.3 可重入锁:1.4 红锁:1.5 读写锁:1.6 信号量:1.7 闭锁: 2. Spring boot 集成Redisson 验证分布式锁3. 参考资料4. 源…

LeetCode 49题: 字母异位词分组

题目 给你一个字符串数组,请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 示例 1: 输入: strs ["eat", "tea", "tan", "ate", "nat&qu…

Linux中的软件管家——yum

目录 ​编辑 一,软件安装的方式 二,对yum的介绍 1.yum的作用 2,yum的库 三,yum下载软件的操作 1.yumlist 2.yuminstall 3.yumremove 四,yum源的转换 一,软件安装的方式 软件安装的方式大概分为三种…

【韩顺平】Linux基础

目录 1.网络连接三种方式 1.1 桥接模式:虚拟系统可以和外部系统通讯,但是容易造成IP冲突【1-225】 1.2 NAT模式:网络地址转换模式。虚拟系统可以和外部系统通讯,不造成IP冲突。 1.3 主机模式:独立的系统。 2.虚拟机…

C# PSO 粒子群优化算法 遗传算法 随机算法 求解复杂方程的最大、最小值

复杂方程可以自己定义,以下是看别人的题目,然后自己来做 以下是计算结果 private void GetMinResult(out double resultX1, out double min){double x1, result;Random random1 new Random(DateTime.Now.Millisecond* DateTime.Now.Second);min 99999…

C语言柔性数组详解:让你的程序更灵活

柔性数组 一、前言二、柔性数组的用法三、柔性数组的内存分布四、柔性数组的优势五、总结 一、前言 仔细观察下面的代码,有没有看出哪里不对劲? struct S {int i;double d;char c;int arr[]; };还有另外一种写法: struct S {int i;double …

软件与系统安全复习

软件与系统安全复习 课程复习内容 其中 软件与系统安全基础 威胁模型 对于影响系统安全的所有信息的结构化表示 本质上,是从安全的视角解读系统与其环境 用于理解攻击者 什么可信、什么不可信攻击者的动机、资源、能力;攻击造成的影响 具体场景…

了解被测系统(二)接入链路--包括域名解析和Nginx代理

目录 一、接入链路示例 二、域名解析过程 1、相关概念 1.1、域的结构 1.2、DNS是什么? 1.3、DNS根域名服务器 1.4、顶级域名服务器 1.5、权威域名服务器 2、域名解析过程 2.1、检查Hosts文件 2.2、检查本地DNS缓存 2.3、DNS解析--本地DNS服务器 2.4、D…

后端SpringBoot+前端Vue前后端分离的项目(二)

前言:完成一个列表,实现表头的切换,字段的筛选,排序,分页功能。 目录 一、数据库表的设计 ​编辑二、后端实现 环境配置 model层 mapper层 service层 service层单元测试 controller层 三、前端实现 interface接…

合宙Air724UG LuatOS-Air LVGL API控件-滑动条 (Slider)

滑动条 (Slider) 滑动条看起来和进度条是有些是有些像,但不同的是滑动条可以进行数值选择。 示例代码 -- 回调函数 slider_event_cb function(obj, event)if event lvgl.EVENT_VALUE_CHANGED then local val (lvgl.slider_get_value(obj) or "0")..&…

在PHP8中遍历数组-PHP8知识详解

所谓遍历数组就是把数组中的变量值读取出来。遍历数组中的所有元素对程序员来说是经常使用的操作,通过遍历数组可以完成数组元素的查询工作。 这好比你去商场买东西一样,要买什么东西,就去该区域浏览一遍,以便找出适合自己的产品…

【JavaEE】_CSS常用属性值

目录 1. 字体属性 1.1 设置字体家族 font-family 1.2 设置字体大小 font-size 1.3 设置字体粗细 font-weight 1.4 设置字体倾斜 font-style 2. 文本属性 2.1 设置文本颜色 color 2.2 文本对齐 text-align 2.3 文本装饰 text-decoration 2.4 文本缩进 text-indent 2.…

合宙Air724UG LuatOS-Air LVGL API控件-图片 (Image)

图片 (Image) 图片IMG是用于显示图像的基本对象类型,图像来源可以是文件,或者定义的符号。 示例代码 -- 创建图片控件 img lvgl.img_create(lvgl.scr_act(), nil) -- 设置图片显示的图像 lvgl.img_set_src(img, "/lua/luatos.png") -- 图片…