Flink Sort-Shuffle 实现简介

简介:Sort-Shuffle 使 Flink 在应对大规模批数据处理任务时更加游刃有余

本文介绍 Sort-Shuffle 如何帮助 Flink 在应对大规模批数据处理任务时更加游刃有余。主要内容包括:

  1. 数据 Shuffle 简介
  2. 引入 Sort-Shuffle 的意义
  3. Flink Sort-Shuffle 实现
  4. 测试结果
  5. 调优参数
  6. 未来展望

Flink 作为批流一体的大数据计算引擎,大规模批数据处理也是 Flink 数据处理能力的重要组成部分。随着 Flink 的版本迭代,其批数据处理能力也在不断增强,sort-shuffle 的引入,使得 Flink 在应对大规模批数据处理任务时更加游刃有余。

一、数据 Shuffle 简介

数据 shuffle 是批数据处理作业的一个重要阶段,在这一阶段中,上游处理节点的输出数据会被持久化到外部存储中,之后下游的计算节点会读取这些数据并进行处理。这些持久化的数据不仅仅是一种计算节点间的数据交换形式,还在错误恢复中发挥着重要作用。

目前,有两种批数据 shuffle 模型被现有的大规模分布式计算系统采用,分别是基于 hash 的方式以及基于 sort 的方式:

  1. 基于 hash 方式的核心思路是将发送给下游不同并发消费任务的数据写到单独的文件中,这样文件本身就成了一个自然的区分不同数据分区的边界;
  2. 基于 sort 方式的核心思路是先将所有分区的数据写在一起,然后通过 sort 来区分不同数据分区的边界。

我们在 Flink 1.12 版本将基于 sort 的批处理 shuffle 实现引入了 Flink 并在后续进行了持续的性能与稳定性优化;到 Flink 1.13 版本,sort-shuffle 已经实现生产可用。

二、引入 Sort-Shuffle 的意义

我们之所以要在 Flink 中引入 sort-shuffle 的实现,一个重要的原因是 Flink 原本的基于 hash 的实现对大规模批作业不可用。这个也是被现有的其他大规模分布式计算系统所证明的:

  1. 稳定性方面:对于高并发批作业,基于 hash 的实现会产生大量的文件,并且会对这些文件进行并发读写,这会消耗很多资源并对文件系统会产生较大的压力。文件系统需要维护大量的文件元数据,会产生文件句柄以及 inode 耗尽等不稳定风险。
  2. 性能方面:对于高并发批作业,并发读写大量的文件意味着大量的随机 IO,并且每次 IO 实际读写的数据量可能是非常少的,这对于 IO 性能是一个巨大的挑战,在机械硬盘上,这使得数据 shuffle 很容易成为批处理作业的性能瓶颈。

通过引入基于 sort 的批数据 shuffle 实现,并发读写的文件数量可以大大降低,有利于实现更好的数据顺序读写,从而能够提高 Flink 大规模批处理作业的稳定性与性能。除此之外,新的 sort-shuffle 实现还可以减小内存缓冲区的消耗。对于基于 hash 的实现,每个数据分区都需要一块读写缓冲区,内存缓冲区消耗和并发成正比。而基于 sort 的实现则可以做到内存缓冲区消耗和作业并发解耦(尽管更大的内存可能会带来更高的性能)。

更为重要的一点是我们实现了新的存储结构与读写 IO 优化,这使得 Flink 的批数据 shuffle 相比于其他的大规模分布式数据处理系统更具优势。下面的章节会更为详细的介绍 Flink 的 sort-shuffle 实现以及所取得的结果。

三、Flink Sort-Shuffle 实现

和其他分布式系统的批数据 sort-shuffle 实现类似,Flink 的整个 shuffle 过程分为几个重要的阶段,包括写数据到内存缓冲区、对内存缓冲区进行排序、将排好序的数据写出到文件以及从文件中读取 shuffle 数据并发送给下游。但是,与其他系统相比,Flink 的实现有一些根本性的不同,包括多段数据存储格式、省掉数据合并流程以及数据读取 IO 调度等。这些都使得 Flink 的实现有着更优秀的表现。

