Spark编程-键值对RDD(K,V)创建及常用操作

简述

        SparkRDD中可以包含任何类型的对象,在实际应用中,“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到,尤其是groupByKey和reduceByKey。
        Spark操作中经常会用到“键值对RDD”(Pair RDD),用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。

生产环境用到的操作

        以下为我在生产环境用到的操作

WordCount:

        统计文本中每个单词出现的次数,使用Pair RDD将每个单词作为键,将出现次数作为值,然后进行reduceByKey操作进行聚合。

分组聚合:

        将具有相同键的元素分组在一起,并对每个键的值进行聚合操作,如groupByKey、reduceByKey等。

数据连接和关联:

        使用键值对进行数据的连接和关联操作,如join、cogroup等。

数据预处理:

        对数据进行分组、排序、过滤等预处理操作,如groupBy、sortByKey、filter等。

数据分析和统计:

        使用Pair RDD进行数据分析和统计操作,如计算平均值、求和、最大值、最小值等。 通过Pair RDD,可以更方便地处理键值对数据,实现更灵活和复杂的数据处理和分析需求。

Pair RDD的创建方式

第一种:从文件中加载数据创建pairRDD

//测试数据,自己编的,文件名为personID
591,2021,15448329898,北京,彩信
592,2022,15648029823,河北,微信
593,2022,16742329894,山西,电话
594,2020,17748529893,海南,微信
595,2020,19048729896,大连,QQ

代码及运行结果

scala> val lines = sc.textFile("file:///data/testdata/personID.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///data/testdata/personID.txt MapPartitionsR                                    DD[1] at textFile at <console>:23scala> val pairRDD = lines.flatMap(elem => (elem + 1))
pairRDD: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[2] at flatMap at <console>:23scala> val pairRDD = lines.flatMap(line => line.split(",")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:                                    23scala> pairRDD.foreach(println)
(591,1)0:>                                                          (0 + 1) / 1]
(2023,1)
(15448329898,1)
(北京,1)
(彩信,1)
(592,1)
......

从代码执行结果来看:

 返回的结果是键值对类型的RDD,即RDD[(String, Int)]。从pairRDD.foreach(println)执行的打印输出结果也可以看到,都是由(单词,1)这种形式的键值对。

第二种:通过数组Array或集合List创建pairRDD

案例:

//使用array数组
scala> val array = Array("spark", "hadoop", "flink", "hive")
array: Array[String] = Array(spark, hadoop, flink, hive)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word =>(word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:23
scala> pairRDD.foreach(println)
(spark,1)
(hadoop,1)
(flink,1)
(hive,1)//使用list集合
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:23              ^scala> pairRDD.foreach(println)
(hadoop,1)
(spark,1)
(hive,1)

常用键值对转换操作

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等

reduceByKey(func)

功能:使用func函数合并具有相同键的值。注意,这里强调合并相同键。

比如,reduceByKey((a,b) => a+b),有五个键值对(nlp,1)
        (nlp,1)
        (spark,1)
        (nlp,1)
        (hadoop,1)
        (hadoop,1)

对具有相同key的键值对进行合并后的结果就是:

        (spark,1)
        (hadoop,2)
        (nlp,3)
我们对上面第二种方式创建List集合得到的pairRDD进行reduceByKey()操作,代码如下:

scala> val list = List("nlp","nlp","spark","nlp","hadoop","hadoop")
list: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:23scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)scala> pairRDD.reduceByKey((a,b) => a + b).foreach(println)
(spark,1)
(hadoop,2)
(nlp,3)

groupByKey()

功能:对具有相同键的值进行分组。注意,这里强调对相同的键分成一组。

比如,groupByKey((a,b) => a+b),有五个键值对(nlp,1)
        (nlp,1)
        (spark,1)
        (nlp,1)
        (hadoop,1)
        (hadoop,1)

我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:

scala> pairRDD.groupByKey()
res17: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:24
// 分组后,value被保存到Iterable[Int]中scala> pairRDD.groupByKey().foreach(println)
(spark,CompactBuffer(1))
(hadoop,CompactBuffer(1, 1))
(nlp,CompactBuffer(1, 1, 1))

keys

功能:会把键值对RDD中的key返回形成一个新的RDD。

scala> pairRDD.keys
res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at keys at <console>:24scala> pairRDD.keys.foreach(println)
nlp
nlp
spark
nlp
hadoop
hadoop

可以对返回的key的集合进行操作,比如说写入一个List集合中

scala> val prirRDDkeysList = pairRDD.keys.collect().toList
prirRDDkeysList: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)scala> val prirRDDkeysArray = pairRDD.keys.collect()
prirRDDkeysArray: Array[String] = Array(nlp, nlp, spark, nlp, hadoop, hadoop)

