Flink1.14 SourceReader概念入门讲解与源码解析 (三)

目录

SourceReader 概念

SourceReader 源码方法

void start();

InputStatus pollNext(ReaderOutput output) throws Exception;

List snapshotState(long checkpointId);

CompletableFuture isAvailable();

void addSplits(List splits);

参考


SourceReader 概念

SourceReader是一个运行在Task Manager上的组件,主要是负责读取 SplitEnumerator 分配的source split。

SourceReader 提供了一个拉动式(pull-based)处理接口。Flink任务会在循环中不断调用 pollNext(ReaderOutput) 轮询来自 SourceReader 的记录。 pollNext(ReaderOutput) 方法的返回值指示 SourceReader 的状态。

  • MORE_AVAILABLE - SourceReader 有可用的记录。
  • NOTHING_AVAILABLE - SourceReader 现在没有可用的记录,但是将来可能会有记录可用。
  • END_OF_INPUT - SourceReader 已经处理完所有记录,到达数据的尾部。这意味着 SourceReader 可以终止任务了。

pollNext(ReaderOutput) 会使用 ReaderOutput 作为参数,为了提高性能且在必要情况下, SourceReader 可以在一次 pollNext() 调用中返回多条记录。例如:有时外部系统的工作系统的工作粒度为块。而一个块可以包含多个记录,但是 source 只能在块的边界处设置 Checkpoint。在这种情况下, SourceReader 可以一次将一个块中的所有记录通过 ReaderOutput 发送至下游。

然而,除非有必要,SourceReader 的实现应该避免在一次 pollNext(ReaderOutput) 的调用中发送多个记录。这是因为对 SourceReader 轮询的任务线程工作在一个事件循环(event-loop)中,且不能阻塞。

在创建 SourceReader 时,相应的 SourceReaderContext 会提供给 Source,而 Source 则会将对应的上下文传递给 SourceReader 实例。 SourceReader 可以通过 SourceReaderContext 将 SourceEvent 传递给相应的 SplitEnumerator 。 Source 的一个典型设计模式是让 SourceReader 发送它们的本地信息给 SplitEnumerator,后者则会全局性地做出决定。

SourceReader API 是一个底层(low-level)API,允许用户自行处理分片,并使用自己的线程模型来获取和移交记录。为了帮助实现 SourceReader,Flink 提供了 SourceReaderBase 类,可以显著减少编写 SourceReader 所需要的工作量。

强烈建议连接器开发人员充分利用 SourceReaderBase 而不是从头开始编写 SourceReader

这里简单说一下,如何通过 Source 创建 DataStream ,有两种方法(感觉上没啥区别):

  • env.fromSource
  • env.addSource
