FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析

FlinkSQL处理如下实时数据需求:
实时聚合不同 类型/账号/发布时间 的各个指标数据,比如:初始化/初始化后删除/初始化后取消/推送/成功/失败 的指标数据。要求实时产出指标数据,数据源是mysql cdc binlog数据。

代码实例

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;CREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'json'
);CREATE TABLE es_sink(send_type      STRING,account_id     STRING,publish_time   STRING,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE view  tmp as
selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,event_time,opt,ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6CREATE view  tmp_groupby as
selectCOALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink
selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

其他配置

  • flink集群参数
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
  • 检查点配置
    在这里插入图片描述

  • job运行资源
    管理节点(JM) 1 个, 节点规格 1 核 4 GB内存, 磁盘 10Gi
    运行节点(TM)10 个, 节点规格 1 核 4 GB内存, 磁盘 80Gi
    单TM槽位数(Slot): 1
    默认并行度:8

  • es mapping

#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{"app_cust_syyy_private_domain_syyy_group_msg": {"properties": {"send_type": {"type": "keyword","ignore_above": 256},"account_id": {"type": "keyword"},"publish_time": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis","ignore_malformed":"true" # 忽略错误的各式}}},"grouping_id": {"type": "integer"},"init": {"type": "integer"},"init_cancel": {"type": "integer"},"query": {"type": "integer"},"succ": {"type": "integer"},"fail": {"type": "integer"},"init_delete": {"type": "integer"},"update_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}

性能调优

是否开启【MiniBatch 聚合】和【Local-Global 聚合】对分组聚合场景影响巨大,尤其是在数据量大的场景下。

  • 如果未开启,在分组聚合,数据更新状态时,每条数据都会触发聚合运算,进而更新StateBackend (尤其是对于 RocksDB StateBackend,火焰图上反映就是一直在update rocksdb),造成上游算子背压特别大。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。
    在这里插入图片描述

  • 在开启【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:

--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

开启配置好会在DAG上添加两个环节MiniBatchAssignerLocalGroupAggregate
在这里插入图片描述

对结果的影响

开启了【MiniBatch 聚合】和【Local-Global 聚合】后,一天处理不完的数据,在10分钟内处理完毕

输出结果

在这里插入图片描述在这里插入图片描述

参考:
Group Aggregation
Streaming Aggregation Performance Tuning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628779.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决kali beef启动失败解问题

只限于出现这个提示的时候使用 卸载 ruby apt remove ruby 卸载 beef apt remove beef-xss 重新安装ruby apt-get install ruby apt-get install ruby-dev libpcap-dev gem install eventmachine 重新安装beef apt-get install beef-xss 弄完以上步骤如果还是不行就重启kali再试…

cmake构建动态库实例(cmakelist)

文章目录 一、开发实例1.1 代码目录1.2 代码内容1.2.1 CMakeLists.txt1.2.2 mylib.cpp1.2.2 mylib.h1.2 编译二、动态库使用方法一、动态链接源代码构建运行方法二:dlopen方式一、开发实例 通过cmake构建静态开发实例如下: 1.1 代码目录 代码目录结构如下:

nestjs 装饰器

1、装饰器定义 装饰器是一种特殊的类型声明&#xff0c;它可以附加在类、方法、属性、参数上边 需开启tsconfig.json中 "experimentalDecorators":true 生成tsconfig.json文件 tsc -init 2、类装饰器 // 类装饰器 主要是通过符号添加装饰器 // 装饰器会自动把cl…

CCC数字钥匙标准3.0版本解读(23)

文章目录 17.8 数据结构定义17.8.1 请求报头17.8.2 响应报头17.8.3 Key类型17.8.4 EncryptedDataContainer17.8.5 未加密的uiBundle17.8.6 数字钥匙状态17.8.7 manageKey操作类型17.8.8 钥匙操作类型17.8.9 通知的事件类型17.8.10 事件通知的事件数据17.8.11 未加密的时间通知数…

【MySQL】MySQL表的增删查改以及聚合函数/group by句子的使用

文章目录 一、创建--Create1.单行数据 全列插入2.多行数据 指定列插入3.插入否则更新4.替换 -- replace 二、读取--Retrieve1.SELECT列1.1全列查询1.2指定列查询1.3查询字段为表达式1.4为查询结果指定别名1.5 结果去重 -- distinct 2.WHERE 条件3.结果排序4.筛选分页结果 三、…

《数据结构》学习笔记

1.算法分析的两个主要任务&#xff1a;正确性&#xff08;不变性 单调性&#xff09; 复杂度。 2.复杂度分析的主要方法&#xff1a; 迭代&#xff1a;级数求和&#xff1b;递归&#xff1a;递归跟踪 递推方程猜测 验证 3.级数&#xff1a; &#xff08;1&#xff09;算…

LLM之RAG实战(十六)| 使用Llama-2、PgVector和LlamaIndex构建LLM Rag Pipeline

近年来&#xff0c;大型语言模型&#xff08;LLM&#xff09;取得了显著的进步&#xff0c;然而大模型缺点之一是幻觉问题&#xff0c;即“一本正经的胡说八道”。其中RAG&#xff08;Retrieval Augmented Generation&#xff0c;检索增强生成&#xff09;是解决幻觉比较有效的…

智能小程序小部件(Widget)表单组件属性说明+代码明细

在 Tuya MiniApp Tools 中&#xff0c;新建项目并选择小部件(Widget)对应模板即可自动创建小部件(Widget)项目。 button 按钮&#xff0c;用于强调操作并引导用户去点击。 属性说明 属性名类型默认值必填说明sizestringdefault否按钮的大小typestringdefault否按钮的样式类…

opencv_角点检测

文章内容 一个opencv检测角点的程序 运行效果 #include <opencv2/opencv.hpp> #include <opencv2/highgui/highgui.hpp> #include <opencv2/imgproc/imgproc.hpp> #include <iostream>using namespace cv; using namespace std;void detectCorners(M…

低代码-详情页组件设计

详情页数据结构定义 layout:{// 按钮数据buttonLayout:{headButton:[], // 页头按钮footButton:[] // 页脚按钮},// 详情页表单配置config:{}, // 配置组件列表detailLayout:[]}默认行为 进表单初始化&#xff0c;只展示表单属性&#xff0c;隐藏通用、数据、事件tab项。 配…

低代码自动化平台| 游戏规则改变者

自动化测试对于软件开发公司起着非常重要的作用。它在公司及其客户之间建立了对优质产品的信任。此外&#xff0c;它还使软件开发人员更加自信&#xff0c;因为他们可以在其他模块上工作&#xff0c;而不必担心应用程序的任何现有功能是否存在错误。在软件测试中融入自动化是必…

数据分析-Pandas如何整合多张数据表

数据分析-Pandas如何整合多张数据表 数据表&#xff0c;时间序列数据在数据分析建模中很常见&#xff0c;例如天气预报&#xff0c;空气状态监测&#xff0c;股票交易等金融场景。数据分析过程中重新调整&#xff0c;重塑数据表是很重要的技巧&#xff0c;此处选择Titanic数据…

多测师肖sir___ui自动化测试po框架(升级)

ui自动化测试po框架&#xff08;升级&#xff09; po框架 一、ui自动化po框架介绍 &#xff08;1&#xff09;PO是Page Object的缩写&#xff08;pom模型&#xff09; &#xff08;2&#xff09;业务流程与页面元素操作分离的模式&#xff0c;可以简单理解为每个页面下面都有一…

【linux】visudo

碎碎念 visudo命令是用来修改一个叫做 /etc/sudoers 的文件的&#xff0c;用来设置哪些 用户 和 组 可以使用sudo命令。并且使用visudo而不是使用 vi /etc/sudoers 的原因在于&#xff1a;visudo自带了检查功能&#xff0c;可以判断是否存在语法问题&#xff0c;所以更加安全 …

深入探讨 Go 语言中的 Map 类型(续)

深入探讨 Go 语言中的 Map 类型&#xff08;续&#xff09; 在上一篇博客中&#xff0c;我们已经讨论了 Go 语言中 map 类型的基本概念、特性以及最佳实践。本篇继续深入&#xff0c;讨论一些更高级的 map 用法和技巧&#xff0c;以及一些注意事项。 更高级的 Map 用法 1. m…

7.评价预测模型——C指数,NRI,IDI计算

目录 基本知识 1. C指数 2. NRI、IDI 二分类资料 1. C指数 C指数计算 比较两个模型C指数 2. NRI 3. IDI 生存资料 1. rms包拟合的生存曲线 C指数 比较两个模型的C指数 2. survival包拟合的生存曲线 C指数 NRI计算 IDI 基本知识 1. C指数 C指数&#xff1a; …

给VScode 挪挪窝

给VSCode 挪挪窝 vscode platoformio 写一些代码&#xff0c;导致笔记本c盘满满当当的&#xff0c; 挪了几次都不成功&#xff0c;今天学了一招&#xff0c;给这俩挪到另外一个盘去 复制文件 cp -R c:\users\user\.vscode d:\work\tools\PIO\.vscode cp -R c:\users\user…

stm32 - 基础架构

stm32 - 基础架构 基础架构外设概念系统结构引脚定义晶振工程 基础架构 外设概念 NVIC &#xff08;内核外设&#xff09; SysTick &#xff08;内核外设&#xff09; 其他是片上外设 系统结构 内核引出三条总线 ICode 指令总线&#xff1a; 连接Flash闪存&#xff08;编写的…

C# wpf 获取控件刷新的时机

文章目录 前言一、为何要获取刷新时机&#xff1f;例子一、隐藏控件后截屏例子二、修改控件大小后做计算 二、如何实现&#xff1f;1.使用动画2.使用TaskCompletionSource 三、完整代码四、使用示例1、隐藏工具条截屏2、修改宽高后获取ActualWidth、ActualHeight 总结 前言 做…

【算法题】58. 最后一个单词的长度

题目 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1&#xff1a; 输入&#xff1a;s "Hello World" 输出&#…