实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本

1. Hudi 表管理

1.1 Hudi 表基础管理

创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):

CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作

插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:

from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护

表清理

  • 配置清理策略,清理过期版本:
    hoodie.cleaner.commits.retained=10
    hoodie.cleaner.policy=KEEP_LATEST_COMMITS
    
    保留最近 10 个提交版本。

表压缩

  • 针对 MOR 表,定期运行 compaction 任务:
    spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
    

元数据管理

  • 更新 Hive 元数据:
    MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
    

2. Flink 性能调优

2.1 Checkpoint 性能优化

增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:

env.getCheckpointConfig().enableIncrementalCheckpoints(true);

异步快照
减少 Checkpoint 对性能的影响:

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化

如果数据有延迟,可以允许一定的 out-of-order 数据处理:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化

状态后端选择

  • 优先选择 RocksDB 状态后端,支持更大的状态数据:
    env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
    

TTL(Time-to-Live)设置

  • 自动清理无用状态:
    stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
    
2.4 Task Slot 配置

根据并发优化 TaskManager:

  • 每个 TaskManager 提供更多 slots:
    taskmanager.numberOfTaskSlots: 4
    

3. 治理工具脚本

3.1 数据质量治理(Great Expectations)

脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):

from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)

策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:

{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}

通过 Ranger REST API 部署该策略:

curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)

Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:

curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/66295.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS——1.优缺点

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title></title><link rel"stylesheet" type"text/css" href"1-02.css"/></head><body><!--css&#xff1a;层叠样式表…

攻防世界 - Web - Level 3 | simple_js

关注这个靶场的其它相关笔记&#xff1a;攻防世界&#xff08;XCTF&#xff09; —— 靶场笔记合集-CSDN博客 0x01&#xff1a;考点速览 本题考察的是 JS 的代码审计&#xff0c;以下是你需要了解的知识点&#xff1a; String.fromCharCode() -> 将接收的 Unicode 编码转换…

关系分类(RC)模型和关系抽取(RE)模型的区别

目标不同 关系分类模型&#xff1a;对给定的实体对在给定句子中预测其关系类型。两阶段&#xff08;RC&#xff09; 关系抽取模型&#xff1a;从句子中识别出所有潜在实体对&#xff0c;并为其预测关系类型。一阶段&#xff08;NERRE&#xff09; 训练/预测阶段输入输出数据不…

永磁同步电机控制算法--最大转矩电流比控制(牛顿迭代法)

一、原理介绍 搭建了基于牛顿迭代法的MTPA双闭环矢量控制系统 二、仿真验证 在MATLAB/simulink里面验证所提算法&#xff0c;采用和实验中一致的控制周期1e-4&#xff0c;电机部分计算周期为1e-6。仿真模型如下所示&#xff1a; 对直接公式计算法和牛顿迭代法进行仿真对比验…

数据结构—树的定义与性质

目录 1.树的定义 2.基本术语 3.树的性质 1.树的定义 树是n&#xff08;n≥0&#xff09;个结点的有限集。 n0时&#xff0c;称为空树。 &#xff08;1&#xff09;树有且只有一个特定的结点&#xff0c;称为根节点。 &#xff08;2&#xff09;当n>1时&#xff0c;其余…

vue数据请求通用方案:axios的options都有哪些值

Axios 是一个基于 promise 的 HTTP 库&#xff0c;可以用在浏览器和 Node.js 中。 在使用 Axios 发送请求时&#xff0c;可以通过传递一个配置对象来指定请求的各种选项。 以下是一些常用的 Axios 配置选项及其说明&#xff1a; 1.url: &#xff08;必需&#xff09;请求的 …

选择器(结构伪类选择器,伪元素选择器),PxCook软件,盒子模型

