深入理解Flink Mailbox线程模型

文章目录

  • 整体设计
  • processMail
    • 1.Checkpoint Tigger
    • 2.ProcessingTime Timer Trigger
  • processInput
  • 兼容SourceStreamTask

整体设计

Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式,可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Timer的相关操作(Runnable任务),会以Mail的形式保存到Mailbox内的阻塞队列中。StreamTask在invoke阶段的runMailboxLoop时期,就会轮询Mailbox来处理队列中保存的Mail,Mail处理完毕后才会对DataStream上的数据元素执行处理逻辑。

MailboxProcessor的能力就是负责拉取、处理Mail,以及执行MailboxDefaultAction(默认动作,即processInput()方法中对DataStream上的普通消息的处理逻辑,包括:处理Event、barrier、Watermark等)

/*** 开始轮询Mailbox内的Mail,Checkpoint和ProcessingTime Timer的触发事件会以Runnable的形式(作为Mail)添加到Mailbox的队列中,等待“Mailbox线程”去处理*/
public void runMailboxLoop() throws Exception {// 获取最新的TaskMailbox:主要用于存储提交的Mail,并提供获取接口。// TaskMailbox有2个队列://        1.queue:阻塞队列,通过ReentrantLock控制队列中的读写操作//        2.batch:非阻塞队列,调用createBatch()方法会将queue中的Mail转存到batch中,这样读操作就能通过tryTakeFromBatch()方法从batch队列中批量获取Mail,且只能被Mailbox线程消费final TaskMailbox localMailbox = mailbox;// 检查当前线程是否为Mailbox线程,即StreamTask运行时所在的主线程Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");// 确认Mailbox的状态:必须为OPENassert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";// 创建MailboxController实例:可以控制Mailbox的循环、临时暂停和恢复MailboxDefaultAction(默认动作)final MailboxController defaultActionContext = new MailboxController(this);/*** 核心:事件循环* processMail()方法会检测Mailbox中是否还有Mail需要处理,新Mail会(在ReentrantLock的保护下)被添加到queue队列并转存到batch队列中。* MailboxProcessor处理完batch队列中的全部Mail后(执行作为Mail的Runnable#run()方法),才会进入到while循环内,执行MailboxDefaultAction的默认动作,* 即调用StreamTask#processInput()方法,对读取到的数据(Event、Barrier、Watermark等)进行处理*/while (processMail(localMailbox)) {mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed}
}

可以看出,对Mail和MailboxDefaultAction的处理,是由唯一的Mailbox线程负责的。

processMail

在while轮询时,首先会processMail

/*** 处理Mailbox中的Mail:Checkpoint、ProcessingTime Timer的触发事件,会以Runnable的形式作为Mail保存在Mailbox的queue队列中,* 并在ReentrantLock的保护下,将queue队列中的新Mail转移到batch队列中。MailboxProcessor会根据queue队列、batch队列内的Mail情况,* 决定处理Mail or processInput。只有当TaskMailbox内的所有Mail全都处理完毕后,MailboxProcessor才会去processInput()*/
private boolean processMail(TaskMailbox mailbox) throws Exception {/*** 新Mail写入queue队列,TaskMailbox会将queue队列中的新Mail转移到batch队列中,MailboxProcessor会根据queue队列、batch队列内的Mail情况,* 判断执行Mail的run() or processInput()。只有当TaskMailbox内的所有Mail全部处理完成后,MailboxProcessor才会去processInput()。*/if (!mailbox.createBatch()) {return true;}Optional<Mail> maybeMail;/*** 能走到这,说明queue队列中的Mail已被全部转移至batch队列。现在要从batch队列中获取到Mail并执行(它作为Runnable的run()方法),* 直到batch队列中的所有Mail全都处理完毕*/while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {maybeMail.get().run();}/**如果默认操作处于Unavailable状态,那就先阻塞住,直到它重新回归available状态*/while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {mailbox.take(MIN_PRIORITY).run();}// 返回Mailbox是否还在Loopreturn isMailboxLoopRunning();
}

