Flink CDC系列之:学习理解核心概念——Transform

Flink CDC系列之:学习理解核心概念——Transform

  • Transform
  • 参数
  • 元数据字段
  • 函数
    • 比较函数
    • 逻辑函数
    • 字符串函数
    • 时间函数
    • 条件函数
  • 示例
    • 添加计算列
    • 参考元数据列
    • 使用通配符投影所有字段
    • 添加过滤规则
    • 重新分配主键
    • 重新分配分区键
    • 指定表创建配置
    • 分类映射
    • 用户定义函数
    • 已知限制

Transform

Transform模块帮助用户根据表中的数据列进行数据列的删除和扩展。
此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。

参数

为了描述转换规则,可以使用以下参数:

参数含义可选/必需
source-table源表id,支持正则表达式必需
projection投影规则,支持类似SQL中select子句的语法可选
filter过滤规则,支持类似SQL中where子句的语法可选
primary-keysSink 表主键,以逗号分隔可选
partition-keys接收表分区键,以逗号分隔可选
table-options用于自动创建表时配置表创建语句可选
description变换规则说明可选

可以在一个管道 YAML 文件中声明多个规则。

元数据字段

字段定义

有一些隐藏列用于访问元数据信息。它们仅在转换规则中明确引用时才会生效。

字段数据类型描述
namespace_nameString包含该行的命名空间的名称。
schema_nameString包含该行的架构的名称。
table_nameString包含该行的表的名称。
data_event_typeString数据改变事件的操作类型。

元数据关系

在这里插入图片描述

函数

Flink CDC 使用 Calcite 解析表达式,并使用 Janino 脚本通过函数调用来评估表达式。

比较函数

FunctionJanino Code描述
value1 = value2valueEquals(value1, value2)如果 value1 等于 value2,则返回 TRUE;如果 value1 或 value2 为 NULL,则返回 FALSE。
value1 <> value2!valueEquals(value1, value2)如果 value1 不等于 value2,则返回 TRUE;如果 value1 或 value2 为 NULL,则返回 FALSE。
value1 > value2value1 > value2如果 value1 大于 value2,则返回 TRUE;如果 value1 或 value2 为 NULL,则返回 FALSE。
value1 >= value2value1 >= value2如果 value1 大于或等于 value2,则返回 TRUE;如果 value1 或 value2 为 NULL,则返回 FALSE。
value1 < value2value1 < value2如果 value1 小于 value2,则返回 TRUE;如果 value1 或 value2 为 NULL,则返回 FALSE。
value1 <= value2value1 <= value2如果 value1 小于或等于 value2,则返回 TRUE;如果 value1 或 value2 为 NULL,则返回 FALSE。
value IS NULLnull == value如果值为 NULL,则返回 TRUE。
value IS NOT NULLnull != value如果值不为 NULL,则返回 TRUE。
value1 BETWEEN value2 AND value3betweenAsymmetric(value1, value2, value3)如果 value1 大于或等于 value2 且小于或等于 value3,则返回 TRUE。
value1 NOT BETWEEN value2 AND value3notBetweenAsymmetric(value1, value2, value3)如果 value1 小于 value2 或大于 value3,则返回 TRUE。
string1 LIKE string2like(string1, string2)如果 string1 与模式 string2 匹配,则返回 TRUE。
string1 NOT LIKE string2notLike(string1, string2)如果 string1 与模式 string2 不匹配,则返回 TRUE。
value1 IN (value2 [, value3]* )in(value1, value2 [, value3]*)如果 value1 存在于给定列表 (value2, value3, …) 中,则返回 TRUE。
value1 NOT IN (value2 [, value3]* )notIn(value1, value2 [, value3]*)如果 value1 不存在于给定列表 (value2, value3, …) 中,则返回 TRUE。

逻辑函数

