【无标题】spark编程

Value类型:

9) distinct

➢ 函数签名

def distinct()(implicit ord: Ordering[T] = null): RDD[T]

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明

将数据集中重复的数据去重

 

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
))
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)

 

10) coalesce

➢ 函数签名

def coalesce(numPartitions: Int, shuffle: Boolean = false,

partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 

(implicit ord: Ordering[T] = null)

: RDD[T]

➢ 函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)

 

11) repartition

➢ 函数签名

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)

 

12) sortBy

➢ 函数签名

def sortBy[K](

f: (T) => K,

ascending: Boolean = true,

numPartitions: Int = this.partitions.length) 

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

➢ 函数说明

该操作用于排序数据。在排序之前,可以将数据通过f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
val dataRDD2 = dataRDD.sortBy(num=>num, true, 4)

 

 

双Value类型:

13) intersection

➢ 函数签名

def intersection(other: RDD[T]): RDD[T]

➢ 函数说明

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

 

14) union

➢ 函数签名

def union(other: RDD[T]): RDD[T]

➢ 函数说明

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD(重复数据不会去重)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

 

15) subtract

➢ 函数签名

def subtract(other: RDD[T]): RDD[T]

➢ 函数说明

以源 RDD 元素为主,去除两个 RDD 中重复元素,将源RDD的其他元素保留下来。(求差集)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)

 

16) zip

➢ 函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

➢ 函数说明

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD

中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

val dataRDD1 = sparkContext.makeRDD(List("a","b","c","d"))
val dataRDD2 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD = dataRDD1.zip(dataRDD2)

flatMap

➢ 函数签名

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

➢ 函数说明

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。