1. 设计目标

在 Flink sort-shuffle 的整个实现过程中,我们把下面这些点作为主要的设计目标加以考量:

1.1 减少文件数量

正如上面所讨论的,基于 hash 的实现会产生大量的文件,而减少文件的数量有利于提高稳定性和性能。Sort-Spill-Merge 的方式被分布式计算系统广泛采纳以达到这一目标,首先将数据写入内存缓冲区,当内存缓冲区填满后对数据进行排序,排序后的数据被写出到一个文件中,这样总的文件数量是:(总数据量 / 内存缓冲区大小),从而文件数量被减少。当所有数据写出完成后,将产生的文件合并成一个文件,从而进一步减少文件数量并增大每个数据分区的大小(有利于顺序读取)。

相比于其他系统的实现,Flink 的实现有一个重要的不同,即 Flink 始终向同一个文件中不断追加数据,而不会写多个文件再进行合并,这样的好处始终只有一个文件,文件数量实现了最小化。

1.2 打开更少的文件

同时打开的文件过多会消耗更多的资源,同时容易导致文件句柄不够用的问题,导致稳定性变差。因此,打开更少的文件有利于提升系统的稳定性。对于数据写出,如上所述,通过始终向同一个文件中追加数据,每个并发任务始终只打开一个文件。对于数据读取,虽然每个文件都需要被大量下游的并发任务读取,Flink 依然通过只打开文件一次,并在这些并发读取任务间共享文件句柄实现了每个文件只打开一次的目标。

1.3 最大化顺序读写

文件的顺序读写对文件的 IO 性能至关重要。通过减少 shuffle 文件数量,我们已经在一定程度上减少了随机文件 IO。除此之外,Flink 的批数据 sort-shuffle 还实现了更多 IO 优化来最大化文件的顺序读写。在数据写阶段,通过将要写出的数据缓冲区聚合成更大的批并通过 wtitev 系统调用写出从而实现了更好的顺序写。在数据读取阶段,通过引入读取 IO 调度,总是按照文件的偏移顺序服务数据读取请求从而最大限度的实现的文件的顺序读。实验表明这些优化极大的提升了批数据 shuffle 的性能。

1.4 减少读写 IO 放大

传统的 sort-spill-merge 方式通过将生成的多个文件合并成一个更大的文件从增大读取数据块的大小。这种实现方案虽然带来了好处,但也有一些不足,最终要的一点便是读写 IO 放大,对于计算节点间的数据 shuffle 而言,在不发生错误的情况下,本身只需要写入和读取数据一次,但是数据合并使得相同的数据被读写多次,从而导致 IO 总量变多,并且存储空间的消耗也会变大。

Flink 的实现通过不断向同一个文件中追加数据以及独特的存储结构规避了文件和并的过程,虽然单个数据块的大小小于和并后的大小,但由于规避了文件合并的开销再结合 Flink 独有的 IO 调度,最终可以实现比 sort-spill-merge 方案更高的性能。

1.5 减少内存缓冲区消耗

类似于其他分布式计算系统中 sort-shuffle 的实现,Flink 利用一块固定大小的内存缓冲区进行数据的缓存与排序。这块内存缓冲区的大小是与并发无关的,从而使得上游 shuffle 数据写所需要的内存缓冲区大小与并发解耦。结合另一个内存管理方面的优化 FLINK-16428 可以同时实现下游 shuffle 数据读取的内存缓冲区消耗并发无关化,从而可以减少大规模批作业的内存缓冲区消耗。(注:FLINK-16428 同时适用于批作业与流作业)

2. 实现细节

2.1 内存数据排序