values

功能: 把键值对RDD中的value返回形成一个新的RDD。

scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)scala> pairRDD.values.foreach(println)
1
1
1
1
1
1

        将得到的值保存到数组或集合中

scala> val prirRDDValuesList = pairRDD.values.collect().toList
prirRDDValuesList: List[Int] = List(1, 1, 1, 1, 1, 1)scala> val prirRDDValueArray = pairRDD.values.collect()
prirRDDValueArray: Array[Int] = Array(1, 1, 1, 1, 1, 1)

注意

        为什么会报错value collect is not a member of Unit ,因为foreach方法返回的是Unit类型,它没有collect方法。

scala> val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList 
:26: error: value collect is not a member of Unit 
val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList

工作中使用collect()导致的内存不足调优:

        当处理大数据集时,可以考虑使用Spark的分布式计算能力来处理数据,而不是将所有数据收集到驱动程序中。这样可以避免内存不足的问题。

        我使用collect方法将这个RDD中的元素收集到驱动程序,并返回一个数组。如果pairRDD中的数据量很大,collect操作可能会导致内存不足的问题,建议在处理大数据集时,谨慎使用collect方法。我们可以用很多方法来避免:

  1. 使用RDD转换操作:可以使用各种RDD转换操作,如mapfilterreduceByKey等,对数据集进行转换和聚合操作。这些操作在分布式环境下进行,可以利用集群中的多个节点进行计算。
  2. 使用RDD的collecttake方法:如果只需要获取部分数据,可以使用collect方法将数据收集到驱动程序中,确保数据量不会导致内存不足,可以使用take方法获取RDD中的前几个元素。
  3. 使用RDD的sample方法:可以使用sample方法对数据进行采样,从而获取数据集的一个子集。这样可以在处理大数据集时降低计算和内存的压力。
  4. 使用Spark SQL或DataFrame:如果数据集结构化且存储在支持Spark SQL的数据源中,可以使用Spark SQL或DataFrame API进行数据操作和分析。这些API提供了更高级的数据操作和查询功能。
  5. 使用持久化存储:如果需要将处理结果保存下来或供其他程序使用,可以将结果存储在持久化存储系统中,如HDFS或数据库。这样可以避免将所有数据收集到驱动程序中。 利用集群中的计算资源进行并行计算,避免将所有数据收集到驱动程序中,可以使用RDD转换操作、采样、分页获取等技术来处理数据。

sortByKey()

功能:是返回一个根据键排序的RDD。

scala> pairRDD.sortByKey().foreach(println)
(hadoop,1)
(hadoop,1)
(nlp,1)
(nlp,1)
(nlp,1)
(spark,1)

mapValues(func)  (常用)

功能:对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。

        即我只对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。例如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。

scala> pairRDD.mapValues(a => a*2).foreach(println)
(nlp,2)
(nlp,2)
(spark,2)
(nlp,2)
(hadoop,2)
(hadoop,2)

join  (常用)

功能:对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
        对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。

案例代码:

scala> val paRDD1 = sc.parallelize(Array(("spark",2),("hadoop",3),("spark",1),("hive",4),("hadoop",2)))
paRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:26scala> val paRDD2 = sc.parallelize(Array(("spark","nicetry"),("hadoop","good"),("spark",234),("hive",2314),("hadoop","ohho")))
paRDD2: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[29] at parallelize at <console>:26scala> paRDD1.join(paRDD2).foreach(println)
(spark,(2,nicetry))
(spark,(2,234))
(spark,(1,nicetry))
(spark,(1,234))
(hive,(4,2314))
(hadoop,(3,good))
(hadoop,(3,ohho))
(hadoop,(2,good))
(hadoop,(2,ohho))

    eg:现在来看林子雨教授讲解的是真清晰,温故而知新。

一个完整实例-计算每种图书的每天平均销量

思路

计算一天中各种类图书卖出去的平均值,键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量

步骤

1、构建数组,包含对应键值对,调用parallelize方法生成 RDD

2、针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。

(注:collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。)

3、调用reduceByKey()函数,此处必须要十分准确地理解reduceByKey()函数的功能 => 合并具有相同键的值。

        reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value。

