Spark-Scala语言实战(7)

在之前的文章中,我们学习了如何在IDEA中导入jars包,并做了一道例题,了解了RDD。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(6)-CSDN博客文章浏览阅读695次,点赞15次,收藏24次。今天我会给大家带来如何在IDEA中导入jars包,以及使用SparkRDD,并正确使用它们同时也会给大家讲解一道实训题例。希望在本篇文章中,大家有所收获。也欢迎朋友们到评论区下一起交流学习,共同进步。https://blog.csdn.net/qq_49513817/article/details/137121524?spm=1001.2014.3001.5502

今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的map,sortby,collect三种方法。

目录

一、知识回顾

二、RDD方法

1.map

2.sortby

3.collect

拓展-RDD和DStream

1.RDD和DStream的区别

2.RDD和DStream的联系


一、知识回顾

导入jars包的过程在上一篇文章中以及讲解的很清楚了,图文一步一步带着做。

主要就是进入Libraries 添加java,然后选择spark的jars文件夹即可

如果还有不懂的朋友可以直接评论问我。

在就是文件的这几行代码

import org.apache.spark.{SparkConf, SparkContext}val conf=new SparkConf().setMaster("local").setAppName("123456")val sc=new SparkContext(conf)

这是配置与方法,记住它们的作用。

现在,开始今天的学习吧

二、RDD方法

1.map

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD
  • map()方法是转换操作,不会立即进行计算。
  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

例:

import org.apache.spark.{SparkConf, SparkContext}  // 定义一个名为p1的Scala对象  
object p1 {  // 定义main方法,作为程序的入口点  def main(args: Array[String]): Unit = {  // 创建一个Spark配置对象,并设置运行模式为"local"(本地模式),应用程序名称为"p2"  val conf = new SparkConf().setMaster("local").setAppName("p2")  // 使用Spark配置对象创建一个SparkContext对象,SparkContext是Spark功能的入口点  val sc = new SparkContext(conf)  // 创建一个包含整数的列表,并使用parallelize方法将其转换为RDD  val ppp = sc.parallelize(List(1, 2, 3, 4, 5))  // 使用map操作将RDD中的每个元素乘以2,并返回一个新的RDD  val ppppp = ppp.map(x => x * 2)  //oreach方法遍历并打印每个元素  ppppp.collect().foreach(println)  }  
}

可以看到我们输出的在原列表上*2,达到了代码预期效果

2.sortby

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
  • 1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  • 2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false
  • 3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

例:

import org.apache.spark.{SparkConf, SparkContext}  object p1 {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local").setAppName("p2")  // 使用配置好的conf对象创建一个SparkContext对象sc。   val sc = new SparkContext(conf)  // 使用SparkContext的parallelize方法将包含整数的序列转换成一个RDD。  // 这个RDD现在可以在Spark上并行处理。  val ppp = sc.parallelize(Seq(5, 1, 9, 3, 7))  // 对ppp RDD中的元素进行排序。  // 使用sortBy方法,并传递一个函数x => x作为参数,表示按照元素本身的值进行排序(升序)。  val pppp = ppp.sortBy(x => x)   // 这将返回一个包含RDD所有元素的数组,存储在ppppp中。  val ppppp = pppp.collect()  // 使用foreach方法遍历数组ppppp中的每个元素,并使用println函数打印它们。  ppppp.foreach(println)  }  
}

看下输出可以看到我们的元素已经排序了

3.collect

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

例:

import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val pp = sc.parallelize(Seq(1, 2, 3, 4, 5))val ppp = pp.collect()ppp.foreach(println)}
}

collect的作用是将RDD中的数据收集到驱动程序中,所以这里运行看不出区别。

拓展-RDD和DStream

在上一篇文章中,我们了解到了RDD,那么DStream是什么呢,我们先来了解一下:

DStream(离散流)是Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream的内部实际上是一系列持续不断产生的RDD,每个RDD包含特定时间间隔的数据。DStream的创建可以通过输入数据源如Kafka、Flume,或者通过对其他DStream应用高阶函数如map、reduce、join、window来实现。

1.RDD和DStream的区别

RDDDStream
定义弹性分布式数据集,是Spark中最基本的数据处理模型。离散流,是Spark Streaming提供的一种高级抽象,代表一个持续不断的数据流。
数据结构静态的、不可变的数据集,可以划分为多个分区。动态的、连续的数据流,内部由一系列RDD组成。
数据处理方式批处理,适用于静态数据的处理和分析。流处理,适用于实时数据流的处理和分析。
时间维度无特定的时间维度,主要关注数据的分区和处理。具有时间维度,每个RDD代表一段时间内的数据。
操作方式对整个RDD进行操作,结果生成新的RDD。对DStream进行操作,结果生成新的DStream,底层转换为RDD操作。
应用场景大规模数据的批处理任务,如机器学习、数据挖掘等。实时数据流处理任务,如日志分析、实时监控等。
容错性具有容错性,数据丢失可以自动恢复。继承了RDD的容错性特点。
与Spark的关系Spark的核心组件,用于构建各种数据处理和分析任务。Spark Streaming的核心组件,用于处理实时数据流。

