Flink:快速掌握批处理数据源的创建方法

Flink 社区最近 “基于FLIP-27” 设计了新的 Source 框架 。一些连接器(API)已迁移到这个新框架。本文介绍了如何使用这个新框架创建批处理源。 它是在为Cassandra实现Flink 批处理源时构建的。如果您有兴趣贡献或迁移连接器,这篇文章非常适合!

1.实现Source组件

Source架构如图:

1.1 Source框架

Cassandra 源示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java

源接口仅在所有其他组件之间起“粘合”作用。它的作用是实例化所有这些并定义源Boundedness 。我们还在这里进行源配置以及用户配置验证。

1.2 SourceReader

Cassandra SourceReader 示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java

如上图所示,SourceReader 的实例( 在本文的后续部分中我们将其简称为阅读器)在任务管理器中并行运行,以读取划分为Split 的实际数据。阅读器从SplitEnumerator请求拆分,并将生成的拆分结果返回给它们。

Flink 提供了负责所有线程的SourceReaderBase实现。对于大多数情况,Flink 还为此类提供了有用的扩展:

SingleThreadMultiplexSourceReaderBase :

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html

该类已配置了线程模型:每个SplitReader 实例使用一个线程读取拆分(但任务管理器中存在多个 SplitReader 实例)。

我们在 SourceReader 类中接下来要做的事情是:

  • 提供 SplitReader 供应者;

  • 创建一个记录发射器;

  • 为 SplitReaders 创建共享资源(会话等)。由于 SplitReader 供应者是在 super() 调用的 SourceReader 构造函数中创建的,因此使用 SourceReader 工厂创建共享资源并将它们传递给供应者是一个好主意;

  • 实现start():这里我们应该要求枚举器进行第一次分割;

  • 重写SourceReaderBase 父类中的close() 以释放任何创建的资源(例如共享资源);

  • 实现initializedState(),以从Split 创建可变的SplitState;

  • 实现toSplitType() ,以从可变的 SplitState 创建 Split;

  • 实现onSplitFinished():这里,因为它是一个批处理源(有限数据),我们应该要求Enumerator进行下一次分割。

1.3 Split和SplitState

Cassandra Split示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java

SourceSplit表示源数据的一个分区。拆分的定义取决于我们正在读取的后端。例如,它可以是(分区开始,分区结束)元组或(偏移量,分割大小)元组。

在任何情况下,Split对象都应该被视为不可变对象:对它的任何更新都应该在相关的SplitState上完成。拆分状态是将存储在Flink检查点内的状态。一个检查点可能发生在两次获取一次分裂之间。因此,如果我们正在读取拆分,我们必须在拆分状态中存储读取进程的当前状态。这个当前状态需要是可序列化的(因为它将成为检查点的一部分),并且后端源可以从中恢复。这样,在故障转移的情况下,读取可以从中断的地方恢复。因此,我们确保不会有重复或丢失的数据。

例如,如果记录的读取顺序在后端是确定的,那么拆分状态可以存储n条已经读取的记录,以便在故障转移后在n+1处重新启动。

1.4 SplitEnumerator和SplitEnumeratorState

SplitEnumerator负责创建拆分并将其提供给阅读器。只要有可能,最好是惰性地生成分割,这意味着每次读取器向枚举数请求分割时,枚举数都会按需生成一个并将其分配给阅读器。

为此,我们实现了SplitEnumerator handleSplitRequest() 方法。延迟拆分生成比拆分发现更可取,在拆分发现中,我们预先生成所有拆分并存储它们,等待将它们分配给阅读器。实际上,在某些情况下,分割的数量可能非常大,并且会消耗大量内存,这可能会在分散阅读器的情况下产生问题。该框架通过实现addReader()提供了对阅读器注册进行操作的能力。但是,由于我们要进行延迟分割生成,因此在那里我们没有什么可做的。在某些情况下,生成拆分的成本太高,因此我们可以预先生成一批(不是全部)拆分来分摊这个成本。需要考虑批处理分割的 数量/大小,以避免消耗过多的内存。

长话短说,Source实现的棘手部分是拆分源数据。最好的平衡是不要有太多的分割(这会导致太多的内存消耗),也不要太少(这会导致次优的并行性)。满足这种平衡的一个好方法是预先评估源数据的大小,并允许用户指定拆分将占用的最大内存。这样他们就可以根据任务管理器上的可用内存配置此参数。这个参数是可选的,所以Source程序需要提供一个默认值。此外,源代码需要控制用户提供的max-split-size不能太小,否则会导致太多的分割。一般的经验法则是给用户一些自由,来保护他们免受不必要的行为。对于这些安全措施,刚性阈值不能很好地工作,因为当突然超过阈值时,Source可能开始失效。

