flink写doris时的优化

1.概念

doris并擅长高频小量数据的导入

因为doris每一次数据导入都会在be节点上生成数据文件;如果高频导入小量数据,就会在存储层产生大量的小文件(必然会影响到后续的查询效率,也会对系统产生更多的compaction操作压力)

而flink是实时不断地往doris中插入数据,所以很容易出现上述问题;

怎么办?有两个办法:

  1. 在flink中先做一些按时间开窗后的轻度聚合,降低写入的数据量(在先flink端处理,后续的数据量变少了)
  2. 可以适当调大checkpoint间隔(10分钟),降低插入频率(原因是flink在做完checkpoint才往下游写数据)

方案1:开窗轻度聚合

1.例子

例子:
-- 分钟级聚合
CREATE TABLE doris_sink (window_start TIMESTAMP(3),total_count BIGINT,sum_value DECIMAL(16,2)
) WITH ('connector' = 'doris','table.identifier' = 'db.table','sink.batch.size' = '5000', 'sink.batch.interval' = '60s'
);INSERT INTO doris_sink
SELECTTUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,COUNT(*) AS total_count,SUM(value) AS sum_value
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

优化效果​​(示例):

时间窗口原始数据量聚合后数据量写入压缩比
1秒10000条/s10000条/s1:1
10秒10000条/s1000条/10s10:1
1分钟10000条/s167条/min60:1

 在flink端部分聚合,再写入doris,数据量变小了,效率自然提高

2.合适的使用场景:

场景特征适用性技术实现要点收益
高并发写入(>1万条/秒)滚动窗口聚合 + 计数窗口降频减少 90% 小文件,避免 -235 错误

1

亚秒级查询需求预计算指标 + 结果表写入查询延迟从秒级降至毫秒级

3

多源数据关联窗口内多流 Join + 聚合避免 Doris 复杂查询,节省 30% CPU

5

精确统计需求需写入原始明细数据-

 (1)高并发写入场景

当上游数据源(如 Kafka)的写入并发量极高(例如每秒 10 万条以上)时,直接写入 Doris 可能导致以下问题:

  1. ​小文件过多​​:频繁写入会产生大量小文件,触发 Doris 的版本合并(Compaction)压力,可能引发错误。
  2. ​资源消耗大​​:高频写入导致 Doris BE 节点的 CPU 和 I/O 资源被 Compaction 任务占用,影响查询性能。

​解决方案​​:
在 Flink 中通过 ​​滚动窗口(如 5 秒窗口)​​ 或 ​​计数窗口(如每 1000 条)​​ 对数据进行预聚合,将多条数据合并为一条统计结果后再写入 Doris。例如:

DataStream<Event> stream = ...;
stream.keyBy(Event::getKey).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口.aggregate(new AvgAggregator()) // 聚合逻辑(如计算均值).addSink(new DorisSink());对应到sql 直接开窗5s

此方式可将写入频率降低 10 倍以上,减少 Doris 的写入压力

(2)低延迟查询需求场景

当业务需要​​亚秒级查询响应​​(如实时大屏、风控决策)时,直接写入原始数据可能导致:

  1. ​查询性能下降​​:原始数据量大,Doris 需实时聚合计算,增加查询耗时;
  2. ​存储成本高​​:原始明细数据占用大量存储空间。

解决方案​​:
在 Flink 中按时间窗口(如 1 分钟)预计算关键指标(如 PV、UV、GMV),仅将聚合结果写入 Doris。例如:

  • ​原始数据​​:用户点击事件(每秒 10 万条) → ​​聚合后​​:每分钟 PV 统计值(每秒 1 条)。
    此方式可提升 Doris 查询效率,同时节省存储资源

(3)数据预处理与清洗场景

当原始数据存在以下特征时,适合在 Flink 端聚合:

  1. ​冗余数据多​​:如重复日志、无效埋点;
  2. ​关联计算需求​​:需跨数据源关联(如用户行为数据与订单数据)。

​解决方案​​:
通过 Flink 窗口函数实现:

  • ​去重​​:使用 WindowFunction 过滤重复数据;
  • ​关联计算​​:在窗口内完成多流 Join,输出关联结果。
    例如,在 10 秒窗口内关联用户点击与加购行为,输出转化率指标,避免 Doris 中复杂的多表关联查询

(4)资源受限场景

当 Doris 集群资源(CPU、内存、磁盘)有限时,可通过以下方式优化:

  1. ​降低写入量​​:聚合后数据量减少 50%~90%,降低 Doris 存储和 Compaction 压力;
  2. ​延长 Compaction 周期​​:通过减少小文件数量,允许 Doris 合并任务更高效调度。

