56、Flink DataStream 的管理执行配置详解

1)概述
1.执行配置

StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

以下是可用的配置选项:(默认为粗体)

  • setClosureCleanerLevel()。closure cleaner 的级别默认设置为 ClosureCleanerLevel.RECURSIVE。closure cleaner 删除 Flink 程序中对匿名 function 的调用类的不必要引用。禁用 closure cleaner 后,用户的匿名 function 可能正引用一些不可序列化的调用类。这将导致序列化器出现异常。可设置的值是: NONE:完全禁用 closure cleaner ,TOP_LEVEL:只清理顶级类而不递归到字段中,RECURSIVE:递归清理所有字段。
  • getParallelism() / setParallelism(int parallelism)。为作业设置默认的并行度。
  • getMaxParallelism() / setMaxParallelism(int parallelism)。为作业设置默认的最大并行度。此设置决定最大并行度并指定动态缩放的上限。
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)。设置失败任务重新执行的次数。值为零会有效地禁用容错。-1 表示使用系统默认值(在配置中定义)。该配置已弃用,请改用重启策略。
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)。设置系统在作业失败后重新执行之前等待的延迟(以毫秒为单位)。在 TaskManagers 上成功停止所有任务后,开始计算延迟,一旦延迟过去,任务会被重新启动。该配置已被弃用,请改用重启策略 。
  • getExecutionMode() / setExecutionMode()。默认的执行模式是 PIPELINED。设置执行模式以执行程序。执行模式定义了数据交换是以批处理方式还是以流方式执行。
  • enableForceKryo() / disableForceKryo。默认情况下不强制使用 Kryo。强制 GenericTypeInformation 对 POJO 使用 Kryo 序列化器,即使可以将它们作为 POJO 进行分析。在某些情况下,应该优先启用该配置。例如,当 Flink 的内部序列化器无法正确处理 POJO 时。
  • enableForceAvro() / disableForceAvro()。默认情况下不强制使用 Avro。强制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 来序列化 Avro 的 POJO。
  • enableObjectReuse() / disableObjectReuse()。默认情况下,Flink 中不重用对象。启用对象重用模式会指示运行时重用用户对象以获得更好的性能。注意可能会导致bug。
  • getGlobalJobParameters() / setGlobalJobParameters()。此方法允许用户将自定义对象设置为作业的全局配置。由于 ExecutionConfig 可在所有用户定义的 function 中访问,因此这是一种使配置在作业中全局可用的简单方法。
  • addDefaultKryoSerializer(Class type, Serializer serializer)。为指定的类型注册 Kryo 序列化器实例。
  • addDefaultKryoSerializer(Class type, Class> serializerClass)。为指定的类型注册 Kryo 序列化器的类。
  • registerTypeWithKryoSerializer(Class type, Serializer serializer)。使用 Kryo 注册指定类型并为其指定序列化器。通过使用 Kryo 注册类型,该类型的序列化将更加高效。
  • registerKryoType(Class type)。如果类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记(整数 ID)被写入。如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的 I/O 成本。
  • registerPojoType(Class type)。将指定的类型注册到序列化栈中。如果该类型最终被序列化为 POJO,那么该类型将注册到 POJO 序列化器中。如果该类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记被写入。如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的I/O成本。

注意:用 registerKryoType() 注册的类型对 Flink 的 Kryo 序列化器实例来说是不可用的。

  • disableAutoTypeRegistration()。自动类型注册在默认情况下是启用的。自动类型注册是将用户代码使用的所有类型(包括子类型)注册到 Kryo 和 POJO 序列化器。
  • setTaskCancellationInterval(long interval)。设置尝试连续取消正在运行任务的等待时间间隔(以毫秒为单位)。当一个任务被取消时,会创建一个新的线程,如果任务线程在一定时间内没有终止,新线程就会定期调用任务线程上的 interrupt() 方法。这个参数是指连续调用 interrupt() 的时间间隔,默认设置为 30000 毫秒,或 30秒

通过 getRuntimeContext() 方法在 Rich* function 中访问到的 RuntimeContext 也允许在所有用户定义的 function 中访问 ExecutionConfig

2.程序打包和分布式运行
a)打包程序

为了能够通过命令行或 web 界面执行打包的 JAR 文件,程序必须使用通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取的 environment。当 JAR 被提交到命令行或 web 界面后,该 environment 会扮演集群环境的角色。如果调用 Flink 程序的方式与上述接口不同,该 environment 会扮演本地环境的角色。

