flink cdc的source数据流如何配置事件时间,如何设置时间语义,分配时间戳并生成水位线

在 Flink CDC 中为 Source 数据流配置事件时间需要结合时间语义设置时间戳分配水位线生成三个核心步骤。以下是具体配置方法及注意事项:


1. 设置时间语义

Flink 默认使用处理时间(Processing Time),需显式指定事件时间语义:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置为事件时间

若使用 Flink 1.12+ 版本,事件时间已是默认语义,但仍建议显式设置以避免混淆。


2. 分配时间戳

(1) 从 CDC 数据中提取时间戳

CDC 数据(如 MySQL Binlog)通常包含变更时间字段(如 update_time),需通过 TimestampAssigner 提取:

DataStream<ChangeEvent> cdcStream = env.addSource(MySqlSource.create(...));DataStream<ChangeEvent> timedStream = cdcStream.assignTimestampsAndWatermarks(WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp() // 从事件中提取时间戳(毫秒))
);

关键点

  • 字段选择:优先使用业务字段(如订单创建时间)或数据库的 update_time 作为事件时间戳。
  • 类型转换:若时间戳为字符串(如 "2023-10-01 12:00:00"),需先转换为毫秒值。

(2) 通过 DDL 定义时间属性(Table API)

若使用 Flink SQL/Table API,可在 DDL 中直接定义时间属性:

CREATE TABLE orders (id INT,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'mysql-cdc',...
);

此方式通过 WATERMARK 语句隐式分配时间戳并生成水位线。


3. 生成水位线

水位线用于处理乱序事件,需根据业务容忍的延迟设置策略:

(1) 固定延迟策略(BoundedOutOfOrderness)

WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(...);

此策略允许最大 5 秒的乱序延迟,适用于大多数业务场景。

(2) 单调递增策略(MonotonousTimestamps)

WatermarkStrategy.<ChangeEvent>forMonotonousTimestamps();

若数据严格有序(如 Kafka 分区有序),可直接使用此策略。

(3) 自定义水位线生成器

对于复杂逻辑(如动态调整延迟),需实现 WatermarkGenerator 接口:

public class CustomWatermarkStrategy implements WatermarkGenerator<ChangeEvent> {@Overridepublic void onEvent(ChangeEvent event, long eventTimestamp, WatermarkOutput output) {// 动态计算最大事件时间maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - 5000)); // 延迟5秒}
}

4. CDC 源的特殊处理

(1) MySQL CDC 的时间戳提取

MySQL Binlog 中的 ts_sec 字段表示事务提交时间,可将其作为事件时间戳:

.withTimestampAssigner((event, recordTimestamp) -> event.getSource().get("ts_sec") // 提取Binlog中的时间戳字段
)

(2) 处理无时间戳的 CDC 数据

若 CDC 数据无时间戳字段,可回退到处理时间或摄取时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 切换为处理时间

5. 注意事项

  1. 水位线生成位置:尽量在 Source 后第一个算子分配时间戳,避免因并行度变化导致乱序。
  2. 水位线间隔调整:默认 200ms 生成一次,可通过 env.getConfig().setAutoWatermarkInterval(1000) 调整为 1 秒。
  3. 状态 TTL:若 CDC 数据量极大,需设置状态 TTL 防止 OOM:
    StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
    

完整示例(DataStream API)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 定义 MySQL CDC Source
MySqlSource<ChangeEvent> source = MySqlSource.<ChangeEvent>builder().hostname("localhost").port(3306).databaseList("mydb").tableList("mydb.orders").username("user").password("pass").deserializer(new JsonDebeziumDeserializationSchema()).build();// 分配时间戳与水位线
DataStream<ChangeEvent> stream = env.fromSource(source,WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) -> event.getUpdateTime()),"MySQL Source"
);// 后续窗口处理
stream.keyBy(event -> event.getOrderId()).window(TumblingEventTimeWindows.of(Time.minutes(5))).aggregate(...);

通过以上配置,Flink CDC 数据流即可正确使用事件时间语义,处理乱序数据并触发窗口计算。具体策略需根据业务延迟容忍度和数据特征调整。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/75884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ 指针类型转换全面解析与最佳实践

