Spark-Scala语言实战(7)

在之前的文章中,我们学习了如何在IDEA中导入jars包,并做了一道例题,了解了RDD。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(6)-CSDN博客文章浏览阅读695次,点赞15次,收藏24次。今天我会给大家带来如何在IDEA中导入jars包,以及使用SparkRDD,并正确使用它们同时也会给大家讲解一道实训题例。希望在本篇文章中,大家有所收获。也欢迎朋友们到评论区下一起交流学习,共同进步。https://blog.csdn.net/qq_49513817/article/details/137121524?spm=1001.2014.3001.5502

今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的map,sortby,collect三种方法。

目录

一、知识回顾

二、RDD方法

1.map

2.sortby

3.collect

拓展-RDD和DStream

1.RDD和DStream的区别

2.RDD和DStream的联系


一、知识回顾

导入jars包的过程在上一篇文章中以及讲解的很清楚了,图文一步一步带着做。

主要就是进入Libraries 添加java,然后选择spark的jars文件夹即可

如果还有不懂的朋友可以直接评论问我。

在就是文件的这几行代码

import org.apache.spark.{SparkConf, SparkContext}val conf=new SparkConf().setMaster("local").setAppName("123456")val sc=new SparkContext(conf)

这是配置与方法,记住它们的作用。

现在,开始今天的学习吧

二、RDD方法

1.map

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD
  • map()方法是转换操作,不会立即进行计算。
  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

例:

import org.apache.spark.{SparkConf, SparkContext}  // 定义一个名为p1的Scala对象  
object p1 {  // 定义main方法,作为程序的入口点  def main(args: Array[String]): Unit = {  // 创建一个Spark配置对象,并设置运行模式为"local"(本地模式),应用程序名称为"p2"  val conf = new SparkConf().setMaster("local").setAppName("p2")  // 使用Spark配置对象创建一个SparkContext对象,SparkContext是Spark功能的入口点  val sc = new SparkContext(conf)  // 创建一个包含整数的列表,并使用parallelize方法将其转换为RDD  val ppp = sc.parallelize(List(1, 2, 3, 4, 5))  // 使用map操作将RDD中的每个元素乘以2,并返回一个新的RDD  val ppppp = ppp.map(x => x * 2)  //oreach方法遍历并打印每个元素  ppppp.collect().foreach(println)  }  
}

可以看到我们输出的在原列表上*2,达到了代码预期效果

2.sortby

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
  • 1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  • 2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false
  • 3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

例:

import org.apache.spark.{SparkConf, SparkContext}  object p1 {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local").setAppName("p2")  // 使用配置好的conf对象创建一个SparkContext对象sc。   val sc = new SparkContext(conf)  // 使用SparkContext的parallelize方法将包含整数的序列转换成一个RDD。  // 这个RDD现在可以在Spark上并行处理。  val ppp = sc.parallelize(Seq(5, 1, 9, 3, 7))  // 对ppp RDD中的元素进行排序。  // 使用sortBy方法,并传递一个函数x => x作为参数,表示按照元素本身的值进行排序(升序)。  val pppp = ppp.sortBy(x => x)   // 这将返回一个包含RDD所有元素的数组,存储在ppppp中。  val ppppp = pppp.collect()  // 使用foreach方法遍历数组ppppp中的每个元素,并使用println函数打印它们。  ppppp.foreach(println)  }  
}

看下输出可以看到我们的元素已经排序了

3.collect

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

例:

import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val pp = sc.parallelize(Seq(1, 2, 3, 4, 5))val ppp = pp.collect()ppp.foreach(println)}
}

collect的作用是将RDD中的数据收集到驱动程序中,所以这里运行看不出区别。

拓展-RDD和DStream

在上一篇文章中,我们了解到了RDD,那么DStream是什么呢,我们先来了解一下:

DStream(离散流)是Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream的内部实际上是一系列持续不断产生的RDD,每个RDD包含特定时间间隔的数据。DStream的创建可以通过输入数据源如Kafka、Flume,或者通过对其他DStream应用高阶函数如map、reduce、join、window来实现。

1.RDD和DStream的区别