打包程序只要简单地将所有相关的类导出为 JAR 文件,JAR 文件的 manifest 必须指向包含程序入口点(拥有公共 main 方法)的类。实现的最简单方法是将 main-class 写入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram)。main-class 属性与 Java 虚拟机通过指令 java -jar pathToTheJarFile 执行 JAR 文件时寻找 main 方法的类是相同的。

大多数 IDE 提供了在导出 JAR 文件时自动包含该属性的功能。

b)总结

调用打包后程序的完整流程包括两步:

  • 搜索 JAR 文件 manifest 中的 main-classprogram-class 属性。如果两个属性同时存在,program-class 属性会优先于 main-class 属性。对于 JAR manifest 中两个属性都不存在的情况,命令行和 web 界面支持手动传入入口点类名参数。
  • 系统接着调用该类的 main 方法。
3.并行执行
a)概述

一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,可以改变特定算子或者整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

b)设置并行度

算子层面

单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).setParallelism(5);wordCounts.print();env.execute("Word Count Example");

执行环境层次

Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();env.execute("Word Count Example");

客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

try {PackagedProgram program = new PackagedProgram(file, args);InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");Configuration config = new Configuration();Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());// set the parallelism to 10 hereclient.run(program, 10, true);} catch (ProgramInvocationException e) {e.printStackTrace();
}

系统层次

可以通过设置 Flink 配置文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。

c)设置最大并行度

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768

为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/40225.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《企业实战分享 · 内存溢出分析》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; 近期刚转战 CSDN&#xff0c;会严格把控文章质量&#xff0c;绝不滥竽充数&#xff0c;如需交流&#xff…

用PyQt5打造炫酷界面:深入解析pyqt5-custom-widgets

在PyQt5中&#xff0c;使用自定义小部件可以为应用程序增添更多实用性和时尚感。pyqt5-custom-widgets是一个开源项目&#xff0c;提供了一系列有用且时尚的自定义小部件&#xff0c;如开关按钮、动画按钮等。本文将详细介绍pyqt5-custom-widgets的安装和使用方法。 安装 可以…

权限维持Linux---监控功能Strace后门命令自定义Alias后门

免责声明:本文仅做技术交流与学习... 目录 监控功能Strace后门 1、记录 sshd 明文 监控 筛选查看 2、记录sshd私钥 命令自定义Alias后门 1、简单粗鲁实现反弹&#xff1a; 靶机替换命令 攻击机监听上线 2.升级(让命令正常) 将反弹命令进行base64编码 替换alias命令 …

【Linux】--help,man page , info page

我们知道Linux有很多的命令&#xff0c;那LInux要不要背命令&#xff1f; 答案是背最常用的那些就行了 那有的时候我们想查询一些命令的详细用法该怎么办呢&#xff1f; 这里我给出3种方法 1.--help --help的使用方法很简单啊 要查询的命令 --help 我们看个例子 这里我只…

java版企业工程管理系统源码:全方位的项目管理解决方案

工程管理系统是一款专注于建设工程项目全生命周期管理的软件。它覆盖了项目从策划、设计、施工到竣工的每一个阶段&#xff0c;提供全方位的管理功能。系统采用模块化设计&#xff0c;包括系统管理、系统设置、项目管理、合同管理、预警管理、竣工管理、质量管理、统计报表和工…

李白的酒量之谜

在中国古典文学的璀璨星空中&#xff0c;李白的名字犹如一颗耀眼的星辰&#xff0c;其卓越的文学成就与独特的人生经历引得无数后人仰望。特别是“李白斗酒诗百篇”&#xff0c;这句话不仅高度概括了李白的诗歌才华和其对酒精的热爱&#xff0c;也使得后人对李白的酒量产生了浓…

6月30日功能测试Day10

3.4.4拼团购测试点 功能位置&#xff1a;营销-----拼团购 后台优惠促销列表管理可以添加拼团&#xff0c;查看拼团活动&#xff0c;启动活动&#xff0c;编辑活动&#xff0c;删除活动。 可以查看拼团活动中已下单的订单以状态 需求分析 功能和添加拼团 商品拼团活动页 3…

Pytorch中方法对象和属性,例如size()和shape

文章目录 方法对象和属性的基本概念方法对象属性示例说明总结 常见的方法对象和属性常见的方法对象常见的属性总结示例 方法对象和属性的基本概念 方法对象&#xff08;method object&#xff09;和属性&#xff08;attribute&#xff09;是面向对象编程中的两个重要概念。让我…

python使用pywebview集成vue3和element-plus开发桌面系统框架