结构为类选择器 伪元素选择器 PxCook 盒子模型 (内外边距&#xff0c;边框&#xff09; 内外边距合并&#xff0c;塌陷问题 元素溢出 圆角 阴影: 模糊半径&#xff1a;越大越模糊&#xff0c;也就是越柔和 案例一&#xff1a;产品卡片 <!DOCTYPE html> <html lang&q…

ThinkPHP 8高效构建Web应用-控制器

【图书介绍】《ThinkPHP 8高效构建Web应用》-CSDN博客 《2025新书 ThinkPHP 8高效构建Web应用 编程与应用开发丛书 夏磊 清华大学出版社教材书籍 9787302678236 ThinkPHP 8高效构建Web应用》【摘要 书评 试读】- 京东图书 使用VS Code开发ThinkPHP项目-CSDN博客 控制器无须特…

模拟出一个三维表面生成表面点,计算体积,并处理边界点

python代码 生成表面点,计算体积,并处理边界点,最终模拟出一个三维表面。 步骤: 初始参数设置: initial_fixed_point:一个初始固定点的坐标。 slop_thre:坡度阈值。 v_thre:体积阈值。 slope_rad:将坡度从度转换为弧度。 step_size:步长。 lam_x, lam_y:泊松分布的…

Elasticsearch 入门教程

掌握Elasticsearch&#xff1a;从入门到入门 一、ES 背景1.1 ElasticSearch 的背景1.2 ElasticSearch 的应用场景 二、ES 简介2.1 ElasticSearch 简介2.2 ElasticSearch 的定义与特点2.3 ElasticSearch 与传统数据库的区别2.4 ElasticSearch 的优势和劣势 三、ES 的核心概念3.1…

Multisim更新:振幅调制器+解调器(含仿真程序+文档+原理图+PCB)

前言 继3年前设计的&#xff1a;Multisim&#xff1a;振幅调制器的设计&#xff08;含仿真程序文档原理图PCB&#xff09;&#xff0c;有读者表示已经不能满足新需求&#xff0c;需要加上新的解调器功能&#x1f602;&#x1f602;&#x1f602;&#xff0c;鸽了很久这里便安排…

区块链方向学习路线

学习路线图 下面是登链社区给出的区块链开发者的学习路线图 学习路线建议 对于一个区块链方向的学习者而言&#xff0c;首先要了解的是区块链理论知识&#xff0c;当你了解了区块链的理论知识之后&#xff0c;下面有三个方向来学习&#xff0c;可以通俗的理解为区块链方向的后…

springboot547产业园区智慧公寓管理系统(论文+源码)_kaic

摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装产业园区智慧公寓管理系统软件来发挥其高效地信息处理的作用…

【电源专题】为什么测试电源的SW波形上冲振荡之前的0V电位要先来个小的下降

在同步电源的开关节点SW波形测试中,你可能会发现周期性的SW波形在上升前的一小段时间时间内会有一个小小的下跌,这个下跌会低于0V。那么这个下跌是怎么来的呢? 如下所示为某降压转换器的SW开关节点波形: 其展开后可以看到在上升之前有20ns左右的时间,SW电压是下跌…

操作系统大题整理

专题一 程序代码题&#xff1a;程序设计与分析&#xff0c;主要考的是线程&#xff0c;多线程的并发&#xff1f; 大题第一问&#xff08;1&#xff09;操作系统的结构有哪几种常用的结构&#xff1f; 宏内核&#xff1a;宏内核是将操作系统的主要功能模块都集中在内核的一种结…

web安全常用靶场

这里写自定义目录标题 phpstydy2018pikachuxss-labs phpstydy2018 网盘地址 提取码: nxnw ‌phpStudy是一款专为PHP开发者设计的集成环境工具&#xff0c;主要用于简化PHP开发环境的搭建过程。‌ 它集成了Apache、MySQL、PHP等核心组件&#xff0c;用户只需进行一次性安装&a…

【yolov8自卸卡车-挖掘机-装载机检测】

yolov8自卸卡车-挖掘机-装载机检测 YOLOv8算法介绍数据集和模型下载数据集准备数据配置文件&#xff08;data.yaml&#xff09;安装依赖模型训练步骤 YOLOv8算法介绍 YOLOv8是You Only Look Once&#xff08;YOLO&#xff09;算法家族的最新版本&#xff0c;它是一种单阶段目标…

扩散模型论文概述(二):Google系列工作【学习笔记】

视频链接&#xff1a;扩散模型论文概述&#xff08;二&#xff09;&#xff1a;Google系列工作_哔哩哔哩_bilibili 本视频讲的是Google在图像生成的工作。 同样&#xff0c;第一张图片是神作&#xff0c;总结的太好了&#xff01; 在生成式AI的时代&#xff0c;OpenAI和Google不…

redis常用数据类型

在Redis中&#xff0c;有几种数据类型是非常常用且核心的&#xff0c;它们各自有着独特的应用场景和优势。 1. 字符串&#xff08;String&#xff09;&#xff1a; • 这是Redis中最基本、最常用的数据类型。 • 字符串可以是任何形式的二进制数据&#xff0c;包括文本、图片等…

基于氢氧燃料电池的分布式三相电力系统Simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于氢氧燃料电池的分布式三相电力系统Simulink建模与仿真&#xff0c;仿真输出燃料电池中氢氧元素含量变化以及生成的H2O变化情况。 2.系统仿真结果 3.核心程序与模型 版本…