【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)

之前学过的kv类型上面的算子

groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的

mapValues keys values flatMapValues 普通算子,管道形式的算子

shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。

shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。

重新排版打乱重分是需要存在规则的。

中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。

groupBy groupBykey reduceBykey 自带的分区器HashPartitioner。

sortby sortBykey rangePartitioner

hashPartitioner

规则 按照key的hashCode %下游分区 = 分区编号

处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。

保证取余的结果为正向结果。

hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。

演示结果:

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = Nonescala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))

演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。

并且hash分区器的规则致使我们可以任意的修改下游的分区数量。

scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26scala> res91.partitions.size
res92: Int = 100scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26scala> res93.partitions.size
res94: Int = 2

rangePartitioner

hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。

range分区规则中存在两个方法。

rangeBounds界限,在使用这个分区器之前先做一个界限划定。

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。

然后在使用getPartitions。

首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。

sortBykey是转换类的算子,不会触发计算。

但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。

scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))

range分区器,它是先做抽样然后指定下游分区的数据界限。

它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。

scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26scala> res111.partitions.size
res114: Int = 3scala> res112.part

自定义分区器

工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同

# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack

分区器的定义需要实现分区器的接口

class MyPartitioner extends Partitioner{override def numPartitions: Int = ???
// 设定下游存在几个分区override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}

整体代码:

package com.hainiu.sparkimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Test1 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("parse")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)val rdd1 = rdd.partitionBy(new MyPartitioner)val fs = FileSystem.get(new Configuration())val out = "data/res"if(fs.exists(new Path(out)))fs.delete(new Path(out),true)rdd1.saveAsTextFile(out)}
}
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {if(key.toString.equals("hello"))0else1}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络基础全攻略:探秘网络构建块(1/10)

一、计算机网络基础概念 计算机网络是指将地理位置不同的具有独立功能的多台计算机及其外部设备&#xff0c;通过通信线路和通信设备连接起来&#xff0c;在网络操作系统&#xff0c;网络管理软件及网络通信协议的管理和协调下&#xff0c;实现资源共享和信息传递的计算机系统…

游戏陪玩系统开发功能需求分析

电竞游戏陪玩系统是一种专门为游戏玩家提供陪伴、指导和互动服务的平台。这类系统通常通过专业的陪玩师&#xff08;也称为陪练师&#xff09;为玩家提供一对一或多对一的游戏陪伴服务&#xff0c;帮助玩家提升游戏技能、享受游戏乐趣&#xff0c;甚至解决游戏中的各种问题。电…

关于SpringBoot集成Kafka

关于Kafka Apache Kafka 是一个分布式流处理平台&#xff0c;广泛用于构建实时数据管道和流应用。它能够处理大量的数据流&#xff0c;具有高吞吐量、可持久化存储、容错性和扩展性等特性。 Kafka一般用作实时数据流处理、消息队列、事件架构驱动等 Kafka的整体架构 ZooKeeper:…

Linux 下的IO模型

一&#xff1a;四种IO模 1.1&#xff1a;阻塞式IO&#xff08;最简单&#xff0c;最常用&#xff0c;效率最低&#xff09; 阻塞I/O 模式是最普遍使用的I/O 模式&#xff0c;大部分程序使用的都是阻塞模式的I/O 。 缺省情况下&#xff08;及系统默认状态&#xff09;&#xf…

vue3项目部署在阿里云轻量应用服务器上

文章目录 概要整体部署流程技术细节小结 概要 vue3前端项目部署在阿里云轻量服务器 整体部署流程 首先有一个Vue3前端项目和阿里云应用服务器 确保环境准备 如果是新的服务器&#xff0c;在服务器内运行以下命令更新软件包 sudo apt update && sudo apt upgrade -y …

tcpdump交叉编译

TCPDUMP在Libpcap上开发。 首先需要编译libcap。 网上那么多教程&#xff0c;下载地址都只给了一个英文的官网首页&#xff0c; 你尽可以试试&#xff0c;从里面找到下载地址都要费半天时间。 \color{red}网上那么多教程&#xff0c;下载地址都只给了一个英文的官网首页&#…

KubeSphere 最佳实战:K8s 构建高可用、高性能 Redis 集群实战指南

首发&#xff1a;运维有术。 本指南将逐步引导您完成以下关键任务&#xff1a; 安装 Redis&#xff1a;使用 StatefulSet 部署 Redis。自动或手动配置 Redis 集群&#xff1a;使用命令行工具初始化 Redis 集群。Redis 性能测试&#xff1a;使用 Redis 自带的 Benchmark 工具进…

02 python基础 python解释器安装

首先在网站&#xff1a;Welcome to Python.org进行下载安装python 最新的解释器不一定是最好的&#xff0c;最稳定的才一定是最好的&#xff1b;要关注解释器最后维护 的时间。 一、python的安装 python安装的时候一定要在下载勾选好添加path环境 安装的时候尽量选择好自己的安…