FunctionJanino Code描述
boolean1 OR boolean2boolean1
boolean1 AND boolean2boolean1 && boolean2如果 BOOLEAN1 和 BOOLEAN2 都为 TRUE,则返回 TRUE。
NOT boolean!boolean如果布尔值为 FALSE,则返回 TRUE;如果布尔值为 TRUE,则返回 FALSE。
boolean IS FALSEfalse == boolean如果布尔值为 FALSE,则返回 TRUE;如果布尔值为 TRUE,则返回 FALSE。
boolean IS NOT FALSEtrue == boolean如果 BOOLEAN 为 TRUE,则返回 TRUE;如果 BOOLEAN 为 FALSE,则返回 FALSE。
boolean IS TRUEtrue == boolean如果 BOOLEAN 为 TRUE,则返回 TRUE;如果 BOOLEAN 为 FALSE,则返回 FALSE。
boolean IS NOT TRUEfalse == boolean如果布尔值为 FALSE,则返回 TRUE;如果布尔值为 TRUE,则返回 FALSE。

字符串函数

FunctionJanino Code描述
string1string2
CHAR_LENGTH(string)charLength(string)返回 STRING 中的字符数。
UPPER(string)upper(string)返回大写的字符串。
LOWER(string)lower(string)返回小写的字符串。
TRIM(string1)trim(‘BOTH’,string1)返回删除两侧空格的字符串。
REGEXP_REPLACE(string1, string2, string3)regexpReplace(string1, string2, string3)返回 STRING1 中的字符串,其中所有与正则表达式 STRING2 匹配的子字符串均被 STRING3 连续替换。例如,‘foobar’.regexpReplace(‘oo
SUBSTRING(string FROM integer1 [ FOR integer2 ])substring(string,integer1,integer2)返回从位置 INT1 开始、长度为 INT2(默认到末尾)的 STRING 子字符串。
CONCAT(string1, string2,…)concat(string1, string2,…)返回连接 string1、string2、… 的字符串。例如,CONCAT(‘AA’, ‘BB’, ‘CC’) 返回 ‘AABBCC’。

时间函数

FunctionJanino Code描述
LOCALTIMElocaltime()返回本地时区的当前SQL时间,返回类型为TIME(0)。
LOCALTIMESTAMPlocaltimestamp()返回当前SQL本地时区的时间戳,返回类型为TIMESTAMP(3)。
CURRENT_TIMEcurrentTime()返回本地时区的当前 SQL 时间,这是 LOCAL_TIME 的同义词。
CURRENT_DATEcurrentDate()返回本地时区的当前 SQL 日期。
CURRENT_TIMESTAMPcurrentTimestamp()返回当前SQL本地时区的时间戳,返回类型为TIMESTAMP_LTZ(3)。
NOW()now()返回本地时区的当前 SQL 时间戳,这是 CURRENT_TIMESTAMP 的同义词。
DATE_FORMAT(timestamp, string)dateFormat(timestamp, string)将时间戳转换为日期格式字符串指定格式的字符串值。格式字符串与 Java 的 SimpleDateFormat 兼容。
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)timestampDiff(timepointunit, timepoint1, timepoint2)返回时间点 1 和时间点 2 之间的时间点单位的(有符号)数。间隔的单位由第一个参数指定,该参数应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。
TO_DATE(string1[, string2])toDate(string1[, string2])将格式为 string2 的日期字符串 string1(默认为“yyyy-MM-dd”)转换为日期。
TO_TIMESTAMP(string1[, string2])toTimestamp(string1[, string2])将格式为 string2 的日期时间字符串 string1(默认情况下为:“yyyy-MM-dd HH:mm:ss”)转换为不带时区的时间戳。

条件函数

FunctionJanino Code描述
CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END嵌套三元表达式当值第一次包含在(值 X_1、值 X_2、…)中时,返回 resultX。当没有值匹配时,如果提供了 result_z,则返回 result_z,否则返回 NULL。
CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END嵌套三元表达式第一个条件满足时返回resultX,不满足条件时,若有条件则返回结果,否则返回NULL。
COALESCE(value1 [, value2]*)coalesce(Object… objects)返回第一个不为 NULL 的参数。如果所有参数均为 NULL,则也返回 NULL。返回类型是其所有参数中限制最少的通用类型。如果所有参数也均为可空,则返回类型为可空。
IF(condition, true_value, false_value)condition ? true_value : false_value如果条件满足,则返回 true_value,否则返回 false_value。例如,IF(5 > 3, 5, 3) 返回 5。

示例

添加计算列

求值表达式可用于生成新列。例如,如果我们想基于数据库 mydb 中的表 web_order 附加两个计算列,我们可以定义一个转换规则,如下所示:

transform:- source-table: mydb.web_orderprojection: id, order_id, UPPER(product_name) as product_name, localtimestamp as new_timestampdescription: append calculated columns based on source table

