Spark-Scala语言实战(9)

之前的文章中,我们学习了如何在spark中使用RDD方法的flatMap,take,union。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(8)-CSDN博客文章浏览阅读675次,点赞16次,收藏10次。​今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的flatMap,take,union三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137157697?今天的文章,我会继续带着大家如何在spark的中使用我们的RDD方法。今天学习RDD方法中的filter,distinct,intersection三种方法,并做一道相关例题。

一、知识回顾

昨天我们学习了RDD的三种方法,分别是flatMap,take,union。

flatMap的一般作用是用来切分我们的单词

它会构建一个新的RDD 

take方法是用来获取我们RDD中前n个元素,n可以自行设置

union可以将我们的两个RDD进行合并操作

但使用我们的union方法时,需保证两个RDD的数据类型相同,否则无法运行。

现在,开始今天的学习吧。

二、RDD方法

1.filter

  • filter()方法是一种转换操作,用于过滤RDD中的元素。
  • filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
  • filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD
import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val p = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val rdd = sc.parallelize(p)// 使用filter操作过滤出所有偶数val pp = rdd.filter(x => x % 2 == 0)// 收集结果并打印val ppp = pp.collect()ppp.foreach(println)}
}

可以看到我们的代码创建了一个1到10的数组,也可以看到注释中我们的需求是筛出里面包括的偶数,那么我们运行代码得到的就应该是2,4,6,8,10,现在,运行我们的代码看看是否得到预期的值吧。

可以看到左下角成功输出代码预期值。

2.distinct

  •  distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val p = Array(1, 2, 2, 3, 4, 4, 5, 5, 5)val pp = sc.parallelize(p)// 使用distinct操作去除重复元素val ppp = pp.distinct()// 收集结果并打印val pppp = ppp.collect()pppp.foreach(println)}
}

可以看到我们的代码给了一组重复数据特别多的数组,那么我们的distinct方法肯定就是要将它进行降重操作了,那么我们现在运行代码来看一下。

可以看到我们预期的降重实现了,但是它的输出顺序特别混乱,这是因为Spark 的分布式计算模型决定了数据在不同分区之间可能会被打乱,并且在执行 distinct 操作时可能会进行重分区。因此,即使你的输入数组  是有序的,经过 distinct 处理后的输出数组很可能不是有序的。

那么要解决这个问题,我们肯定需要手动排序了

在这里我们就可以使用到sorted进行排序。

    val ppppp=pppp.sortedppppp.foreach(println)

把这两行代码加到末尾,运行代码

可以看到输出预期中降重并排序的结果了。 

3.intersection

  •  intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val p1 = sc.parallelize(Array(1, 2, 3, 4, 5))val p2 = sc.parallelize(Array(4, 5, 6, 7, 8))// 计算两个RDD的交集val ppp = p1.intersection(p2)// 收集结果并打印val ppppp = ppp.collect()ppppp.foreach(println)}
}

 看代码,我们定义了两个数组,那么既然intersection是求交集,那么运行代码输出的肯定是两个数组中的共同元素,即4,5。运行代码

可以看到成功输出我们交集4与5

三、任务实现

现在,我们有两个csv文件,里面有我们大量的薪资信息,我们现在需要做的事情如下: 

  • 输出上半年或下半年实际薪资大于20万元的员工姓名。
  • 首先需要过滤出两个RDD中实际薪资大于20万元的员工姓名。
  • 再将两个RDD得到的员工姓名合并到一个RDD中,对员工姓名进行去重。
  • 即可得到上半年或下半年实际薪资大于20万元的员工姓名。

想要完成它,并不困难,现在我们把文件放在C盘的根目录下,方便寻找,当然这个位置可以自己随便放。

然后编写我们的代码:

import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)// 从C盘根目录读取第一个CSV文件val p1 = sc.textFile("C:\\Employee_salary_first_half.csv")// 从C盘根目录读取第二个CSV文件val p2 = sc.textFile("C:\\Employee_salary_second_half.csv")// 使用mapPartitionsWithIndex方法跳过CSV文件的标题行val pp1 = p1.mapPartitionsWithIndex((ix,it) => {if (ix ==0) it.drop(1)it})val pp2 = p2.mapPartitionsWithIndex((ix, it) => {if (ix == 0) it.drop(1)it})// 将pp1中的每一行转换为(员工名, 工资)元组val ppp1 = pp1.map(Line => {val data = Line.split(",");(data(1),data(6).toInt)})//使用逗号分割每行数据, 提取第二列和第七列数据,并将第七列转换为整数val ppp2 = pp2.map(Line => {val data = Line.split(",");(data(1),data(6).toInt)})val pppp1=ppp1.filter(x => x._2 > 200000).map(x => x._1)// 找出ppp1中工资超过200,000的元组,并只保留员工名val pppp2=ppp2.filter(x => x._2 > 200000).map(x => x._1)//x._n,n即使你要找的元素,通过 ._1 来访问第一个元素 a,通过 ._2 来访问第二个元素 b。val ppppp=pppp1.union(pppp2).distinct()//合并并降重ppppp.collect().foreach(println)//逐行打印}
}

我们先读取了两个文件,在将文件的标题行进行跳过,再分割数据找出需要的两行,最后找出工资大于200000的数据打印

来看看运行效果

可以看到我们预期的输出效果达到了。

拓展-方法参数设置

方法参数描述使用例子不同参数/效果
filterfunc对RDD中的每个元素应用函数func,返回True的元素保留,返回False的元素被过滤掉rdd.filter(lambda x: x > 3)通过修改func,可以定义不同的过滤条件,从而保留或过滤掉不同的元素。例如,lambda x: x % 2 == 0会保留偶数。
distinct返回一个包含RDD中所有不同元素的新RDD,去重rdd.distinct()此方法没有参数,它直接返回一个新的RDD,其中包含了原始RDD中的所有不同元素。这对于去除重复项非常有用。
intersectionother返回当前RDD与另一个RDDother的交集,结果中不包含重复元素rdd1.intersection(rdd2)other参数指定了另一个RDD,该方法将返回两个RDD中共有的元素。改变other的值将会影响交集的结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/791132.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ESP32S3 Sense接入语音识别+MiniMax模型+TTS模块语音播报】

【ESP32S3 Sense接入语音识别MiniMax模型TTS模块语音播报】 1. 前言2. 功能模块概述2.1 语音接入2.2 大模型接入2.3 TTS模块接入 3. 先决条件3.1 环境配置3.2 所需零件3.3 硬件连接步骤 4. 核心代码4.1 源码分享4.2 代码解析 5. 上传验证5.1 对话测试5.2 报错 6. 总结 1. 前言 …

C语言杂谈

努力扩大自己,以靠近,以触及自身以外的世界 文章目录 什么是定义?什么是声明?什么是赋值?什么是初始化?什么是生命周期?什么是作用域?全局变量?局部变量?size…

HCIA-RS基础-VLAN路由

目录 VLAN 路由1. 什么是 VLAN 路由2. VLAN 路由的原理及配置3. VLAN 的缺点和 VLAN Trunking4. 单臂路由配置 总结 VLAN 路由 1. 什么是 VLAN 路由 VLAN 路由是指在虚拟局域网(VLAN)之间进行路由转发的过程。传统的 VLAN 配置只能在同一个 VLAN 内进行…

简单认识 node 包的幽灵依赖

幽灵依赖的概念与成因 在 Node.js 的包管理生态系统中,特别是使用 npm(Node Package Manager)作为包管理器时,“幽灵依赖”是指那些没有直接在项目的 dependencies 或 devDependencies 中显式声明,但却能够在项目的依…

LCD1602显示屏

LCD1602显示 概述 LCD1602(Liquid Crystal Display)是一种工业字符型液晶,能够同时显示 1602 即 32 字符(16列两行) 引脚说明 //电源 VSS -- GND VDD -- 5V //对比度 VO -- GND //控制线 RS -- P1.0 RW -- P1.1 E -- P1.4 //背光灯 A -- 5…

LLaMA-Factory+qwen多轮对话微调

LLaMA-Factory地址:https://github.com/hiyouga/LLaMA-Factory/blob/main/README_zh.md qwen地址:https://huggingface.co/Qwen/Qwen-7B-Chat/tree/main 数据准备 数据样例 [ {"id": "x3959", "conversations": [{&qu…

2024年150道高频Java面试题(十六)

31. Java 8 中引入的 Stream API 有哪些用途? Java 8 中引入的 Stream API 是一个强大的新特性,它提供了一种高效且易于使用的处理数据的方法。Stream API 的用途主要包括以下几个方面: 简化集合操作:Stream API 可以对集合对象…

在ChatGPT中,能用DALL·E 3编辑图片啦!