在 shuffle 数据的 sort-spill 阶段,每条数据被首先序列化并写入到排序缓冲区中,当缓冲区被填满后,会对缓冲区中的所有二进制数据按照数据分区的顺序进行排序。此后,排好序的数据会按照数据分区的顺序被写出到文件中。虽然,目前并没有对数据本身进行排序,但是排序缓冲区的接口足够的泛化,可以实现后续潜在的更为复杂的排序要求。排序缓冲区的接口定义如下:

public interface SortBuffer {*/** Appends data of the specified channel to this SortBuffer. \*/*boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;*/** Copies data in this SortBuffer to the target MemorySegment. \*/*BufferWithChannel copyIntoSegment(MemorySegment target);long numRecords();long numBytes();boolean hasRemaining();void finish();boolean isFinished();void release();boolean isReleased();}

在排序算法上,我们选择了复杂度较低的 bucket-sort。具体而言,每条序列化后的数据前面都会被插入一个 16 字节的元数据。包括 4 字节的长度、4 字节的数据类型以及 8 字节的指向同一数据分区中下一条数据的指针。结构如下图所示:

当从缓冲区中读取数据时,只需要按照每个数据分区的链式索引结构就可以读取到属于这个数据分区的所有数据,并且这些数据保持了数据写入时的顺序。这样按照数据分区的顺序读取所有的数据就可以达到按照数据分区排序的目标。

2.2 文件存储结构

如前所述,每个并行任务产生的 shuffle 数据会被写到一个物理文件中。每个物理文件包含多个数据区块(data region),每个数据区块由数据缓冲区的一次 sort-spill 生成。在每个数据区块中,所有属于不同数据分区(data partition,由下游计算节点不同并行任务消费)的数据按照数据分区的序号顺序进行排序聚合。下图展示了 shuffle 数据文件的详细结构。其中(R1,R2,R3)是 3 个不同的数据区块,分别对应 3 次数据的 sort-spill 写出。每个数据块中有 3 个不同的数据分区,分别将由(C1,C2,C3)3 个不同的并行消费任务进行读取。也就是说数据 B1.1,B2.1 及 B3.1 将由 C1 处理,数据 B1.2,B2.2 及 B3.2 将由 C2 处理,而数据 B1.3,B2.3 及 B3.3 将由 C3 处理。

类似于其他的分布式处理系统实现,在 Flink 中,每个数据文件还对应一个索引文件。索引文件用来在读取时为每个消费者索引属于它的数据(data partition)。索引文件包含和数据文件相同的 data region,在每个 data region 中有与 data partition 相同数量的索引项,每个索引项包含两个部分,分别对应到数据文件的偏移量以及数据的长度。作为一个优化。Flink 为每个索引文件缓存最多 4M 的索引数据。数据文件与索引文件的对应关系如下:

2.3 读取 IO 调度

为了进一步提高文件 IO 性能,基于上面的存储结构,Flink 进一步引入了 IO 调度机制,类似于磁盘调度的电梯算法,Flink 的 IO 调度总是按照 IO 请求的文件偏移顺序进行调度。更具体来说,如果数据文件有 n 个 data region,每个 data region 有 m 个 data partition,同时有 m 个下游计算任务读取这一数据文件,那么下面的伪代码展示了 Flink 的 IO 调度算法的工作流程:

*// let data_regions as the data region list indexed from 0 to n - 1**// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1*for (data_region in data_regions) {data_reader = poll_reader_of_the_smallest_file_offset(data_readers);if (data_reader == null)break;reading_buffers = request_reading_buffers();if (reading_buffers.isEmpty())break;read_data(data_region, data_reader, reading_buffers);}   

2.4 数据广播优化

数据广播是指发送相同的数据给下游计算节点的所有并行任务,一个常见的应用场景是 broadcast-join。Flink 的 sort-shuffle 实现对这一过程进行了优化,使得在包括内存排序缓冲区和 shuffle 文件中,广播数据只保存一份,这可以大大提升数据广播的性能。更具体来说,当写入一条广播数据到排序缓冲区时,这条数据只会被序列化并且拷贝一次,同样在将数据写出到 shuffle 文件时,也只会写一份数据。在索引文件中,对于不同 data partition 的数据索引项,他们均指向数据文件中的同一块数据。下图展示了数据广播优化的所有细节:

2.5 数据压缩

数据压缩是一个简单而有效的优化手段,测试结果显示数据压缩可以提高 TPC-DS 总体性能超过 30%。类似于 Flink 的基于 hash 的批处理 shuffle 实现,数据压缩是以网络缓冲区(network buffer)为单位进行的,数据压缩不跨 data partition,也就是说发给不同下游并行任务的数据分开压缩,压缩发生在数据排序后写出前,下游消费任务在收到数据后进行解压。下图展示了数据压缩的整个流程:

四、测试结果

1. 稳定性

新的 sort-shuffle 的实现极大的提高 Flink 运行批处理作业的稳定性。除了解决了潜在的文件句柄以及 inode 耗尽的不稳定问题外,还解决了一些 Flink 原有 hash-shuffle 存在的已知问题,如 FLINK-21201(创建过多文件导致主线程阻塞),FLINK-19925(在网络 netty 线程中执行 IO 操作导致网络稳定性受到影响)等。

2. 性能

我们在 1000 规模的并发下运行了 TPC-DS 10T 数据规模的测试,结果表明,相比于 Flink 原本的批数据 shuffle 实现,新的数据 shuffle 实现可以实现 2-6 倍的性能提升,如果排除计算时间,只统计数据 shuffle 时间可以是先最高 10 倍的性能提升。下表展示了性能提升的详细数据:

JobsTime Used for Sort-Shuffle (s)Time Used for Hash-Shuffle (s)Speed up Factor
q4.sql98653715.45
q11.sql3487982.29
q14b.sql88321292.51
q17.sql2697812.90
q23a.sql41811992.87
q23b.sql3768432.24
q25.sql4138732.11
q29.sql35410382.93
q31.sql2234982.23
q50.sql2155502.56
q64.sql2174422.04
q74.sql2709623.56
q75.sql1667134.30
q93.sql2045402.65

在我们的测试集群上,每块机械硬盘的数据读取以及写入带宽可以达到 160MB/s:

Disk NameSDISDJSDK
Writing Speed (MB/s)189173186
Reading Speed (MB/s)112154158

注:我们的测试环境配置如下,由于我们有较大的内存,所以一些 shuffle 数据量小的作业实际数据 shuffle 仅为读写内存,因此上面的表格仅列出了一些 shuffle 数据量大,性能提升明显的查询:

Number of NodesMemory Size Per NodeCores Per NodeDisks Per Node
12About 400G963

五、调优参数

在 Flink 中,sort-shuffle 默认是不开启的,想要开启需要调小这个参数的配置:taskmanager.network.sort-shuffle.min-parallelism。这个参数的含义是如果数据分区的个数(一个计算任务并发需要发送数据给几个下游计算节点)低于这个值,则走 hash-shuffle 的实现,如果高于这个值则启用 sort-shuffle。实际应用时,在机械硬盘上,可以配置为 1,即使用 sort-shuffle。

Flink 没有默认开启数据压缩,对于批处理作业,大部分场景下是建议开启的,除非数据压缩率低。开启的参数为 taskmanager.network.blocking-shuffle.compression.enabled

对于 shuffle 数据写和数据读,都需要占用内存缓冲区。其中,数据写缓冲区的大小由 taskmanager.network.sort-shuffle.min-buffers 控制,数据读缓冲区由 taskmanager.memory.framework.off-heap.batch-shuffle.size 控制。数据写缓冲区从网络内存中切分出来,如果要增大数据写缓冲区可能还需要增大网络内存总大小,以避免出现网络内存不足的错误。数据读缓冲区从框架的 off-heap 内存中切分出来,如果要增大数据读缓冲区,可能还需要增大框架的 off-heap 内存,以避免出现 direct 内存 OOM 错误。一般而言更大的内存缓冲区可以带来更好的性能,对于大规模批作业,几百兆的数据写缓冲区与读缓冲区是足够的。