参考元数据列

我们可以在投影表达式中引用元数据列。例如,给定数据库 mydb 中的表 web_order,我们可以定义转换规则如下:

transform:- source-table: mydb.web_orderprojection: id, order_id, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_namedescription: access metadata columns from source table

使用通配符投影所有字段

通配符 (*) 可用于引用表中的所有字段。例如,给定数据库 mydb 中的两个表 web_order 和 app_order,我们可以定义转换规则如下:

transform:- source-table: mydb.web_orderprojection: \*, UPPER(product_name) as product_namedescription: project fields with wildcard character from source table- source-table: mydb.app_orderprojection: UPPER(product_name) as product_name, *description: project fields with wildcard character from source table

注意:当表达式开头出现 * 字符时,需要使用转义反斜杠。

添加过滤规则

使用引用列在数据库mydb中的表web_order中添加过滤规则时,我们可以定义一个转换规则如下:

transform:- source-table: mydb.web_orderfilter: id > 10 AND order_id > 100description: filtering rows from source table

计算列也可以用于过滤条件。例如,给定数据库 mydb 中的表 web_order,我们可以定义转换规则如下:

transform:- source-table: mydb.web_orderprojection: id, order_id, UPPER(province) as new_province filter: new_province = 'SHANGHAI'description: filtering rows based on computed columns

重新分配主键

我们可以在转换规则中重新分配主键。例如,给定数据库 mydb 中的表 web_order,我们可以定义转换规则如下:

transform:- source-table: mydb.web_orderprojection: id, order_idprimary-keys: order_iddescription: reassign primary key example

还支持复合主键:

transform:- source-table: mydb.web_orderprojection: id, order_id, UPPER(product_name) as product_nameprimary-keys: order_id, product_namedescription: reassign composite primary keys example

重新分配分区键

我们可以在转换规则中重新分配分区键。例如,给定数据库 mydb 中的表 web_order,我们可以定义转换规则如下:

transform:- source-table: mydb.web_orderprojection: id, order_id, UPPER(product_name) as product_namepartition-keys: product_namedescription: reassign partition key example

指定表创建配置

可以在转换规则中定义额外选项,这些选项将在创建下游表时应用。给定数据库 mydb 中的表 web_order,我们可以定义转换规则如下:

transform:- source-table: mydb.web_orderprojection: id, order_id, UPPER(product_name) as product_nametable-options: comment=web orderdescription: auto creating table options example

提示:table-options的格式为key1=value1,key2=value2。

分类映射

可以定义多个转换规则来对输入数据行进行分类并应用不同的处理。只有第一个匹配的转换规则才会应用。例如,我们可以定义如下转换规则:

transform:- source-table: mydb.web_orderprojection: id, order_idfilter: UPPER(province) = 'SHANGHAI'description: classification mapping example- source-table: mydb.web_orderprojection: order_id as id, id as order_idfilter: UPPER(province) = 'BEIJING'description: classification mapping example

用户定义函数

用户定义函数 (UDF) 可用于转换规则。

如果满足以下条件,类可用作 UDF:

  • 实现 org.apache.flink.cdc.common.udf.UserDefinedFunction 接口
  • 具有无参数的公共构造函数
  • 至少有一个名为 eval 的公共方法

它还可以:

  • 覆盖 getReturnType 方法以指示其返回 CDC 类型
  • 覆盖 open 和 close 方法以执行一些初始化和清理工作

例如,这是一个有效的 UDF 类:

public class AddOneFunctionClass implements UserDefinedFunction {public Object eval(Integer num) {return num + 1;}@Overridepublic DataType getReturnType() {return DataTypes.INT();}@Overridepublic void open() throws Exception {// ...}@Overridepublic void close() throws Exception {// ...}
}

为了简化从 Flink SQL 到 Flink CDC 的迁移,Flink ScalarFunction 也可以用作转换 UDF,但有一些限制:

  • 不支持具有带参数的构造函数的 ScalarFunction。
  • ScalarFunction 中的 Flink 样式类型提示将被忽略。
  • 不会调用打开/关闭生命周期钩子。

可以通过添加用户定义函数块来注册 UDF 类:

pipeline:user-defined-function:- name: addoneclasspath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass- name: formatclasspath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass

请注意,给定的类路径必须是完全限定的,并且相应的 jar 文件必须包含在 Flink /lib 文件夹中,或者使用 flink-cdc.sh --jar 选项传递。

正确注册后,UDF 可以在投影和过滤表达式中使用,就像内置函数一样:

transform:- source-table: db.\.*projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"filter: inc(id) < 100

已知限制