4月3日,OpenAI开始向部分用户,提供在ChatGPT中的DALLE 3图片编辑功能。 DALLE 3是OpenAI在2023年9月20日发布的一款文生图模型,其生成的图片效果可以与Midjourney、leonardo、ideogram等顶级产品媲美,随后被融合到ChatGPT中增强其…

matlab的歧视:simulink不能使用stm32f4系列的ADC?

2023b的matlab,stm32f407芯片,运行内容Using the Analog to Digital Converter Block to Support STMicroelectronics STM32 Processor Based Boards Using the Analog to Digital Converter Block to Support STMicroelectronics STM32 Processor Base…

基于SSM的社区疫情防控管理信息系统

目录 背景 技术简介 系统简介 界面预览 背景 随着时代的进步,计算机技术已经全方位地影响了社会的发展。随着居民生活质量的持续上升,人们对社区疫情防控管理信息系统的期望和要求也在同步增长。在社区疫情防控日益受到广泛关注的背景下&#xff0c…

【漏洞复现】通天星CMSV6车载主动安全监控云平台inspect_file接口处存在任意文件上传漏洞

免责声明:文章来源互联网收集整理,请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失,均由使用者本人负责,所产生的一切不良后果与文章作者无关。该…

电商技术揭秘一:电商架构设计与核心技术

文章目录 引言一、电商平台架构概述1.1 架构设计原则与架构类型选择1.2 传统电商平台架构与现代化架构趋势分析 二、高并发处理与负载均衡2.1 高并发访问特点分析与挑战2.2 负载均衡原理与算法选择 三、分布式数据库与缓存技术3.1 分布式数据库设计与一致性考量3.2 缓存策略与缓…

第四十二章 保护与 IRIS 的 Web 网关连接 - Windows

文章目录 第四十二章 保护与 IRIS 的 Web 网关连接 - WindowsKerberos 的 Windows Web 网关配置 Kerberos 的 UNIX Web 网关配置Kerberos 的 UNIX Web 网关配置基于 SSL/TLS 的身份验证和数据保护 第四十二章 保护与 IRIS 的 Web 网关连接 - Windows Kerberos 密钥表未针对 Wi…

Vue登陆鉴权方案(token)

Vue token 登陆鉴权完整方案-----总结 一、路由拦截 目的:页面跳转时验证,确认是否即将进入的页面需要登陆验证。 router/index.js //给需要验证的页面添加 meta : { requireAuth :true } 如下: export default new Router({scrollBehavior…

基于SpringBoot和Vue的金融融资管理系统的设计和实现【附源码】

1、系统演示视频(演示视频) 2、需要交流和学习请联系

vue弹出的添加信息组件中 el-radio 单选框无法点击问题

情景描述:在弹出的添加信息的组件中的form中有一个单选框,单选框无法进行点击切换 原因如下: 单选框要求有个默认值,因为添加和更新操作复用同一个组件,所以我在初始化时对相关进行了判定,如果为空则赋初始值 结果这样虽然实现了初始值的展示,但是就是如此造成了单选框的无法切…

Java Web面试题(四)

1. JDBJDBC的DriverManager主要用于管理一组JDBC驱动程序。其主要职责包括: JDBC的DriverManager主要用于管理一组JDBC驱动程序。其主要职责包括: 注册数据库驱动程序:在使用JDBC连接数据库之前,必须先注册适用于数据库的驱动程…

代码随想录算法训练营第二十九天(回溯5)|491. 非递减子序列、46. 全排列、47. 全排列 II(JAVA)

文章目录 491. 非递减子序列解题思路源码 46. 全排列解题思路源码 47. 全排列 II解题思路源码 总结 491. 非递减子序列 给你一个整数数组 nums ,找出并返回所有该数组中不同的递增子序列,递增子序列中 至少有两个元素 。你可以按 任意顺序 返回答案。 …

Java基础学习: Forest - 极简 HTTP 调用 API 框架

文章目录 一、介绍参考: 一、介绍 Forest是一个开源的Java HTTP客户端框架,专注于简化HTTP客户端的访问。它是一个高层的、极简的轻量级HTTP调用API框架,通过Java接口和注解的方式,将复杂的HTTP请求细节隐藏起来,使HT…

前端返回 List<Map<String, Object>>中的vaue值里面包含一个Bigdecimal类型,序列化时小数点丢失,如何解决?

🏆本文收录于「Bug调优」专栏,主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&…