六、未来展望

还有一些后续的优化工作,包括但不限于:

1)网络连接复用,这可以提高网络的建立的性能与稳定性,相关 Jira 包括 FLINK-22643 以及 FLINK-15455;

2)多磁盘负载均衡,这有利于解决负载不均的问题,相关 Jira 包括 FLINK-21790 以及 FLINK-21789;

3)实现远程数据 shuffle 服务,这有利于进一步提升批数据 shuffle 的性能与稳定性;

4)允许用户选择磁盘类型,这可以提高易用性,用户可以根据作业的优先级选择使用 HDD 或者 SSD。

原文链接

本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/511524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

「现代C++设计魅力」虚函数继承-thunk技术初探

简介:工作中使用LLDB调试器调试这一段C多继承程序的时候,发现通过lldb print(expression命令的别名) 命令获取的指针地址和实际理解的C的内存模型的地址不一样。那么到底是什么原因呢? 作者 | 扬阜 来源 | 阿里技术公众号 一 问题背景 1 实…

万物互联时代到来,锐捷发布场景化无线零漫游方案

数字化和万物互联时代到来,物联网与 IoT 设备发展迅猛,以往只在办公区域主要由手机等移动设备使用的无线网络,正在接入更多核心业务生产、物流仓储等各类的生产设备。据分析机构 IDC 预测,无线网络优先是当下智能园区网络建设投资…

阿里云田涛涛:高效智能的云,CloudOps让运维更简单

简介:CloudOps:以应用为中心的自动化运维新趋势 12月21日,在阿里云弹性计算年度峰会上,阿里云弹性计算体验与控制系统负责人田涛涛发表了主题为《高效智能的云,CloudOps让运维更简单》的演讲,深度解读了云上运维新趋势…

打造南沙“强芯”,南沙首届IC Nansha大会召开

6月25日,2022 中国南沙国际集成电路产业论坛在广州南沙召开。本次峰会由广州南沙经济技术开发区管理委员会、广州市工业和信息化局主办;支持单位为广州湾区半导体产业集团有限公司、广东省集成电路行业协会、广州市半导体协会;广东省半导体及…

OpenAI开发者大会简介

文章目录 GPT-4 Turbo 昨天晚上 OpenAI的首届开发者大会召开 Sam Altman也做了公开演讲,应该说 这是继今年春天发布GPT-4之后 OpenAI在AI行业又创造的一个不眠夜 过去一年 ChatGPT绝对是整个科技领域最热的词汇 OpenAI 也依靠ChatGPT取得了惊人的成绩 ChatG…

阿里云贾少天:大规模云服务器高效使用及管理实践

简介:本篇内容分享了大规模云服务器高效使用及管理最佳实践。 2021年10月22日,在云栖大会的《云上运维最佳实践》分论坛,阿里云高级技术专家贾少天发表了主题为“大规模云服务器高效使用及管理最佳实践”的演讲,本篇内容根据他的…

发现新视界——视觉计算将如何改变生产方式

简介:本篇内容将从3个部分为读者介绍关于视觉计算如何改变生产方式,进一步阐述可视化业务方面的挑战及阿里云视觉计算的解决方案与优势。 编者按:在2021年10月举办的云栖大会的《数字孪生&Cloud XR技术助力产研创新论坛》上,…

容器监控指南:三剑客轻松实现 Docker 容器监控

作者 | Milan Mahat在本指南中,我们将学习如何使用 docker-compose 在容器中设置 cAdvisor,将其与 prometheus 连接,并通过 grafana 监控服务器的容器。CAdvisor 是一种流行的工具,用于收集容器的信息。它是 prometheus 和 grafan…

N个技巧,编写更高效 Dockerfile|云效工程师指北

简介:云原生时代下软件的构建和部署离不开容器技术。提到容器,几乎大家下意识都会联想到 Docker 。而 Docker 中有两个非常重要的概念,一个是Image(镜像),一个是Container(容器)。前…

