源码解析Flink源节点数据读取是如何与checkpoint串行执行

文章目录

        • 源码解析Flink源节点数据读取是如何与checkpoint串行执行
          • Checkpoint阶段
            • StreamTask类变量actionExecutor的实现和初始化
            • 小结
          • 数据读取阶段
            • 小结
          • 总结

源码解析Flink源节点数据读取是如何与checkpoint串行执行

Flink版本:1.13.6

前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。

本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据

Checkpoint阶段

Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下

// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics )throws Exception {if (isRunning) {//使用actionExecutor 同步触发checkpointactionExecutor.runThrowing(() -> {....//经过一系列检查subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {....}}

从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行

StreamTask类变量actionExecutor的实现和初始化

StreamTask类变量actionExecution的实现

通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁

  /*** All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another* thread) must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/
private final StreamTaskActionExecutor actionExecutor;class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}

StreamTask变量actionExecution初始化

actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示

//   SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {//调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象super(env,null,FatalExitExceptionHandler.INSTANCE,//初始化actionExecutorStreamTaskActionExecutor.synchronizedExecutor(lock));//将lock对象赋值给类变量lockthis.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}//  StreamTask的方法
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,//初始化actionExecutorStreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));
}protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);//初始化actionExecutorthis.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);.......}
小结

actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行

数据读取阶段

在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时

@Override
public void run(SourceContext<String> ctx) throws Exception {int i = 0;while (true) {//在这个方法里处理ctx.collect(String.valueOf(i));}
}

点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下

@Override
public void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}
}

所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?

通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {....case ProcessingTime://初始化NonTimestampContextctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}

向上追踪,在StreamSource类中调用getSourceContext:

public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain)throws Exception {....this.ctx =StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);....}
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final OperatorChain<?, ?> operatorChain)throws Exception {run(lockingObject, streamStatusMaintainer, output, operatorChain);
}//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {try {// 使用的是类变量lockmainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);if (!wasStoppedExternally && !isCanceled()) {synchronized (lock) {operatorChain.setIgnoreEndOfInput(false);}}completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}
}
小结

所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象

总结

flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三天从零快速入门React

前言 React 官网文档比较完善&#xff0c;本文更注重结合实际项目中常见的问题&#xff0c;来介绍 React 的用法 Fun Facts ReactVueAngularNPM weekly downloads &#xff08;由于 cnpm 无法查看包&#xff0c;数据不全&#xff09;12,635,9662,662,666823,653Dependents59…

AI深度学习部署全记录

AI部署流程&#xff0c;以PyTorch为例&#xff1a; 1.Torch.Model->ONNX->ONNXSIM->TensortRT->落地 2.Torch.Model->Pt->ONNX->ONNXRunTime->落地 3.Torch.Model->Pt->Libtorch->落地 4.Torch.Model->PNNX->TensorRT->落地 5.…

sql刷题

文章目录 section A1 各部门工资最高的员工&#xff08;难度&#xff1a;中等&#xff09;2 换座位&#xff08;难度&#xff1a;中等&#xff09;3 分数排名&#xff08;难度&#xff1a;中等&#xff09;4 连续出现的数字&#xff08;难度&#xff1a;中等&#xff09;5 树节…

GD32F103VE串口中断发送和接收

GD32F103VE串口中断发送和接收&#xff0c;本程序基于RS485完成测试&#xff0c;实现将收到的数据&#xff0c;再发送出去。 #include "USART1_Interrupt.h" #include "stdio.h" //getchar(),putchar(),scanf(),printf(),puts(),gets(),sprintf() #inclu…

什么是 API 安全?学习如何防止攻击和保护数据

随着 API 技术的普及&#xff0c;API 安全成为了一个越来越重要的问题。本文将介绍什么是 API 安全&#xff0c;以及目前 API 面临的安全问题和相应的解决方案。 什么是 API 安全 API 安全是指保护 API 免受恶意攻击和滥用的安全措施。API 安全通常包括以下几个方面&#xff1…

Zabbix监控华为交换机DHCP接口地址池

一、背景 最近工作中遇到一个因为DHCP地址池满载、导致用户无法获取到IP地址的故障&#xff0c;所以在想通过zabbix 监控DHCP地址池的状态、当DHCP 地址池数量小于某个值时触发zabbix告警。 网上找了一下没有相关的文档、和对应的OID值、于是用Python 脚本的方式实现 二、实现效…

电视盒子哪个牌子好?拆机达人揭晓电视盒子品牌排行榜

老赵每天会对各种类型的数码产品进行拆机&#xff0c;对硬件、品控这块非常熟悉&#xff0c;近期很多朋友问我电视盒子哪个牌子好&#xff0c;我整理了目前市面上硬件、软件都表现不错的电视盒子品牌排行榜&#xff0c;看看目前最值得入手的电视盒子都有哪些。 第一&#xff1a…

无涯教程-Perl - getnetent函数

描述 此函数从/etc/networks文件获取下一个条目,返回-($name,$aliases,$addrtype,$net) 如果/etc/networks文件为空,则它将不返回任何内容,并且调用将失败。 语法 以下是此函数的简单语法- getnetent返回值 此函数在错误时返回undef,否则在标量context中返回网络地址,在错…

高质量api接口对接及Python示例代码

当我们需要将不同系统或服务进行对接时&#xff0c;接口对接是一种常见的解决方案。我将介绍如何使用Python进行接口对接&#xff0c;并提供示例代码。 首先&#xff0c;我们需要导入Python的requests库&#xff0c;它是一个常用的HTTP请求库&#xff0c;可以方便地发送HTTP请求…

第九次作业

1. SSL工作过程是什么&#xff1f; 当客户端向一个 https 网站发起请求时&#xff0c;服务器会将 SSL 证书发送给客户端进行校验&#xff0c;SSL 证书中包含一个公钥。校验成功后&#xff0c;客户端会生成一个随机串&#xff0c;并使用受访网站的 SSL 证书公钥进行加密&#xf…

提升城市管理效率,软件机器人助力自动化处理投诉、建议、举报

在现代城市管理中&#xff0c;市民的投诉、建议和举报等事项是不可忽视的重要环节。然而&#xff0c;传统的处理方式往往需要大量的人力和时间&#xff0c;效率较低。为了提升城市管理部门的服务质量和效率&#xff0c;引入软件机器人成为一种可行的选择。 博为小帮软件机器人可…

Python京东商品详情页数据采集方法,京东 API 接口介绍

京东详情接口 API 是开放平台提供的一种 API 接口&#xff0c;它可以帮助开发者获取商品的详细信息&#xff0c;包括商品的标题、描述、图片等信息。在电商平台的开发中&#xff0c;详情接口 API 是非常常用的 API&#xff0c;因此本文将详细介绍详情接口 API 的使用。 一、京…

运算符重载---1

运算符重载---1 //运算符重载//内置类型可以直接使用运算符运算&#xff0c;编译器知道要如何运算。 //但自定义类型无法直接使用运算符&#xff0c;因为编译器不知道要如何运算。如果想支持&#xff0c;自己实现运算符重载即可。// C为了增强 代码的可读性 引入了运算符重载&a…

手搓单链表

文章目录 前言一、链表和顺序表的区别二、什么是单链表单链表分类单链表的结构 三、带头不循环单链表1.单链表的结构体2.带头不循环单链表的初始化和销毁3.带头不循环单链表的头插&#xff0c;尾插和打印4.带头不循环单链表的头删和尾删5.带头不循环单链表的查找&#xff0c;指…

进程通信常见方式

目录 通信通信概述 通信的主要方式 进程同步机制--低级进程通信 高级通信工具 共享存储器系统(Shared-Memory System&#xff09; 管道(pipe)通信系统 客户机-服务器系统(Client-Server system)---套接字&#xff08;Socket&#xff09; 客户机-服务器系统(Client-Serv…

国内什么牌子的ipad手写笔好用?适合绘画电容笔推荐

对于那些想要用ipad来学习的人来说&#xff0c;苹果Pencil是必不可少的。但是&#xff0c;Apple Pencil的价格真的太贵了&#xff0c;以至于很多人都买不起。所以&#xff0c;最好的办法就是选用一支平替的电容笔。本人从前几年就开始使用iPad&#xff0c;同时本身也是一位数码…

冠达管理:创新药概念强势拉升,康希诺大涨超15%

立异药概念9日盘中强势拉升&#xff0c;到发稿&#xff0c;昊帆生物“20cm”涨停&#xff0c;康希诺大涨超15%&#xff0c;翰宇药业涨近13%&#xff0c;德展健康涨停&#xff0c;泰格医药、药石科技涨超7%。 康希诺昨日晚间公告&#xff0c;8月7日&#xff0c;公司与 AstraZene…

【三维重建】【深度学习】windows10下instant-nsr-pl代码Pytorch实现

【三维重建】【深度学习】windows10下instant-nsr-pl代码Pytorch实现 提示:基于 Instant-NGP 和 Pytorch-Lightning 框架的神经表面重建 文章目录 【三维重建】【深度学习】windows10下instant-nsr-pl代码Pytorch实现前言instant-nsr-pl模型运行下载源码并安装环境训练instant-…

那些没人教你的Jmeter 循环断言,百度不到的,收藏一下吧

前言 对于使用jmeter工具完成接口测试的测试工程师而言。在工作中&#xff0c;或者在面试中&#xff0c;都会遇到一个问题。 CSV文档做了一大笔测试数据后&#xff0c;怎么去校验这个结果呢&#xff1f; 现在大部分测试工程师可能都是通过人工的方法去查看结果&#xff0c;十几…

[国产MCU]-BL602开发实例-ADC数据采样

ADC数据采样 文章目录 ADC数据采样1、ADC介绍2、ADC驱动API3、ADC使用示例模数转换器(analog-to-digital converter,通常称为ADC)是一种模拟与数字转换器,支持12路外部模拟输入和若干内部模拟信号选择。 BL602的ADC支持以下四种模式:单次单通道转换、连续单通道转换、单次…