很核心的一个点就是Mailbox要去createBatch,TaskMailboxImpl提供了具体的实现逻辑。Mailbox引入了2个队列,新Mail被add到Mailbox内的queue队列中(此过程受ReentrantLock保护)。同时为了减少读取queue队列时的同步开销,Mailbox还构建了一个batch队列专门用来后续消费(避免加锁操作)。

/*** 对Deque<Mail>队列的读写,通过ReentrantLock加以保护*/
private final ReentrantLock lock = new ReentrantLock();/*** Internal queue of mails.* 使用Deque(内部队列)保存所有的Mail*/
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();
/*** 为了减少读取queue队列所造成的同步开销,TaskMailbox会创建一个batch队列,queue队列中的Mail会被转移到batch队列中,* 有效避免了后续消费时的加锁操作*/
private final Deque<Mail> batch = new ArrayDeque<>();@Override
public boolean createBatch() {checkIsMailboxThread();/*** 如果queue队列中没有新Mail,那就要看batch队列是否为空。* 1.如果batch也是空的(Mailbox里已经没有任何Mail了,需要去processInput()了),那processMail()也会return true,* MailboxProcessor就会进入到while循环内部,执行processInput()来处理DataStream上的数据;* 2.如果batch不空,说明MailboxProcessor还需要继续processMail(),即取出Mail执行它(作为Runnable)的run()方法;* 由此可见,Mailbox中的batch队列中的Mail最终一定会被Mailbox线程消耗殆尽(轮询、处理),然后才会去processInput()*/if (!hasNewMail) { // 只要queue队列里还有Mail,hasNewMail就为truereturn !batch.isEmpty();}/**能走到这说明queue队列中仍有新Mail,接下来需要将它的新Mail向batch队列转移,该过程受ReentrantLock保护*/final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {Mail mail;/**每次循环都将queue队列中的First Mail,转移到batch队列中,直至queue队列被消耗殆尽。此时一定return true*/while ((mail = queue.pollFirst()) != null) {batch.addLast(mail);}// 此时queue队列内的所有Mail都被转移到batch队列中了,queue中没有新Mail了hasNewMail = false;// 此时根据batch队列是否为空,MailboxProcessor会判断执行Mail的run() or processInput()return !batch.isEmpty();} finally {// 最终释放锁lock.unlock();}
}

如果Mailbox内的queue队列中仍有新Mail,那就在ReentrantLock的加持下将queue内的Mail全都转移到batch队列中;如果Mailbox内的queue队列中没有新Mail,那就看batch队列的情况了。决断权交给外层的MailboxProcessor,总的来看:

  • 如果batch队列中有Mail,MailboxProcessor会从Mailbox内的batch队列中逐个pollFirst,然后执行(它作为Runnable#run()方法),直到batch队列中的所有Mail全都被“消耗殆尽”为止
  • 如果batch队列中没有Mail,MailboxProcessor此时就没有Mail可处理了,那就直接processInput

1.Checkpoint Tigger

对Checkpoint的触发,是通过MailboxExecutor向Mailbox提交Mail的

/*** 触发执行StreamTask中的Checkpoint操作:异步的通过MailboxExecutor,将“执行Checkpoint”的请求封装成Mail后,* 提交到TaskMailbox中,最终由MailboxProcessor来处理*/
@Override
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,boolean advanceToEndOfEventTime) {// 通过MailboxExecutor,将“触发执行Checkpoint”的具体逻辑封装成Mail,提交到Mailbox中,后期会被MailboxProcessor执行return mailboxProcessor.getMainMailboxExecutor().submit(// 触发Checkpoint的具体逻辑() -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),"checkpoint %s with %s",checkpointMetaData,checkpointOptions);
}

triggerCheckpoint操作会被封装成Mail,添加到Mailbox中等待被处理。

@Override
public void execute(@Nonnull final RunnableWithException command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(command, priority, actionExecutor, descriptionFormat, descriptionArgs));} catch (IllegalStateException mbex) {throw new RejectedExecutionException(mbex);}
}

当然,Checkpoint的完成操作,也是同样的套路。

2.ProcessingTime Timer Trigger