TDA-04D8变送器数据上报阿里云

简介:本文将以TDA-04D8变送器作为采集对象,使用海创微联采集控制系统对TDA-04D8变送器进行采集,然后将设备上的毛重、净重、皮重数据采集上传到阿里云物联网平台,阿里云物联网平台将数据实时可视化。 文章分为3部分: …

http ,怎么优雅的拒绝你

作者 | 奇伢来源 | 奇伢云存储典型问题:服务端优雅的拒绝今天分享一个后端编程的实际经验。这个问题来源于对象 S3 后端协议实现的技巧思考。场景:服务端不想接收 http 的 body 的时候,该怎么优雅的拒绝呢?什么意思?对…

企业物联网平台新版公共实例升级企业实例教程

简介:2021年7月30日企业物联网平台重磅升级,发布的新版公共实例支持一键升级企业版实例,本文将为大家介绍一键升级教程 一、企业版实例,企业用户首选 企业物联网平台 提供设备上云必备的基础服务,用户无需自建物联网…

【全观测系列】Elasticsearch应用性能监控实践

简介:本文介绍了应用性能监控的应用价值以及解决方案等。 1、什么是全观测? 要了解全观测,我们先看看传统运维存在哪些问题。 数据孤岛,分散在不同部门,分析排查故障困难;多个厂商的多种工具&#xff0c…

es实战-使用IK分词器进行词频统计

简介:通过IK分词器分词并生成词云。 本文主要介绍如何通过 IK 分词器进行词频统计。使用分词器对文章的词频进行统计,主要目的是实现如下图所示的词云功能,可以找到文章内的重点词汇。后续也可以对词进行词性标注,实体识别以及对…

IC Nansha|AMD高级副总裁、大中华区总裁潘晓明:制程、架构、平台优化突破计算边界

6月25日,中国南沙国际集成电路产业论坛在广州南沙顺利举行。AMD高级副总裁、大中华区总裁潘晓明出席了本次会议,并在高峰论坛环节中以《高性能计算的未来》为主题发表了演讲。 (AMD高级副总裁、大中华区总裁 潘晓明) 作为一家深耕…

爱数SMART 2022峰会开启,分享数据战略与建设数据驱动型组织方法论

6月28日,爱数SMART 2022线上峰会全球直播正式开启。主论坛上,爱数正式提出了企业制定数据战略以及建设数据驱动型组织的方法论,并推出开源计划与数字伙伴计划2.0,共创数据驱动型组织。 通过清晰的数据战略,从容加速数据…

云原生时代开发者工具变革探索与实践

简介:本篇内容分享了原生时代开发者工具变革探索与实践。 分享人:马洪喜 行云创新CEO 正文:本篇内容将通过三个部分来介绍云原生时代开发者工具变革探索与实践。 一、云原生模块化开发概览 二、软件模块化开发特点 三、ADD产品简介 一、…

喜马拉雅 Apache RocketMQ 消息治理实践

简介:本文通过喜马拉雅的RocketMQ治理实践分享,让大家了解使用消息中间件过程中可能遇到的问题,避免实战中踩坑。 作者:曹融,来自喜马拉雅,从事微服务和消息相关中间件开发。 本文通过喜马拉雅的RocketMQ治…

Docker 容器为什么傲娇?全靠镜像撑腰!

作者 | 飞向星的客机来源 | CSDN博客🌟 前言Docker 镜像是 Docker 容器的基石,容器是镜像的运行实例,有了镜像才能启动容器。Docker 镜像是一个只读的模板,一个独立的文件系统,包括运行一个容器所需的数据,…

HBase读链路分析

简介:HBase的存储引擎是基于LSM-Like树实现的,更新操作不会直接去更新数据,而是使用各种type字段(put,delete)来标记一个新的多版本数据,采用定期compaction的形式来归档合并数据。这种数据结构…