​参数调优建议​​:

  • Flink Checkpoint 间隔:从 5 秒调整为 30 秒~1 分钟,减少事务提交频率;
  • Doris Compaction 参数:调低 cumulative_size_based_promotion_min_size_mbytes(默认 64MB → 8MB),加速小文件合并;

方案2:调大 Checkpoint 间隔

生产环境测试数据​​:

Checkpoint间隔吞吐量(events/s)写入延迟(ms)CPU利用率
1分钟12,00050-10075%
5分钟28,00030-8065%
10分钟35,00020-6058%

考一个对checkpoint的理解:flink是在做完checkpoint才往下游写数据?,比如说checkpoint的时间是1分钟,岂不是延迟就是一分钟?

结论:​​数据处理和状态快照是解耦的​​。调整 Checkpoint 间隔只会影响故障恢复时可能丢失的数据量(Recovery Time Objective),​​不会增加数据处理的固有延迟​;

具体例子(以第 N 分钟为例):

  1. ​0:00.000​

    • 用户A点击商品X → Kafka 生产事件
    • Flink 立即消费并处理,PV计数器+1 → 实时写入 Doris

  2. ​0:00.500​

    • CheckpointCoordinator 触发新一轮 Checkpoint
    • Source 算子注入 Barrier 到数据流(特殊标记,不影响正常数据处理)

  3. ​0:00.501-0:02.000​

    • Barrier 随数据流向下游传播
    • PV统计算子 ​​边处理新事件​​ 边接收 Barrier:
    • Doris 持续收到 PV=100 → 101 → 102... 的写入请求
  4. ​0:03.000​

    • 所有算子完成状态快照(耗时约2秒)
    • 快照存储到 HDFS(异步执行,不阻塞主线程)
  5. ​0:06.000​

    • Checkpoint 确认完成,JM 记录元数据;

正常情况:

  • 用户点击后 ​​500ms​​ 内即可在 Doris 查询到最新 PV(实际延迟仅网络+计算耗时)
  • Checkpoint 过程持续 ​​6秒​​,但期间 Doris 收到 ​​60次​​ 数据写入(每秒10次);

故障要恢复情况:

假设在 ​​0:50​​ 发生故障:

  • 从最近 Checkpoint(0:00 开始,0:06 完成)恢复
  • 状态回滚到 PV=100(Checkpoint 时的快照值)
  • ​但 Doris 实际已写入 PV=150​
  • Flink 通过事务机制保证最终 PV=150 + 恢复后新数据 的精确一次语义

这时候聪明的你又发现:

Doris 实际已写入 PV=150​​,相当于以及写入到下游的doris,是怎么让数据回滚的???

原因:Flink 在故障恢复时保证 Doris 已写入的 PV=150 数据不会导致重复计算,核心是通过​​两阶段提交(2PC)机制​​与​​事务性写入​​实现的,所以可以回滚数据

Checkpoint 与事务的阶段性控制​:

Flink 的 Checkpoint 过程与 Sink 的事务提交严格绑定,整个过程分为 ​​预提交(pre-commit)​​ 和 ​​正式提交(commit)​​ 两个阶段

  1. ​预提交阶段​​(Checkpoint 进行中)

    • Flink Sink 将计算结果(如 PV=100→150 的增量)写入 Doris 的​​临时存储位置​​(如临时表或事务日志),但​​未对外可见​​。
    • 此时 Doris 的 PV=150 ​​仅处于预提交状态​​,未实际生效。
  2. ​正式提交阶段​​(Checkpoint 确认完成)

    • 当 JobManager 收到所有算子的 Checkpoint 完成确认后,才会通知 Sink ​​提交事务​​。
    • Doris 将临时数据​​原子性替换为正式数据​​(如重命名临时文件或更新可见标志)

故障恢复时的回滚逻辑​

假设故障发生在 ​​0:50​​(Checkpoint 未完成):

  1. ​未完成的 Checkpoint 事务回滚​

    • Flink 从最近成功的 Checkpoint(PV=100)恢复状态。
    • 同时,Doris 中处于预提交状态的 PV=150 ​​会被自动清理​​(如删除临时表或撤销事务日志)。
  2. ​数据重放与幂等性保障​

    • Flink 会从 Source 端(如 Kafka)​​重放 Checkpoint 后的数据​​(0:06→0:50 的数据)。
    • Doris Sink 在写入时通过​​事务 ID 或唯一键​​实现幂等性,确保相同数据多次写入不会重复累加;

