flink sql 优化

文章目录

  • 一、参数方面
  • 二、资源方面
  • 三、总结


提示:实时flink sql 参考很多网上方法与自己实践方法汇总(版本:flink1.13+)

一、参数方面

  • flink sql参数配置
//关闭详细算子链(默认为true),true后job性能会略微有提升。false则可以展示更详细的DAG图方便地位性能结点   ###有用的参数
pipeline.operator-chaining: 'true'
//指定时区  ###实用的参数
table.local-time-zone: Asia/Shanghai
//对flink sql是否要敏感大小(建议false,不区分大小写。默认为true)
table.identifier-case-sensitive: 'false'
//开启 miniBatch
table.exec.mini-batch.enabled: 'true'
//批量输出的间隔时间
table.exec.mini-batch.allow-latency: 5s
//防止 OOM 设置每个批次最多缓存数据的条数
table.exec.mini-batch.size: '500'
//提交批次数据大小
batchSize: '127108864'
//刷数据间隔
flushIntervalMs: '60000'
//几个flush线程
numFlushThreads: '5'
// 写odps时压缩 :https://help.aliyun.com/zh/flink/developer-reference/maxcompute-connector
compressAlgorithm: snappy
//开启异步状态后端
state.backend.async: 'true'
//状态后端开启增量(默认就是true 增量)
state.backend.incremental: 'true'
//作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源  ###有用的参数
table.exec.split-slot-sharing-group-per-vertex: 'true'
//Checkpoint间隔时间,单位为毫秒 默认180秒 ###如果作业量大,可以适当调大间隔时间。性能方便略有提升
execution.checkpointing.interval: 180s
//State数据的生命周期,单位为毫秒。默认36小时
table.exec.state.ttl: 129600000
//Checkpoint生成超时时间(默认值10分钟),当Checkpoint生成时间超过10分钟,flink会把创建生成的Checkpoint杀掉,重新再创建生成Checkpoint。如果观察自己的job生成时间过长减少被杀死Checkpoint可以调大下面时间   ###有用的参数
execution.checkpointing.timeout	:10min
  • datastream代码配置
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal(job有聚合函数使用)
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目 (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的缓存条数 (job有分组top使用)
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");
  • flink sql 简单作业优化实验截图
    1).调大checkpoint生成时间
    在这里插入图片描述
    在这里插入图片描述
    2).去掉参数:pipeline.operator-chaining: ‘false’
    在这里插入图片描述
    在这里插入图片描述
    3).加攒批参数
    在这里插入图片描述
    在这里插入图片描述
    4).由于full GC导致job性能过差(排查)
    在这里插入图片描述
    查看gc日志:
    在这里插入图片描述
    在这里插入图片描述
    解决方案:对taskmanager增加内存(jobmanager略,因为它很少会出现频繁full gc)。

5).全量Checkpoint与增量Checkpoint的大小一致,是否正常?
如果您在使用Flink的情况下,观察到全量Checkpoint与增量Checkpoint的大小一致:

  • 检查增量快照是否正常配置并生效。
  • 是否为特定情况。在特定情况下,这种现象是正常的,例如:
    a.在数据注入前(18:29之前),作业没有处理任何数据,此时Checkpoint只包含了初始化的源(Source)状态信息。由于没有其他状态数据,此时的Checkpoint实际上是一个全量Checkpoint。
    b.在18:29时注入了100万条数据。假设数据在接下来的Checkpoint间隔时间(3分钟)内被完全处理,并且期间没有其他数据注入,此时发生的第一个增量Checkpoint将会包含这100万条数据产生的所有状态信息。
    在这种情况下,全量Checkpoint和增量Checkpoint的大小一致是符合预期的。因为第一个增量Checkpoint需要包含全量数据状态,以确保能够从该点恢复整个状态,导致它实际上也是一个全量Checkpoint。

增量Checkpoint通常是从第二个Checkpoint开始体现出来的,在数据稳定输入且没有大规模的状态变更时,后续的增量Checkpoint应该显示出大小上的差异,表明系统正常地只对状态的增量部分进行快照。如果仍然一致,则需要进一步审查系统状态和行为,确认是否存在问题。

二、资源方面

当上面添加配置性能还是不行是,可以增加资源。

  • 添加cu
    一般对taskmanager添加cup,默认给1的整数倍,例如1,2,3等。jobmanager 基本不咋干活(业务数据处理),不用添加资源,之前给很少cup即可
  • 添加内存
    一般默认每个taskmanager给4G内存,后面再对它增加资源。jobmanager不用增加内存
  • 槽位(slot)
    每个 TaskManager Slot 数给1个( TaskManager 只能同时执行一个 Subtask),性能比较好(一般简单的job没有大量的回撤流的情况下)。
    A.如果开3个并行度,每个taskmanager1个槽位:1个槽位 乘 3个并行度 乘 每个taskmanager分配的资源+job manager资源=job的总使用资源
    B.如果开3个并行度,每个taskmanager3个槽位:1个槽位 乘 每个taskmanager分配的资源+job manager资源=job的总使用资源
  • 并行度
    在 TaskManager Slot 数给1个情况下(此方案性能比较好),增加并行度可以提升处理性能。但taskmanager资源(内存和cpu)也会成倍增加