随着web技术越来越成熟&#xff0c;就连QQ的windows客户端都用web技术来开发&#xff0c;所以在未来&#xff0c;web技术来开发windows桌面软件也会越来越多&#xff0c;所以在此发展驱动之下&#xff0c;将最近流程的python与web技术相结合&#xff0c;使用vue3和element-plus…

图像增强 目标检测 仿射变换 图像处理 扭曲图像

1.背景 在目标检测中&#xff0c;需要进行图像增强。这里的代码模拟了旋转、扭曲图像的功能&#xff0c;并且在扭曲的时候&#xff0c;能够同时把标注的结果也进行扭曲。 这里忽略了读取xml的过程&#xff0c;假设图像IMG存在对应的标注框&#xff0c;且坐标为左上、右下两个…

[C++初阶]vector的初步理解

一、标准库中的vector类 1.vector的介绍 1. vector是表示可变大小数组的序列容器 &#xff0c; 和数组一样&#xff0c;vector可采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问&#xff0c;和数组一样高效。但是又不像数组&#xff0c;它的大…

灾后恢复与勒索恢复的区别

灾难恢复通常侧重于物理基础设施&#xff0c;如硬盘和网络设备&#xff0c;而勒索软件恢复则涉及数据完整性和防范网络威胁。 BackBox 产品管理副总裁 Amar Ramakrishnan表示&#xff0c;在灾难中&#xff0c;企业可能面临硬件受损等问题&#xff0c;但在网络安全事件中&#…

Java学习高级一

修饰符 static 类变量的应用场景 成员方法的分类 成员变量的执行原理 成员方法的执行原理 Java之 main 方法 类方法的常见应用场景 代码块 设计模式 单例设计模式 饿汉式单例设计模式 懒汉式单例设计模式 继承 权限修饰符

小红书 达芬奇:生活问答 AI 机器人

小红书去年 9 月开始内测的生活问答 AI 机器人&#xff1a;达芬奇&#xff0c;现在可以在小红书 APP 上用了 得益于小红书平台的特性&#xff0c;该助手擅长吃、住、宠、喝、学等等各类生活知识&#xff0c;目前还在搞活动&#xff0c;写评测笔记最高得 666 元

为什么不能在foreach中删除元素

文章目录 快速失败机制&#xff08;fail-fast&#xff09;for-each删除元素为什么报错原因分析逻辑分析 如何正确的删除元素remove 后 breakfor 循环使用 Iterator 总结 快速失败机制&#xff08;fail-fast&#xff09; In systems design, a fail-fast system is one which i…

合肥高新区建设世界领先科技园区政策(商务部分)申报奖励补贴和条件材料、时间指南

一、合肥高新区建设世界领先科技园区政策&#xff08;商务部分&#xff09;申报主体 &#xff08;更多政策可以查看小编主页方式&#xff09; 工商、税务、统计关系均在合肥高新区&#xff0c;并在高新区持续经营。申请项目在高新区内实施、且符合政策要求的具有独立法人资格…

网络基础:EIGRP

EIGRP&#xff08;Enhanced Interior Gateway Routing Protocol&#xff09;是由思科开发的一种高级距离矢量路由协议&#xff0c;结合了距离矢量和链路状态路由协议的优点&#xff1b;EIGRP具有快速收敛、高效带宽利用、负载均衡等特点&#xff0c;适用于各种规模的网络。EIGR…

python sklearn机械学习-数据预处理

&#x1f308;所属专栏&#xff1a;【机械学习】✨作者主页&#xff1a; Mr.Zwq✔️个人简介&#xff1a;一个正在努力学技术的Python领域创作者&#xff0c;擅长爬虫&#xff0c;逆向&#xff0c;全栈方向&#xff0c;专注基础和实战分享&#xff0c;欢迎咨询&#xff01; 您…

【设计模式】策略模式(定义 | 特点 | Demo入门讲解)

文章目录 定义策略模式的结构 QuickStart | DemoStep1 | 策略接口Step2 | 策略实现Step3 | 上下文服务类Step4 | 客户端 策略模式的特点优点缺点 定义 策略模式Strategy是一种行为模式&#xff0c;它能定义一系列算法&#xff0c;并将每种算法分别放入到独立的类中&#xff0c…

书籍表达式得到期望结果的组成种数

题目 给定一个只由0(假)、1(真)、&(逻辑与)、|(逻辑或)和^(异或)五种组成的字符串express&#xff0c;再给定一个布尔值desired。返回express能有多少种组合方式。可以达到desired的结果。 举例 express“1^0|0|1”,desiredfalse. 只有1^((0|0)|1)和1^(0|(0|1))的组合可…