2.RDD和DStream的联系

RDDDStream
基础构建单元RDD是Spark的基本数据处理单元。DStream基于RDD构建,每个时间间隔内的数据对应一个RDD。
计算模型RDD支持分布式计算模型,数据被划分为多个分区进行并行处理。DStream继承了RDD的计算模型,对流数据进行分布式处理。
容错性RDD具有容错性,可以自动恢复丢失的数据。DStream同样具有容错性,因为它基于RDD构建。
操作方式RDD提供了一系列转换操作(如map、reduce)和动作操作(如collect、save)。DStream也提供了类似的操作,这些操作最终会转换为底层RDD的操作。
数据处理能力RDD适用于批处理任务,可以对大规模数据集进行处理和分析。DStream适用于实时流处理任务,可以对连续的数据流进行实时分析和处理。
底层实现DStream内部实际上是由一系列RDD组成的,每个RDD代表一段时间内的数据。DStream的操作最终会转换为RDD的操作,利用RDD的分布式计算能力。
扩展性RDD可以通过自定义操作进行扩展,支持更多的数据处理场景。DStream同样可以通过自定义操作和转换函数进行扩展,以满足特定的实时处理需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780560.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 操作系统 022-串口/U盘/共享文件夹

Linux 操作系统 022-串口/U盘/共享文件夹 本节关键字:Linux、centos、串口、U盘、共享文件夹 本节相关指令:echo、cat、mkdir、mount 1、串口 #(1) 查看串口是否可用,可以对串口发送数据比如: $ echo helloworld >/dev/ttyS…

167-两数之和II

题目 给你一个下标从 1 开始的整数数组 numbers &#xff0c;该数组已按 非递减顺序排列 &#xff0c;请你从数组中找出满足相加之和等于目标数 target 的两个数。如果设这两个数分别是 numbers[index1] 和 numbers[index2] &#xff0c;则 1 < index1 < index2 < num…

04-JavaScript函数

函数&#xff08;重点&#xff09; 1.为什么使用函数? 用函数来解决代码重用的问题。 2.函数的意义 函数其实就是封装&#xff0c;把可以重复使用的代码放到函数中&#xff0c;如果需要多次使用同一段代码&#xff0c;就可以把封装成一个函数。这样的话&#xff0c;在你需…

Redis中处理处理没有ACK确认的Stream

系列文章目录 文章目录 系列文章目录前言前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 Stream是一个只能追加内容的数据类型。也就是说Stream这种数据类型,我们…

vue的创建、启动以及目录结构详解

vue的创建、启动以及目录结构详解目录 一. vue项目的创建 二. vue的目录结构 三. src的目录结构 四. vue项目的启动 4.1 方法1 4.2 方法2 一. vue项目的创建 创建一个工程化的Vue项目&#xff0c;执行命令&#xff1a;npm init vuelatest 注意&#xff1a;如果你在这个目…

pdf在浏览器上无法正常加载的问题

一、背景 觉得很有意思给大家分享一下。事情是这样的&#xff0c;开发给我反馈说&#xff0c;线上环境接口请求展示pdf异常&#xff0c;此时碰巧我前不久正好在ingress前加了一层nginx&#xff0c;恰逢此时内心五谷杂陈&#xff0c;思路第一时间便放在了改动项。捣鼓了好久无果…

银河麒麟、统信UOS、Centos、欧拉以及红帽系linux服务器版本安装Mysql

查看当前目录home/xiaolin 创建mysql文件夹&#xff1a;mkidr mysql 检查系统自带的mysql安装包&#xff1a;rpm -qa | grep mariadb或者rpm -qa | grep mysql 请卸载通过 rpm -e --nodeps mariadb-libs-5.5.56-2.el7.x86_64命令装卸 mariadb 进入mysql文件夹&#xff1a;cd m…

Android 多层级列表实现

方法一&#xff1a; Element.java &#xff1a; package com.chy.ydy.tools.treeutil; /*** TreeView 元素* */ public class Element {/** 文字内容 */private String contentText;/** 在tree中的层级 */private int level;/** 元素的id */private int id;/** 父元素的id */…

动态链接dlopen/dlclose/..