4、 计算最终结果。对得到的几个键值对构成的RDD执行mapValues()操作,得到每种书的每天平均销量。mapValues,key不变,只对值记性操作。value会被赋值给Lamda表达式x => (x._1 / x._2中的x,x的值就是(22,2),x._1就是22,表示hadoop书总销量是22,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是11。mapValues()输出的一个键值对就是("hadoop",11),其他同理。

代码

//构建书籍及销量
scala> val books = sc.parallelize(Array(("book1",5),("book2",10),("book3",8),("book1",6),("book2",12)))
books: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:23
// 统计
scala> val sum_books = books.mapValues(x => (x,1)).foreach(println)
(book1,(5,1))
(book2,(10,1))
(book3,(8,1))
(book1,(6,1))
(book2,(12,1))
sum_books: Unit = ()
//计算出现次数,value中,前面是总数,后面是天数,如(11,2),表示2天卖出11本
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).foreach(println)
(book1,(11,2))
(book3,(8,1))
(book2,(22,2))
average_books: Unit = ()//平均值统计
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).mapValues(x => x._1 / x._2).foreach(println)
(book1,5)
(book3,8)
(book2,11)
average_books: Unit = ()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/3913.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS样式

1.高度和宽度 .c1{height:300px;width:500px;}注意事项&#xff1a; 宽度支持百分比&#xff0c;高度不支持。行内标签&#xff1a;默认无效会计标签&#xff1a;默认有效&#xff08;霸道&#xff0c;右侧区域空白&#xff0c;也不给你用&#xff09; 2.块级和行内标签 块…

【Django学习】(十四)自定义action_router

之前我们的视图类可以继承GenericViewSet或者ModelViewSet&#xff0c;我们不用再自定义通用的action方法&#xff0c;但是有时候我们需要自定义action&#xff0c;我们该如何设计呢&#xff1f; 自定义action 1、手写视图逻辑 1.1、先在视图集里自定义action方法&#xff0…

GO语言泛型

set一般没什么不方便的 但是使用GET 需要使用类型断言,将取出来的数据转为预期数据, 空接口本身是一个装箱,会产生内存逃逸和多一部分空间. 于是1.17GO使用泛型. 泛型实现: 分析可执行文件后:发现 也就是泛型会为每个数据类型都生产一套代码,导致可执行文件大小增加,并且使用…

uni-app中a标签下载文件跳转后左上角默认返回键无法继续返回

1.首先使用的是onBackPress //跟onShow同级别 onBackPress(option){ uni.switchTab({ url:/pages/....... return true }) }发现其在uni默认头部中使用是可以的 但是h5使用了"navigationStyle":"custom"后手机默认的返回并不可以&#xff0c; 2.经过查询…

LCD-STM32液晶显示中英文-(5.字符编码)

目录 字符编码 字符编码说明参考网站 字符编码 ASCII编码 ASCII编码介绍 ASCII编码表 中文编码 1. GB2312标准 区位码 2. GBK编码 3. GB18030 各个标准的对比说明 4. Big5编码 字符编码 字符编码说明参考网站 字符编码及转换测试&#xff1a;导航菜单 - 千千秀字 …

智迪科技在创业板上市:市值约31亿元,谢伟明和黎柏松为实控人

7月17日&#xff0c;珠海市智迪科技股份有限公司&#xff08;下称“智迪科技”&#xff0c;SZ:301503&#xff09;在深圳证券交易所创业板上市。本次上市&#xff0c;智迪科技的发行价为31.59元/股&#xff0c;发行数量为2000万股&#xff0c;募资总额约为6.32亿元&#xff0c;…

onnx如何改变输入的维度

最近遇到一个难题&#xff0c;就算在用行为识别onnx转rknn的时候提示维度不对&#xff0c;因为行为识别模型是5维的。而rknn只支持4维。 我们先加载模型看一下它的input和node 可以看出模型的input[1]是一个全连接&#xff0c;因此我们可以直接修改他的input[0] input hel…

Kafka 入门到起飞系列 - 生产者发送消息流程解析

生产者通过producerRecord 对象封装消息主题、消息的value&#xff08;内容&#xff09;、timestamp(时间戳)等 生产者通过send()方法发送消息&#xff0c;send()方法会经过如下几步 1. 首先将消息交给拦截器&#xff08;Interceptor&#xff09;处理, 拦截器对生产者而言&…

静态数码管——FPGA