疑问:针对数据回滚的场景,doris能查询到 PV=150的数据吗

Doris 默认的隔离级别保证查询只能看到已提交的数据,所以查看不到PV=150的数据

其他调优手段:

1、开启 MiniBatch 聚合

table.exec.mini-batch.enabled = true
table.exec.mini-batch.size = 5000

2、配置 Doris 批量写入

sink.batch.size = 5000
sink.max-retries = 5 --最大可重试5次

3、异步 Compaction 调优

ALTER TABLE doris_table SET ("compaction_policy" = "time_series");

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/77098.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ElementNotInteractableException原因及解决办法

在自动化测试中,ElementNotInteractableException是一个常见的异常,它通常发生在尝试与网页上的某个元素进行交互(例如点击、输入等操作)时,但由于该元素当前不可交互。这可能由多种原因引起,以下是一些常见的原因及其解决方法: 元素未完全加载 如果尝试与页面上的元素交…

如何从 GitHub 镜像仓库到极狐GitLab?

最近 GitHub 封禁中国用户的事情闹得沸沸扬扬,虽然官方发布的报道说中国用户被限制登录是因为配置错误导致,已经撤回了更新,中国用户已经可以正常使用。但是这就像横在国内开发者和企业头上的“达摩克利斯之剑”。为了避免 GitHub 不可用而带来的影响,国内开发者和企业可以…

服务器安装nacos

1.下载依赖 docker pull nacos/nacos-server:v2.4.3安装 docker run -d --name nacos-server -p 8848:8848 -e MODEstandalone nacos/nacos-server:v2.4.3把nacos中的data 文件和conf 文件copy到自己服务的文件夹 docker cp nacos-server:/home/nacos/data /home/admin1/…

Matter协议暗战:苹果、谷歌、亚马逊的智能家居霸权争夺

原文地址&#xff1a;Matter协议暗战&#xff1a;苹果、谷歌、亚马逊的智能家居霸权争夺 一、Matter 协议&#xff1a;巨头联手打造的 “智能家居联合国” 1.1 从 CHIP 到 Matter&#xff1a;标准统一的十年长跑 智能家居发展多年&#xff0c;却始终被 “孤岛效应” 困扰。各…

软件设计师2009-2022历年真题与答案解析(附pdf下载)

软考在即&#xff0c;现在给大家分享一下软件设计师2009-2022真题与答案解析 pdf全套&#xff0c;文末提供大家免费下载&#xff0c;大家都知道在软考备考过程中&#xff0c;拥有一套全面且实用的考试资料对于考生来说至关重要。目录如下&#xff1a; 历年真题及详解2004-2019 …

基于EasyX库开发的球球大作战游戏

目录 球球大作战 一、开发环境 二、流程图预览 三、代码逻辑 1、初始化时间 2、设置开始界面大小 3、设置开始界面 4、让玩家选择速度 5、设置玩家小球、人机小球、食物的属性 6、一次性把图绘制到界面里 7、进入死循环 8、移动玩家小球 9、移动人机 10、食物刷新…

aslist和list的区别

‌Arrays.asList和List的主要区别在于它们的固定长度和不可变性、与原始数组的关系、性能以及使用场景。 一、固定长度和不可变性 ‌Arrays.asList‌&#xff1a;通过Arrays.asList方法创建的List是一个固定长度的List&#xff0c;其长度与原始数组相同。这意味着你不能通过添…

大模型预标注和自动化标注在OCR标注场景的应用

OCR&#xff0c;即光学字符识别&#xff0c;简单来说就是利用光学设备去捕获图像并识别文字&#xff0c;最终将图片中的文字转换为可编辑和可搜索的文本。在数字化时代&#xff0c;OCR&#xff08;光学字符识别&#xff09;技术作为处理图像中文字信息的关键手段&#xff0c;其…

stm32工程,拷贝到另一台电脑编译,错误提示头文件找不到cannot open source input file “core_cm4.h”