dlopen&#xff0c;dlsym,dlclose可以在不去link shared library的前提下&#xff0c;在runtime时调用shared library里面的函数.这样可以实现shared library的覆盖或是省略编译阶段的链接检查.但dlopen/dlclose要谨慎使用,尤其是有些写的不是很好的shared library. 动态链接函…

搜索与图论——Prim算法求最小生成树

在最小生成树问题里&#xff0c;正边和负边都没问题 朴素版prim算法 时间复杂度O(n^2) 生成树&#xff1a;每一次选中的t点&#xff0c;它和集合的距离对应的那条边&#xff0c;就是生成树的一条边 算法流程和dijkstra算法非常相似 #include<iostream> #include<cs…

OKCC的API资源管理平台怎么用?

API资源管理平台&#xff0c;重点是“资源”管理平台&#xff0c;不是API接口管理平台。 天天讯通推出的API资源管理平台&#xff0c;类似昆石的VOS系统&#xff0c;区别是VOS是SIP资源管理系统&#xff0c;我们的API资源管理平台是API资源管理系统&#xff08;AXB、AX、回拨AP…

【御控物联】JavaScript JSON结构转换(7):数组To数组——键值互换属性重组

文章目录 一、JSON结构转换是什么&#xff1f;二、案例之《JSON数组 To JSON数组》三、代码实现四、在线转换工具五、技术资料 一、JSON结构转换是什么&#xff1f; JSON结构转换指的是将一个JSON对象或JSON数组按照一定规则进行重组、筛选、映射或转换&#xff0c;生成新的JS…

【Spring Cache】基于注解的缓存框架 简化redis代码

文章目录 一、介绍二、常用注解三、快速入门3.1 EnableCaching3.2 CachePut3.3 Cacheable3.4 CacheEvict 一、介绍 Spring Cache 是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单地加一个注解&#xff0c;就能实现缓存功能。 Spring Cache 提供了一层…

带你认识线程

线程的概念 前言&#xff1a; 一个程序运行起来&#xff0c;就会对应一个进程&#xff0c;例如&#xff0c;启动一个 Java 程序&#xff0c;就会创建一个 Java 进程。进程也被称为系统分配资源的基本单位。 一个进程可以包含一个线程&#xff0c;也可以包含多个线程&#xff…

政安晨:【Keras机器学习实践要点】(九)—— 保存、序列化和导出模型

目录 介绍 如何保存和加载模型 保存一个Keras模型 装回模型 设置 保存 例子&#xff1a; 自定义对象 向 load_model() 传递自定义对象 使用自定义对象范围 模型序列化 APIs 内存模型克隆 任意对象序列化和反序列化 保存模型权重 内存中的权重传递接口 无状态层…

MYSQL中update的low_priority

low_priority&#xff0c;低优先级 UPDATE [LOW_PRIORITY] tbl_name SET col_name1expr1,col_name2expr2,... mysql中update用low_priority让update不锁定表 MySQL允许你改变语句调度的优先级&#xff0c;它可以使来自多个客户端的查询更好地协作&#xff0c;这样单个客户端就…

蓝桥杯算法基础(32):素数,埃式筛法,快速幂,斐波那契与矩阵幂运算

素数 有些人认为一个人一生中有三个周期&#xff0c;从他或她出生的那一天开始。 这三个周期是身体周期&#xff0c;情感周期的和智力的周期&#xff0c;他们有周期的长度为23&#xff0c;28&#xff0c; 和33天。每一个周期都有一个高峰。在一个周期的高峰期&#xff0c; 一个…

新能源充电桩站场视频汇聚系统建设方案及技术特点分析

随着新能源汽车的普及&#xff0c;充电桩作为新能源汽车的基础设施&#xff0c;其安全性和可靠性越来越受到人们的关注。为了更好地保障充电桩的安全运行与站场管理&#xff0c;TSINGSEE青犀&触角云推出了一套新能源汽车充电桩视频汇聚管理与视频监控方案。 方案采用高清摄…

Springboot之RESTful风格

概述 Restful风格与传统的有一些不同&#xff0c;传统的资源请求中只有Get以及Post两种方式来传递参数&#xff0c;而Restful风格将资源请求按照CRUD增删改查这基本的数据操作分成了四个基本传递方式。其中&#xff0c;Put 和Delete是从Post中分离出来的&#xff0c;可以浅显的…

2024年03月CCF-GESP编程能力等级认证C++编程七级真题解析

本文收录于专栏《C++等级认证CCF-GESP真题解析》,专栏总目录:点这里。订阅后可阅读专栏内所有文章。 一、单选题(每题 2 分,共 30 分) 第 1 题 下列关于排序的说法,正确的是( )。 A. 冒泡排序是最快的排序算法之一。 B. 快速排序通常是不稳定的。 C. 最差情况,N 个元素…