StreamTask数据流:StreamTask能力概述、Flink处理网络数据逻辑

文章目录

    • 一. StreamTask核心组件与能力
    • 二. OneInputStreamTask接入网络数据并处理
    • 三. 处理数据
      • 1. StreamElement类别
      • 2. 业务数据处理逻辑
    • 四. 小结

先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的,这里以OneInputStreamTask为例进行说明。

一. StreamTask核心组件与能力

如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。

OneInputStreamTask
public void init() throws Exception {StreamConfig configuration = getConfiguration();int numberOfInputs = configuration.getNumberOfInputs();if (numberOfInputs > 0) {// 创建CheckpointedInputGateCheckpointedInputGate inputGate = createCheckpointedInputGate();TaskIOMetricGroup taskIOMetricGroup = getEnvironment().getMetricGroup().getIOMetricGroup();taskIOMetricGroup.gauge("checkpointAlignmentTime", inputGate::getAlignmentDurationNanos);// 创建DataOutput组件DataOutput<IN> output = createDataOutput();StreamTaskInput<IN> input = createTaskInput(inputGate, output);// 创建StreamOneInputProcessorinputProcessor = new StreamOneInputProcessor<>(input,output,getCheckpointLock(),operatorChain);}headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  1. 创建CheckpointedInputGate:CheckpointedInputGate是对InputGate进行封装,实现对CheckpointBarrier对齐的功能。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。
  2. 创建DataOutput组件:在StreamTaskInput中会将 接入的数据 通过DataOutput组件输出到算子链的HeaderOperator中。
  3. 创建StreamTaskInput组件:用于接收数据,将InputGate和DataOutput作为内部成员,完成对数据的接入和输出。
  4. 创建StreamOneInputProcessor数据处理器:此组件会被Task线程模型调度并执行,实现周期性地从StreamTaskInput组件中读取数据元素并处理。

小结:

OneInputStreamTask初始化过程中,包括创建StreamTaskInput和DataOutput组件。

 
 
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。

二. OneInputStreamTask接入网络数据并处理

StreamTask.processInput()方法定义了处理数据的主要流程。

  1. 数据最终会通过MailboxProcessor调度与执行
  2. 调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理
  3. 调度StreamOneInputProcessor组件,串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件,最终完成数据元素的处理操作。
StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {InputStatus status = inputProcessor.processInput();// 上游如果还有数据,则继续等待执行if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}// 上游如果没有数据,则发送控制消息到控制器if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();jointFuture.thenRun(suspendedDefaultAction::resume);
}

 
接下来详细看StreamOneInputProcessor.processInput()

emitNext():通过StreamTaskNetworkInput接收数据元素,并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部,用于将数据元素输出到算子链中

public InputStatus processInput() throws Exception {InputStatus status = input.emitNext(output);if (status == InputStatus.END_OF_INPUT) {synchronized (lock) {operatorChain.endHeadOperatorInput(1);}}return status;
}

StreamTaskNetworkInput.emitNext():处理数据逻辑。


//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型,
//比如CheckpointBarrier、TaskEvent等事件。public InputStatus emitNext(DataOutput<T> output) throws Exception {while (true) {// 从Deserializer中获取数据元素if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);// 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}// 如果result是完整的数据元素,则调用processElement()方法进行处理if (result.isFullRecord()) {processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// 从checkpointedInputGate中拉取数据//如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();// 如果有数据则调用processBufferOrEvent()方法进行处理if (bufferOrEvent.isPresent()) {processBufferOrEvent(bufferOrEvent.get());} else {// 如果checkpointedInputGate已关闭,则返回END_OF_INPUTif (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");if (!checkpointedInputGate.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}
}

 

三. 处理数据

1. StreamElement类别

StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。

//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {// StreamRecord类型if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());// Watermark类型} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);// LatencyMarker类型} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());// StreamStatus类型} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}
}

 

2. 业务数据处理逻辑

对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链中进行处理。

如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator

OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {synchronized (lock) {//累加器计算消费数量numRecordsIn.inc();//通过算子链处理operator.setKeyContextElement1(record);operator.processElement(record);}
}

 

四. 小结

Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。

 
 
《Flink设计与实现:核心原理与源码解析》 – 张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/722075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

go 程序被意外kill后出现僵尸进程解决方案

go 管理自身子进程(防止僵尸进程出现) 写这篇文章是因为最近有同事竟然会知道异步启动子进程&#xff0c;不会关闭&#xff0c;最后导致导致僵尸进程出现&#xff0c;而且由于子进程会随着业务的使用越开越多&#xff0c;主进程一旦被kill掉就会不得不手动一个一个kill。 大概…

AWS虚拟机迁移到Azure上的实战操作

将一台虚拟机从AWS迁移到Azure涉及几个关键步骤,包括准备工作、虚拟机的备份与导出、格式转换、上传到Azure以及在Azure上创建新的虚拟机实例。以下是详细的步骤和示例: 一、 准备阶段 Azure和AWS的免费账户请参考下面的链接: 想学习云计算么?教你如何免费白嫖微软和AWS…

004-CSS-左右经典布局

左右经典布局 方案一&#xff1a;弹性盒子布局方案二&#xff1a;绝对定位 padding方案三&#xff1a;绝对定位 margin方案四&#xff1a;行内块布局 calc方案五&#xff1a;浮动 BFC 方案一&#xff1a;弹性盒子布局 &#x1f4a1; Tips&#xff1a;左侧子盒子宽度固定&a…

Python并发编程:协程-greenlet模块

一 greenlet模块 如果我们在单个线程内有20个任务&#xff0c;要想实现在多个任务之间切换&#xff0c;使用yield生成器的方式过于麻烦&#xff08;需要先得到初始化一次的生成器&#xff0c;然后再调用send。。。非常麻烦&#xff09;&#xff0c;而使用greenlet模块可以非常简…

鸿蒙Harmony应用开发—ArkTS声明式开发(通用属性:多态样式)

设置组件不同状态下的样式。 说明&#xff1a; 从API Version 8开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 从API Version 11开始支持另一种写法attributeModifier&#xff0c;可根据开发者需要动态设置属性。 stateStyles stateStyl…

16:00面试,16:06就出来了,问的问题过于变态了。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到2月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40%…

微信报修小程序源码

源码获取方式&#xff1a; 1、搜一搜 万能工具箱合集 然后点击资料库&#xff0c;即可获取资源 一、先看Demo&#xff08;已更新至4.0.0&#xff09; 想看界面图片的&#xff0c;辛苦你爬一下楼&#xff0c;点击下方查看资源&#xff0c;进入官方demo 二、功能介绍 1、当前版…

什么是AJAX?它的运用场景有哪些?

文章目录 前言一、什么是AJAX二、AJAX原理是什么三、为什么需要AJAX四、AJAX的使用五、AJAX的应用场景 前言 AJAX 即 Asynchronous Javascript And XML&#xff08;异步JavaScript和XML&#xff09;&#xff0c;是指一种创建交互式网页应用的网页开发技术。 AJAX 是一种用于创…

LLM(十一)| Claude 3:Anthropic发布最新超越GPT-4大模型

2024年3月4日&#xff0c;Anthropic发布最新多模态大模型&#xff1a;Claude 3系列&#xff0c;共有Haiku、Sonnet和Opus三个版本。 Opus在研究生水平专家推理、基础数学、本科水平专家知识、代码等10个维度&#xff0c;超过OpenAI的GPT-4。 Haiku模型更注重效率&#xff0c;能…

正则表达式判断IP地址(python)

正则表达式判断IP地址&#xff08;python&#xff09; 分情况&#xff1a; 1位数&#xff1a;0 | 1-9 2位数&#xff1a;[1-9][0-9] 3位数&#xff1a; 1开头&#xff1a;1[0-9]{2} 2开头&#xff1a;2[0-4][0-9] | 25[0-5] 1、2位数&#xff1a;0 | [1-9][0-9]? 3位数&#x…

稀碎从零算法笔记Day7-LeetCode:罗马数字转整数

题型&#xff1a;字符串转化、找规律 链接&#xff1a;13. 罗马数字转整数 - 力扣&#xff08;LeetCode&#xff09; 来源&#xff1a;LeetCode 题目描述 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 …

SpringCloud远程调用Feign

一&#xff0c;什么是Feign Feign是一个声明式的http客户端底层还是基于HTTP实现&#xff0c;是SpringCloud的核心组件之一&#xff0c;实现了微服务之间的远程调用。 二&#xff0c;Feign的使用步骤 1&#xff0c;引入依赖 引入Spring Cloud start Feign的依赖 &#xff0c…

关于我使用numpy.random.choice()遇到坑这件事

做仿真时经常使用到随机数&#xff0c;下面是一个场景&#xff1a;使用np.random.choice([0,1],p[0.5,0.5],size1)去进行随机的二选一&#xff0c;假设需要随机选择1000次&#xff0c;为了保证结果的稳健性&#xff0c;对前述过程重复50次&#xff0c;为了保证可复现性&#xf…

collection及迭代遍历

Collection是单列集合的祖宗接口&#xff0c;它的功能是全部单列集合都可以继承使用的。 package myCollection;import java.util.ArrayList; import java.util.Collection;public class A01CollectionDemo1 {public static void main(String[] args) {//collection是一个接口…

《PyTorch深度学习实践》第十一讲卷积神经网络进阶

一、 1、卷积核超参数选择困难&#xff0c;自动找到卷积的最佳组合。 2、1x1卷积核&#xff0c;不同通道的信息融合。使用1x1卷积核虽然参数量增加了&#xff0c;但是能够显著的降低计算量(operations) 3、Inception Moudel由4个分支组成&#xff0c;要分清哪些是在Init里定义…

基于springboot+vue的精简博客系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

Nginx配置文件的整体结构

一、Nginx配置文件的整体结构 从图中可以看出主要包含以下几大部分内容&#xff1a; 1. 全局块 该部分配置主要影响Nginx全局&#xff0c;通常包括下面几个部分&#xff1a; 配置运行Nginx服务器用户&#xff08;组&#xff09; worker process数 Nginx进程PID存放路径 错误…

Linux 防火墙 操作命令【实用】

防火墙操作&#xff1a; 描述命令查看防火墙状态systemctl status firewalld、firewall-cmd --state暂时关闭防火墙systemctl stop firewalld永久关闭防火墙systemctl disable firewalld开启防火墙systemctl start firewalld开放指定端口firewall-cmd --zonepublic --add-port…

Java集合5-HashSet

HashSet&#xff1a;基于哈希表实现的集合&#xff0c;用于存储不重复的元素。 HashSet<String> set new HashSet<>(); set.add("Item 1"); set.add("Item 2");HashSet 是一个不允许存储重复元素的集合&#xff0c;它的实现比较简单&#xf…

080|为什么阿里的价值观值得你关注?

在阿里巴巴20周年年会现场&#xff0c;万众瞩目之下&#xff0c;马云和张勇完成了阿里巴巴董事长职务的交接。 不过你也知道&#xff0c;这次接棒在一年前就已经公布了&#xff0c;在年会上只是一个仪式。在20周年年会过后&#xff0c;我找到了互联网圈的资深媒体人阳淼&#…