Apache SeaTunnel脚本升级及参数调优实战

最近作者针对实时数仓的Apache SeaTunnel同步链路,完成了双引擎架构升级与全链路参数深度调优,希望本文能够给大家有所启发,欢迎批评指正!

在这里插入图片描述

Apache SeaTunnel 版本 :2.3.9

Doris版本:2.0.6

MySQL JDBC Connector : 8.0.28

架构升级

  • 批处理链路:JDBC并行度进行提升,基于ID分区实现分片读取,结合批量参数(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加

  • 实时增量链路:引入MySQL-CDC组件,通过initial快照模式+chunk.size.rows=8096实现全量/增量平滑切换,事件延迟压降至500ms内

稳定性增强

  • 资源管控:JDBC连接池动态扩容(max_size=20)+ CDC限流策略(rows_per_second=1000),源库CPU峰值负载下降40%

  • 容错机制:Doris两阶段提交(enable-2pc=true)配合检查点(checkpoint.interval=10s),故障恢复时间缩短80%

写入优化

  • 缓冲区三级联控(buffer-size=10000+buffer-count=3+flush.interval=5s)提升Doris写入批次质量

  • Tablet粒度控制(request_tablet_size=5)使BE节点负载均衡度提升

实战演示

同步之前创建Doris表

在这里插入图片描述

-- DROP TABLE IF EXISTS ods.ods_activity_info_full;
CREATE TABLE ods.ods_activity_info_full
(`id`            VARCHAR(255) COMMENT '活动id',`k1`            DATE NOT NULL   COMMENT '分区字段',`activity_name` STRING COMMENT '活动名称',`activity_type` STRING COMMENT '活动类型',`activity_desc` STRING COMMENT '活动描述',`start_time`    STRING COMMENT '开始时间',`end_time`      STRING COMMENT '结束时间',`create_time`   STRING COMMENT '创建时间'
)ENGINE=OLAP  -- 使用Doris的OLAP引擎,适用于高并发分析场景UNIQUE KEY(`id`,`k1`)  -- 唯一键约束,保证(id, k1)组合的唯一性(Doris聚合模型特性)
COMMENT '活动信息全量表'
PARTITION BY RANGE(`k1`) ()  -- 按日期范围分区(具体分区规则由动态分区配置决定)
DISTRIBUTED BY HASH(`id`)  -- 按id哈希分桶,保证相同id的数据分布在同一节点
PROPERTIES
("replication_allocation" = "tag.location.default: 1",  -- 副本分配策略:默认标签分配1个副本"is_being_synced" = "false",          -- 是否处于同步状态(通常保持false)"storage_format" = "V2",             -- 存储格式版本(V2支持更高效压缩和索引)"light_schema_change" = "true",      -- 启用轻量级schema变更(仅修改元数据,无需数据重写)"disable_auto_compaction" = "false", -- 启用自动压缩(合并小文件提升查询性能)"enable_single_replica_compaction" = "false", -- 禁用单副本压缩(多副本时保持数据一致性)"dynamic_partition.enable" = "true",            -- 启用动态分区"dynamic_partition.time_unit" = "DAY",          -- 按天创建分区"dynamic_partition.start" = "-60",             -- 保留最近60天的历史分区"dynamic_partition.end" = "3",                 -- 预先创建未来3天的分区"dynamic_partition.prefix" = "p",              -- 分区名前缀(如p20240101)"dynamic_partition.buckets" = "32",            -- 每个分区的分桶数(影响并行度)"dynamic_partition.create_history_partition" = "true", -- 自动创建缺失的历史分区"bloom_filter_columns" = "id,activity_name",  -- 为高频过滤字段(id/名称)创建布隆过滤器,加速WHERE查询"compaction_policy" = "time_series",          -- 按时间序合并策略优化时序数据(适合活动时间字段)"enable_unique_key_merge_on_write" = "true",  -- 唯一键写时合并(实时更新场景减少读放大)"in_memory" = "false"                        -- 关闭全内存存储(仅小表可开启)
);
配置SeaTunnel JDBC同步脚本

在这里插入图片描述

env {# 环境配置parallelism = 8                     # 增加并行度以提高吞吐量job.mode = "STREAMING"              # 使用流式处理模式进行实时同步checkpoint.interval = 10000         # 检查点间隔,单位毫秒# 限流配置 - 避免对源数据库造成过大压力read_limit.bytes_per_second = 10000000  # 每秒读取字节数限制,约10MB/sread_limit.rows_per_second = 1000       # 每秒读取行数限制# 本地检查点配置execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"execution.checkpoint.max-concurrent = 1  # 最大并发检查点数# 性能优化参数execution.buffer-timeout = 5000          # 缓冲超时时间(毫秒)execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}source {MySQL-CDC {# 基本连接配置# server-id = 5652-5657             # MySQL复制客户端的唯一ID范围username = "root"                # 数据库用户名password = ""                # 数据库密码table-names = ["gmall.activity_info"]  # 要同步的表base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"# CDC 特有配置schema-changes.enabled = true     # 启用架构变更捕获server-time-zone = "Asia/Shanghai"  # 服务器时区# 性能优化配置snapshot.mode = "initial"         # 初始快照模式snapshot.fetch.size = 10000       # 快照获取大小chunk.size.rows = 8096            # 分块大小,用于并行快照connection.pool.size = 10         # 连接池大小# 高级配置include.schema.changes = true     # 包含架构变更事件scan.startup.mode = "initial"     # 启动模式:initial(全量+增量)scan.incremental.snapshot.chunk.size = 8096  # 增量快照分块大小debezium.min.row.count.to.stream.results = 1000  # 流式结果的最小行数# 容错配置connect.timeout = 30000           # 连接超时时间(毫秒)connect.max-retries = 3           # 最大重试次数# 输出表名result_table_name = "mysql_cdc_source"}
}# 可选的转换逻辑,如果需要对数据进行处理
transform {Sql {source_table_name = "mysql_cdc_source"result_table_name = "doris_sink_data"# 根据需要转换字段,这里添加了一个分区字段k1query = """selectid,formatdatetime(create_time,'yyyy-MM-dd') as k1,activity_name,activity_type,activity_desc,start_time,end_time,create_timefrom mysql_cdc_source"""}
}sink {Doris {# 基本连接配置source_table_name = "doris_sink_data"  # 或直接使用 "mysql_cdc_source"fenodes = "192.168.241.128:8030"username = "root"password = ""table.identifier = "ods.ods_activity_info_full"  # Doris目标表# 事务和标签配置sink.enable-2pc = "true"          # 启用两阶段提交,确保一致性sink.label-prefix = "cdc_sync"    # 导入标签前缀# 写入模式配置sink.properties {format = "json"read_json_by_line = "true"column_separator = "\t"         # 列分隔符line_delimiter = "\n"           # 行分隔符max_filter_ratio = "0.1"        # 允许的最大错误率# CDC特有配置 - 处理不同操作类型# 使用Doris的UPSERT模式处理CDC事件merge_type = "MERGE"            # 合并类型:APPEND或MERGEdelete_enable = "true"          # 启用删除操作}# 性能优化配置sink.buffer-size = 10000          # 缓冲区大小sink.buffer-count = 3             # 缓冲区数量sink.flush.interval-ms = 5000     # 刷新间隔sink.max-retries = 3              # 最大重试次数sink.parallelism = 8              # 写入并行度# Doris连接优化doris.config = {format = "json"read_json_by_line = "true"request_connect_timeout_ms = "5000"  # 连接超时request_timeout_ms = "30000"         # 请求超时request_tablet_size = "5"            # 每个请求的tablet数量}}
}
配置SeaTunnel MySQLCDC 同步脚本

在这里插入图片描述

env {parallelism = 8job.mode = "BATCH"checkpoint.interval = 30000# 本地文件系统检查点execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"execution.buffer-timeout = 5000# JVM 参数优化execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}source {Jdbc {result_table_name = "mysql_seatunnel"url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 30user = "gmall"password = "gmall"# 使用分区并行读取query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"partition_column = "id"partition_num = 8# 连接池配置connection_pool {max_size = 20min_idle = 5max_idle_ms = 60000}# 批处理配置fetch_size = 10000batch_size = 5000is_exactly_once = true}
}transform {Sql {source_table_name = "mysql_seatunnel"result_table_name = "seatunnel_doris"query = """select id, formatdatetime(create_time,'yyyy-MM-dd') as k1,  activity_name, activity_type, activity_desc, start_time, end_time, create_time from mysql_seatunnel"""}
}sink {Doris {source_table_name = "seatunnel_doris"fenodes = "192.168.241.128:8030"username = "root"password = ""table.identifier = "ods.ods_activity_info_full"sink.enable-2pc = "true"sink.label-prefix = "test_json"# 优化Doris写入配置sink.properties {format = "json"read_json_by_line = "true"column_separator = "\t"line_delimiter = "\n"max_filter_ratio = "0.1"}# 批量写入配置sink.buffer-size = 10000sink.buffer-count = 3sink.flush.interval-ms = 5000sink.max-retries = 3sink.parallelism = 8doris.config = {format = "json"read_json_by_line = "true"request_connect_timeout_ms = "5000"request_timeout_ms = "30000"request_tablet_size = "5"}}
}

最终Apache Doris数据:
在这里插入图片描述

本文完!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/74220.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ 时间操作:获取有史以来的天数与文件计数器

C 时间操作:获取有史以来的天数与文件计数器 在C中,时间操作是一个非常重要的功能,尤其是在需要处理日期、时间戳或定时任务时。本文将介绍如何利用C的时间操作功能,实现以下两个目标: 获取从Unix纪元时间&#xff0…

Python Bug修复案例分析:Python 中常见的 IndentationError 错误 bug 的修复

在 Python 编程的世界里,代码的可读性和规范性至关重要。Python 通过强制使用缩进来表示代码块的层次结构,这一独特的设计理念使得代码更加清晰易读。然而,正是这种对缩进的严格要求,导致开发者在编写代码时,稍有不慎就…

【论文笔记】Transformer

Transformer 2017 年,谷歌团队提出 Transformer 结构,Transformer 首先应用在自然语言处理领域中的机器翻译任务上,Transformer 结构完全构建于注意力机制,完全丢弃递归和卷积的结构,这使得 Transformer 结构效率更高…

CI/CD(三) 安装nfs并指定k8s默认storageClass

一、NFS 服务端安装(主节点 10.60.0.20) 1. 安装 NFS 服务端 sudo apt update sudo apt install -y nfs-kernel-server 2. 创建共享目录并配置权限 sudo mkdir -p /data/k8s sudo chown nobody:nogroup /data/k8s # 允许匿名访问 sudo chmod 777 /dat…

【QA】单件模式在Qt中有哪些应用?

单例设计模式确保一个类仅有一个实例,并提供一个全局访问点来获取该实例。在 Qt 框架中,有不少类的设计采用了单例模式,以下为你详细介绍并给出相应代码示例。 1. QApplication QApplication 是 Qt GUI 应用程序的核心类,每个 Q…

存储过程触发器习题整理1

46、{blank}设有商品表(商品号,商品名,单价)和销售表(销售单据号,商品号,销售时间,销售数量,销售单价)。其中,商品号代表一类商品,商品号、单价、销售数量和销售单价均为整型。请编写…

基于ChatGPT、GIS与Python机器学习的地质灾害风险评估、易发性分析、信息化建库及灾后重建高级实践

第一章、ChatGPT、DeepSeek大语言模型提示词与地质灾害基础及平台介绍【基础实践篇】 1、什么是大模型? 大模型(Large Language Model, LLM)是一种基于深度学习技术的大规模自然语言处理模型。 代表性大模型:GPT-4、BERT、T5、Ch…

单表达式倒计时工具:datetime的极度优雅(智普清言)

一个简单表达式,也可以优雅自成工具。 笔记模板由python脚本于2025-03-22 20:25:49创建,本篇笔记适合任意喜欢学习的coder翻阅。 【学习的细节是欢悦的历程】 博客的核心价值:在于输出思考与经验,而不仅仅是知识的简单复述。 Pyth…

最优编码树的双子性

现在看一些书,不太愿意在书上面做一些标记,也没啥特殊的原因。。哈哈。 树的定义 无环连通图,极小连通图,极大无环图。 度 某个节点,描述它的度,一般默认是出度,分叉的边的条数。或者说孩子…

MiB和MB

本文来自腾讯元宝 MiB 和 ​MB 有区别,尽管它们都用于表示数据存储的单位,但它们的计算方式不同,分别基于不同的进制系统。 1. ​MiB(Mebibyte)​ ​MiB 是基于二进制的单位,使用1024作为基数。1 MiB 102…

Labview和C#调用KNX API 相关东西

叙述:完全没有听说过KNX这个协议...................我这次项目中也是简单的用了一下没有过多的去研究 C#调用示例工程链接(labview调用示例在 DEBUG文件夹里面) 通过网盘分享的文件:KNX调用示例.zip 链接: https://pan.baidu.com/s/1NQUEYM11HID0M4ksetrTyg?pwd…

损失函数理解(二)——交叉熵损失

损失函数的目的是为了定量描述不同模型(例如神经网络模型和人脑模型)的差异。 交叉熵,顾名思义,与熵有关,先把模型换成熵这么一个数值,然后用这个数值比较不同模型之间的差异。 为什么要做这一步转换&…

Kubernetes的Replica Set和ReplicaController有什么区别

ReplicaSet 和 ReplicationController 是 Kubernetes 中用于管理应用程序副本的两种资源,它们有类似的功能,但 ReplicaSet 是 ReplicationController 的增强版本。 以下是它们的主要区别: 1. 功能的演进 ReplicationController 是 Kubernete…

信息系统运行管理员教程3--信息系统设施运维

第3章 信息系统设施运维 信息系统设施是支撑信息系统业务活动的信息系统软硬件资产及环境。 第1节 信息系统设施运维的管理体系 信息系统设施运维的范围包含信息系统涉及的所有设备及环境,主要包括基础环境、硬件设备、网络设备、基础软件等。 信息系统设施运维…

如何通过Python实现自动化任务:从入门到实践

在当今快节奏的数字化时代,自动化技术正逐渐成为提高工作效率的利器。无论是处理重复性任务,还是管理复杂的工作流程,自动化都能为我们节省大量时间和精力。本文将以Python为例,带你从零开始学习如何实现自动化任务,并通过一个实际案例展示其强大功能。 一、为什么选择Pyt…

Spring Boot 与 MyBatis Plus 整合 KWDB 实现 JDBC 数据访问

​ 引言 本文主要介绍如何在 IDEA 中搭建一个使用 Maven 管理的 Spring Boot 应用项目工程,并结合在本地搭建的 KWDB 数据库(版本为:2.0.3)来演示 Spring Boot 与 MyBatis Plus 的集成,以及对 KWDB 数据库的数据操作…

Java锁等待唤醒机制

在 Java 并发编程中,锁的等待和唤醒机制至关重要,通常使用 wait()、notify() 和 notifyAll() 来实现线程间的协调。本文将详细介绍这些方法的用法,并通过示例代码加以说明。 1. wait()、notify() 与 notifyAll() 在 Java 中,Obj…

 UNIX网络编程笔记:TCP客户/服务器程序示例

服务器实例 有个著名的项目&#xff0c;tiny web&#xff0c;本项目将其改到windows下&#xff0c;并使用RAII重构&#xff0c;编写过程中对于内存泄漏确实很头疼&#xff0c;还没写完&#xff0c;后面会继续更&#xff1a; #include <iostream> #include <vector&g…

AI Agent开发大全第四课-提示语工程:从简单命令到AI对话的“魔法”公式

什么是提示语工程&#xff1f;一个让AI“听话”的秘密 如果你曾经尝试过用ChatGPT或者其他大语言模型完成任务&#xff0c;那么你一定遇到过这样的情况&#xff1a;明明你的问题是清晰的&#xff0c;但答案却离题万里&#xff1b;或者你认为自己提供的信息足够详尽&#xff0c…

系统架构设计知识体系总结

1.技术选型 1.什么是技术选型&#xff1f; 技术选型是指评估和选择在项目或系统开发中使用的最合适的技术和工具的过程。这涉及考虑基于其能力、特性、与项目需求的兼容性、可扩展性、性能、维护和其他因素的各种可用选项。技术选型的目标是确定与项目目标相符合、能够有效解…