文章目录 C 指针类型转换全面解析与最佳实践1. 隐式转换基类和派生类指针 2. 显式转换(1) static_cast(2) dynamic_cast(3) reinterpret_cast(4) const_cast 3. C 风格转换4. 常见问题与注意事项5. 总结最佳实践 C 指针类型转换全面解析与最佳实践 在 C 中&#xff0c;指针类型…

批量将 txt/html/json/xml/csv 等文本拆分成多个文件

我们的文本文件太大的时候&#xff0c;我们通常需要对文本文件进行拆分&#xff0c;比如按多少行一个文件将一个大的文本文件拆分成多个小的文本文件。这样我们在打开或者传输的时候都比较方便。今天就给大家介绍一种同时对多个文本文件进行批量拆分的方法&#xff0c;可以快速…

ARM 汇编启动代码详解:从中断向量表到中断处理

ARM 汇编启动代码详解&#xff1a;从中断向量表到中断处理 引言 在嵌入式系统开发中&#xff0c;ARM 处理器&#xff08;如 Cortex-A 系列&#xff09;的启动代码是系统初始化和运行的基础。启动代码通常包括中断向量表的创建、初始化硬件状态&#xff08;如关闭缓存和 MMU&a…

4.7学习总结 可变参数+集合工具类Collections+不可变集合

可变参数&#xff1a; 示例&#xff1a; public class test {public static void main(String[] args) {int sumgetSum(1,2,3,4,5,6,7,8,9,10);System.out.println(sum);}public static int getSum(int...arr){int sum0;for(int i:arr){sumi;}return sum;} } 细节&#xff1a…

2023年蓝桥杯第十四届CC++大学B组真题及代码

目录 1A&#xff1a;日期统计 解析代码_暴力_正解 2B&#xff1a;01串的熵 解析代码_暴力_正解 3C&#xff1a;冶炼金属 解析代码_暴力_正解 4D&#xff1a;飞机降落 解析代码_暴力dfs_正解 5E&#xff1a;接龙数列 解析代码_dp_正解 6F&#xff1a;岛屿个数 解析代…

rom定制系列------小米10pro机型定制解锁固件 原生安卓15批量线刷固件 操作解析与界面预览

注意;固件用于自己机型忘记密码或者手机号注销等出现设备锁 过保修期 售后无视的机型&#xff0c;勿用于非法途径 目前有粉丝联系&#xff0c;自己的机型由于手机号注销导致手机更新系统后出现设备锁界面。另外也没有解锁bl。目前无法使用手机。经过询问是小米10pro机型。根据…

信息学奥赛一本通 1861:【10NOIP提高组】关押罪犯 | 洛谷 P1525 [NOIP 2010 提高组] 关押罪犯

【题目链接】 ybt 1861&#xff1a;【10NOIP提高组】关押罪犯 洛谷 P1525 [NOIP 2010 提高组] 关押罪犯 【题目考点】 1. 图论&#xff1a;二分图 2. 二分答案 3. 种类并查集 【解题思路】 解法1&#xff1a;种类并查集 一个囚犯是一个顶点&#xff0c;一个囚犯对可以看…

我的NISP二级之路-01

目录 一.SSE-CMM系统安全工程-能力成熟度模型(Systems Security Engineering - Capability Maturity Model) 二.ISMS 即信息安全管理体系(Information Security Management System),是一种基于风险管理的、系统化的管理体系 三.Kerberos协议 1. 用户登录与 AS 请求 2…

WEB安全--内网渗透--利用Net-NTLMv2 Hash

一、前言 在前两篇文章中分析了NTLM协议中Net-NTLMv2 Hash的生成、如何捕获Net-NTLMv2 Hash&#xff0c;现在就来探讨一下在内网环境中&#xff0c;如何利用Net-NTLMv2 Hash进行渗透。 二、Net-NTLM Hash的破解 工具&#xff1a;hashcat 原理&#xff1a;利用其内部的字典对…

如何正确使用 `apiStore` 进行 API 管理

在现代前端开发中&#xff0c;API 管理是一个非常重要的环节。apiStore 是一个基于 Pinia 的状态管理工具&#xff0c;它可以帮助我们更高效地管理和调用 API。本文将详细介绍如何正确使用 apiStore&#xff0c;包括如何创建 API 配置文件、在组件中使用 apiStore 以及如何配置…

