Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作(创建/更新/删除)的时间,可以说是天然的“事件时间”,特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下,优先使用 CDC 中的这个时间字段,这个时间更加精准。

与此同时,在定义 Hudi 表时,precombine.field 也是一个非常重要的配置,显然 CDC 数据中的记录变更时间是最合适的,没有之一。

CDC 数据中的记录变更时间属于元数据范畴,以 Flink CDC 的 MySQL 数据库为例,它提供四种元数据的抽取:

KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0.
row_kindSTRING NOT NULLIt indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘+I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘+U’ means UPDATE_AFTER message.

其中的 op_ts 就是我们想要的,也就是:CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列,Flink CDC 可以将其提取出来作为普通字段供下游使用,就像下表中这样:

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,`op_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc',...
);

注意,在定义 Flink CDC 源表时,op_ts 的数据类型是 TIMESTAMP_LTZ(3),不是 TIMESTAMP(3),写入下游表时,可以是 TIMESTAMP(3)

当我们初次使用这个 op_ts 字段时,你会发现,写入到的数据库的数据全部都是 1970-01-01 00:00:00.000,就像下面这样:

在这里插入图片描述

你可能会认为是哪里出错了,实际上,这是 Flink CDC 特别设计的,也是合理的,Flink CDC 官方文档的解释是:

If the record is read from snapshot of the table instead of the binlog, the value is always 0.

我们知道,Flink CDC ( 2.0+ ) 的一个显著特征是:它是全量 + 增量的一体化读取!全量就是经常说的历史数据,增量就是实时的数据,控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是:scan.startup.mode ( 官方文档 ),该配置项支持 5 种配置,而默认配置(initial)就是以当前分界点,数据中的现有数据使用全量方式读取(也叫快照读取),此后的数据从 binlog 中读取,这样就和上面描述的 op_ts 字段的取值吻合上了:

当 Flink CDC 使用全量方式读取表中的历史数据时,op_ts 字段全部取值为 0,即 1970-01-01 00:00:00.000,当 Flink CDC 使用增量方式读取 binlog 数据时,op_ts 字段的取值为数据发生变更的实际时间

这种设计还是非常合理的,因为,Flink CDC 本身在使用快照方式读取时,就没有任何变更时间可以读取,这个时间只在 binlog 中才有,而这对下游也不会造成太大的影响,因为此时的数据都是 insert-only 的数据,同一主键也不会出现两条记录,至少对 Hudi 表是没有影响的。

此外,作为一个“额外收获”,你会发现:op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的,还是增量同步进来的!


补充:以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释:

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
  • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用ArcGIS Pro为栅格图添加坐标信息

在某些时候,我们从网上获取的资源是一张普通的栅格图,没有任何的坐标信息,如果想要和带坐标信息的数据一起使用就需要先添加坐标信息,在GIS上,我们把这个过程叫做地理配准,这里为大家介绍一下地理配准的方法…

golang的反射探索

1、golang中反射常用的场景 1》类型检查—通用类包或者函数的时候,在运行时可以动态的获取任意对象的类型信息 2》动态调用方法—运行时动态的选择使用哪个方法 3》结构体标签处理—结构体字段一般是通过tag来注解。运行时可以通过反射读取tag。常用于解析配置文件&…

雾锁王国Enshrouded服务器CPU内存配置怎么选择?

雾锁王国/Enshrouded服务器CPU内存配置如何选择?阿里云服务器网aliyunfuwuqi.com建议选择8核32G配置,支持4人玩家畅玩,自带10M公网带宽,1个月90元,3个月271元,幻兽帕鲁服务器申请页面 https://t.aliyun.com…

使用 Go 语言读取文件内容并进行反序列化

在现代软件开发过程中,经常需要读取配置文件或数据文件,并将这些文件的内容转换成程序可以理解和操作的数据结构。对于使用Go语言的开发者来说,标准库中提供的一系列工具和包能够帮助完成从文件读取到数据反序列化的整个流程,特别…

通过shell编写内存监视的脚本来介绍一些基本shell脚本操作

目录 知识概览 总体脚本编写 date awk grep bc 知识概览 总体脚本编写 #!/bin/bash#定义日志的文件名和日期 cdate$(date %Y%m%d%H%M%S) logfile"/tmp/memlog_{$0}.log"#拿到ip ip_addr$(ip add|grep "ens33$"|awk {print $2})#总内存和使用的内存 m…

成为大佬之路--linux软件安装使用第000000018篇--linux安装nacos

官网 Nacos官网 | Nacos 官方社区 | Nacos 下载 | Nacos 安装包 Releases alibaba/nacos GitHub 安装 1.创建目录 mkdir -p /opt/nacos cd /opt/nacos 2.下载安装包 wget https://github.com/alibaba/nacos/releases/download/1.4.7/nacos-server-1.4.7.tar.gz 3.解压…

如何使用Fastapi上传文件?先从请求体数据讲起

文章目录 1、请求体数据2、form表单数据3、小文件上传1.单文件上传2.多文件上传 4、大文件上传1.单文件上传2.多文件上传 1、请求体数据 前面我们讲到,get请求中,我们将请求数据放在url中,其实是非常不安全的,我们更愿意将请求数…

springboot/ssm高校疫情防控系统Java校园疫情防控管理平台web

springboot/ssm高校疫情防控系统Java校园疫情防控管理平台web 基于springboot(可改ssm)vue项目 开发语言:Java 框架:springboot/可改ssm vue JDK版本:JDK1.8(或11) 服务器:tomcat 数据库:…

第三百七十二回

文章目录 1. 概念介绍2. 实现方法2.1 maskFilter2.2 shader 3. 代码与效果3.1 示例代码3.2 运行效果 4. 内容总结 我们在上一章回中介绍了"两种阴影效果"相关的内容,本章回中将介绍如何绘制阴影效果.闲话休提,让我们一起Talk Flutter吧。 1. 概…

LeetCode刷题笔记之二叉树(三)

一、寻找特定节点 1. 404【左叶子之和】 题目: 给定二叉树的根节点 root ,返回所有左叶子之和。代码: class Solution {public int sumOfLeftLeaves(TreeNode root) {//左叶子不止是最左边的叶子,而是二叉树中每个节点的左叶子…

java多线程并发实战,java高并发场景面试题

阶段一:筑基 Java基础掌握不牢,对于一个开发人员来说无疑是非常致命的。学习任何一个技术知识无疑不是从基础开始;在面试的时候,面试官无疑不是从基础开始拷问。 内容包括:Java概述、Java基本语法、Java 执行控制流程、…

html5盒子模型

1.边框的常用属性 border-color 属性 说明 示例 border-top-color 上边框颜色 border-top-color:#369; border-right-color 右边框颜色 border-right-color:#369; border-bottom-color 下边框颜色 border-bottom-color:#fae45b; border-left-color 左边框颜色…

java springmvc/springboot 项目通过HttpServletRequest对象获取请求体body工具类

请求 测试接口 获取到的 获取到打印出的json字符串里有空格这些,在json解析的时候正常解析为json对象了。 工具类代码 import lombok.extern.slf4j.Slf4j; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.we…

【前端开发】前端开发深度解析:HTML、CSS、JavaScript与Vue.js

一、HTML:构建网页的基石 1.1 简介 HTML(HyperText Markup Language,超文本标记语言)是一种用于创建网页的标准标记语言。它使用各种标签(tags)来描述网页上的内容,包括文本、图像、链接、视频…

类的继承extends以及super

类的继承 继承:基于父类的某个扩展,制定出一个新的子类,而这个子类可以继承父类的原有属性和方法 java是如何体现继承的特性的 平板电脑继承了台式机电脑所拥有的功能(如听音乐)的基础上,同时扩展出自己特殊的用途:如…

pikachu之xss获取键盘记录

前备知识 跨域 跨域(Cross-Origin)是指在互联网中,浏览器为了保护用户信息安全而实施的一种安全策略——同源策略(Same-Origin Policy),即浏览器禁止一个域上的文档或者脚本(如通过JavaScript发…

从零开始学习Netty - 学习笔记 -Netty入门-ChannelFuture

5.2.2.Channel Channel 的基本概念 在 Netty 中,Channel 是表示网络传输的开放连接的抽象。它提供了对不同种类网络传输的统一视图,比如 TCP 和 UDP。 Channel 的生命周期 Channel 的生命周期包括创建、激活、连接、读取、写入和关闭等阶段。Netty 中…

QT C++实战:实现用户登录页面及多个界面跳转

主要思路 一个登录界面,以管理员Or普通用户登录管理员:一个管理员的操作界面,可以把数据录入到数据库中。有返回登陆按钮,可以选择重新登陆(管理员Or普通用户普通用户:一个主界面,负责展示视频…

【0267】pg内核初始化 process table(ProcGlobal、PROC_HDR、PGPROC)分析

1. 前言 在postmaster或standalone后端启动期间初始化全局进程表(global process table)。该过程由InitProcGlobal()完成,对于此函数: (1)还创建了支持所请求的后端数量所需的所有每个进程信号量。我们过去只在后端真正启动时才分配信号量,但这很糟糕,因为它会让Postg…

vue组件中data为什么必须是一个函数

查看本专栏目录 关于作者 还是大剑师兰特:曾是美国某知名大学计算机专业研究生,现为航空航海领域高级前端工程师;CSDN知名博主,GIS领域优质创作者,深耕openlayers、leaflet、mapbox、cesium,canvas&#x…