RDDDStream
定义弹性分布式数据集,是Spark中最基本的数据处理模型。离散流,是Spark Streaming提供的一种高级抽象,代表一个持续不断的数据流。
数据结构静态的、不可变的数据集,可以划分为多个分区。动态的、连续的数据流,内部由一系列RDD组成。
数据处理方式批处理,适用于静态数据的处理和分析。流处理,适用于实时数据流的处理和分析。
时间维度无特定的时间维度,主要关注数据的分区和处理。具有时间维度,每个RDD代表一段时间内的数据。
操作方式对整个RDD进行操作,结果生成新的RDD。对DStream进行操作,结果生成新的DStream,底层转换为RDD操作。
应用场景大规模数据的批处理任务,如机器学习、数据挖掘等。实时数据流处理任务,如日志分析、实时监控等。
容错性具有容错性,数据丢失可以自动恢复。继承了RDD的容错性特点。
与Spark的关系Spark的核心组件,用于构建各种数据处理和分析任务。Spark Streaming的核心组件,用于处理实时数据流。

2.RDD和DStream的联系

RDDDStream
基础构建单元RDD是Spark的基本数据处理单元。DStream基于RDD构建,每个时间间隔内的数据对应一个RDD。
计算模型RDD支持分布式计算模型,数据被划分为多个分区进行并行处理。DStream继承了RDD的计算模型,对流数据进行分布式处理。
容错性RDD具有容错性,可以自动恢复丢失的数据。DStream同样具有容错性,因为它基于RDD构建。
操作方式RDD提供了一系列转换操作(如map、reduce)和动作操作(如collect、save)。DStream也提供了类似的操作,这些操作最终会转换为底层RDD的操作。
数据处理能力RDD适用于批处理任务,可以对大规模数据集进行处理和分析。DStream适用于实时流处理任务,可以对连续的数据流进行实时分析和处理。
底层实现DStream内部实际上是由一系列RDD组成的,每个RDD代表一段时间内的数据。DStream的操作最终会转换为RDD的操作,利用RDD的分布式计算能力。
扩展性RDD可以通过自定义操作进行扩展,支持更多的数据处理场景。DStream同样可以通过自定义操作和转换函数进行扩展,以满足特定的实时处理需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780560.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

04-JavaScript函数

函数(重点) 1.为什么使用函数? 用函数来解决代码重用的问题。 2.函数的意义 函数其实就是封装,把可以重复使用的代码放到函数中,如果需要多次使用同一段代码,就可以把封装成一个函数。这样的话,在你需…

Redis中处理处理没有ACK确认的Stream

系列文章目录 文章目录 系列文章目录前言前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 Stream是一个只能追加内容的数据类型。也就是说Stream这种数据类型,我们…

vue的创建、启动以及目录结构详解

vue的创建、启动以及目录结构详解目录 一. vue项目的创建 二. vue的目录结构 三. src的目录结构 四. vue项目的启动 4.1 方法1 4.2 方法2 一. vue项目的创建 创建一个工程化的Vue项目,执行命令:npm init vuelatest 注意:如果你在这个目…

pdf在浏览器上无法正常加载的问题

一、背景 觉得很有意思给大家分享一下。事情是这样的,开发给我反馈说,线上环境接口请求展示pdf异常,此时碰巧我前不久正好在ingress前加了一层nginx,恰逢此时内心五谷杂陈,思路第一时间便放在了改动项。捣鼓了好久无果…

动态链接dlopen/dlclose/..

dlopen,dlsym,dlclose可以在不去link shared library的前提下,在runtime时调用shared library里面的函数.这样可以实现shared library的覆盖或是省略编译阶段的链接检查.但dlopen/dlclose要谨慎使用,尤其是有些写的不是很好的shared library. 动态链接函…

搜索与图论——Prim算法求最小生成树

在最小生成树问题里&#xff0c;正边和负边都没问题 朴素版prim算法 时间复杂度O(n^2) 生成树&#xff1a;每一次选中的t点&#xff0c;它和集合的距离对应的那条边&#xff0c;就是生成树的一条边 算法流程和dijkstra算法非常相似 #include<iostream> #include<cs…

OKCC的API资源管理平台怎么用?

API资源管理平台&#xff0c;重点是“资源”管理平台&#xff0c;不是API接口管理平台。 天天讯通推出的API资源管理平台&#xff0c;类似昆石的VOS系统&#xff0c;区别是VOS是SIP资源管理系统&#xff0c;我们的API资源管理平台是API资源管理系统&#xff08;AXB、AX、回拨AP…

【御控物联】JavaScript JSON结构转换(7):数组To数组——键值互换属性重组

文章目录 一、JSON结构转换是什么&#xff1f;二、案例之《JSON数组 To JSON数组》三、代码实现四、在线转换工具五、技术资料 一、JSON结构转换是什么&#xff1f; JSON结构转换指的是将一个JSON对象或JSON数组按照一定规则进行重组、筛选、映射或转换&#xff0c;生成新的JS…

【Spring Cache】基于注解的缓存框架 简化redis代码