例如,如果我们强制分割的数量低于并行度的两倍,如果作业经常在一个不断增长的表上运行,那么在某个时刻,将会有越来越多的max-split-size的分割,并且将超过阈值。当然,需要在不读取实际数据的情况下评估源数据的大小。Cassandra连接器就是这样做的。

另一个重要的话题是状态。如果作业管理器失败,则拆分枚举器需要恢复。对于分割,我们需要为枚举器提供一个状态,它将成为检查点的一部分。恢复后,将重建枚举数并接收一个枚举数状态,以恢复其先前的状态。在检查点上,当调用SplitEnumerator snapshotState()时,枚举数返回其状态。状态必须包含恢复故障转移后枚举器停止的位置所需的所有内容。在延迟分割生成场景中,状态将包含生成下一个分割所需的所有内容。例如,它可以是下一个分裂的开始偏移量,分裂大小,仍然生成的分裂的数量等等,但是SplitEnumeratorState也必须包含一个分裂的列表,不是发现的分裂的列表,而是要重新分配的分裂的列表。实际上,每当reader失败时,如果它在最后一个检查点之后被分配了分片,那么检查点就不会包含这些分片。因此,在恢复时,阅读器将不再分配分片。有一个回调来处理这种情况:addSplitsBack()。在这里,分配给故障读取器的分片可以放回枚举器状态,以便以后重新分配给阅读器。这里没有内存大小风险,因为要重新分配的分片数量非常低。

以上是关于分裂的更重要的话题。还有两个方法需要实现:用于资源创建/处置的常用start() /close()方法。关于start()的实现,Flink连接器框架提供了enumeratorContext callAsync()实用程序来异步运行长时间的处理,比如拆分准备或拆分发现(如果不可能生成延迟拆分)。实际上,start()方法在源协调器线程中运行,我们不希望长时间阻塞它。

1.5 SplitReader 

Cassandra SplitReader示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java

这个类负责读取框架调用handleSplitsChanges()时接收到的实际分片。拆分阅读器的主要部分是fetch()实现,我们读取接收到的所有分片,并将读取的记录作为RecordsBySplits对象返回。该对象包含分割id到所属记录的映射,以及已完成分割的id。需要考虑的要点:

  • fetch调用必须是非阻塞的。如果其代码中的任何调用是同步的并且可能很长,则必须提供fetch()的转义。当框架调用wakeUp()时,我们应该通过设置一个AtomicBoolean来中断获取。

  • Fetch调用需要是可重入的:一个已经读过的分片不能被重读。我们应该将其从分割列表中删除,并在返回的RecordsBySplits中将其id添加到已完成的分割(以及空分割)中。

实现者提前退出fetch()方法是完全可以的。此外,失败可能会中断获取。在这两种情况下,框架稍后都会再次调用fetch()。在这种情况下,fetch方法必须使用已经讨论过的拆分状态从停止读取的位置恢复读取。如果由于后端约束而无法恢复对分割的读取,那么唯一的解决方案就是自动读取分割(要么根本不读取分割,要么完全读取分割)。这样,在读取中断的情况下,不会输出任何内容,并且可以在下一次读取调用时从开始重新读取分割,从而没有重复。但是,如果完全读取分割,则需要考虑以下几点:

  • 我们应该确保总的拆分内容(来自源的记录)适合内存,例如通过指定以字节为单位的最大拆分大小(请参阅SplitEnumarator)。

  • 分裂状态变得无用,只需要一个分裂类。

1.6 RecordEmitter

 Cassandra RecordEmitter示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java

SplitReader以实现者为每条记录提供的中间记录格式的形式读取记录。它可以是后端返回的原始格式,也可以是允许事后提取实际记录的任何格式。该格式不是源程序期望的最终输出格式。它包含转换为记录输出格式所需的所有内容。我们需要实现RecordEmitter#emitRecord()来完成这个转换。一个好的模式是用一个映射函数初始化RecordEmitter。实现必须是幂等的。实际上,这种方法可能会在中途中断。在这种情况下,稍后将再次将同一组记录传递给记录发射器。

1.7 Serializers

Cassandra SplitSerializer和SplitEnumeratorStateSerializer示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.javahttps://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java