文章目录 前言一、数码管1、数码管简介2、共阴极数码管or共阳极数码管3、共阴极与共阳极的真值表 二、系统设计1、模块框图2、RTL视图 三、源码1、seg_led_static模块2、time_count模块3、top_seg_led_static(顶层文件) 四、效果五、总结六、参考资料 前言 环境&#xff1a; 1、…

数字化时代,智能文件工具让办公升级

无论是在办公室还是在学校&#xff0c;文件管理是我们日常工作中不可或缺的一环。传统的文件整理方式可能需要花费大量的时间和精力&#xff0c;而且常常容易出现混乱和遗漏。然而&#xff0c;随着科技的不断进步&#xff0c;我们现在有幸生活在一个数字化时代&#xff0c;因此…

如何正确有效的学习java前端(合集)

大量阅读 我是一个劲头十足的读者。所以&#xff0c;我的第一个关于学习JavaScript的技巧就是关于阅读&#xff0c;这绝不是巧合。书籍和其他的资源(如文章)可以在很大程度上帮助你学习JavaScript。通过实践学习&#xff0c;书籍是我学习新学科最喜欢的方式。在学习JavaScript的…

测试用例(2)

项目管理工具 主要用tapd&#xff0c;jira少用 acp 敏捷项目管理证书 task:故事&#xff0c;一个故事有开始也有结束&#xff0c;那么在项目管理里面&#xff0c;会把每个任务按照一个task来看&#xff0c;那么这个task也可以叫story&#xff0c;具体指的就是任务有开始有结…

ChatGPT火热之下的冷思考

作为一款基于人工智能的自然语言处理(NLP)​​聊天机器人​​程序&#xff0c;ChatGPT通过大量来自互联网的文本进行训练&#xff0c;并使用深度学习和机器学习算法来理解用户的问题并提供准确的回答。并且&#xff0c;ChatGPT还内置了情感分析、关键字提取和实体识别等功能&am…

Beyond Compare 代码比较工具

一、下载 官网下载地址&#xff1a; https://www.scootersoftware.com/download.php 选择 Windows 系统&#xff0c;简体中文版本&#xff0c;点击下载。 下载完成 二、安装 步骤1&#xff1a;双击安装包 步骤2&#xff1a;进入安装向导&#xff0c;点击下一步 步骤3&a…

在LLM的支持下使游戏NPC具有记忆化的方法

问题 使用GPT这样的LLM去处理游戏中的NPC和玩家的对话是个很好的点子&#xff0c;那么如何处理记忆化的问题呢。 因为LLM的输入tokens是有限制的&#xff0c;所以伴随着问题的记忆context是有窗口大小限制的&#xff0c;将所有的记忆输入LLM并不现实。 所以这里看到了stanfo…

Zookeeper

作为分布式中间件&#xff0c;zookeeper有以下几个重要功能 服务注册服务监听 &#xff1a;观察者模式&#xff0c;有服务上线或下线可以感知&#xff0c;并进行响应回调处理服务拉取配置中心CP特性数据存储方式为标准的文件结构 安装zk需要java环境&#xff0c;可参考 linux…

面试中关于自动化测试的认识

目录 一、什么是自动化测试&#xff0c;自动化测试的优势是什么&#xff1f; 二、什么样的项目比较适合做自动化测试&#xff0c;什么样的不适合做自动化测试&#xff1f; 三、在制定自动化测试计划的时候一般要考虑哪些点&#xff1f; 四、编写自动化脚本时的一些规范&…

怎么给pdf文件加密?pdf文档如何加密

在数字化时代&#xff0c;保护个人和机密信息的重要性越来越受到关注。PDF&#xff08;Portable Document Format&#xff09;是一种广泛使用的文件格式&#xff0c;用于共享和存储各种类型的文档。然而&#xff0c;由于其易于编辑和复制的特性&#xff0c;保护PDF文件中的敏感…

xss跨站脚本攻击总结

XSS(跨站脚本攻击) 跨站脚本攻击&#xff08;Cross Site Scripting&#xff09;&#xff0c;为了不和层叠样式表&#xff08;Cascading Style Sheets &#xff09;CSS的缩写混淆&#xff0c;故将跨站脚本攻击缩写为XSS。恶意攻击者往Web页面里插入恶意Script代码&#xff0c;当…

css基本样式的使用

1、高度和宽度 .c1{height: 300px;width: 500px; }注意事项&#xff1a; 宽度&#xff0c;支持百分比行内标签&#xff0c;默认无效块级标签&#xff0c;默认有效&#xff08;即使右侧空白&#xff0c;也不给你占用&#xff09; 块级和行内标签 css样式 标签&#xff1a; di…