*上面只是建议给taskmanager 1cup,4Gb内存起,原因现在很多平台大多是云虚拟资源,这样分配性能较好,同时也是养成良好习惯。

三、总结

不是所有job资源越堆越多好。有时作业的复杂或数据的特殊情况(外部系统性能除外,例如写数据库),增加资源只会让job性能越来越差或报错(亲身经历job性能差,特别痛苦,一直加资源性能还是差或运行报错)。需要不断找根源问题,多使用不同方法测试才能找到适合job的处理性能。

  • 如果优化很多次后job性能还是很差(资源给的很多性能还是不理想)(略增加一些资源)
    可以将一个job拆分两个job(将占用比较多的业务数据(50%更好)在新的job单独处理)
  • 性能优化一直无法提升,要么看业务要么看job的性能瓶颈业务(业务牺牲)
  • 要么flink只做业务写表,离线负责处理业务写其他表(时效牺牲)

  • 调优举例(真实案例,折腾了很久):
    背景:(flink 双流join) 默认资源配置(taskmanager 1cpu,4Bb内存,1个槽位,1个并行度)
    数据有堆积,且越堆越多,写入性能弱(每秒十几条写入),CP(checkpoint有时失败,但很大,生成很慢),业务处理简单,单日数量在1700万条数据。
  • 后面开始对此job加资源,加并行度,加各种优化配置,增加CP生成时间等等。
    开启job运行后生成CP一直失败(生成CP更大,之前200多兆,改后生成700-800兆还没有生成,生成变慢,生成时间变成)
    在这里插入图片描述
    即使加大CP生产时间和CP校验时间,CP依旧是失败。
    CP一直失败导致处理性能极差(CP在生成时整个job几乎都在停止),如下截图
    在这里插入图片描述
    后面是各种调优尝试都不能,发现问题是flink在双流join时,有大量的回撤流.如果撤回数据较多的话 , 就会造成这个节点的state大 从而导致SinkMaterializer节点压力大(自己结合UI监控图观察得到)。
  • 后面经过很多次调优将并行度改为3,每个 TaskManager Slot 数 给3个,其他不变,性能有提升,CP生成也变快了
    在这里插入图片描述
  • 又做调整将taskmanager 给3cup,内存给15Gb,开5个并行度,每个 TaskManager Slot 数 给5个。
    目的:将5个并行度放到一个槽位,资源也没有使用多少。
    测试后发现CP比上面3个并发的增量存储要大(意料之中),CP生成特别快,已将数据堆压十几个小时的数据全部追上。

在这里插入图片描述
*上面案例思想:
A.减少CP生成时间。flink才能快速处理数据(提交完已处理的偏移量数据,快速进行下一轮的新数据)。
B.在有回撤流,需要状态(自己观察在一个并发时CP较大几百兆,一般join情况出现的比较多)将多个并发尽量放到一个slot,减少数据传输和交换(一个槽位共享状态)。其他简单的job没有或很少回撤流的情况下可以只建一个槽位。
C.增加并行度会导致CP增大。原因之前一个线程一个CP,现在是多个线程有自己的状态(可能会有重复数据状态),多个状态合在一起CP就大了。
在这里插入图片描述

参考:文档1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/832180.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

go mod

常用命令 初始化模块 go mod init 模块名下载 go.mod 文件中指明的所有依赖 go mod download github.com/gin-gonic/ginv1.9.(依赖路径)依赖对其(使引用的都是所依赖的) go mod tidy编辑go.mod go mod edit go mod edit -require"github.com/g…

jvm 马士兵 01 JVM简介,class文件结构

01.JVM是什么 JVM是一个跨平台的标准 JVM只识别class文件,符合JVM规范的class文件都可以被识别 u1 是一个字节 u2是两个字节

5款采用AMD Instinct MI300芯片的超酷AI和HPC服务器

我们收集了戴尔科技、联想、超微和技嘉的五款超酷人工智能和高性能计算服务器,这些服务器使用 AMD 的 Instinct MI300 芯片,该芯片于几个月前推出,旨在挑战 Nvidia 在人工智能计算领域的主导地位。 AMD 正在凭借其 Instinct MI300 加速器芯片…

新手必看!场外个股期权的权利金估算公式

场外个股期权的权利金估算公式 场外个股期权的权利金估算公式通常涉及多个因素,这些因素共同决定了权利金的具体数额。虽然具体的估算公式可能因不同的交易平台、交易规则和标的资产而有所差异,但一般来说,权利金的计算会考虑以下几个关键要…

毕业单纯的钻研嵌入式知识有前景吗?

毕业之后单纯地去钻研嵌入式知识到底有没有前景呢?不可否认的是,嵌入式领域有着较高的薪资待遇,并且还存在着巨大的上升空间。然而,要学习嵌入式开发并非易事,其中存在着诸多挑战。其中一个挑战就是需要深入理解计算机…

前端奇怪面试题总结