我们需要为以下情况提供单例序列化器:

  • 拆分:当将拆分从枚举器发送到读取器时,以及当检查读取器的当前状态时,拆分被序列化

  • SplitEnumeratorState:序列化器用于SplitEnumerator#snapshotState()的结果。

对于两者,我们都需要实现SimpleVersionedSerializer。在一些重要的地方需要注意:

  • 在Flink中禁止使用Java序列化,主要是出于迁移考虑。我们应该使用ObjectOutputStream手动编写对象的字段。当一个类不被ObjectOutputStream(不是String, Integer, Long…)支持时,我们应该将对象的大小以字节为单位写入Integer,然后写入转换为byte[]的对象。类似的方法用于序列化集合。首先写入集合的元素数量,然后序列化所有包含的对象。当然,对于反序列化,我们以相同的顺序进行完全相同的读取。

  • 可能会有很多拆分,所以我们应该缓存SplitSerializer中使用的OutputStream。我们可以使用。

ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal. withinitial (() -> new DataOutputSerializer(64));

初始流大小取决于拆分的大小。

2.测试&&总结

本文收集了实现领域的反馈,因为javadoc无法涵盖高性能和可维护源的所有实现细节。希望你喜欢这篇文章,并且它给了你为Flink项目贡献一个新连接器的愿望!

Flink:快速掌握批处理数据源的创建方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/645556.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2017年认证杯SPSSPRO杯数学建模D题(第二阶段)教室的合理设计全过程文档及程序

2017年认证杯SPSSPRO杯数学建模 D题 教室的合理设计 原题再现&#xff1a; 某培训机构租用了一块如图&#xff08;见附件&#xff09;所示的场地&#xff0c;由于该机构开设了多种门类的课程&#xff0c;所以需要将这块场地通过加入一些隔墙来分割为多个独立的教室和活动区。…

sheng的学习笔记-神经网络

基础知识 基础知识-什么是分类问题 分类问题是根据已有数据&#xff0c;判断结果是正的还是负的&#xff08;1或者0&#xff09;,比如&#xff1a; • 根据肿瘤大小&#xff0c;判断肿瘤是良性的还是恶性的 • 根据客户交易行为&#xff0c;判断是否是恶意用户 • 根据邮件情况…

SAP EXCEL上传如何实现指定读取某一个sheet页(ALSM_EXCEL_TO_INTERNAL_TABLE)

如何读取指定的EXCEL sheet 页签&#xff0c;比如要读取下图中第二个输出sheet页签 具体实现方法如下&#xff1a; 拷贝标准的函数ALSM_EXCEL_TO_INTERNAL_TABLE封装成一个自定义函数ZCALSM_EXCEL_TO_INTERNAL_TABLE 在自定义函数导入参数页签新增一个参数SHEET_NAME 在源代码…

热门技术问答 | 请 GaussDB 用户查收

近年来&#xff0c;Navicat 与华为云 GaussDB 展开一系列技术合作&#xff0c;为 GaussDB 用户提供面向管理开发工具的生态工具。Navicat 现已完成 GaussDB 主备版&#xff08;单节点、多节点&#xff09;和分布式数据库的多项技术对接。Navicat 通过工具的流畅性和实用性&…

k8s 部署 Nginx 并代理到tomcat

一、已有信息 [rootmaster nginx]# kubectl get nodes -o wide [rootmaster nginx]# kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 2…

jQuery遍历(树遍历)

1、.children&#xff08;&#xff09;: 获得匹配元素集合中每个元素的直接子元素&#xff0c;选择器选择性筛选。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><script src"jQuery.js"&g…

《WebKit 技术内幕》学习之八(2):硬件加速机制

2 Chromium的硬件加速机制 2.1 GraphicsLayer的支持 GraphicsLayer对象是对一个渲染后端存储中某一层的抽象&#xff0c;同众多其他WebKit所定义的抽象类一样&#xff0c;在WebKit移植中&#xff0c;它还需要具体的实现类来支持该类所要提供的功能。为了完成这一功能&#x…

认识与探索大模型时代的RPA应用及进化(上)

AI Agent当前仍然处于技术爬坡与实验阶段&#xff0c;特别是在企业领域&#xff0c;真正的成熟应用还处于广泛探索与原型验证阶段&#xff0c;离成熟还尚待时日。而同时另外一种在最近几年广受欢迎的自动化解决方案-RPA&#xff08;机器人流程自动化&#xff09;也在LLM时代不断…