val dataRDD = sparkContext.makeRDD(List(
 List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
 list => list
)

 

map和flatMap的区别:

 

map会将每一条输入数据映射为一个新对象。

 

flatMap包含两个操作:会将每一个输入对象输入映射为一个新集合,然后把这些新集合连成一个大集合。

partitionBy

➢ 函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

➢ 函数说明

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

val rdd: RDD[(Int, String)] =
 sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

val rdd2: RDD[(Int, String)] =
 rdd.partitionBy(new HashPartitioner(2))

函数说明

将数据源的数据根据 key 对 value 进行分组

val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

可以将数据按照相同的 Key 对 Value 进行聚合

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

将数据根据不同的规则进行分区内计算和分区间计算val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 =
 dataRDD1.aggregateByKey(0)(_+_,_+_)

val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

现有数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每个key的总值及每个key对应键值对的个数

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRDD: RDD[(String, (Int, Int))] = input.combineByKey(
 (_, 1), //a=>(a,1)
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //acc_1为数据源的value,acc_2为key出现的次数,二者进行分区内部的计算
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //将分区内部计算的结果进行分区间的汇总计算,得到每个key的总值以及每个key出现的次数
)

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/75719.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GPT-2 语言模型 - 模型训练

本节代码是一个完整的机器学习工作流程,用于训练一个基于GPT-2的语言模型。下面是对这段代码的详细解释: 文件目录如下 1. 初始化和数据准备 设置随机种子 random.seed(1002) 确保结果的可重复性。 定义参数 test_rate 0.2 context_length 128 tes…

架构师面试(二十九):TCP Socket 编程

问题 今天考察网络编程的基础知识。 在基于 TCP 协议的网络 【socket 编程】中可能会遇到很多异常,在下面的相关描述中说法正确的有哪几项呢? A. 在建立连接被拒绝时,有可能是因为网络不通或地址错误或 server 端对应端口未被监听&#x…

HTTP实现心跳模块

HTTP实现心跳模块 使用轻量级的cHTTP库cpp-httplib重现实现HTTP心跳模块 头文件HttplibHeartbeat.h #ifndef HTTPLIB_HEARTBEAT_H #define HTTPLIB_HEARTBEAT_H#include <string> #include <thread> #include <atomic> #include <chrono> #include …

openharmony—release—4.1开发环境搭建(踩坑记录)

环境开发需要分别在window以及ubuntu下进行相应设置 一、window 1.安装DevEco Device Tool OpenAtom OpenHarmony 二、ubuntu 1.将Ubuntu Shell环境修改为bash ls -l /bin/sh 2.打开终端工具&#xff0c;执行如下命令&#xff0c;输入密码&#xff0c;然后选择No&#xff0…

Go学习系列文章声明

本次学习是基于B站的视频&#xff0c;【Udemy高分热门付费课程】Golang&#xff1a;完整开发者指南&#xff08;基础知识和高级特性&#xff09;中英文字幕_哔哩哔哩_bilibili 本人会尝试输出视频中的内容&#xff0c;如有错误欢迎指出 next page: Go installation process

error: RPC failed; HTTP 408 curl 22 The requested URL returned error: 408

在git push时报错&#xff1a;error: RPC failed; HTTP 408 curl 22 The requested URL returned error: 408 原因&#xff1a;可能是推送的文件太大&#xff0c;要么是缓存不够&#xff0c;要么是网络不行。 解决方法&#xff1a; 将本地 http.postBuffer 数值调整到500MB&…

Android.bp中添加条件判断编译方式

背景&#xff1a; 马哥学员朋友以前在vip群里&#xff0c;有问道如何在Android.bp中添加条件判断&#xff0c;在工作中经常需要一套代码兼容发货目标版本&#xff0c;即代码都是公共的一套&#xff0c;但是需要用这一套代码集成到各个产品设备上 但是这个产品设备可能面临比…

swift ui基础

一个朴实无华的目录 今日学习内容&#xff1a;1.三种布局&#xff08;可以相互包裹&#xff09;1.1 vstack&#xff08;竖直&#xff09;&#xff1a;先写的在上面1.1 hstack&#xff08;水平&#xff09;&#xff1a;先写的在左边1.1 zstack&#xff08;前后&#xff09;&…

第16届蓝桥杯单片机模拟试题Ⅲ

试题 代码 sys.h #ifndef __SYS_H__ #define __SYS_H__#include <STC15F2K60S2.H> //sys.c extern unsigned char UI; //界面标志(0湿度界面、1参数界面、2时间界面) extern unsigned char time; //时间间隔(1s~10S) extern bit ssflag; //启动/停止标志…

Node.js中URL模块详解

Node.js 中 URL 模块全部 API 详解 1. URL 类 const { URL } require(url);// 1. 创建 URL 对象 const url new URL(https://www.example.com:8080/path?queryvalue#hash);// 2. URL 属性 console.log(协议:, url.protocol); // https: console.log(主机名:, url.hos…

Java接口性能优化面试问题集锦:高频考点与深度解析

1. 如何定位接口性能瓶颈&#xff1f;常用哪些工具&#xff1f; 考察点&#xff1a;性能分析工具的使用与问题定位能力。 核心答案&#xff1a; 工具&#xff1a;Arthas&#xff08;在线诊断&#xff09;、JProfiler&#xff08;内存与CPU分析&#xff09;、VisualVM、Prometh…

WheatA小麦芽:农业气象大数据下载器

今天为大家介绍的软件是WheatA小麦芽&#xff1a;专业纯净的农业气象大数据系统。下面&#xff0c;我们将从软件的主要功能、支持的系统、软件官网等方面对其进行简单的介绍。 主要内容来源于软件官网&#xff1a;WheatA小麦芽的官方网站是http://www.wheata.cn/ &#xff0c;…

Python10天突击--Day 2: 实现观察者模式

以下是 Python 实现观察者模式的完整方案&#xff0c;包含同步/异步支持、类型注解、线程安全等特性&#xff1a; 1. 经典观察者模式实现 from abc import ABC, abstractmethod from typing import List, Anyclass Observer(ABC):"""观察者抽象基类""…

CST1019.基于Spring Boot+Vue智能洗车管理系统

计算机/JAVA毕业设计 【CST1019.基于Spring BootVue智能洗车管理系统】 【项目介绍】 智能洗车管理系统&#xff0c;基于 Spring Boot Vue 实现&#xff0c;功能丰富、界面精美 【业务模块】 系统共有三类用户&#xff0c;分别是&#xff1a;管理员用户、普通用户、工人用户&…

Windows上使用Qt搭建ARM开发环境

在 Windows 上使用 Qt 和 g++-arm-linux-gnueabihf 进行 ARM Linux 交叉编译(例如针对树莓派或嵌入式设备),需要配置 交叉编译工具链 和 Qt for ARM Linux。以下是详细步骤: 1. 安装工具链 方法 1:使用 MSYS2(推荐) MSYS2 提供 mingw-w64 的 ARM Linux 交叉编译工具链…

Python爬虫教程011:scrapy爬取当当网数据开启多条管道下载及下载多页数据

文章目录 3.6.4 开启多条管道下载3.6.5 下载多页数据3.6.6 完整项目下载3.6.4 开启多条管道下载 在pipelines.py中新建管道类(用来下载图书封面图片): # 多条管道开启 # 要在settings.py中开启管道 class DangdangDownloadPipeline:def process_item(self, item, spider):…

Mysql -- 基础

SQL SQL通用语法&#xff1a; SQL分类&#xff1a; DDL: 数据库操作 查询&#xff1a; SHOW DATABASES&#xff1b; 创建&#xff1a; CREATE DATABASE[IF NOT EXISTS] 数据库名 [DEFAULT CHARSET字符集] [COLLATE 排序规则]&#xff1b; 删除&#xff1a; DROP DATABA…

实操(环境变量)Linux

环境变量概念 我们用语言写的文件编好后变成了程序&#xff0c;./ 运行的时候他就会变成一个进程被操作系统调度并运行&#xff0c;运行完毕进程相关资源被释放&#xff0c;因为它是一个bash的子进程&#xff0c;所以它退出之后进入僵尸状态&#xff0c;bash回收他的退出结果&…

torch.cat和torch.stack的区别

torch.cat 和 torch.stack 是 PyTorch 中用于组合张量的两个常用函数&#xff0c;它们的核心区别在于输入张量的维度和输出张量的维度变化。以下是详细对比&#xff1a; 1. torch.cat (Concatenate) 作用&#xff1a;沿现有维度拼接多个张量&#xff0c;不创建新维度 输入要求…

深入解析@Validated注解:Spring 验证机制的核心工具

一、注解出处与核心定位 1. 注解来源 • 所属框架&#xff1a;Validated 是 Spring Framework 提供的注解&#xff08;org.springframework.validation.annotation 包下&#xff09;。 • 核心定位&#xff1a; 作为 Spring 对 JSR-380&#xff08;Bean Validation 2.0&#…