面试题总结 不修改下面的代码进行正常结构 这道题考的是迭代器和生成器的概念 let [a,b] {a:1,b:2}答案 对象缺少迭代器,需要手动加上 Object.prototype[Symbol.iterator] function* (){// return Object.values(this)[Symbol.iterator]()return yeild* Object.v…

SpringBootWeb创建

创建spring项目 创建SpringBoot工程定义请求处理类运行常见问题java: 无效的源发行版: XXjava: 无法访问org.springframework.web.bind.annotation.RequestMapping类文件具有错误的版本 61.0, 应为 52.0 创建SpringBoot工程 定义请求处理类 RestController public class HelloC…

毕业设计uniapp+vue有机农产品商城系统 销售统计图 微信小程序

本人在网上找了一下这方面的数据发现农村中的信心普及率很是低农民们都不是怎么会用手机顶多就是打打电话发发短信,平时不太会上网更不会想到通过网络手段去卖出自己的劳作成果—农产品,这无疑大大浪费了农民的劳动成果和国家资源也大大打击了人们的生产…

Centos7配置NFS

环境描述 服务器IP为192.168.200.132客户机IP为192.168.200.143 服务器配置 首先安装软件包 [rootlocalhost ~]# yum install nfs-utils rpcbind //我这里已经安装完毕了建立共享目录 [rootlocalhost ~]# mkdir -p /data/share更改文件夹权限 chmod 777 /data/share编辑配置…

vsftp虚拟用户和ssl加密配置 —— 筑梦之路

为什么要用虚拟用户? 1.增强安全性:使用虚拟用户,可以避免直接使用系统账户进行 FTP 访问,通过使用虚拟用户,可以限制 FTP 用户的访问范围和权限,减少潜在的安全风险。 2.隔离用户和文件:虚拟用户可以被隔…

docker如何关闭证书认证

目录 前言关闭Docker认证的步骤修改pom 前言 当docker认证证书过期了,项目又要马上上线怎么办?重新生成证书,时间来不及,这时最快的方法就是关闭证书认证。 关闭Docker认证的步骤 停止Docker服务 systemctl stop docker编辑Do…

Ts创建的详细过程及配置步骤(傻瓜式配置创建),最后效果展示

一:首先创建一个 空文件夹 二:使用编辑器打开,再创建一个src文件夹,然后按照以下步骤

VMware worksation 17 简易安装Centos8.2、Redhat8.2、Ubuntu16.04

系列文章目录 文章目录 系列文章目录前言一、VMware worksation 17 安装二、安装Centos8.2三、安装RHEL8.2四、安装Ubuntu16.04总结 前言 傻瓜式按照Linux系统,如果觉得简单,可以自定义设置,特别是配置一下磁盘空间大小,对以后排…

2023年上半年系统规划与管理师上午真题及答案解析

1.香农用概率来定量描述信息的公式如下,其中H(x)表示X的( ),Pi是( )出现第i种状态的( )。 A.信息熵 事件 概率 B.总熵 单位 概率 C.信息熵 单位 概率 D.总熵 单位 度量 2.信息传输模型中,( )负责信息的向外传播,( )负责…

数据结构--链表进阶面试题

在链表题目开始之前我们来复习一道数组元素的逆序问题&#xff1a; 给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 提示&#xff1a; 1 < nums.length < 10^5-2^31 < nums[i] < 2^31 - 10 < k < 10^5 思…

wordpress子比主题给文章内容加上密码查看

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么?二、使用步骤1.引入库2.读入数据第三步:文章内添加代码前言 提示:这里可以添加本文要记录的大概内容: 例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,…

Docker容器:Docker-Consul的容器服务更新与发现

目录 前言 一、什么是服务注册与发现 二、 Docker-Consul 概述 1、Consul 概念 2、Consul 提供的一些关键特性 3、Consul 的优缺点 4、传统模式与自动发现注册模式的区别 4.1 传统模式 4.2 自动发现注册模式 5、Consul 核心组件 5.1 Consul-Template组件 5.2 Consu…

一款 NodeJS 版本管理工具 NVM (Windows)

一、简介 Node Version Manager&#xff08;NVM&#xff09;是一种用于管理多个 NodeJS 版本的工具。在日常工作中&#xff0c;我们可能同时在进行多个不同的项目开发&#xff0c;每个项目的需求不同&#xff0c;依赖与不同版本的NodeJS 运行环境。这种情况下&#xff0c;维护…

SAP PP学习笔记11 - PP中的MRP相关概念,参数,配置

上文讲了作业区的概念及配置。 SAP PP学习笔记08 - 作业区&#xff08;工作中心Work Center&#xff09;&#xff0c;作业区Customize-CSDN博客 SAP PP学习笔记09 - 作业区&#xff08;工作中心Work Center&#xff09;Customize2&#xff08;管理码&#xff0c;班次顺序&…

利用github pages建立Serverless个人博客

利用github pages建立Serverless个人博客 概述 使用github pages&#xff0c;可以在github上部署静态网站。利用这个功能&#xff0c;可以很方便地实现个人博客的发布托管。 比如我的个人博客&#xff1a;Buttering’s Blog 对应代码仓库&#xff1a;buttering/EasyBlog: 自…