Flink学习笔记(二):Flink内存模型

文章目录

  • 1、配置总内存
  • 2、JobManager 内存模型
  • 3、TaskManager 内存模型
  • 4、图形化展示
  • 5、实际案例计算内存分配

1、配置总内存

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。详细的配置参数:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/deployment/config.html

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项TaskManager 配置参数JobManager 配置参数
Flink 总内存taskmanager.memory.flink.sizejobmanager.memory.flink.size
进程总内存taskmanager.memory.process.sizejobmanager.memory.process.size

Flink 启动需要明确配置:

TaskManagerJobManager
taskmanager.memory.flink.sizejobmanager.memory.flink.size
taskmanager.memory.process.sizejobmanager.memory.process.size
taskmanager.memory.task.heap.size 和
taskmanager.memory.managed.sizejobmanager.memory.heap.size

不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。
JVM参数

2、JobManager 内存模型

JobManager内存模型
如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
JVM 堆内存jobmanager.memory.heap.sizeJobManager 的 JVM 堆内存。
堆外内存jobmanager.memory.off-heap.sizeJobManager 的堆外内存(直接内存或本地内存)。
JVM Metaspacejobmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销jobmanager.memory.jvm-overhead.min、jobmanager.memory.jvm-overhead.max、jobmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

如配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 JVM 堆内存的大小(jobmanager.memory.heap.size)。 通过这种方式,用户可以更好地掌控用于以下用途的 JVM 堆内存大小。

3、TaskManager 内存模型

内存模型详解
如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
框架堆内存(Framework Heap Memory)taskmanager.memory.framework.heap.size用于 Flink 框架的 JVM 堆内存(进阶配置)。
任务堆内存(Task Heap Memory)taskmanager.memory.task.heap.size用于 Flink 应用的算子及用户代码的 JVM 堆内存。
托管内存(Managed memory)taskmanager.memory.managed.size、taskmanager.memory.managed.fraction由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
框架堆外内存(Framework Off-heap Memory)taskmanager.memory.framework.off-heap.size用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。
任务堆外内存(Task Off-heap Memory)taskmanager.memory.task.off-heap.size用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)。
网络内存(Network Memory)taskmanager.memory.network.min、taskmanager.memory.network.max、taskmanager.memory.network.fraction用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销taskmanager.memory.jvm-overhead.min、taskmanager.memory.jvm-overhead.max、taskmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