瓦片数据合并方法

影像数据 假如有两份影像数据 1.全球底层影像0-5级别如下&#xff1a; 2.局部高清影像数据级别9-14如下&#xff1a; 合并方法 将9-14文件夹复制到全球底层0-5的目录下 如下&#xff1a; 然后合并xml文件 使得Tileset设置到最高级&#xff08;包含所有级别&#xff09;&…

C++中的类和对象(上)

1 类的定义 1.1 类定义的格式 1 class为定义类的关键字&#xff0c;Stack为类的名字&#xff0c;{}中为类的主体&#xff0c;注意类定义结束时后面分号不能省 略》。类体中内容称为类的成员&#xff1a;类中的变量称为类的属性或成员变量; 类中的函数称为类的方法或者成员函数…

【Tauri2】013——前端Window Event与创建Window

前言 【Tauri2】012——on_window_event函数-CSDN博客https://blog.csdn.net/qq_63401240/article/details/146909801?spm1001.2014.3001.5501 前面介绍了on_window_event&#xff0c;这个在Builder中的方法&#xff0c;里面有许多事件&#xff0c;比如Moved&#xff0c;Res…

【问题处理】webpack4升webpack5,报错Uncaught ReferrnceError: process is not defined

问题 正在做webpack4升webpack5&#xff0c;项目构建项目成功后在浏览器打开时报错 Uncaught ReferrnceError: process is not defined。 原因 webpack 5 不再自动 polyfill Node.js 的核心模块。 如果你在浏览器运行的代码中使用它&#xff0c;需要从 NPM 中安装兼容模块…

软件工程师减肥计划

一、目标设定 在 3 个月内减轻体重 5-7kg&#xff0c;改善身体代谢水平和体脂率&#xff0c;增强身体活力和精神状态&#xff0c;以更好地适应工作强度。 二、饮食调整 &#xff08;一&#xff09;基本原则 控制热量摄入&#xff0c;保证每天摄入热量低于消耗热量 500-800 …

即时访问成为降低风险的关键

云计算和软件即服务 (SaaS) 解决方案的广泛采用从根本上重塑了企业的数字格局。 不同行业的组织越来越多地利用云固有的可扩展性和成本效益来推动创新和简化运营。 这种向基于云的环境的转变也带来了一系列新的复杂安全挑战&#xff0c;需要仔细考虑并制定强有力的缓解策略。…

[环境配置] 1. 开发环境搭建

开发环境搭建 本文档将详细介绍如何搭建深度学习开发环境&#xff0c;包括 Python 环境配置、IDE 选择与配置以及虚拟环境管理。 也会介绍一下最近比较流行的 uv 工具。它是一个用 Rust 编写的极其快速的 Python 包和项目管理工具。 uv 是一个非常强大的工具&#xff0c;它可…

rust 同时处理多个异步任务,并在一个任务完成退出

use std::thread; use tokio::{sync::mpsc,time::{sleep, Duration}, };async fn check_for_one() {// 该函数会每秒打印一次 "write"loop {println!("write");sleep(Duration::from_secs(1)).await;} }async fn start_print_task() -> Result<(), (…

“群芳争艳”:CoreData 4 种方法计算最大值的效率比较(上)

概览 在 CoreData 支持的 App 中&#xff0c;一种常见操作就是计算数据库表中指定字段的最大值&#xff08;或最小值&#xff09;。就是这样一种看起来“不足挂齿”的任务&#xff0c;可能稍不留神就会“马失前蹄”。 在实际的代码中&#xff0c;我们怎样才能既迅速又简洁的…

skynet网络包库(lua-netpack.c)的作用解析

目录 网络包库&#xff08;lua-netpack.c&#xff09;的作用解析1. 数据包的分片与重组2. 网络事件处理3. 内存管理4. 数据打包与解包 动态库&#xff08;.so&#xff09;在 Lua 中的使用1. 编译为动态库2. Lua 中加载与调用(1) 加载模块(2) 核心方法(3) 使用示例 3. 注意事项 …