// fromSource 这个返回的是source
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Source mySource = new MySource(....);DataStream<Integer> stream = env.fromSource(mySource,WatermarkStrategy.noWatermarks(),// 无水标"MySourceName");
..// addSource 这个返回的是Source function
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<..> stream = env.addSource(new MySource(...));

SourceReader 源码方法

void start();

判断是否有splits了,如果当前没有已经分配的splits了就发送请求获取。

/** Start the reader. */void start();// FileSourceReader的实现@Overridepublic void start() {// we request a split only if we did not get splits during the checkpoint restoreif (getNumberOfCurrentlyAssignedSplits() == 0) {context.sendSplitRequest(); // 发送split的读取请求给SplitEnumerator,在handleSplitRequest方法中被调用}}

InputStatus pollNext(ReaderOutput<T> output) throws Exception;

主要负责拉取下一个可读取的记录到SourceOutput,确保这个方法是非阻塞的,并且最好一次调用只输出一条数据。

/*** Poll the next available record into the {@link SourceOutput}.** <p>The implementation must make sure this method is non-blocking.** <p>Although the implementation can emit multiple records into the given SourceOutput, it is* recommended not doing so. Instead, emit one record into the SourceOutput and return a {@link* InputStatus#MORE_AVAILABLE} to let the caller thread know there are more records available.** @return The InputStatus of the SourceReader after the method invocation.*/InputStatus pollNext(ReaderOutput<T> output) throws Exception;// FileSourceReader读取数据的pollNext方法位于父类SourceReaderBase中
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {// make sure we have a fetch we are working on, or move to the next// 获取当前从fetcher中读取到的一批split// RecordsWithSplitIds代表了从fetcher拉取到SourceReader的数据// RecordsWithSplitIds可以包含多个split,但是对于FileRecords而言,只代表一个splitRecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;if (recordsWithSplitId == null) {// 如果没有,获取下一批splitrecordsWithSplitId = getNextFetch(output);if (recordsWithSplitId == null) {// 如果还没有获取到,需要检查后续是否还会有数据到来。return trace(finishedOrAvailableLater());}}// we need to loop here, because we may have to go across splitswhile (true) {// Process one record.// 从split中获取下一条记录final E record = recordsWithSplitId.nextRecordFromSplit();if (record != null) {// emit the record.// 如果获取到数据// 记录数量计数器加1numRecordsInCounter.inc(1);// 发送数据到Output// currentSplitOutput为当前split对应的下游output// currentSplitContext.state为reader的读取状态recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);LOG.trace("Emitted record: {}", record);// We always emit MORE_AVAILABLE here, even though we do not strictly know whether// more is available. If nothing more is available, the next invocation will find// this out and return the correct status.// That means we emit the occasional 'false positive' for availability, but this// saves us doing checks for every record. Ultimately, this is cheaper.// 总是发送MORE_AVAILABLE// 如果真的没有可用数据,下次调用会返回正确的状态return trace(InputStatus.MORE_AVAILABLE);} else if (!moveToNextSplit(recordsWithSplitId, output)) {// 如果本次fetch的split已经全部被读取(本批没有更多的split),读取下一批数据// The fetch is done and we just discovered that and have not emitted anything, yet.// We need to move to the next fetch. As a shortcut, we call pollNext() here again,// rather than emitting nothing and waiting for the caller to call us again.return pollNext(output);}// else fall through the loop}
}

getNextFetch方法获取下一批 split 。

@Nullable
private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {// 检查fetcher是否有错误splitFetcherManager.checkErrors();LOG.trace("Getting next source data batch from queue");// elementsQueue中缓存了fetcher线程获取的split// 从这个队列中拿出一批splitfinal RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();// 如果队列中没有数据,并且接下来这批split已被读取完毕,返回nullif (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {// No element available, set to available later if needed.return null;}// 更新当前的fetchcurrentFetch = recordsWithSplitId;return recordsWithSplitId;
}

finishedOrAvailableLater 方法检查后续是否还有数据,返回对应的状态。

private InputStatus finishedOrAvailableLater() {// 检查所有的fetcher是否都已关闭final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();// 如果reader不会再接收更多的split,或者所有的fetcher都已关闭// 返回NOTHING_AVAILABLE,将来可能会有记录可用。if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {return InputStatus.NOTHING_AVAILABLE;}if (elementsQueue.isEmpty()) {// 如果缓存队列中没有数据,返回END_OF_INPUT// We may reach here because of exceptional split fetcher, check it.splitFetcherManager.checkErrors();return InputStatus.END_OF_INPUT;} else {// We can reach this case if we just processed all data from the queue and finished a// split,// and concurrently the fetcher finished another split, whose data is then in the queue.// 其他情况返回MORE_AVAILABLEreturn InputStatus.MORE_AVAILABLE;}
}

moveToNextSplit 方法前往读取下一个split。

private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {// 获取下一个split的IDfinal String nextSplitId = recordsWithSplitIds.nextSplit();if (nextSplitId == null) {// 如果没获取到,则当前获取过程结束LOG.trace("Current fetch is finished.");finishCurrentFetch(recordsWithSplitIds, output);return false;}// 获取当前split上下文// Map<String, SplitContext<T, SplitStateT>> splitStates它保存了split ID和split的状态currentSplitContext = splitStates.get(nextSplitId);checkState(currentSplitContext != null, "Have records for a split that was not registered");// 获取当前split对应的output// SourceOperator在从SourceCoordinator获取到分片后会为每个分片创建一个OUtput,currentSplitOutput是当前分片的输出currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);LOG.trace("Emitting records from fetch for split {}", nextSplitId);return true;
}

List<SplitT> snapshotState(long checkpointId);

主要是负责创建 source 的 checkpoint 。

/*** Checkpoint on the state of the source.** @return the state of the source.*/List<SplitT> snapshotState(long checkpointId);public List<SplitT> snapshotState(long checkpointId) {List<SplitT> splits = new ArrayList();this.splitStates.forEach((id, context) -> {splits.add(this.toSplitType(id, context.state));});return splits;}

CompletableFuture<Void> isAvailable();

     /*** Returns a future that signals that data is available from the reader.** <p>Once the future completes, the runtime will keep calling the {@link* #pollNext(ReaderOutput)} method until that methods returns a status other than {@link* InputStatus#MORE_AVAILABLE}. After that the, the runtime will again call this method to* obtain the next future. Once that completes, it will again call {@link* #pollNext(ReaderOutput)} and so on.** <p>The contract is the following: If the reader has data available, then all futures* previously returned by this method must eventually complete. Otherwise the source might stall* indefinitely.** <p>It is not a problem to have occasional "false positives", meaning to complete a future* even if no data is available. However, one should not use an "always complete" future in* cases no data is available, because that will result in busy waiting loops calling {@code* pollNext(...)} even though no data is available.** @return a future that will be completed once there is a record available to poll.*/// 创建一个future,表明reader中是否有数据可被读取// 一旦这个future进入completed状态,Flink一直调用pollNext(ReaderOutput)方法直到这个方法返回除InputStatus#MORE_AVAILABLE之外的内容// 在这之后,会再次调isAvailable方法获取下一个future。如果它completed,再次调用pollNext(ReaderOutput)。以此类推public CompletableFuture<Void> isAvailable() {return this.currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : this.elementsQueue.getAvailabilityFuture();}

void addSplits(List<SplitT> splits);

    /*** Adds a list of splits for this reader to read. This method is called when the enumerator* assigns a split via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)} or {@link* SplitEnumeratorContext#assignSplits(SplitsAssignment)}.** @param splits The splits assigned by the split enumerator.*/// 添加一系列splits,以供reader读取。这个方法在SplitEnumeratorContext#assignSplit(SourceSplit, int)或者SplitEnumeratorContext#assignSplits(SplitsAssignment)中调用void addSplits(List<SplitT> splits);

其中,SourceReaderBase类的实现,fetcher的作用是从拉取split缓存到SourceReader中。

@Override
public void addSplits(List<SplitT> splits) {LOG.info("Adding split(s) to reader: {}", splits);// Initialize the state for each split.splits.forEach(s ->splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));// Hand over the splits to the split fetcher to start fetch.splitFetcherManager.addSplits(splits);
}

addSplits 方法将fetch任务交给 SplitFetcherManager 处理,它的 addSplits 方法如下:

@Override
public void addSplits(List<SplitT> splitsToAdd) {// 获取正在运行的fetcherSplitFetcher<E, SplitT> fetcher = getRunningFetcher();if (fetcher == null) {// 如果没有,创建出一个fetcherfetcher = createSplitFetcher();// Add the splits to the fetchers.// 将这个创建出的fetcher加入到running fetcher集合中fetcher.addSplits(splitsToAdd);// 启动这个fetcherstartFetcher(fetcher);} else {// 如果获取到了正在运行的fetcher,调用它的addSplits方法fetcher.addSplits(splitsToAdd);}
}

最后我们查看SplitFetcheraddSplits方法:

public void addSplits(List<SplitT> splitsToAdd) {// 将任务包装成AddSplitTask,通过splitReader兼容不同格式数据的读取方式// 将封装好的任务加入到队列中enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));// 唤醒fetcher任务,使用SplitReader读取数据// Split读取数据并缓存到elementQueue的逻辑位于FetcherTask,不再具体分析wakeUp(true);
}

参考

数据源 | Apache Flink

Flink 源码之新 Source 架构 - 简书

Flink新Source架构(下) - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/108939.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用UniApp实现视频数组自动下载与播放功能:一步步指导

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

类加载的过程总结以及双亲委派模型[JVM]

类加载过程 类一共有七个生命周期:加载->验证->准备->解析->初始化->使用->卸载 加载&#xff08;加载字节码文件&#xff0c;生成.class对象&#xff09; 加载是类加载的第一个阶段。 加载阶段的任务是在类文件从磁盘加载到内存中&#xff0c;通常是从cl…

Aroid问题笔记 - ViewPager嵌套RecyclerView,降低ViewPager灵敏度

点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册点击跳转>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&…

【网络协议】聊聊DHCP和PXE 工作原理

DHCP 动态主机配置协议 对于每个主机来说&#xff0c;只要连接了网络&#xff0c;那么就会配置一个IP地址&#xff0c;那么这个IP地址&#xff0c;如果是手动配置的话&#xff0c;对于公司内部的人员来说都要找IT进行配置&#xff0c;这个太浪费人力物力了&#xff0c;所以解决…

React18入门(第四篇)——React中的4种CSS使用方式,CSS Module、CSS-in-Js详解

文章目录 一、普通方式使用CSS1.1 元素内联 style1.2 引入 CSS 文件1.3 类名插件 -- Classnames1.4 注意事项 二、CSS Module2.1 普通 CSS 的问题2.2 CSS Module 的特点2.3 简单使用 三、使用 sass3.1 sass 简介3.2 使用 四、CSS-in-JS4.1 CSS-in-JS 简介4.2 CSS-in-JS 常用工具…

【JVM】对象内存布局

对象内存布局 文章目录 对象内存布局1. 对象的内存布局2. 对象标记(Mark Word)3. 类元信息(类型指针)4. 实例数据和对象填充 1. 对象的内存布局 在Hotspot虚拟机里&#xff0c;对象在堆内存中的存储布局可以划分为三个部分&#xff1a;对象头(Header)、实例数据(Instance Data…

SpringBoot面试题5:SpringBoot Starter的工作原理是什么?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:SpringBoot Starter的工作原理是什么? Spring Boot Starter 是一种便捷的方式来为 Spring Boot 应用程序引入一组特定功能的依赖项。它简化了项目…

SparkContext 与 SparkContext 之间的区别是什么

SparkContext 是 Spark 的入口点&#xff0c;它是所有 Spark 应用程序的主要接口&#xff0c;用于创建 RDD、累加器、广播变量等&#xff0c;并管理与 Spark 集群的连接。在一个 Spark 应用程序中只能有一个 SparkContext。 而 SparkSession 是 Spark 2.0 新增的 API&#xff0…

7-13 p070找出全部子串位置

7-13 p070找出全部子串位置 分数 5 作者 吴敏华 单位 首都师范大学 输入两个串s1,s2&#xff0c;找出s2在s1中所有出现的位置。 前后两个子串的出现不能重叠。例如’aa’在 aaaa 里出现的位置只有0,2 输入格式: 第一行是整数n 接下来有n行&#xff0c;每行两个不带空格的字符…

简述快速失败(fail-fast)和安全失败(fail-safe)的区别 ?

1&#xff1a;快速失败&#xff08;fail-fast&#xff09;: 在用迭代器遍历一个集合对象时&#xff0c;如果遍历过程中对集合对象的内容进行了修改&#xff08;增加、删除、修改&#xff09;&#xff0c;则会抛出Concurrent Modification Exception。 原理&#xff1a;迭代器在…

Kotlin注释

一、设置注释样式 按需配置 二、单行多行注释 fun main() {// 单行注释println("单行注释") //单行注释/** 多行注释* */println("多行注释") }

c++ fstream 文件追加模式

目录 c 覆盖模式&#xff1a; c 追加模式&#xff1a; c 覆盖模式&#xff1a; #include <fstream>int main() {std::ofstream file("example.txt");if (file.is_open()) {file << "Hello, World!";file.close();}return 0; }在这个例子中&a…

Hive引擎MR、Tez、Spark

Hive引擎包括&#xff1a;默认MR、Tez、Spark 不更换引擎hive默认的就是MR。 MapReduce&#xff1a;是一种编程模型&#xff0c;用于大规模数据集&#xff08;大于1TB&#xff09;的并行运算。 Hive on Spark&#xff1a;Hive既作为存储元数据又负责SQL的解析优化&#xff0…

python中matrix()矩阵和array()数组(待完善)

参考&#xff1a;python矩阵中matrix()和array()函数区别-CSDN博客 区别&#xff1a; 维度&#xff1a;ndarray可以是多维的&#xff0c;包括1D、2D、3D等&#xff0c;而matrix只能是2维的&#xff0c;也就是矩阵。数据类型&#xff1a;ndarray的数据类型可以不一致&#xf…

ELK + Filebeat 分布式日志管理平台部署

ELK Filebeat 分布式日志管理平台部署 1、前言1.1日志分析的作用1.2需要收集的日志1.3完整日志系统的基本特征 2、ELK概述2.1ELK简介2.2为什么要用ELK?2.3ELK的组件 3、ELK组件详解3.1Logstash3.1.1简介3.1.2Logstash命令常用选项3.1.3Logstash 的输入和输出流3.1.4Logstash配…

【LeetCode】34. 在排序数组中查找元素的第一个和最后一个位置

1 问题 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返回 [-1, -1]。 你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。 示例 1&a…

Qt入门之深入了解QWidget类

文章目录 一、QWidget简介一、QWidget的基本特性&#xff1a;1.1 绘图功能1.2 事件处理1.3布局管理 三、QWidget的子类1. QMainWindow&#xff08;主窗口类&#xff09;2. QPushButton&#xff08;按钮类&#xff09;&#xff1a;3. QLabel&#xff08;标签类&#xff09;&…

密码学三 btc 钱包 节点 挖矿 51%攻击 双花攻击

03-BTC-数据结构_哔哩哔哩_bilibili 哈希指针并解释 比特币的每个区块都包含一个区块头和区块体两部分。 在区块头中,有一个字段是用于存储前一个区块的哈希值,我们把这个存储前一个区块哈希值的字段称为“哈希指针”。 这个哈希指针的作用是将本区块指向前一个区块,连接起整…

CentOS有IP地址,连接不上Xshell或使用Xshell时突然断开

问题原因&#xff1a;未在电脑主机的网络中进行IP地址配置 解决办法&#xff1a; 1.打开控制面板&#xff0c;选择‘网络与共享中心’ 2.选择“更改适配器设置” 3.右键点击以太网3“属性” 4.选择协议版本4&#xff0c;点击属性 5.IP地址填写CentOS的IP地址&#xff1a;192.…

Epoch、批量大小、迭代次数

梯度下降 它是 机器学习中使用的迭代 优化算法&#xff0c;用于找到最佳结果&#xff08;曲线的最小值&#xff09;。 坡度 是指 斜坡的倾斜度或倾斜度 梯度下降有一个称为 学习率的参数。 正如您在上图&#xff08;左&#xff09;中看到的&#xff0c;最初步长较大&#…