Flink推测机制

1、配置

    execution.batch.speculative.enabled:false,推测机制开关,必须在AdaptiveBatchScheduler模式下使用

    execution.batch.speculative.max-concurrent-executions:2,同时最多几次执行

    execution.batch.speculative.block-slow-node-duration:1分钟,慢速节点会如黑名单,控制在黑名单中的时长

    slow-task-detector.check-interval:1秒,慢任务检查间隔

    slow-task-detector.execution-time.baseline-lower-bound:1分钟,慢任务检测基线的下限

    slow-task-detector.execution-time.baseline-ratio:0.75,开始检测慢任务基线的任务完成率,即有75%任务完成后,开始计算剩下的任务是否为慢任务

    slow-task-detector.execution-time.baseline-multiplier:1.5,慢任务基线乘数

2、SpeculativeScheduler

    推测机制在AdaptiveBatchScheduler模式下使用,在AdaptiveBatchSchedulerFactory当中,创建调度器时,如果开启了推测机制,会创建SpeculativeScheduler

if (enableSpeculativeExecution) {return new SpeculativeScheduler(log,jobGraph,ioExecutor,jobMasterConfiguration,

2.1、启动

    调度器启动时有三个操作:1、注册指标;2、父类通用的启动流程,会有算子的一些初始化;3、启动慢任务检测任务

protected void startSchedulingInternal() {registerMetrics(jobManagerJobMetricGroup);super.startSchedulingInternal();slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
}

2.2、SlowTaskDetector

    SlowTaskDetector负责检测慢任务,实现类是ExecutionTimeBasedSlowTaskDetector,基于schedule进行检测

this.scheduledDetectionFuture =mainThreadExecutor.schedule(() -> {listener.notifySlowTasks(findSlowTasks(executionGraph));scheduleTask(executionGraph, listener, mainThreadExecutor);},checkIntervalMillis,TimeUnit.MILLISECONDS);

    核心是findSlowTasks,首先是获取需要校验的拓扑集

private List<ExecutionJobVertex> getJobVerticesToCheck(final ExecutionGraph executionGraph) {return IterableUtils.toStream(executionGraph.getVerticesTopologically()).filter(ExecutionJobVertex::isInitialized).filter(ejv -> ejv.getAggregateState() != ExecutionState.FINISHED).filter(ejv -> getFinishedRatio(ejv) >= baselineRatio).collect(Collectors.toList());
}

    getFinishedRatio就是获取完成任务数超过基线比率的,就是拓扑集中完成任务数和总任务数的比值

private double getFinishedRatio(final ExecutionJobVertex executionJobVertex) {checkState(executionJobVertex.getTaskVertices().length > 0);long finishedCount =Arrays.stream(executionJobVertex.getTaskVertices()).filter(ev -> ev.getExecutionState() == ExecutionState.FINISHED).count();return (double) finishedCount / executionJobVertex.getTaskVertices().length;
}

    接下来是获取基线和在基线基础上计算慢速任务的,接口是getBaseline和findExecutionsExceedingBaseline,本质就是执行时间和基线的对比,注意这里不仅用到了时间,还用到了输入字节数,所以慢任务的检测可能是基于吞吐来的

private ExecutionTimeWithInputBytes getBaseline(final ExecutionJobVertex executionJobVertex, final long currentTimeMillis) {final ExecutionTimeWithInputBytes weightedExecutionTimeMedian =calculateFinishedTaskExecutionTimeMedian(executionJobVertex, currentTimeMillis);long multipliedBaseline =(long) (weightedExecutionTimeMedian.getExecutionTime() * baselineMultiplier);return new ExecutionTimeWithInputBytes(multipliedBaseline, weightedExecutionTimeMedian.getInputBytes());
}return Double.compare((double) executionTime / Math.max(inputBytes, Double.MIN_VALUE),(double) other.getExecutionTime()/ Math.max(other.getInputBytes(), Double.MIN_VALUE));

2.3、notifySlowTasks

    获取慢速任务以后,SlowTaskDetector会触发监听器,监听器的处理实现在SpeculativeScheduler的notifySlowTasks接口

    首先把节点加入黑名单

// add slow nodes to blocklist before scheduling new speculative executions
blockSlowNodes(slowTasks, currentTimestamp);

    这边会检测任务是否支持推测,默认是支持

if (!executionVertex.isSupportsConcurrentExecutionAttempts()) {continue;
}

    基于时间戳,对慢任务新建Execution

final Collection<Execution> attempts =IntStream.range(0, newSpeculativeExecutionsToDeploy).mapToObj(i ->executionVertex.createNewSpeculativeExecution(currentTimestamp)).collect(Collectors.toList());

    之后会进行一系列的配置,加入监控

setupSubtaskGatewayForAttempts(executionVertex, attempts);
verticesToDeploy.add(executionVertexId);
newSpeculativeExecutions.addAll(attempts);

    最后发起调度

executionDeployer.allocateSlotsAndDeploy(newSpeculativeExecutions,executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));

3、任务结束

    任务结束主要核心在DefaultExecutionGraph的jobFinished,判断在上层ExecutionJobVertex.executionVertexFinished,这里是通过任务并行度来判断的,所有子任务完成则认为job完成

void executionVertexFinished() {checkState(isInitialized());numExecutionVertexFinished++;if (numExecutionVertexFinished == parallelismInfo.getParallelism()) {getGraph().jobVertexFinished();}
}

    这个的调用是由Execution触发的,也就是每个子任务完成会去调用一次

if (transitionState(current, FINISHED)) {try {finishPartitionsAndUpdateConsumers();updateAccumulatorsAndMetrics(userAccumulators, metrics);releaseAssignedResource(null);vertex.getExecutionGraphAccessor().deregisterExecution(this);} finally {vertex.executionFinished(this);}return;
}

    最终一个jobVertex(对应Job的一个任务,任务根据并行度有子任务)完成的时候会通知所有子任务完成

public void jobVertexFinished() {assertRunningInJobMasterMainThread();final int numFinished = ++numFinishedJobVertices;if (numFinished == numJobVerticesTotal) {FutureUtils.assertNoException(waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished()));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/43917.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

6.MkDocs附录

安装插件 在 MkDocs 中&#xff0c;插件通常是通过 pip​ 工具安装的。你可以使用以下步骤来安装和配置 MkDocs 插件。 1.使用 pip​ 命令安装你需要的插件。例如 pip install pymdown-extensions‍ 2.更新 mkdocs.yml​ 文件。 ‍ 3.使用 mkdocs serve​ 命令本地预览你…

CentOS6禁止锁屏

在电源中设置后还是会锁屏, 原因是有屏幕保护程序 电源管理都 “从不” 一些AI的回答 在CentOS 6系统中&#xff0c;如果你想要禁用锁屏功能&#xff0c;可以编辑/etc/kbd/config文件。这个文件通常包含了键盘相关的设置&#xff0c;包括密码策略和屏幕锁定选项。 首先打开终…

支持向量机 (support vector machine,SVM)

支持向量机 &#xff08;support vector machine&#xff0c;SVM&#xff09; flyfish 支持向量机是一种用于分类和回归的机器学习模型。在分类任务中&#xff0c;SVM试图找到一个最佳的分隔超平面&#xff0c;使得不同类别的数据点在空间中被尽可能宽的间隔分开。 超平面方…

三级_网络技术_12_路由设计技术基础

1.R1、R2是一个自治系统中采用RIP路由协议的两个相邻路由器&#xff0c;R1的路由表如下图(a)所示&#xff0c;当R1收到R2发送的如下图(b)的(V.D)报文后&#xff0c;R1更新的4个路由表项中距离值从上到下依次为0、3、3、4 那么&#xff0c;①②③④可能的取值依次为()。 0、4、…

在 VitePress 中安装 mermaid 画 UML,并推荐在线 mermaid 编辑网址

介绍 在 VitePress 中如果想要画流程图&#xff0c;饼图&#xff0c;UML类图等一系列图的话&#xff0c;VitePress 原生是不支持的&#xff0c;但是我们可以使用 Mermaid 的vitepress插件&#xff0c;名字是 vitepress-plugin-mermaid。下面介绍如何安装和使用 插件的 Github…

dify/api/models/workflow.py文件中的数据表

源码位置&#xff1a;dify/api/models/workflow.py Workflow 表结构 字段英文名数据类型字段中文名字备注idStringUUIDIDUUID生成tenant_idStringUUID工作区ID非空app_idStringUUID应用ID非空typeString工作流类型非空versionString版本非空graphText工作流画布配置JSON格式&…

【LeetCode】12. 小张刷题计划

稳住&#xff0c;能赢&#xff01;没有经验的同学在面试岗位的时候&#xff0c;总是显得手忙脚乱&#xff0c;所以多练习&#xff0c;把技能提升&#xff0c;眼界提升&#xff0c;接着心态放平和&#xff0c;不要慌张&#xff0c;把面试题目读懂读透彻就会大大提升赢的概率。 1…

List、Map、Set 接口在Java中的存取元素特点

List、Map、Set 接口在Java中的存取元素特点 1、List 接口2、Map 接口3、Set 接口4、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java中&#xff0c;List、Map和Set是三个最常用的集合接口。它们各自有不同的特点和用途&#xff…

详解Java的内部类

一、基本介绍 一个类的内部又完整的嵌套了另一个类结构。被嵌套的类称为内部类(inner class)嵌套其他类的类称为外部类(outer class)。它是我们类的第五大成员&#xff0c;内部类最大的特点就是可以直接访问私有属性&#xff0c;并且可以体现类与类之间的包含关系。 二、内部类…

【小贪】深度学习常用Pytorch, Numpy对比及常用语法

近期致力于总结科研或者工作中用到的主要技术栈&#xff0c;从技术原理到常用语法&#xff0c;这次查缺补漏当作我的小百科。主要技术包括&#xff1a; ✅数据库常用&#xff1a;MySQL, Hive SQL, Spark SQL✅大数据处理常用&#xff1a;Pyspark, Pandas⚪ 图像处理常用&#…

Maven的基本使用

引入依赖 1.引入Maven仓库存在的依赖&#xff0c;直接引入&#xff0c;刷新Maven <dependency><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId><version>5.2.12.RELEASE</version> </dependency…

Redis代替Session实现共享

集群的session共享问题 session共享问题&#xff1a;多台tomcat并不共享session存储空间&#xff0c;当请求切换到不同的tomcat服务时导致数据丢失的问题。 session的替代方案&#xff1a; 数据共享内存存储key、value结构 将redis替换session可以解决session共享问题

为什么root密码正确在登录系统时仍然报错permission denied

guanzwanguanzwan-mac ~ % ssh rootoci8 rootoci8’s password: Permission denied, please try again. rootoci8’s password: Permission denied, please try again. 使用正确的密码一直无法登录. 最后发现是sshd 服务禁止root用户用密码登录 在/etc/ssh/sshd_config配置文…

从天空到地面:无人机航拍推流直播技术在洞庭湖决口封堵中的全方位支援

据新闻报道&#xff0c;受持续强降雨影响&#xff0c;湖南省华容县团洲垸洞庭湖一线堤防发生管涌险情&#xff0c;随后出现决口。截至7月8日20时左右&#xff0c;226米长的洞庭湖一线堤防决口已累计进占208米&#xff0c;目前剩余18米&#xff0c;有望在今晚或9日凌晨实现合龙。…

7.9实验室总结 SceneBuilder的使用方法+使用javafx等

由于下错了东西&#xff0c;所以一直运行不出来&#xff0c;今天一直在配置环境&#xff0c;配置好了才学&#xff0c;所以没学多少&#xff0c;看了网课学习了SceneBuilder的使用方法还有了解了javafx是怎么写项目的&#xff0c;&#xff0c; 学习了怎么跳转页面&#xff1a;…

溶解氧(DO)理论指南(2)

转载自梅特勒官网资料&#xff0c;仅用于学习交流&#xff0c;侵权则删&#xff01; 溶解氧理论指南 2 DO电极类型2.1 氧化还原化学简介2.2 原电池法溶解氧电极2.3 极谱法溶解氧电极2.3 光学法溶解氧电极 2 DO电极类型 O2是一种高活性分子&#xff0c;因为它是通过光合作用连…

静态搜索iOS动态链接函数的调用位置

静态搜索iOS动态链接函数的调用位置 可执行文件格式mach-O,是在苹果的操作系统 macOS 和 iOS 上使用的一种二进制文件格式。 在一些iOS安全扫描中&#xff0c;可能存在需要获取函数具体调用位置的需求&#xff0c;能指导用户更精确的定位漏洞。 现在以NSLog函数为例&#xff…

Ensp配置防火墙的web界面

Ensp配置防火墙的web界面 准备工作新建网卡配置网卡 启动防火墙配置防火墙注意事项和错误如果云里面没有网卡选项防火墙启动不了没有web界面启动不了没有web界面 准备工作 新建网卡 我用的是win10系统&#xff0c;新建网卡 先右键管理 再点击设备管理器 --- 再网络适配器 接…

PostgreSQL 中如何处理数据的并发更新冲突解决?

文章目录 一、并发更新冲突的场景二、PostgreSQL 中的并发控制机制&#xff08;一&#xff09; 封锁机制&#xff08;二&#xff09; 事务隔离级别 三、并发更新冲突的解决方法&#xff08;一&#xff09; 重试机制&#xff08;二&#xff09; 使用乐观并发控制&#xff08;三&…

密封方法知识点

密封方法的基本概念 用密封关键字sealed 修饰的重写函数 作用&#xff1a;让虚方法或者抽象方法之后不能再被重写 特点&#xff1a;和override一起出现 实例 abstract class Animal {public string name;public abstract void Eat();public virtual void Speak(){Console.…