/*** 借助Mailbox线程模型,由MailboxExecutor负责将"ProcessingTime Timer触发的消息"封装成Mail提交到TaskMailbox中,后续由MailboxProcessor处理*/
public ProcessingTimeService getProcessingTimeService(int operatorIndex) {Preconditions.checkState(timerService != null, "The timer service has not been initialized.");MailboxExecutor mailboxExecutor = mailboxProcessor.getMailboxExecutor(operatorIndex);// 通过MailboxExecutor将Mail提交到Mailbox中等待处理return new ProcessingTimeServiceImpl(timerService, callback -> deferCallbackToMailbox(mailboxExecutor, callback));
}private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {return timestamp -> {mailboxExecutor.execute(() -> invokeProcessingTimeCallback(callback, timestamp),"Timer callback for %s @ %d",callback,timestamp);};
}

processInput

StreamInputProcessor会对输入的数据进行处理、输出,包含:StreamTaskInput + OperatorChain + DataOutput。每次processInput都相当于是在处理一个有界流(外层MailboxProcessor在不断地的轮询),处理完DataStream上的StreamRecord后,会返回InputStatus的枚举值,根据InputStatus值来决定下一步该“何去何从”。

/*** StreamTask的执行逻辑:处理输入的数据,返回InputStatus状态,并根据InputStatus决定是否需要结束当前Task。* 该方法会通过MailboxProcessor调度、执行(作为MailboxProcessor的默认动作),底层调用StreamInputProcessor#processInput()方法*/
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 核心:借助StreamInputProcessor完成数据的读取,并交给算子处理,处理完毕后会返回InputStatus。* 每次触发,相当于处理一个有界流,在外层Mailbox拉取Mail才是while循环无限拉取*/InputStatus status = inputProcessor.processInput();/*** case 1:上游如果还有数据 && RecordWriter是可用的,立即返回。意为:继续处理!*/if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}/*** case 2:当状态为END_OF_INPUT,说明本批次的有界流数据已经处理完毕,* 通过MailboxCollector来告诉Mailbox*/if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}/*** case 3:当前有界流中没有数据,但未来可能会有。此时处理线程会被挂起:直到有新的可用数据到来 && RecordWriter可用* 此时会先临时暂停对MailboxDefaultAction的处理,等啥时候又有新数据了,再重新恢复MailboxDefaultAction的处理。*/CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);// 通过MailboxCollector让Mailbox线程暂时停止对MailboxDefaultAction的处理MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();// 等啥时候又有了input、output,RecordWriter也变得可用了以后,再重新继续执行默认操作jointFuture.thenRun(suspendedDefaultAction::resume);
}

MailboxController是MailboxDefaultAction和Mailbox之间交互的桥梁,在StreamTask处理DataStream元素的过程中,会利用MailboxController将处理状态及时通知给Mailbox。如果这批有界流处理完毕,就会通过MailboxController通知Mailbox(本质就是向Mailbox发送一个Mail),进行下一轮的处理。

private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs));
}

兼容SourceStreamTask

作为DataStream Source是专门用来生产无界流数据的,并不能穿插兼顾Mailbox内Mail的检测。如果仅有一个线程生产无界流数据的话,那将永远无法检测Mailbox内的Mail。作为StreamTask的子类,SourceStreamTask会额外启动另一个独立的LegacySourceFunctionThread线程来执行SourceFunction中的循环(生产无界流),Mailbox线程(主线程)依然负责处理Mail和默认操作。