类和对象(友元、运算符重载、继承、多态)---C++

类和对象 4.友元4.1全局函数做友元4.2类做友元4.3成员函数做友元 5.运算符重载5.1 加号运算符重载5.1.1成员函数实现运算符重载5.1.2全局函数实现运算符重载 5.2 左移运算符重载5.2.1全局函数实现运算符重载5.2.2成员函数实现运算符重载 5.3 递增/递减运算符重载5.3.1 前置5.3.…

将vue组件发布成npm包

文章目录 前言一、环境准备1.首先最基本的需要安装nodejs&#xff0c;版本推荐 v10 以上&#xff0c;因为需要安装vue-cli2.安装vue-cli 二、初始化项目1.构建项目2.开发组件/加入组件3. 修改配置文件 三、调试1、执行打包命令2、发布本地连接包3、测试项目 四、发布使用1、注册…

德州仪器(TI):市场形势仍不明朗

TI作为模拟芯片大厂龙头&#xff0c;客户超过100,000家&#xff0c;产品上千万种&#xff0c;前10大客户占公司营收5%&#xff0c;前100大产品占公司营收0.1%。客户群庞大且拥有半导体业界最广的产品范围。因此&#xff0c;TI的市场行情展望对整个产业具参考价值。 根据TI公布…

Mediasoup Demo-v3笔记(一)——框架和Nodejs的基本语法

Medisasop Demo的框架 Nodejs基本语法 后记   个人总结&#xff0c;欢迎转载、评论、批评指正

SSH 解析 | 关键参数 | 安全配置

介绍 SSH&#xff08;Secure Shell&#xff09;是一种用于在计算机网络上进行安全远程访问和执行命令的协议。提供加密通信通道&#xff0c;防止敏感信息在传输过程中被窃听或篡改。SSH还支持文件传输和端口转发等功能&#xff0c;使其成为广泛使用的安全远程管理工具。 1. 安…

使用POI生成word文档的table表格

文章目录 使用POI生成word文档的table表格1. 引入maven依赖2. 生成table的两种方式介绍2.1 生成一行一列的table2.2 生成固定行列的table2.3 table合并列2.4 创建多个table存在的问题 使用POI生成word文档的table表格 1. 引入maven依赖 <dependency><groupId>org.…

“探索C语言操作符的神秘世界:从入门到精通的全方位解析“

各位少年&#xff0c;我是博主那一脸阳光&#xff0c;今天来分享深度解析C语言操作符&#xff0c;C语言操作符能帮我们解决很多逻辑性的问题&#xff0c;减少很多代码量&#xff0c;就好比数学的各种符号&#xff0c;我们现在深度解剖一下他们。 前言 在追求爱情的道路上&…

深入浅出AI落地应用分析:AI视频生成Top 5应用

接下俩会每周集中体验一些通用或者垂直的AI落地应用&#xff0c;主要以一些全球或者国外国内排行较前的产品为研究对象&#xff0c;「AI 产品榜&#xff1a; aicpb.com」以专题的方式在博客进行分享。 一、Loom 二、Runway 产品链接&#xff1a;https://app.runwayml.com/ …

ubuntu 22.04 安装mysql-8.0.34

ubuntu 22.04 安装mysql-8.0.34 1、基础安装配置 更新软件包&#xff1a; sudo apt update查看可用软件包&#xff1a; sudo apt search mysql-server安装最新版本&#xff1a; sudo apt install -y mysql-server或者&#xff0c;安装指定版本&#xff1a; sudo apt inst…

【Python程序开发系列】并发执行协程任务超时的解决方案(案例分析)

一、问题 假如我在利用协程并发执行任务的时候&#xff0c;会出现有些任务特别耗时&#xff0c;从而导致程序运行卡住&#xff0c;我们想跳过这些执行特别耗时的任务&#xff0c;只返回不超时的任务结果该怎么解决&#xff1f; 二、实现过程 2.1 情景 假如我有四个任务需要并…

MySQL--删除数据表(6)

MySQL中删除数据表是非常容易操作的&#xff0c;但是你在进行删除表操作时要非常小心&#xff0c;因为执行删除命令后所有数据都会消失。 语法 以下为删除 MySQL 数据表的通用语法&#xff1a; DROP TABLE table_name ; -- 直接删除表&#xff0c;不检查是否存在 或 DROP…

力(FFT,acwing2313)

题目路径&#xff1a; https://www.acwing.com/problem/content/2315/ 思路&#xff1a;