我们可以看到,有些内存部分的大小可以直接通过一个配置参数进行设置,有些则需要根据多个参数进行调整。通常情况下,不建议对框架堆内存和框架堆外内存进行调整。 除非你非常肯定 Flink 的内部数据结构及操作需要更多的内存。 这可能与具体的部署环境及作业结构有关,例如非常高的并发度。 此外,Flink 的部分依赖(例如 Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。

4、图形化展示

JobManager 内存直观展示
Job
TaskManager 内存直观展示
task
树状图表示:
树状图

5、实际案例计算内存分配

如果是 Flink On YARN 模式下:

taskmanager.memory.process.size = 4096 MB = 4G
taskmanager.memory.network.fraction = 0.15
taskmanager.memory.managed.fraction = 0.45

然后根据以上参数,就可以计算得到各部分的内存大小:

taskmanager.memory.jvm-overhead = 4096 * 0.1 = 409.6 MB
taskmanager.memory.flink.size = 4096 - 409.6 - 256 = 3430.4 MB
taskmanager.memory.network = 3430.4 * 0.15 = 514.56 MB
taskmanager.memory.managed = 3430.4 * 0.45 = 1543.68 MB
taskmanager.memory.task.heap.size = 3430.4 - 128 * 2 - 1543.68 - 514.56 = 1116.16 MB

实际案例

Flink 启动参考配置参数:

/home/dev/soft/flink/bin/flink run \-m yarn-cluster \-yD akka.ask.timeout='360 s' \-yD akka.framesize=20485760b \-yD blob.fetch.backlog=1000 \-yD blob.fetch.num-concurrent=500 \-yD blob.fetch.retries=50 \-yD blob.storage.directory=/data1/flinkdir \-yD env.java.opts.jobmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -XX:G1HeapWastePercent=5 -XX:G1ReservePercent=25 -Dfile.encoding=UTF-8' \-yD env.java.opts.taskmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -Dsun.security.krb5.debug=false -Dfile.encoding=UTF-8' \-yD env.java.opts='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -Dfile.encoding=UTF-8' \-yD execution.attached=false \-yD execution.buffer-timeout='1000 ms' \-yD execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \-yD execution.checkpointing.interval='30 min' \-yD execution.checkpointing.max-concurrent-checkpoints=1 \-yD execution.checkpointing.min-pause='2 min' \-yD execution.checkpointing.mode=EXACTLY_ONCE \-yD execution.checkpointing.timeout='28 min' \-yD execution.checkpointing.tolerable-failed-checkpoints=8 \-yD execution.checkpointing.unaligned=true \-yD execution.checkpointing.unaligned.forced=true \-yD heartbeat.interval=60000 \-yD heartbeat.rpc-failure-threshold=5 \-yD heartbeat.timeout=340000 \-yD io.tmp.dirs=/data1/flinkdir \-yD jobmanager.heap.size=1024m \-yD jobmanager.memory.jvm-metaspace.size=268435456b \-yD jobmanager.memory.jvm-overhead.max=1073741824b \-yD jobmanager.memory.jvm-overhead.min=1073741824b \-yD jobmanager.memory.network.fraction=0.2 \-yD jobmanager.memory.network.max=6GB \-yD jobmanager.memory.off-heap.size=134217728b \-yD jobmanager.memory.process.size='18360 mb' \-yD metrics.reporter.promgateway.deleteOnShutdown=true \-yD metrics.reporter.promgateway.factory.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory \-yD metrics.reporter.promgateway.filter.includes=\*:dqc\*,uptime,taskSlotsTotal,numRegisteredTaskManagers,taskSlotsAvailable,numberOfFailedCheckpoints,numRestarts,lastCheckpointDuration,Used,Max,Total,Count,Time:gauge,meter,counter,histogram \-yD metrics.reporter.promgateway.groupingKey="yarn=${yarn};hdfs=${hdfs};job_name=TEST-broadcast-${jobName//./-}-${provId}" \-yD metrics.reporter.promgateway.host=172.17.xxxx.xxxx \-yD metrics.reporter.promgateway.interval='60 SECONDS' \-yD metrics.reporter.promgateway.jobName="TEST-broadcast-${jobName//./-}-${provId}" \-yD metrics.reporter.promgateway.port=10080 \-yD metrics.reporter.promgateway.randomJobNameSuffix=true \-yD pipeline.name="TEST-broadcast-${jobName//./-}-${provId}" \-yD pipeline.object-reuse=true \-yD rest.flamegraph.enabled=true \-yD rest.server.numThreads=20 \-yD restart-strategy.failure-rate.delay='60 s' \-yD restart-strategy.failure-rate.failure-rate-interval='3 min' \-yD restart-strategy.failure-rate.max-failures-per-interval=3 \-yD restart-strategy=failure-rate \-yD security.kerberos.krb5-conf.path=/home/dev/kerberos/krb5.conf \-yD security.kerberos.login.contexts=Client,KafkaClient \-yD security.kerberos.login.keytab=/home/dev/kerberos/xxxx.keytab \-yD security.kerberos.login.principal=xxxx \-yD security.kerberos.login.use-ticket-cache=false \-yD state.backend.async=true \-yD state.backend=hashmap \-yD state.checkpoints.dir=hdfs://xxxx/flink/checkpoint/${jobName//.//}/$provId \-yD state.checkpoint-storage=filesystem \-yD state.checkpoints.num-retained=3 \-yD state.savepoints.dir=hdfs://xxxx/flink/savepoint/${jobName//.//}/$provId \-yD table.exec.hive.fallback-mapred-writer=false \-yD task.manager.memory.segment-size=4mb \-yD taskmanager.memory.framework.off-heap.size=1GB \-yD taskmanager.memory.managed.fraction=0.2 \-yD taskmanager.memory.network.fraction=0.075 \-yD taskmanager.memory.network.max=16GB \-yD taskmanager.memory.process.size='50 gb' \-yD taskmanager.network.netty.client.connectTimeoutSec=600 \-yD taskmanager.network.request-backoff.max=120000 \-yD taskmanager.network.retries=100 \-yD taskmanager.numberOfTaskSlots=10 \-yD web.timeout=900000 \-yD web.upload.dir=/data1/flinkdir \-yD yarn.application.name="TEST-broadcast-${jobName//./-}-${provId}" \-yD yarn.application.queue=$yarnQueue \-yD yarn.application-attempts=10 \

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iTunes更新iOS17出现发生未知错误4000的原因和解决方案

有不少人使用iTunes更新iOS 17时出现「无法更新iPhone发生未知的错误4000」的错误提示,不仅不知道iTunes升级失败的原因,也无从解决iPhone无法更新4000的问题。 小编今天就分享iPhone更新iOS系统出现4000错误提示的原因和对应的解决方案。 为什么iPhone…

MySQL Cluster 简介

文章目录 1.简介2.组成参考文献 1.简介 MySQL Cluster 是官方推出的基于 NDB(Network DataBase)存储引擎的高可用和可伸缩的分布式数据库系统。 以下是 MySQL NDB Cluster 的主要特点和能力: 高可用:MySQL Cluster 具有内置的高…

Python大数据之PySpark(七)SparkCore案例

文章目录 SparkCore案例PySpark实现SouGou统计分析 总结后记 SparkCore案例 PySpark实现SouGou统计分析 jieba分词: pip install jieba 从哪里下载pypi 三种分词模式 精确模式,试图将句子最精确地切开,适合文本分析;默认的方…

洗地机怎么选?2023年洗地机推荐

洗地机结合洗地、拖地、扫地的功能,在日常生活中备受关注,他能帮助我们更加节省时间和节省体力,但是面对参差不齐的洗地机市场如何选到适合自己的呢,下文整理了几款非常值得入手的性价比型号,供大家选择参考。 一、CE…

C++Day2

#include <iostream>using namespace std;class Rect { private:int width;int height; public:void init(int w,int h){width w;height h;}void set_w(int w){width w;}void set_h(int h){height h;}void show(){cout << "矩形的周长为&#xff1a;"…

Java数组:没错,不装了我就是书架。

&#x1f451;专栏内容&#xff1a;Java⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、数组的概念1、什么是数组&#xff1f;2、数组的创建3、数组的初始化Ⅰ、动态初始化Ⅱ、静态初始化 二、数组的使用1、数组中…

Windows系统上使用CLion远程开发Linux程序

CLion远程开发Linux程序 情景说明Ubuntu配置CLion配置同步 情景说明 在Windows系统上使用CLion开发Linux程序&#xff0c;安装CLion集成化开发环境时会自动安装cmake、mingw&#xff0c;代码提示功能也比较友好。 但是在socket开发时&#xff0c;包含sys/socket.h头文件时&am…

【Java-LangChain:使用 ChatGPT API 搭建系统-4】评估输入-分类

第三章&#xff0c;评估输入-分类 如果您正在构建一个允许用户输入信息的系统&#xff0c;首先要确保人们在负责任地使用系统&#xff0c;以及他们没有试图以某种方式滥用系统&#xff0c;这是非常重要的。 在本章中&#xff0c;我们将介绍几种策略来实现这一目标。 我们将学习…

【yolo系列:YOLOV7改进-添加EIOU,SIOU,AlphaIOU,FocalEIOU.】

yolo系列文章目录 在YoloV7中添加EIoU,SIoU,AlphaIoU,FocalEIoU,Wise-IoU. 2023-2-7 更新 yolov7添加Wise-IoUB站链接 重磅&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; YOLO系列模型改进损失函数 文章目录 yolo系列文章目录一、初始的yolov7损失函数二、首…

7346-2015 控制电机基本外形结构型式

声明 本文是学习GB-T 7346-2015 控制电机基本外形结构型式.pdf而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了控制电机的机座号、外形及安装尺寸、轴伸型式、出线方式、标记及铭牌。 本标准适用于各类控制电机(以下简称电机),其…

NFT Insider#110:The Sandbox与TB Media Global合作,YGG Web3游戏峰会阵容揭晓

引言&#xff1a;NFT Insider由NFT收藏组织WHALE Members、BeepCrypto出品&#xff0c;浓缩每周NFT新闻&#xff0c;为大家带来关于NFT最全面、最新鲜、最有价值的讯息。每期周报将从NFT市场数据&#xff0c;艺术新闻类&#xff0c;游戏新闻类&#xff0c;虚拟世界类&#xff0…

数据结构面试常问问题--保研及考研复试

前言&#xff1a; Hello大家好&#xff0c;我是Dream。今年保研上岸山东大学人工智能专业 &#xff08;经验贴&#xff09;&#xff0c;现在将我自己的专业课备考知识点整理出来&#xff0c;分享给大家&#xff0c;希望可以帮助到大家&#xff01;这是重点知识总结&#xff0c;…

为什么短视频离不开美颜SDK?短视频领域的秘密武器

在当今的社交媒体时代&#xff0c;短视频已经成为了人们获取信息、娱乐和社交的重要方式。无论是抖音、快手&#xff0c;还是Instagram、TikTok&#xff0c;短视频都以其独特的魅力吸引着数亿用户。而在这些短视频的背后&#xff0c;有一款名为“美摄美颜SDK”的秘密武器&#…

软件项目验收测试报告-软件项目验收流程

对甲方而言&#xff0c;项目验收是正式接受项目成果&#xff0c;将项目从建设转为运营。对于乙方来说&#xff0c;则意味着项目的结束&#xff0c;项目资源的释放。 项目验收是项目收尾的重要环节&#xff0c;依据招投标文件、合同对测评相关要求内容、项目章程和项目过程中的…

国庆出游远程实测:ToDesk 、TeamViewer、AnyDesk远程控制软件稳定性

ToDesk 、TeamViewer、AnyDesk远程控制软件稳定性 【前言】【实测软件】【测试环境】【实操体验】1. 软件安装2. 登录速度3. 文件传输4. 操作延迟5. 画面清晰度6. 安全防护 【本文小结】 【前言】 随着科技的不断发展&#xff0c;远程控制软件已成为我们生活中不可或缺的一部分…

7344-2015 交流伺服电动机通用技术条件

声明 本文是学习GB-T 7344-2015 交流伺服电动机通用技术条件.pdf而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了交流伺服电动机的分类、技术要求和试验方法、检验规则、交付准备。 本标准适用于两相交流伺服电动机(以下简称电机…

7321-2017 定形耐火制品试样制备方法

声明 本文是学习GB-T 7321-2017 定形耐火制品试样制备方法.pdf而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了定形耐火制品制样的定义、制样部位的确定原则和试样的制备。 本标准适用于定形耐火制品试样的制备。 2 规范性引用文…

宝塔反代openai官方API接口详细教程,502 Bad Gateway问题解决

一、前言 宝塔反代openai官方API接口详细教程&#xff0c;实现国内使用ChatGPT502 Bad Gateway问题解决&#xff0c; 此方法最简单快捷&#xff0c;没有复杂步骤&#xff0c;不容易出错&#xff0c;即最简单&#xff0c;零代码、零部署的方法。 二、实现前提 一台海外VPS服务…

[补题记录] Atcoder Beginner Contest 297(F)

URL&#xff1a;https://atcoder.jp/contests/abc297 目录 F Problem/题意 Thought/思路 Code/代码 F Problem/题意 给一个 H * W 的矩形&#xff0c;在其中任意放置 K 个点&#xff0c;由这 K 个点构成的最小矩形带来的贡献为该矩形的面积&#xff0c;这 K 个点构成一种…

1.6 IntelliJ IDEA开发工具

前言&#xff1a; ### 1.6 IntelliJ IDEA开发工具笔记 - **背景**&#xff1a; - 使用基础文本编辑器如记事本编写Java代码虽然可行&#xff0c;但存在效率低下且难以调试的问题。 - 集成开发环境 (IDE) 可以有效地提高Java程序的开发效率。 - **常见Java IDE**&#xf…