Spark-Scala语言实战(9)

之前的文章中,我们学习了如何在spark中使用RDD方法的flatMap,take,union。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(8)-CSDN博客文章浏览阅读675次,点赞16次,收藏10次。​今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的flatMap,take,union三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137157697?今天的文章,我会继续带着大家如何在spark的中使用我们的RDD方法。今天学习RDD方法中的filter,distinct,intersection三种方法,并做一道相关例题。

一、知识回顾

昨天我们学习了RDD的三种方法,分别是flatMap,take,union。

flatMap的一般作用是用来切分我们的单词

它会构建一个新的RDD 

take方法是用来获取我们RDD中前n个元素,n可以自行设置

union可以将我们的两个RDD进行合并操作

但使用我们的union方法时,需保证两个RDD的数据类型相同,否则无法运行。

现在,开始今天的学习吧。

二、RDD方法

1.filter

  • filter()方法是一种转换操作,用于过滤RDD中的元素。
  • filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
  • filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD
import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val p = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val rdd = sc.parallelize(p)// 使用filter操作过滤出所有偶数val pp = rdd.filter(x => x % 2 == 0)// 收集结果并打印val ppp = pp.collect()ppp.foreach(println)}
}

可以看到我们的代码创建了一个1到10的数组,也可以看到注释中我们的需求是筛出里面包括的偶数,那么我们运行代码得到的就应该是2,4,6,8,10,现在,运行我们的代码看看是否得到预期的值吧。

可以看到左下角成功输出代码预期值。

2.distinct

  •  distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val p = Array(1, 2, 2, 3, 4, 4, 5, 5, 5)val pp = sc.parallelize(p)// 使用distinct操作去除重复元素val ppp = pp.distinct()// 收集结果并打印val pppp = ppp.collect()pppp.foreach(println)}
}

可以看到我们的代码给了一组重复数据特别多的数组,那么我们的distinct方法肯定就是要将它进行降重操作了,那么我们现在运行代码来看一下。

可以看到我们预期的降重实现了,但是它的输出顺序特别混乱,这是因为Spark 的分布式计算模型决定了数据在不同分区之间可能会被打乱,并且在执行 distinct 操作时可能会进行重分区。因此,即使你的输入数组  是有序的,经过 distinct 处理后的输出数组很可能不是有序的。

那么要解决这个问题,我们肯定需要手动排序了

在这里我们就可以使用到sorted进行排序。

    val ppppp=pppp.sortedppppp.foreach(println)

把这两行代码加到末尾,运行代码

可以看到输出预期中降重并排序的结果了。 

3.intersection

  •  intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val p1 = sc.parallelize(Array(1, 2, 3, 4, 5))val p2 = sc.parallelize(Array(4, 5, 6, 7, 8))// 计算两个RDD的交集val ppp = p1.intersection(p2)// 收集结果并打印val ppppp = ppp.collect()ppppp.foreach(println)}
}

 看代码,我们定义了两个数组,那么既然intersection是求交集,那么运行代码输出的肯定是两个数组中的共同元素,即4,5。运行代码

可以看到成功输出我们交集4与5

三、任务实现

现在,我们有两个csv文件,里面有我们大量的薪资信息,我们现在需要做的事情如下: 

  • 输出上半年或下半年实际薪资大于20万元的员工姓名。
  • 首先需要过滤出两个RDD中实际薪资大于20万元的员工姓名。
  • 再将两个RDD得到的员工姓名合并到一个RDD中,对员工姓名进行去重。
  • 即可得到上半年或下半年实际薪资大于20万元的员工姓名。

想要完成它,并不困难,现在我们把文件放在C盘的根目录下,方便寻找,当然这个位置可以自己随便放。

然后编写我们的代码:

import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)// 从C盘根目录读取第一个CSV文件val p1 = sc.textFile("C:\\Employee_salary_first_half.csv")// 从C盘根目录读取第二个CSV文件val p2 = sc.textFile("C:\\Employee_salary_second_half.csv")// 使用mapPartitionsWithIndex方法跳过CSV文件的标题行val pp1 = p1.mapPartitionsWithIndex((ix,it) => {if (ix ==0) it.drop(1)it})val pp2 = p2.mapPartitionsWithIndex((ix, it) => {if (ix == 0) it.drop(1)it})// 将pp1中的每一行转换为(员工名, 工资)元组val ppp1 = pp1.map(Line => {val data = Line.split(",");(data(1),data(6).toInt)})//使用逗号分割每行数据, 提取第二列和第七列数据,并将第七列转换为整数val ppp2 = pp2.map(Line => {val data = Line.split(",");(data(1),data(6).toInt)})val pppp1=ppp1.filter(x => x._2 > 200000).map(x => x._1)// 找出ppp1中工资超过200,000的元组,并只保留员工名val pppp2=ppp2.filter(x => x._2 > 200000).map(x => x._1)//x._n,n即使你要找的元素,通过 ._1 来访问第一个元素 a,通过 ._2 来访问第二个元素 b。val ppppp=pppp1.union(pppp2).distinct()//合并并降重ppppp.collect().foreach(println)//逐行打印}
}

我们先读取了两个文件,在将文件的标题行进行跳过,再分割数据找出需要的两行,最后找出工资大于200000的数据打印

来看看运行效果

可以看到我们预期的输出效果达到了。

拓展-方法参数设置

方法参数描述使用例子不同参数/效果
filterfunc对RDD中的每个元素应用函数func,返回True的元素保留,返回False的元素被过滤掉rdd.filter(lambda x: x > 3)通过修改func,可以定义不同的过滤条件,从而保留或过滤掉不同的元素。例如,lambda x: x % 2 == 0会保留偶数。
distinct返回一个包含RDD中所有不同元素的新RDD,去重rdd.distinct()此方法没有参数,它直接返回一个新的RDD,其中包含了原始RDD中的所有不同元素。这对于去除重复项非常有用。
intersectionother返回当前RDD与另一个RDDother的交集,结果中不包含重复元素rdd1.intersection(rdd2)other参数指定了另一个RDD,该方法将返回两个RDD中共有的元素。改变other的值将会影响交集的结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/791132.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ESP32S3 Sense接入语音识别+MiniMax模型+TTS模块语音播报】

【ESP32S3 Sense接入语音识别MiniMax模型TTS模块语音播报】 1. 前言2. 功能模块概述2.1 语音接入2.2 大模型接入2.3 TTS模块接入 3. 先决条件3.1 环境配置3.2 所需零件3.3 硬件连接步骤 4. 核心代码4.1 源码分享4.2 代码解析 5. 上传验证5.1 对话测试5.2 报错 6. 总结 1. 前言 …

C语言杂谈

努力扩大自己,以靠近,以触及自身以外的世界 文章目录 什么是定义?什么是声明?什么是赋值?什么是初始化?什么是生命周期?什么是作用域?全局变量?局部变量?size…

HCIA-RS基础-VLAN路由

目录 VLAN 路由1. 什么是 VLAN 路由2. VLAN 路由的原理及配置3. VLAN 的缺点和 VLAN Trunking4. 单臂路由配置 总结 VLAN 路由 1. 什么是 VLAN 路由 VLAN 路由是指在虚拟局域网(VLAN)之间进行路由转发的过程。传统的 VLAN 配置只能在同一个 VLAN 内进行…

LCD1602显示屏

LCD1602显示 概述 LCD1602(Liquid Crystal Display)是一种工业字符型液晶,能够同时显示 1602 即 32 字符(16列两行) 引脚说明 //电源 VSS -- GND VDD -- 5V //对比度 VO -- GND //控制线 RS -- P1.0 RW -- P1.1 E -- P1.4 //背光灯 A -- 5…

在ChatGPT中,能用DALL·E 3编辑图片啦!

4月3日,OpenAI开始向部分用户,提供在ChatGPT中的DALLE 3图片编辑功能。 DALLE 3是OpenAI在2023年9月20日发布的一款文生图模型,其生成的图片效果可以与Midjourney、leonardo、ideogram等顶级产品媲美,随后被融合到ChatGPT中增强其…

matlab的歧视:simulink不能使用stm32f4系列的ADC?

2023b的matlab,stm32f407芯片,运行内容Using the Analog to Digital Converter Block to Support STMicroelectronics STM32 Processor Based Boards Using the Analog to Digital Converter Block to Support STMicroelectronics STM32 Processor Base…

基于SSM的社区疫情防控管理信息系统

目录 背景 技术简介 系统简介 界面预览 背景 随着时代的进步,计算机技术已经全方位地影响了社会的发展。随着居民生活质量的持续上升,人们对社区疫情防控管理信息系统的期望和要求也在同步增长。在社区疫情防控日益受到广泛关注的背景下&#xff0c…