文章目录 一、介绍二、常用注解三、快速入门3.1 EnableCaching3.2 CachePut3.3 Cacheable3.4 CacheEvict 一、介绍 Spring Cache 是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单地加一个注解&#xff0c;就能实现缓存功能。 Spring Cache 提供了一层…

带你认识线程

线程的概念 前言&#xff1a; 一个程序运行起来&#xff0c;就会对应一个进程&#xff0c;例如&#xff0c;启动一个 Java 程序&#xff0c;就会创建一个 Java 进程。进程也被称为系统分配资源的基本单位。 一个进程可以包含一个线程&#xff0c;也可以包含多个线程&#xff…

政安晨:【Keras机器学习实践要点】(九)—— 保存、序列化和导出模型

目录 介绍 如何保存和加载模型 保存一个Keras模型 装回模型 设置 保存 例子&#xff1a; 自定义对象 向 load_model() 传递自定义对象 使用自定义对象范围 模型序列化 APIs 内存模型克隆 任意对象序列化和反序列化 保存模型权重 内存中的权重传递接口 无状态层…

新能源充电桩站场视频汇聚系统建设方案及技术特点分析

随着新能源汽车的普及&#xff0c;充电桩作为新能源汽车的基础设施&#xff0c;其安全性和可靠性越来越受到人们的关注。为了更好地保障充电桩的安全运行与站场管理&#xff0c;TSINGSEE青犀&触角云推出了一套新能源汽车充电桩视频汇聚管理与视频监控方案。 方案采用高清摄…

甲骨文护城河(MOAT)分析工具-用户指南

甲骨文护城河&#xff08;MOAT&#xff09;分析工具-用户指南 登录后&#xff0c;您可以通过显示的基于web的用户界面访问Moat Analytics Dashboard。 以下是如何通过UI使用护城河的指南的使用目录&#xff1a; 一、主屏幕导航 在面板中创建警报和导出 二、无效流量概述 什…

深入浅出MHA(MySQL Master High Availability)集群:原理、部署与实践

目录 引言 一、MHA集群介绍 &#xff08;一&#xff09;什么是MHA &#xff08;二&#xff09;MHA集群原理 &#xff08;三&#xff09;同步方式 &#xff08;四&#xff09;管理节点与数据节点 二、实现MHA &#xff08;一&#xff09;搭建主从复制环境 1.搭建时间同…

Github profile Readme实现小游戏[github自述游戏]

Github profile Readme常用于个人主页介绍&#xff0c;将它与action自动化流程结合&#xff0c;可以实现一些小游戏 例如&#xff1a;2048、五子棋 2048实现 losehu (RUBO) GitHub 五子棋 https://github.com/losehu/losehu/tree/main 通过python/C编写可执行文件&#xf…

智能网关BL102E采集西门子PLC S7-200 Smart数据上传至Thingsboard

1、WAN口采集西门子PLC的配置 WAN口可以添加很多设备,具体我们用西门子为例来简要配置。 双击WAN,弹出以太网设置,直接把自动获取IP打开,他会根据你的网段自动设置链接! (1)点击`WAN",点击鼠标右键,点击“添加",弹出设备配置框。 (2)设备名称任意填写,如…

《数据结构学习笔记---第七篇》---栈和队列的OJ练习

1. 括号匹配问题。OJ链接 step1:思路分析 &#xff1a; 1.括号匹配&#xff0c;我们首先考虑用栈实现&#xff0c;我们通过符号栈帧的思想知道&#xff0c;求前中后缀表达式的时候用的就是栈帧&#xff0c;操作数栈和符号栈。 2.根据常见的情况 考虑怎么使用栈&#xff0c;首先…

Bun安装与使用

Bun安装与使用。 它目前无法在windows上直接安装使用&#xff0c;必须通过虚拟机安装。 在win10虚拟机中安装 # 查看内核版本 $ uname -srm Linux 6.1.0-10-amd64 x86_64# 安装unzip解压工具 $ sudo apt install unzip# 下载安装脚本并开始安装 curl -fsSL https://bun.sh/ins…

SpringBoot使用Redis

1.Spring是如何集成Redis的&#xff1f; Spring Data Redis 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId&…

互联网轻量级框架整合之JavaEE基础

不得不解释得几个概念 JavaEE SUN公司提出来的企业版Java开发中间件&#xff0c;主要用于企业级互联网系统的框架搭建&#xff0c;同时因为Java语言优质的平台无关性、可移植性、健壮性、支持多线程和安全性等优势&#xff0c;其迅速成为构建企业互联网平台的主流技术&#x…