  • 目前,转换不适用于路由规则。它将在未来版本中得到支持。
  • 计算列不能引用最终投影结果中不存在的修剪列。这将在未来版本中修复。
  • 不支持具有不同架构的表的常规匹配。如有必要,需要编写多个规则。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/58698.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每天五分钟深度学习pytorch:基于pytorch搭建普通全连接神经网络

本文重点 本文我们通过pytorch搭建普通的全连接神经网络,这里我们就不介绍什么是全连接神经网络了,如果不知道的可以看我的机器学习专栏,或者深度学习专栏,它们对全连接神经网络都进行了简单的介绍。 代码 import torch from torch import nn class ThreeNet(nn.Module)…

故障诊断 | MTF-TLSSA-DarkNet-GRU-MSA迁移学习故障识别程序(t分布+莱维飞行改进麻雀优化)

故障诊断 | 故障诊断实例代码 目录 故障诊断 | 故障诊断实例代码效果一览基本介绍程序设计参考资料 效果一览 基本介绍 利用了迁移学习和多项技术改进&#xff0c;包括麻雀搜索法、DarkNet19、GRU、多头注意力机制等&#xff0c;以提高故障识别的准确性和效率 模型框架&#x…

【一起python】使用python实现学生管理系统

文章目录 &#x1f4dd;前言&#x1f320;主函数man&#x1f309;菜单menu&#x1f309;添加学生信息&#x1f309;展示目前学生信息&#x1f309;查找学生&#x1f309;删除同学信息&#x1f309;退出程序 &#x1f320;python完整代码&#x1f6a9;总结 &#x1f4dd;前言 &…

前缀和_560. 和为 K 的子数组

560. 和为 K 的子数组 #include <unordered_map> class Solution { public:int subarraySum(vector<int>& nums, int k) {int nnums.size();unordered_map<int,int> hs;int sum0,re0;hs[0]1;for(int i0;i<n;i){sumnums[i];if(hs.count(sum-k)) rehs[s…

(转载)Tools for Learning LLVM TableGen

前提 最近在学习有关llvm的东西&#xff0c;其中TableGen占了一部分&#xff0c;所以想特意学习下TableGen相关的语法。这里找到了LLVM官网的一篇介绍TableGen的博客&#xff0c;学习并使用机器翻译为中文。在文章的最后也添加了一些学习TableGen的资源。 原文地址&#xff1…

Python酷库之旅-第三方库Pandas(182)

目录 一、用法精讲 841、pandas.api.types.is_complex函数 841-1、语法 841-2、参数 841-3、功能 841-4、返回值 841-5、说明 841-6、用法 841-6-1、数据准备 841-6-2、代码示例 841-6-3、结果输出 842、pandas.api.types.is_float函数 842-1、语法 842-2、参数 …

CSS基础学习篇——选择器

学习文档连接&#xff1a;CSS层叠样式表 1.全局选择器&#xff1a;* * {margin: 0;padding: 0;font-size: 18px; }2.类&#xff08;clss&#xff09;选择器&#xff0c;以 . 开头 .container {display: flex;justify-content: space-around;align-items: center;width: 1200…

Marin说PCB之电源的Surface Current Density知多少?

小编我是一位资深的国漫迷&#xff0c;像什么仙逆&#xff0c;斗破&#xff0c;斗罗&#xff0c;完美世界&#xff0c;遮天&#xff0c;凡人修仙传&#xff0c;少年歌行等&#xff0c;为了可以看这些视频小编我不惜花费了攒了很多年的私房钱去开了这个三个平台的会员啊&#xf…

Oracle视频基础1.3.3练习

1.3.3 检查数据库启动情况 ps -ef | grep oracle启动数据库 sqlplus /nolog conn / as sysdba修改 fast_start_mttr_target 参数为初始值-50&#xff0c;缺省 scope 和 sid&#xff0c;查看修改结果 show parameter fast; alter system set parameter 250; show parameter fa…

CSS flex布局- 最后一个元素占满剩余可用高度转载

效果图 技术要点 height父元素必须有一个设定的高度flex-grow: 1 flex 盒子模型内的该元素将会占据父容器中剩余的空间F12检查最后一行的元素&#xff0c;高度就已经改变了&#xff1b;

基于vue框架的的驾校预约管理系统设计d5tq3(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,驾校教练,车辆信息,报名信息,学员信息,考试预约,教学课程,教练评价,考试成绩,练车预约,报修申请,维修信息,课程类型,车辆类型 开题报告内容 基于Vue框架的驾校预约管理系统设计开题报告 一、项目背景与意义 随着驾驶培训行业的快…

shell脚本编写注意细节 ==、=等的区别

文章目录 和的使用主要区别小结 Shell脚本要注意的细节1. 变量引用和空格处理2. [[ ... ]] vs [ ... ]3. 使用-n和-z来检测变量是否为空4. 整数运算和浮点运算5. 避免使用反引号执行命令6. for循环中的命令替换7. 使用trap来清理临时文件8. 避免使用硬编码路径9. 使用-eq、-lt、…

Docker Swarm简介

注意事项 Swarm 模式是用于管理 Docker 守护进程集群的一项高级特性。如果你打算将 Swarm 用作生产运行时环境&#xff0c;那就使用 Swarm 模式。如果你不打算使用 Swarm 进行部署&#xff0c;可改用 Docker Compose。如果正在为 Kubernetes 部署进行开发&#xff0c;可以考虑使…

接口测试及常用接口测试工具(postman/jmeter)详解

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 首先&#xff0c;什么是接口呢&#xff1f; 接口一般来说有两种&#xff0c;一种是程序内部的接口&#xff0c;一种是系统对外的接口。 系统对外的接口&#…

CVE-2024-51567 CyberPanel upgrademysqlstatus 远程命令执行

该漏洞源于upgrademysqlstatus接口未做身份验证和参数过滤&#xff0c;未授权的攻击者可以通过此接口执行任意命令获取服务器权限&#xff0c;从而造成数据泄露、服务器被接管等严重的后果。 影响版本 CyberPanel v2.3.5CyberPanel v2.3.6 目前官方已有可更新版本&#xff0…

Kaggle “Reducing Commercial Aviation Fatalities” 比赛 生理数据分析

1、背景 Kaggle在2018 年 12 月 20 日举办“Reducing Commercial Aviation Fatalities” 比赛&#xff0c;通过收集飞行员的生理数据&#xff0c;判断飞行员何时会遇到麻烦吗&#xff1f;该比赛主要分析飞行员的问题&#xff0c;因为航班多、时间不固定&#xff0c;飞行员会出…

无套路领取《AI应用开发专栏》

最近有些时间没有更新技术文章了&#xff0c;都在忙着写《AI应用开发入门》专栏&#xff0c;专栏已整理放到了github上&#xff0c;有兴趣的小伙伴可以移步github阅读&#xff0c;地址见文末。 1、为什么写这个文档 之前陆续写了一些零散的AI相关的技术文章&#xff0c;也有不…

Webserver(1.8)操作函数

目录 文件属性操作函数access函数chmod函数chown函数truncate函数 目录操作函数mkdir函数rmdir函数rename函数chdir函数*getcwd函数 目录遍历函数*opendir函数*readdir函数closedir函数 dup、dup2函数dupdup2 fcntl函数 文件属性操作函数 access函数 判断某个文件是否有某个权…

基础IO -- 标准错误输出stderr

目录 1&#xff09;为什么要有 fd 为 2 的 stderr 2&#xff09;使2和1重定向到一个文件中 这里我们谈一下以前只是了解过的stderr 通过两段代码&#xff0c;显然&#xff0c;我们可以知道两个FILE*都是指向显示器的 对于重定向&#xff0c;只有stdout才会将打印的数据重定向…

基于Multisim的四位抢答器设计与仿真

四位选手进行抢答比赛&#xff0c;用基本门电路及集成逻辑器件构成四人抢答器。选手编号分别为1&#xff0c;2&#xff0c;3&#xff0c;4号&#xff0c;用S1&#xff0c;S2&#xff0c;S3&#xff0c;S4四个按钮作为抢答按钮&#xff0c;S0按钮为总清零按钮。当四人中任何一个…