java编程开发基础,正则表达式的使用案例Demo

java编程开发基础,正则表达式的使用案例Demo!实际开发中&#xff0c;经常遇到一些字符串&#xff0c;信息的裁剪和提取操作&#xff0c;正则表达式是经常使用的&#xff0c;下面的案例&#xff0c;可以帮助大家快速的了解和熟悉&#xff0c;正则表达式的使用技巧。 package com…

Windows Pycharm 远程 Spark 开发 PySpark

一、环境版本 环境版本PyCharm2024.1.2 (Professional Edition)Ubuntu Kylin16.04Hadoop3.3.5Hive3.1.3Spark2.4.0 二、Pycharm远程开发 文件-远程-开发 选择 SSH连接&#xff0c;连接虚拟机&#xff0c;选择项目目录即可远程开发

WebGL进阶(十一)层次模型

理论基础&#xff1a; 效果&#xff1a; 源码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"vie…

【H2O2|全栈】JS进阶知识(九)ES6(5)

目录 前言 开篇语 准备工作 class类 概念 形式 直接继承 概念 优点 案例 重写 概念 案例 关于重载 结束语 前言 开篇语 本系列博客主要分享JavaScript的进阶语法知识&#xff0c;本期为第九期&#xff0c;依然围绕ES6的语法进行展开。 本期内容为&#xff1a…

Prompting LLMs to Solve Complex Tasks: A Review

文章目录 题目简介任务分解未来方向结论 题目 促使 LLM 解决复杂任务&#xff1a; 综述 论文地址&#xff1a;https://www.intjit.org/cms/journal/volume/29/1/291_3.pdf 简介 大型语言模型 (LLM) 的最新趋势显而易见&#xff0c;这体现在大型科技公司的投资以及媒体和在线社…

学会Lambda,让程序Pythonic一点

Lambda是Python里的高阶用法&#xff0c;要把代码写得Pythonic&#xff0c;就需要了解这些高阶用法&#xff0c;想说自己是一名真正的Python程序员&#xff0c;先要把代码写得Pythonic。 今天聊下Lambda的用法&#xff0c;写篇简短的用法说明。 Lambda是匿名函数的意思&#…

加速科技精彩亮相中国国际半导体博览会IC China 2024

11月18日—20日&#xff0c;第二十一届中国国际半导体博览会&#xff08;IC China 2024&#xff09;在北京国家会议中心顺利举办&#xff0c;加速科技携重磅产品及全系测试解决方案精彩亮相&#xff0c;加速科技创始人兼董事长邬刚受邀在先进封装创新发展论坛与半导体产业前沿与…

window11编译pycdc.exe

一、代码库和参考链接 在对python打包的exe文件进行反编译时&#xff0c;会使用到uncompyle6工具&#xff0c;但是这个工具只支持python3.8及以下&#xff0c;针对更高的版本的python则不能反编译。 关于反编译参考几个文章&#xff1a; Python3.9及以上Pyinstaller 反编译教…

【深度学习之回归预测篇】 深度极限学习机DELM多特征回归拟合预测(Matlab源代码)

深度极限学习机 (DELM) 作为一种新型的深度学习算法&#xff0c;凭借其独特的结构和训练方式&#xff0c;在诸多领域展现出优异的性能。本文将重点探讨DELM在多输入单输出 (MISO) 场景下的应用&#xff0c;深入分析其算法原理、性能特点以及未来发展前景。 1、 DELM算法原理及其…

前端-react(class组件和Hooks)

文章主要以Hooks为主,部分涉及class组件方法进行对比 一.了解react 1.管理组件的方式 在React中&#xff0c;有两种主要的方式来管理组件的状态和生命周期&#xff1a;Class 组件和 Hooks。 Class 组件&#xff1a; Class 组件是 React 最早引入的方式&#xff0c;它是基于…

Ollama vs VLLM:大模型推理性能全面测评!

最近在用本地大模型跑实验&#xff0c;一开始选择了ollama,分别部署了Qwen2.5-14B和Qwen2.5-32B&#xff0c;发现最后跑出来的实验效果很差&#xff0c;一开始一直以为prompt的问题&#xff0c;尝试了不同的prompt&#xff0c;最后效果还是一直不好。随后尝试了vllm部署Qwen2.5…

基于深度学习CNN算法的花卉分类识别系统01--带数据集-pyqt5UI界面-全套源码

文章目录 基于深度学习算法的花卉分类识别系统一、项目摘要二、项目运行效果三、项目文件介绍四、项目环境配置1、项目环境库2、环境配置视频教程 五、项目系统架构六、项目构建流程1、数据集2、算法网络Mobilenet3、网络模型训练4、训练好的模型预测5、UI界面设计-pyqt56、项目…