Spark 【RDD编程(一)RDD编程基础】

RDD

简介

        在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可以缓存在内存中,可在多次计算中重用。
        RDD是由一系列的记录(或元素)组成的,这些记录可以分散存储在集群的多个节点上,每个节点上的数据可以被并行处理。RDD提供了一系列的操作函数,例如map、reduce、filter等,可以对数据进行转换和计算。RDD的特点是具有容错性和弹性,即使在节点故障的情况下,也能自动恢复数据和计算过程。 

RDD编程基础

1、RDD 创建

Spark 通过 textFile() 从文件系统(本地系统、HDFS、集合)中加载数据来创建RDD。

1.1、从文件系统中加载数据创建 RDD

import org.apache.spark.{SparkConf, SparkContext}object CreateRddByFileScala {def main(args: Array[String]): Unit = {//创建SparkContext对象val conf = new SparkConf()conf.setAppName("CreateRddByFileScala").setMaster("local")val sc = new SparkContext(conf)//windowsval path = "D:\\test\\data"//linux
//    val path = "file:///usr/local/test/data/"//读取文件数据,可以在textFile中生成的RDD分区数量val rdd = sc.textFile(path,2)//获取每一行数据的长度,计算文件内数据的总长度val length = rdd.map(_.length).reduce(_+_)println(length)//关闭SparkContextsc.stop()}}

1.2、从HDFS中加载数据

只需要修改路径如下:

    val path = "hadoop101:9000/test/"//读取文件数据,可以在textFile中生成的RDD分区数量val rdd = sc.textFile(path,2)

1.3、通过并行集合(数组)创建RDD

调用 SparkContext 的 parallelize() 方法,通过一个已经存在的集合(数组)来创建RDD。

//创建SparkContextval conf = new SparkConf()conf.setAppName("CreateRddByArrayScala").setMaster("local")  //local表示在本地执行val sc = new SparkContext(conf)//创建集合val arr = Array(1,2,3,4,5)//基于集合创建RDDval rdd = sc.parallelize(arr)

2、RDD 操作

        RDD 的操作包括两种类型:转换操作和行动操作。其中,转换操作主要有map()、filter()、groupBy()、join()等,对RDD而言,每次转换都会产生一个新的RDD,供下一次操作使用。而行动操作(如count()、collect()等)返回的一般都是一个值。

2.1、转换操作

        RDD 的真个转换过程是采用惰性机制的,也就是说,整个转换过程只记录了转换的轨迹,并不会真正的运算,只有遇到行动操作才会触发从头到尾的真正计算。

1、filter(f: String => Boolean)

用法和Scala中的filter一致。

输入文档:

Hadoop is good
Spark is better
Spark is fast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDAction {def main(args: Array[String]): Unit = {// 创建 SparkContext 对象val conf = new SparkConf()conf.setAppName("filter-test").setMaster("local")val sc = new SparkContext(conf)// 通过加载数据创建RDD对象val rdd: RDD[String] = sc.textFile("data/word.txt")//filter 的参数是一个匿名函数 要求返回一个Boolean 类型的值 true-留下 false-过滤val lineWithSpark: RDD[String] = rdd.filter(line => {line.contains("Spark")})lineWithSpark.foreach(println)// 关闭sc对象sc.stop()}
}

 运行结果:
 

Spark is better
Spark is fast

2、map()

同样和Scala中的map()用法一致。

//省略创建AparkContext对象的代码...// 使用并行集合创建 RDDval arr = Array(1,2,3,4,5)val rdd1: RDD[Int] = sc.parallelize(arr)//转换操作val rdd2 = rdd1.map(num => num*2)rdd2.foreach(println)

运行结果:

2
4
6
8
10
//使用本地文件作为数据加载创建RDD 对象val rdd1: RDD[String] = sc.textFile("data/word.txt")val rdd2: RDD[Array[String]] = rdd1.map(line => {line.split(" ")})

解析:

输入:

Hadoop is good 
Spark is better 
Spark is fast 

Spark 读取进来后,就变成了 RDD("Hadoop is good","Spark is better","Spark is fast"),我们知道,Scala中要进行扁平化操作的话,对象必须是一个多维数组,所以我们要通过 map() 对读取进来的格式进行处理,处理后的格式:RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))

RDD("Hadoop is good","Spark is better","Spark is fast") => RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))

3、flatMap()

和Scala中用法基本一样。

//使用本地文件作为数据加载创建RDD 对象val rdd1: RDD[String] = sc.textFile("data/word.txt")val rdd2: RDD[String] = rdd1.flatMap(line => line.split(" "))

flatMap 的过程:

RDD("Hadoop is good","Spark is better","Spark is fast")
先进行 map() => 
RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
在进行 flatten =>
RDD("Hadoop","is",good","Spark","is","better","Spark","is","fast"))

扁平化后我们的数据又变为了一维集合的数据结构(RDD)了。

4、groupByKey()

        这个函数十分重要,上面我们得到了关于每次单词的一个RDD集合,现在我们要进行wordcount 的话肯定还需要对相同的键进行一个分类,这样会生成一个RDD集合(key:String,valut_list:Interable[Int])。

我们同样基于上面的结果进行操作:

val rdd3: RDD[(String, Int)] = rdd2.map(word => {(word, 1)})//RDD(("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("better",1),("Spark",1),("is",1),("fast",1)))val rdd4: RDD[(String, Iterable[Int])] = rdd3.groupByKey()//RDD(("Hadoop",1),("is",1,1,1),("good",1),("Spark",1,1),("better",1),("fast",1)))

5、reduceByKey()

需要注意的是,reduceByKey是对(key:String,value:Int)这种相同键值对元素的合并,而不是对上面groupByKey()的结果(key:String,value_list:Interable[Int])进行操作,这个粗心让我找了半天。

//rdd5和6效果都一样val rdd5: RDD[(String,Int)] = rdd4.map(t => {(t._1, t._2.size)})//RDD(("Hadoop",1),("is",3),("good",1),("Spark",2),("better",1),("fast",1)))//    rdd3.reduceByKey((v1,v2)=>v1+v2)  //v1 v2代表发现key相同的键值对的值 参数按照顺序在函数体中只出现了一次 那么可以用下划线代替val rdd6: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)//RDD(("Hadoop",1),("is",3),("good",1),("Spark",2),("better",1),("fast",1)))//打印结果rdd6.foreach(println)

运行结果:
 

(Spark,2)
(is,3)
(fast,1)
(good,1)
(better,1)
(Hadoop,1)Process finished with exit code 0

总结

剩下的RDD转换操作下午再新开一篇,以及RDD的行动操作篇、持久化、分区和综合实例后续更新。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/64926.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TiDB x 安能物流丨打造一栈式物流数据平台

作者:李家林 安能物流数据库团队负责人 本文以安能物流作为案例,探讨了在数字化转型中,企业如何利用 TiDB 分布式数据库来应对复杂的业务需求和挑战。 安能物流作为中国领先的综合型物流集团,需要应对大规模的业务流程&#xff…

JVM解密: 解构类加载与GC垃圾回收机制

文章目录 一. JVM内存划分二. 类加载机制1. 类加载过程2. 双亲委派模型 三. GC垃圾回收机制1. 找到需要回收的内存1.1 哪些内存需要回收?1.2 基于引用计数找垃圾(Java不采取该方案)1.3 基于可达性分析找垃圾(Java采取方案) 2. 垃圾回收算法2.1 标记-清除算法2.2 标记…

Hugging Face--Transformers

pipeline 在这里插入图片描述 AutoClass AutoClass 是一个能够通过预训练模型的名称或路径自动查找其架构的快捷方式. 你只需要为你的任务选择合适的 AutoClass 和它关联的预处理类。 AutoTokenizer AutoModel 保存模型 自定义模型构建 Trainer - PyTorch优化训练循环 参考资…

net start MongoDB 启动MongoDB服务时, 出现没有响应控制功能的解决方案

问题描述 管理员权限打开cmd后,输入net start MongoDB启动MongoDB服务,显示服务没有响应控制功能 检查 1、系统环境变量PATH中,MongoDB的bin文件夹路径是否正确 2、打开注册表,在HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\…

SpringCloud(十)——ElasticSearch简单了解(三)数据聚合和自动补全

文章目录 1. 数据聚合1.1 聚合介绍1.2 Bucket 聚合1.3 Metrics 聚合1.4 使用 RestClient 进行聚合 2. 自动补全2.1 安装补全包2.2 自定义分词器2.3 自动补全查询2.4 拼音自动补全查询2.5 RestClient 实现自动补全2.5.1 建立索引2.5.2 修改数据定义2.5.3 补全查询2.5.4 解析结果…

鸿鹄企业工程项目管理系统 Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统源代码

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展,企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性,公司对内部工程管…

如何在 iPhone 上检索已删除的短信

我厌倦了垃圾短信。当我例行公事地删除 iPhone 上的这些不需要的消息时,当我分散注意力时,我通过点击错误的按钮清除了所有消息。这些被删除的消息中包含两条团购验证信息。有什么办法可以从 iPhone 检索我的消息吗? 有时我们可能会不小心删…

jupyter常用的方法以及快捷键

选中状态 蓝色 按enter 进入编辑状态 编辑状态 绿色 按Esc 进入选中状态 Code模式运行是运行代码 Markdown模式运行是进入预览状态 - - - 是文本格式的一种精简的语法形式 Raw NBConvert 是默认文本状态 - - - 输入什么样 展示什么样 Y - - - 切换code模式 M - - - 切换Markdo…

9、监测数据采集物联网应用开发步骤(7)

源码将于最后一遍文章给出下载 监测数据采集物联网应用开发步骤(6) 串口(COM)通讯开发 本章节测试使用了 Configure Virtual Serial Port Driver虚拟串口工具和本人自写的串口调试工具,请自行baidu下载对应工具 在com.zxy.common.Com_Para.py中添加如下内容 #RS…

HOperatorSet.Connection 有内存泄漏或缓存

开发环境 Win7 VS2002 halcon12, 直接运行Debug的exe 宽高5000,单格1*1的棋盘占用内存 手动释放region regionConnect private void butTemp_Click(object sender, EventArgs e) { butTemp.Enabled false; HOperatorS…

[CISCN 2019初赛]Love Math

文章目录 前言考点解题过程 前言 感慨自己实力不够,心浮气躁根本做不来难题。难得这题对我还很有吸引力,也涉及很多知识。只能说我是受益匪浅,总的来说加油吧ctfer。 考点 利用php动态函数的特性利用php中的数学函数实现命令执行利用php7的特…

音频——I2S 标准模式(二)

I2S 基本概念飞利浦(I2S)标准模式左(MSB)对齐标准模式右(LSB)对齐标准模式DSP 模式TDM 模式 文章目录 I2S format时序图逻辑分析仪抓包 I2S format 飞利浦 (I2S) 标准模式 数据在跟随 LRCLK 传输的 BCLK 的第二个上升沿时传输 MSB,其他位一直到 LSB 按顺序传传输依…

Linux(实操篇三)

Linux实操篇 Linux(实操篇三)1. 常用基本命令1.7 搜索查找类1.7.1 find查找文件或目录1.7.2 locate快速定位文件路径1.7.3 grep过滤查找及"|"管道符 1.8 压缩和解压类1.8.1 gzip/gunzip压缩1.8.2 zip/unzip压缩1.8.3 tar打包 1.9 磁盘查看和分区类1.9.1 du查看文件和…

【C#】泛型

【C#】泛型 泛型是什么 泛型是将类型作为参数传递给类、结构、接口和方法,这些参数相当于类型占位符。当我们定义类或方法时使用占位符代替变量类型,真正使用时再具体指定数据类型,以此来达到代码重用目的。 泛型特点 提高代码重用性一定…

高阶MySQL语句

数据准备 create table ky30 (id int,name varchar(10) primary key not null ,score decimal(5,2),address varchar(20),hobbid int(5)); insert into ky30 values(1,liuyi,80,beijing,2); insert into ky30 values(2,wangwu,90,shengzheng,2); insert into ky30 values(3,lis…

3D步进式漫游能够在哪些行业应用?

VR技术一直以来都是宣传展示领域中的热门话题,在VR全景技术的不断发展下,3D步进式漫游技术也逐渐覆盖各行各业,特别是在建筑、房产、博物馆、企业等领域应用更加广泛,用户通过这种技术能够获得更加直观、生动、详细的展示体验&…

【大数据】Apache Iceberg 概述和源代码的构建

Apache Iceberg 概述和源代码的构建 1.数据湖的解决方案 - Iceberg1.1 Iceberg 是什么1.2 Iceberg 的 Table Format 介绍1.3 Iceberg 的核心思想1.4 Iceberg 的元数据管理1.5 Iceberg 的重要特性1.5.1 丰富的计算引擎1.5.2 灵活的文件组织形式1.5.3 优化数据入湖流程1.5.4 增量…

零基础学Python:元组(Tuple)详细教程

前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 Python的元组与列表类似, 不同之处在于元组的元素不能修改, 元组使用小括号,列表使用方括号, 元组创建很简单,只需要在括号中添加元素,并使用逗号隔开即可 👇 👇 👇 更…

数据结构--树4.2.1(二叉树)

目录 一、二叉树的存储结构 二、二叉树的遍历 一、二叉树的存储结构 顺序存储结构:二叉树的顺序存储结构就是用一维数组存储二叉树中的各个结点,并且结点的存储位置能体现结点之间的逻辑关系。 链式存储结构:二叉树每个结点最多只有两个孩…

渗透测试漏洞原理之---【任意文件上传漏洞】

文章目录 1、任意文件上传概述1.1、漏洞成因1.2、漏洞危害 2、WebShell解析2.1、Shell2.2、WebShell2.2.1、大马2.2.2、小马2.2.3、GetShell 3、任意文件上传攻防3.1、毫无检测3.1.1、源代码3.1.2、代码审计3.1.3、靶场试炼 3.2、黑白名单策略3.2.1、文件检测3.2.2、后缀名黑名…