Flink问题解决及性能调优-【Flink根据不同场景状态后端使用调优】

Flink 实时groupby聚合场景操作时,由于使用的是rocksdb状态后端,发现CPU的高负载卡在rocksdb的读写上,导致上游算子背压特别大。通过调优使用hashmap状态后端代替rocksdb状态后端,使吞吐量有了质的飞跃(20倍的性能提升),并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 msCREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'json'
);CREATE TABLE es_sink(send_type      STRING,account_id     STRING,publish_time   STRING,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE view  tmp as
selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,event_time,opt,ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6CREATE view  tmp_groupby as
selectCOALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink
selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

问题调优

由于使用的是rocksdb状态后端,发现CPU的高负载卡在rocksdb的读写上,导致上游算子背压特别大,如下图:
在这里插入图片描述
改使用hashmap状态后端以后,当前环节的CPU负载大大缓解,上游背压消失,吞吐量有20以上的提升,如下:
在这里插入图片描述

分析问题

Flink为我们预置了两种状态后端HashMap和RocksDB,

  • HashMap状态后端是将状态数据存储在SubTask的内存中,访问速度更快,但是受限于SubTask内存大小
  • RocksDB状态后端是将状态数据存储在SubTask的磁盘中,存储容量更大,但是访问速度会慢于HashMap状态后端

通过比较这两种不同类型状态后端,用户可以根据业务场景中的状态的大小、状态的访问性能等条件来衡量并选择将状态数据存储到内存中还是本地的磁盘中。
举例来说,有的应用场景中的Flink作业要保存数百亿条状态数据,那么就需要在SubTask本地保存大量的状态数据,这种场景下RocksDB状态后端显然更合适;而有的应用场景中的Flink作业只需要保存数百万条状态数据,但是对于状态的访问和更新频次很高,那么在这种应用场景下,需要保障状态数据访问的高效性,hashmap状态后端显然是更好的选择。

注意:

  • 如果我们没有通过上述两种方法来设置作业的状态后端,那么Flink默认的状态后端就是HashMap状态后端
  • 从Flink 1.13版本开始,Flink统一了不同状态后端的Savepoint的二进制格式,因此我们可以使用一种状态后端生成Savepoint并且使用另一种状态后端进行恢复,这可以帮助我们在极致的状态访问性能(HashMap状态后端)以及支持大容量的状态存储(RocksDB状态后端)之间进行灵活切换。

状态后端的配置

HashMap状态后端的配置

  • 通过作业代码设置单个Flink作业的状态后端
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定状态后端为HashMap
env.setStateBackend(new HashMapStateBackend());
// Checkpoint快照文件存储的目录
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:50010/flink/checkpoints"));
  • 通过flink-conf.yaml设置状态后端,sql方式一般通过这种方式配置
# 状态后端的类型
state.backend: hashmap
# Checkpoint快照文件存储的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

HashMap状态后端的使用建议:
将托管内存(Managed Memory)设为0,托管内存是Flink分配的本地堆外内存,应用场景通常在RocksDB状态后端下分配给RocksDB来存储状态数据的,因此在使用HashMap状态后端的情况下,我们可以将托管内存设置为0来将更多的内存提供给HashMap状态后端使用。可以通过以下3种方式来在flink-conf.yaml中设置托管内存。

  • 通过taskmanager.memory.managed.size指定托管内存的大小。
  • 通过taskmanager.memory.managed.fraction指定托管内存在Flink总内存中的占比,默认值为0.4。
  • 当同时指定二者时,会优先采用taskmanager.memory.managed.size,若二者均未指定,会根据taskmanager.memory.managed.fraction的默认值0.4计算得到托管内存的大小。

通过Flink Web UI查看状态后端配置及内存使用情况,如下图:
在这里插入图片描述托管内存(Managed Memory)不为0时:
在这里插入图片描述
托管内存(Managed Memory)为0时(强烈建议):
在这里插入图片描述

RocksDB状态后端的配置

  • 通过作业代码设置单个Flink作业的状态后端
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置状态后端为RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 设置状态后端为RocksDB,并且设置为增量Checkpoint
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// Checkpoint快照文件存储的目录
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:50010/flink/checkpoints"));

需要引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.14.6</version>
</dependency>
  • 通过flink-conf.yaml设置状态后端,sql方式一般通过这种方式配置
# 配置状态后端的类型
state.backend: rocksdb
# 设置增量Checkpoint
state.backend.incremental: true
# 配置Checkpoint快照文件的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

RocksDB状态后端的使用建议:
通过上文知道,托管内存(Managed Memory)通常在RocksDB状态后端下分配给RocksDB来存储状态数据的,因此需要适当调大托管内存。

  • 通过taskmanager.memory.managed.size指定托管内存的大小。

  • 通过taskmanager.memory.managed.fraction调大托管内存在Flink总内存中的占比,例如:0.8。

  • 状态数据大小:由于JNI API是构建在字节数组之上的,因此每个key和value最大只支持231字节,而在ListState这种数据结构中,可能会出现value超过231字节的情况,这时获取状态数据会失败,在使用时需要注意

  • RocksDB状态后端增量快照
    RocksDB状态后端是目前唯一支持增量快照(增量Checkpoint)的状态后端。与增量快照相反的是全量快照,全量快照很好理解,在Checkpoint执行时,Flink作业将当前所有的状态数据全部备份到远程文件系统中,这就是全量快照。而在生产环境中,大多数Flink作业两次快照的间隔中发生变化的状态数据只占整体状态数据的一小部分,基于这个特点,增量快照诞生了,增量快照的特点在于每一次快照要持久化的数据只包含自上一次快照完成之后发生变化(被修改)的状态数据,所以可以显著减少持久化快照文件的大小以及执行快照的耗时。增量ck与全量ck的区别,如下图:
    在这里插入图片描述
    在这里插入图片描述

  • 定时器状态数据的存储
    在Flink的窗口类应用中,定时器是用于触发窗口计算的核心组件,为了在作业异常时保证注册的定时器不被丢失,定时器会被存储到键值状态中。
    在Flink作业中,用于存储定时器的数据结构是一个支持去重的优先队列。当我们配置RocksDB作为状态后端时,默认情况下定时器将存储在RocksDB中,但是这样的存储方式容易导致Flink作业出现性能问题。原因主要有两个,第一个原因是去重优先队列是一个复杂的数据结构,Flink作业访问RocksDB会存在性能问题,第二个原因是算子对于定时器的访问是比较频繁的,这会加大Flink作业处理数据的时延。
    以事件时间为例,默认情况下Flink作业的Watermark生成器会每隔200ms抽取一次Watermark,而每当时间窗口算子的Watermark发生更新,都要访问优先队列判断当前是否有定时器要触发,所以如果将去重优先队列存储在RocksDB中,频繁的访问定时器将会严重影响作业性能。
    如果我们将定时器的状态数据存储在JVM堆上就可以有效提升访问性能了,因此Flink提供了配置来实现将定时器的状态数据单独存储在JVM堆上,而只使用RocksDB存储其他键值状态,配置方式是将flink-conf.yaml文件中的state.backend.rocksdb.timer-service.factory配置项设置为heap(默认为rocksdb),如下图:
    在这里插入图片描述

  • 通过Flink Web UI查看状态后端配置及内存使用情况,如下图:
    在这里插入图片描述
    在这里插入图片描述

状态后端的使用注意事项

区分键值状态和算子状态

由于算子状态数据只会存储在SubTask内存中,因此在生产环境中要严格区分键值状态和算子状态的使用场景,避免因为将算子状态当做键值状态使用而导致出现内存溢出的问题。如下图:
在这里插入图片描述

ValueState<HashMap<String, String>>和MapState<String, String>的选型

如标题所示,作为初学者来说,如果要在键值状态中存储Map<String, String>数据结构的状态,可能会认为使用ValueState<HashMap<String, String>>或者使用MapState<String, String>都是可行的。

如果我们选择使用HashMap状态后端,那么两种方式的性能上不会有很大差异,但是如果我们选择使用RocksDB状态后端,则推荐使用MapState<String, String>,避免使用ValueState<HashMap<String, String>>。因为ValueState<HashMap<String, String>>在将数据写入RocksDB时,是将一整个HashMap<String, String>序列化为字节数组之后写入的。同样,在读取时,也是先读取到字节数组,然后反序列化为一整个HashMap<String, String>后,再给用户使用。所以每次访问和更新ValueState时,实际上都是对HashMap<String, String>这个集合类的大对象做序列化以及反序列化,而这是一个及其耗费资源的过程,很容易就会导致Flink作业产生性能瓶颈,所以极不推荐在ValueState中存储大对象。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/649714.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年,你是否还在迷茫?

2024年&#xff0c;你是否还在迷茫&#xff1f; 别担心&#xff01;鸿蒙来了&#xff0c;这个未来技术的制高点&#xff0c;为你提供了答案&#xff01; 诸多大厂疯抢、24年预计鸿蒙相关的岗位需求将达到百万级、就业均薪达到19K&#xff0c;全国高校开课…… 种种现象都在表…

VirtualBox安装Ubuntu22.04

目录 1、新建虚拟机 1.1、设置内存大小 1.2、创建虚拟硬盘 2、虚拟机设置 2.1、设置启动顺序​编辑 2.2、选择iso镜像文件 2.3、设置网络(桥接网卡) 3、启动 3.1、设置语言环境 3.2、系统更新安装(不更新) 3.3、选择键盘布局(默认即可) 3.4、选择安装类型 3.5、网…

硬件知识(1) 手机的长焦镜头

#灵感# 手机总是配备好几个镜头&#xff0c;研究一下 目录 手机常配备的摄像头&#xff0c;及效果举例 长焦的焦距 焦距的定义和示图&#xff1a; IPC的焦距和适用场景&#xff1a; 手机常配备的摄像头&#xff0c;及效果举例 以下是小米某个手机的摄像头介绍&#xff1a…

EXCEL VBA抓取网页JSON数据并解析

EXCEL VBA抓取网页JSON数据并解析 链接地址&#xff1a; https://api.api68.com/CQShiCai/getBaseCQShiCaiList.do?lotCode10036&date2024-01-26 Sub test() On Error Resume Next Sheet.Select Sheet1.Cells.ClearContents [a1:g1] Split("preDrawIssue|preDrawTi…

用Visual Studio Code创建JavaScript运行环境【2024版】

用Visual Studio Code创建JavaScript运行环境 JavaScript 的历史 JavaScript 最初被称为 LiveScript&#xff0c;由 Netscape&#xff08;Netscape Communications Corporation&#xff0c;网景通信公司&#xff09;公司的布兰登艾奇&#xff08;Brendan Eich&#xff09;在 …

mysql 存储过程学习

存储过程介绍 1.1 SQL指令执行过程 从SQL执行的流程中我们分析存在的问题: 1.如果我们需要重复多次执行相同的SQL&#xff0c;SQL执行都需要通过连接传递到MySQL&#xff0c;并且需要经过编译和执行的步骤; 2.如果我们需要执行多个SQL指令&#xff0c;并且第二个SQL指令需要…

Topaz Video AI:无损放大,让你的视频更清晰!

在当今的数字时代&#xff0c;视频内容的重要性越来越受到人们的关注。无论是在社交媒体上分享生活片段&#xff0c;还是在商业领域中制作宣传视频&#xff0c;人们都希望能够展现出更高质量的视频内容。 然而&#xff0c;由于各种原因&#xff0c;我们经常会面临一个问题&…

C++版QT:分割窗口

目录 mainwindow.h mainwindow.cpp main.cpp Qt的分割窗口功能允许用户将一个窗口分割成多个区域&#xff0c;每个区域可以独立地显示不同的内容。这种功能在许多应用程序中非常有用&#xff0c;例如编辑器、浏览器和IDE等。 理解Qt的分割窗口&#xff0c;需要从以下几个方面…

音频格式之AAC:(2)AAC封装格式ADIF,ADTS,LATM,extradata及AAC ES存储格式

系列文章目录 音频格式的介绍文章系列&#xff1a; 音频编解码格式介绍(1) ADPCM&#xff1a;adpcm编解码原理及其代码实现 音频编解码格式介绍(2) MP3 &#xff1a;音频格式之MP3&#xff1a;(1)MP3封装格式简介 音频编解码格式介绍(2) MP3 &#xff1a;音频格式之MP3&#x…

IDEA jdk版本切换问题

打开 IntelliJ IDEA 的 Project Structure&#xff08;快捷键通常是 Ctrl Alt Shift S&#xff09;。 转到 Project Settings > Modules。 选择相应的模块&#xff0c;然后在 Sources 标签页下&#xff0c;查看 Language level 是否设置为 自己需要的jdk版本语言。 接…

20240125-边界外路径

题目要求 有一个m*n的网格&#xff0c;网格中有一个小球。小球初始位置位[startRow&#xff0c;startColumn]。您可以将小球移动到网格中相邻的四个单元格之一&#xff08;可能会越过网格边界移出网格&#xff09;。最多可以对小球进行maxMove移动。 给定 m、n、maxMove、sta…

uniapp导入uView组件库

目录 准备工作 1. 新建一个项目 2. 导入uview组件库 3. 关于SCSS 配置步骤 1. 引入uView主JS库 2. 在引入uView的全局SCSS 3. 引入uView基础样式 4. 配置easycom组件模式 添加效果实验运行即可成功 准备工作 1. 新建一个项目 2. 导入uview组件库 在进行配置之前&#x…

TensorFlow2实战-系列教程2:神经网络分类任务

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 1、Mnist数据集 下载mnist数据集&#xff1a; %matplotlib inline from pathlib imp…

使用代码取大量2*2像素图片各通道均值,存于Excel文件中。

任务是取下图RGB各个通道的均值及标签&#xff08;R, G&#xff0c;B&#xff0c;Label&#xff09;,其中标签由图片存放的文件夹标识。由于2*2像素图片较多&#xff0c;所以将结果放置于Excel表格中&#xff0c;之后使用SVM对他们进行分类。 from PIL import Image import os …

【Linux】查看硬件信息和操作系统信息、安装的应用信息

【Linux】查看硬件信息和操作系统信息、安装的应用信息 一、硬件信息 1.1 CPU信息 cat /proc/cpuinfo #查看 processor : 0 // 逻辑处理器的唯一标识符 physical id : 0 // 硬件上真实存在的CPU siblings : 1 // 一个物理CPU有几个逻辑CPU cpu…

定向减免!函数计算让轻量 ETL 数据加工更简单,更省钱

作者&#xff1a;澈尔、墨飏 业内较为常见的高频短时 ETL 数据加工场景&#xff0c;即频率高时延短&#xff0c;一般均可归类为调用密集型场景。此场景有着高并发、海量调用的特性&#xff0c;往往会产生高额的计算费用&#xff0c;而业内推荐方案一般为攒批处理&#xff0c;业…

ChatGPT+Midjourney+闲鱼赚钱方法实战探索

最近天天在朋友群内看到朋友接单(出售提示词&#xff0c;图片&#xff09;&#xff0c;轻轻松松半小时就赚200-300&#xff0c;特意探索了一下相关玩法&#xff0c;总结出一套ChatGPTMidjourney闲鱼赚钱方法&#xff0c;主打的是易上手&#xff0c;有可操作性&#xff01; 具体…

项目性能优化之用compression-webpack-plugin插件开启gzip压缩

背景&#xff1a;vue项目打包发布后&#xff0c;部分js、css文件体积较大导致页面卡顿&#xff0c;于是使用webpack插件compression-webpack-plugin开启gzip压缩 前端配置vue.config.js 先通过npm下载compression-webpack-plugin包&#xff0c;npm i compression-webpack-plug…

C#使用RabbitMQ-2_详解工作队列模式

简介 &#x1f340;RabbitMQ中的工作队列模式是指将任务分配给多个消费者并行处理。在工作队列模式中&#xff0c;生产者将任务发送到RabbitMQ交换器&#xff0c;然后交换器将任务路由到一个或多个队列。消费者从队列中获取任务并进行处理。处理完成后&#xff0c;消费者可以向…

【图像分割】【深度学习】Windows10下UNet代码Pytorch实现与源码讲解

【图像分割】【深度学习】Windows10下UNet代码Pytorch实现与源码讲解 提示:最近开始在【医学图像分割】方面进行研究,记录相关知识点,分享学习中遇到的问题已经解决的方法。 文章目录 【图像分割】【深度学习】Windows10下UNet代码Pytorch实现与源码讲解前言UNet模型运行环境搭…