/*** 专门为Source源生产数据的线程*/
private class LegacySourceFunctionThread extends Thread {private final CompletableFuture<Void> completionFuture;LegacySourceFunctionThread() {this.completionFuture = new CompletableFuture<>();}@Overridepublic void run() {try {// CheckpointLock保证线程安全headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}}public void setTaskDescription(final String taskDescription) {setName("Legacy Source Thread - " + taskDescription);}CompletableFuture<Void> getCompletionFuture() {return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;}
}

负责为Source生产无界流数据的LegacySourceFunctionThread线程启动后,不管是启动成功 or 出现异常,都会封装对应的Mail并发送给Mailbox,而Mailbox线程的processMail一直在等待处理Mail。

/*** SourceStreamTask中,一个Thread负责专门生产无界流,另一个MailBox Thread处理Checkpoint、ProcessingTime Timer等事件Mail*/
@Override
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 通过MailboxDefaultAction.Controller告诉Mailbox:让MailboxThread先暂停处理MailboxDefaultAction。* TaskMailbox收到该消息后,就会在processMail()中一直等待并处理Mail(在MailboxThread中会一直处理Mail)*/controller.suspendDefaultAction();/**启动LegacySourceFunctionThread线程:专门生产Source无界流数据的,和MailboxThread线程一起运行*/sourceThread.setTaskDescription(getName());sourceThread.start();/**LegacySourceFunctionThread线程启动后,会通知Mailbox,Mailbox会在processMail()中一直等待并处理mail(不会返回,即Mailbox线程会一直处理mail)*/sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {/**LegacySourceFunctionThread线程启动过程中发生的任何异常、以及启动成功,都会以Mail的形式发送给Mailbox*/if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));} else if (!isFinished && sourceThreadThrowable != null) {mailboxProcessor.reportThrowable(sourceThreadThrowable);} else {mailboxProcessor.allActionsCompleted();}});
}

Mailbox主线程和LegacySourceFunctionThread线程线程都在运行,通过CheckpointLock锁来保证线程安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/43202.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

@Repeatable的作用以及具体如何使用

文章目录 1. 前言2. 先说结论3. 案例演示 1. 前言 最近无意看到某些注解上有Repeatable&#xff0c;出于比较好奇&#xff0c;因此稍微研究并写下此文章。 2. 先说结论 Repeatable的作用&#xff1a;使被他注释的注解可以在同一个地方重复使用。 具体使用如下&#xff1a; T…

CentOS7源码安装MySQL详细教程

&#x1f60a; 作者&#xff1a; Eric &#x1f496; 主页&#xff1a; https://blog.csdn.net/weixin_47316183?typeblog &#x1f389; 主题&#xff1a;CentOS7源码安装MySQL详细教程 ⏱️ 创作时间&#xff1a; 2023年08月014日 文章目录 1、安装的四种方式2、源码安装…

深度解析:DDoS攻击与先进防御策略

目录 DDoS 介绍 DDoS 攻击理论 DDoS 介绍 DDoS&#xff08;分布式拒绝服务&#xff09;攻击是一种恶意网络活动&#xff0c;旨在通过同时向目标系统发送大量请求或流量&#xff0c;使其无法正常运行或提供服务。攻击者通常利用网络上的多个计算机和设备&#xff0c;形成一个&…

极智嘉x吉利汽车 x京东物流,引领汽车行业智慧物流新变革!

近日&#xff0c;中国领先的汽车制造商吉利汽车携手中国领先的技术驱动的供应链解决方案及物流服务商京东物流、全球仓储机器人引领者极智嘉(Geek)&#xff0c;在西安吉利汽车制造基地RDC仓库率先落地SkyPick上存下拣解决方案&#xff0c;实现了全物流链精益化、智能化、一体化…

Spring-4-掌握Spring事务传播机制

今日目标 能够掌握Spring事务配置 Spring事务管理 1 Spring事务简介【重点】 1.1 Spring事务作用 事务作用&#xff1a;在数据层保障一系列的数据库操作同成功同失败 Spring事务作用&#xff1a;在数据层或业务层保障一系列的数据库操作同成功同失败 1.2 案例分析Spring…

STM32--TIM定时器(2)

文章目录 输出比较PWM输出比较通道参数计算舵机简介直流电机简介TB6612 PWM基本结构PWM驱动呼吸灯PWM驱动舵机PWM控制电机 输出比较 输出比较&#xff0c;简称OC&#xff08;Output Compare&#xff09;。 输出比较的原理是&#xff0c;当定时器计数值与比较值相等或者满足某种…

【数据结构OJ题】有效的括号

原题链接&#xff1a;https://leetcode.cn/problems/valid-parentheses/ 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 这道题目主要考查了栈的特性&#xff1a; 题目的意思主要是要做到3点匹配&#xff1a;类型、顺序、数量。 题目给的例子是比较…

【Hibench 】完成 HDP-Spark 性能测试

&#x1f341; 博主 "开着拖拉机回家"带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,Java基础学习,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341; 希望本文能够给您带来一定的…

单片机实训报告