提示 cannot open source input file “core_cm4.h” ,找不到 [ core_cm4.h ] 这个头文件 . 于是我在原电脑工程文件里找也没有找到这个头文件 接下来查看原电脑keil的头文件引入配置,发现只引入了工程文件下的头文件, 那么core_cm4.h到底哪里来的? (到现在我也不清楚怎…

STM32 模块化开发指南 · 第 2 篇 如何编写高复用的外设驱动模块(以 UART 为例)

本文是《STM32 模块化开发实战指南》的第 2 篇,聚焦于“串口驱动模块的设计与封装”。我们将从一个最基础的裸机 UART 初始化开始,逐步实现:中断支持、环形缓冲收发、模块接口抽象与测试策略,构建一个可移植、可扩展、可复用的 UART 驱动模块。 一、模块化 UART 的设计目标…

【NLP 59、大模型应用 —— 字节对编码 bpe 算法】

目录 一、词表的构造问题 二、bpe(byte pair encoding) 压缩算法 算法步骤 示例&#xff1a; 步骤 1&#xff1a;初始化符号表和频率统计 步骤 2&#xff1a;统计相邻符号对的频率 步骤 3&#xff1a;合并最高频的符号对 步骤 4&#xff1a;重复合并直至终止条件 三、bpe在NLP中…

TMS320F28P550SJ9学习笔记15:Lin通信SCI模式结构体寄存器

今日初步认识与配置使用Lin通信SCI模式&#xff0c;用结构体寄存器的方式编程 文章提供完整工程下载、测试效果图 我的单片机平台是这个&#xff1a; LIN通信引脚&#xff1a; LIN通信PIE中断&#xff1a; 这个 PIE Vector Table 表在手册111页&#xff1a; 这是提到LINa的PI…

linux-设置每次ssh登录服务器的时候提醒多久需要修改密码

在 Linux 系统中,你可以通过设置 motd(Message of the Day)或 sshd 配置来在用户通过 SSH 登录时提醒他们密码即将过期。以下是具体步骤: 方法 1: 使用 motd 文件 motd 文件在用户登录时显示,你可以通过脚本动态生成内容,提醒用户密码过期时间。 编辑 /etc/motd 文件:…

matlab求和∑函数方程编程?

matlab求和∑函数方程编程&#xff1f; 一 题目&#xff1a;求下列函数方程式的和 二&#xff1a;代码如下&#xff1a; >> sum_result 0; % 初始化求和变量 for x 1:10 % 设…

electron桌面端开发-打开指定软件和文件

electron桌面端开发 现在越来越多的软件开发已经趋向于简单化&#xff0c;桌面端开发已经不在依赖之前的java、c等主流技术&#xff0c;目前基于node的开发越来越广泛。功能点也越来越多元化。 文章目录 electron桌面端开发前言一、打开文件的方式&#xff1f;二、exec使用步骤…

ShenNiusModularity项目源码学习(17:ShenNius.Admin.Mvc项目分析-2)

ShenNiusModularity项目的后台管理主页面如下图所示&#xff0c;该页面为ShenNius.Admin.Mvc项目的Views\Home\Index.cshtml&#xff0c;使用的是layuimini后台模板&#xff08;参考文献2&#xff09;&#xff0c;在layuimini的GitHub主页中提供有不同样式的页面模版链接&#…

SpringBoot 与 Vue3 实现前后端互联全解析

在当前的互联网时代&#xff0c;前后端分离架构已经成为构建高效、可维护且易于扩展应用系统的主流方式。本文将详细介绍如何利用 SpringBoot 与 Vue3 构建一个前后端分离的项目&#xff0c;展示两者如何通过 RESTful API 实现无缝通信&#xff0c;让读者了解从环境搭建、代码实…

portainer.io篇

Portainer‌是一个轻量级的容器管理工具&#xff0c;支持Docker、Kubernetes、Docker Swarm、ACI和Nomad等多种平台。它提供了一个直观的Web界面&#xff0c;使用户能够轻松地管理和监控容器&#xff0c;包括创建、启动、停止、删除容器&#xff0c;以及查看容器的日志和配置信…

Dockerfile 文件常见命令及其作用

Dockerfile 文件包含一系列命令语句&#xff0c;用于定义 Docker 镜像的内容、配置和构建过程。以下是一些常见的命令及其作用&#xff1a; FROM&#xff1a;指定基础镜像&#xff0c;后续的操作都将基于该镜像进行。例如&#xff0c;FROM python:3.9-slim-buster 表示使用 Pyt…

Android Studio开发知识:从基础到进阶

引言 Android开发作为移动应用开发的主流方向之一&#xff0c;曾吸引了无数开发者投身其中。然而&#xff0c;随着市场饱和和技术迭代&#xff0c;当前的Android开发就业形势并不乐观&#xff0c;竞争日益激烈。尽管如此&#xff0c;掌握扎实的开发技能仍然是脱颖而出的关键。本…