【漏洞复现】通天星CMSV6车载主动安全监控云平台inspect_file接口处存在任意文件上传漏洞

免责声明:文章来源互联网收集整理,请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失,均由使用者本人负责,所产生的一切不良后果与文章作者无关。该…

电商技术揭秘一:电商架构设计与核心技术

文章目录 引言一、电商平台架构概述1.1 架构设计原则与架构类型选择1.2 传统电商平台架构与现代化架构趋势分析 二、高并发处理与负载均衡2.1 高并发访问特点分析与挑战2.2 负载均衡原理与算法选择 三、分布式数据库与缓存技术3.1 分布式数据库设计与一致性考量3.2 缓存策略与缓…

基于SpringBoot和Vue的金融融资管理系统的设计和实现【附源码】

1、系统演示视频(演示视频) 2、需要交流和学习请联系

vue弹出的添加信息组件中 el-radio 单选框无法点击问题

情景描述:在弹出的添加信息的组件中的form中有一个单选框,单选框无法进行点击切换 原因如下: 单选框要求有个默认值,因为添加和更新操作复用同一个组件,所以我在初始化时对相关进行了判定,如果为空则赋初始值 结果这样虽然实现了初始值的展示,但是就是如此造成了单选框的无法切…

代码随想录算法训练营第二十九天(回溯5)|491. 非递减子序列、46. 全排列、47. 全排列 II(JAVA)

文章目录 491. 非递减子序列解题思路源码 46. 全排列解题思路源码 47. 全排列 II解题思路源码 总结 491. 非递减子序列 给你一个整数数组 nums ,找出并返回所有该数组中不同的递增子序列,递增子序列中 至少有两个元素 。你可以按 任意顺序 返回答案。 …

前端返回 List<Map<String, Object>>中的vaue值里面包含一个Bigdecimal类型,序列化时小数点丢失,如何解决?

🏆本文收录于「Bug调优」专栏,主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&…

Linux 命令 top 详解

1 top命令介绍 Linux系统中,Top命令主要用于实时运行系统的监控,包括Linux内核管理的进程或者线程的资源占用情况。这个命令对所有正在运行的进程和系统负荷提供不断更新的概览信息,包括系统负载、CPU利用分布情况、内存使用、每个进程的内容…

PS从入门到精通视频各类教程整理全集,包含素材、作业等(7)

PS从入门到精通视频各类教程整理全集,包含素材、作业等 最新PS以及插件合集,可在我以往文章中找到 由于阿里云盘有分享次受限制和文件大小限制,今天先分享到这里,后续持续更新 PS敬伟01——90集等文件 https://www.alipan.com/s…

Golang | Leetcode Golang题解之第7题整数反转

题目&#xff1a; 题解&#xff1a; func reverse(x int) (rev int) {for x ! 0 {if rev < math.MinInt32/10 || rev > math.MaxInt32/10 {return 0}digit : x % 10x / 10rev rev*10 digit}return }

ETL工具-nifi干货系列 第八讲 处理器PutDatabaseRecord 写数据库(详细)

1、本节通过一个小例子来讲解下处理器PutDatabaseRecord&#xff0c;该处理器的作用是将数据写入数据库。 如下流程通过处理器GenerateFlowFile 生成数据&#xff0c;然后通过处理器JoltTransformJSON转换结构&#xff0c;最后通过处理器PutDatabaseRecord将数据写入数据库。如…

keepalived+LVS高可用部署

目录 一.两台设备&#xff08;2.130和2.133&#xff09;作为调度器&#xff0c;前主后备 1.部署keepalived 2.修改配置文件准备启动 3.配置keepalived的系统日志并启动 二.模拟调度器掉点和web服务进程丢失 1.调度器掉点 2.当类似于httpd这种网站服务掉点 三.以三种健康…

C++ 前K个高频单词的六种解法

目录 大堆 小堆 vectorsort vectorstable_sort multimap set/multiset 与GPT的对话 1.对于比较类型中 < 运算符重载的理解 2.map有稳定性的说法吗 ​编辑 3.为什么map和set类的仿函数后面要加const来修饰*this 5.关于名词的理解 6.匿名对象对类要求 7.map和set的…

新手使用GIT上传本地项目到Github(个人笔记)

亲测下面的文章很有用处。 1. 初次使用git上传代码到github远程仓库 - 知乎 (zhihu.com) 2. 使用Git时出现refusing to merge unrelated histories的解决办法 - 知乎