这周我们进行了单片机实训&#xff0c;一周中我们通过七个项目1&#xff1a;P1 口输入/输出 2&#xff1a;继电器控制 3 音频控制 4&#xff1a;子程序设计 5&#xff1a;字符碰头程序设计 6&#xff1a;外部中断 7&#xff1a; 急救车与交通信号灯&#xff0c;练习编写了子程…

mysql 设置 mysql 日志时间与系统时间保持一致

临时设置 mysql> show variables like %log_timestamps%;-----------------------| Variable_name | Value |-----------------------| log_timestamps | UTC |-----------------------1 row in set (0.00 sec)系统是 CST &#xff0c; nysql 是 UTC当UTC时间为0点时&am…

docker的使用方法总结

Docker是一个非常强大的工具&#xff0c;它可以用于创建、部署和运行应用程序。以下是一些docker相关的常用指令&#xff0c; 1、查看docker版本 docker version 2、查看正在运行的Docker容器 docker ps 3、查看所有的docker容器&#xff08;包括没有运行的容器&#xff0…

Python 之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息

Python之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息 目录 Python之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息

SCF金融公链新加坡启动会 创新驱动未来

新加坡迎来一场引人瞩目的金融科技盛会&#xff0c;SCF金融公链启动会于2023年8月13日盛大举行。这一受瞩目的活动将为金融科技领域注入新的活力&#xff0c;并为广大投资者、合作伙伴以及关注区块链发展的人士提供一个难得的交流平台。 在SCF金融公链启动会上&#xff0c; Wil…

级联(数据字典)

二级级联&#xff1a; 一&#xff1a;新建两个Bean 父级&#xff1a; /*** Description 数据字典* Author WangKun* Date 2023/7/25 10:15* Version*/ Data AllArgsConstructor NoArgsConstructor TableName("HW_DICT_KEY") public class DictKey implements Seri…

excel快速选择数据、选择性粘贴、冻结单元格

一、如何快速选择数据 在excel中&#xff0c;希望选择全部数据&#xff0c;通常使用鼠标选择数据然后往下拉&#xff0c;当数据很多时&#xff0c;也可单击单元格使用ctrl A选中全部数据&#xff0c;此外&#xff0c;具体介绍另一种方法。 操作&#xff1a;ctrl shift 方向…

【C++】STL---list

STL---list 一、list 的介绍二、list 的模拟实现1. list 节点类2. list 迭代器类&#xff08;1&#xff09;前置&#xff08;2&#xff09;后置&#xff08;3&#xff09;前置- -、后置- -&#xff08;4&#xff09;! 和 运算符重载&#xff08;5&#xff09;* 解引用重载 和 …

css3新增属性

文章目录 css3新增属性box-shadowborder-radius设置椭圆 position: sticky;渐变背景线性渐变可重复的渐变背景 径向渐变可重复的渐变背景 过渡分属性 动画关键帧与transition的关系demo 变形平移使用 旋转使用 其他使用立体效果perspective元素位于3D空间还是平面中 缩放变形的…

tornado在模板中遍历二维数组

要在Tornado模板中遍历一个二维数组&#xff0c;你可以使用Tornado的模板语法来实现迭代和显示数组中的每个元素。 以下是一个示例&#xff0c;演示如何在Tornado模板中遍历和显示二维数组的内容&#xff1a; template.html: <!DOCTYPE html> <html> <head&g…

小米分享 | 解密面试题:网易面试如何回答“创建线程有哪几种方式?”

大家好&#xff0c;我是你们的小米&#xff01;今天要和大家一起探讨一个在技术面试中常见的问题&#xff1a;创建线程有哪几种方式&#xff1f;这可是个经典面试题哦&#xff01;不过别担心&#xff0c;小米在这里为你详细解析&#xff0c;帮你轻松应对&#xff0c;让你在面试…

深度学习在MRI运动校正中的应用综述

运动是MRI中的主要挑战之一。由于MR信号是在频率空间中获取的&#xff0c;因此除了其他MR成像伪影之外&#xff0c;成像对象的任何运动都会导致重建图像中产生伪影。深度学习被提出用于重建过程的几个阶段的运动校正。广泛的MR采集序列、感兴趣的解剖结构和病理学以及运动模式&…