FlinkSQL ChangeLog

01 Changelog相关优化规则

0101 运行upsert-kafka作业

登录sql-client,创建一个upsert-kafka的sql作业(注意,这里发送给kafka的消息必须带key,普通只有value的消息无法解析,这里的key即是主键的值)

CREATE TABLE pageviews_per_region (user_region STRING,pv STRING,PRIMARY KEY (user_region) NOT ENFORCED  -- 设置主键
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = 'xxxxxx:9092','key.format' = 'csv','value.format' = 'csv'
);select * from pageviews_per_region;

发送消息带key和消费消息显示key方式如下

kafka-console-producer.sh --broker-list xxxxxx:9092 --topic pageviews_per_region --property "parse.key=true" --property "key.separator=:"
key1:value1,value1
key2:value2,value2kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic pageviews_per_region --from-beginning --property print.key=true

作业的DAG图如下
在这里插入图片描述

0102 StreamPhysicalChangelogNormalize

DAG图中有一个ChangelogNormalize,代码中搜索到对应的类是StreamPhysicalChangelogNormalize,这是一个对changelog数据做规范化的类,注释如下

/*** Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or a* changelog stream containing duplicate events. This node normalize such stream into a regular* changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without* duplication.*/
class StreamPhysicalChangelogNormalize(

功能就是转成对应的exec节点

override def translateToExecNode(): ExecNode[_] = {val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)new StreamExecChangelogNormalize(unwrapTableConfig(this),uniqueKeys,generateUpdateBefore,InputProperty.DEFAULT,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)
}

0103 StreamPhysicalTableSourceScanRule

StreamPhysicalChangelogNormalize是在优化规则StreamPhysicalTableSourceScanRule当中创建的,如下流式的FlinkLogicalTableSourceScan会应用该规则

class StreamPhysicalTableSourceScanRuleextends ConverterRule(classOf[FlinkLogicalTableSourceScan],FlinkConventions.LOGICAL,FlinkConventions.STREAM_PHYSICAL,"StreamPhysicalTableSourceScanRule") {

创建StreamPhysicalChangelogNormalize,也就是转为changelog的条件如下

if (isUpsertSource(resolvedSchema, table.tableSource) ||isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config)
) {

isUpsertSource判断是否为upsert流,判断逻辑如下

public static boolean isUpsertSource(ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {if (!(tableSource instanceof ScanTableSource)) {return false;}ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();boolean isUpsertMode =mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();return isUpsertMode && hasPrimaryKey;
}

其中ChangelogMode在各自数据源实现类的getChangelogMode接口中定义,如JDBC只支持insert

@Override
public ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();
}

isSourceChangeEventsDuplicate判断不是upsert的更改流,判断逻辑如下

public static boolean isSourceChangeEventsDuplicate(ResolvedSchema resolvedSchema,DynamicTableSource tableSource,TableConfig tableConfig) {if (!(tableSource instanceof ScanTableSource)) {return false;}ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();boolean isCDCSource =!mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);boolean changeEventsDuplicate =tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
}

综合来说要走StreamPhysicalChangelogNormalize这一条调用链,就不能是insertOnly的数据源,但目前大部分Flink实现的数据源包括Iceberg都是insertOnly的

0104 更新模式

Flink相关的更新模式类有如下几个:RowKind、ChangelogMode、UpdateKind

  • RowKind

RowKind是定义更新流每条数据的类型,其中对于更新有;两条数据,一条删除旧数据,一条插入新数据

/** Insertion operation. */
INSERT("+I", (byte) 0),/*** Update operation with the previous content of the updated row.** <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/
UPDATE_BEFORE("-U", (byte) 1),/*** Update operation with new content of the updated row.** <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/
UPDATE_AFTER("+U", (byte) 2),/** Deletion operation. */
DELETE("-D", (byte) 3);
  • ChangelogMode

ChangelogMode定义数据源的更新模式,主要三种,就是包含不同的RowKind的类型

private static final ChangelogMode INSERT_ONLY =ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();private static final ChangelogMode UPSERT =ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();private static final ChangelogMode ALL =ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();
  • UpdateKind

UpdateKind是针对update这种更新类型细分

/** NONE doesn't represent any kind of update operation. */
NONE,/*** This kind indicates that operators should emit update changes just as a row of {@code* RowKind#UPDATE_AFTER}.*/
ONLY_UPDATE_AFTER,/*** This kind indicates that operators should emit update changes in the way that a row of {@code* RowKind#UPDATE_BEFORE} and a row of {@code RowKind#UPDATE_AFTER} together.*/
BEFORE_AND_AFTER

02 StreamExecChangelogNormalize

StreamExecChangelogNormalize的处理流程中根据是否启用table.exec.mini-batch.enabled分为微批处理和单数据的流处理

微批处理使用ProcTimeMiniBatchDeduplicateKeepLastRowFunction,流式使用ProcTimeDeduplicateKeepLastRowFunction,两者的核心差别就是微批会缓存数据使用一个for循环处理

这两个函数除了StreamPhysicalChangelogNormalize这一条链路外,还有StreamExecDeduplicate这一条链路,对应StreamPhysicalRankRule,是一个排序的东西

for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {RowData currentKey = entry.getKey();RowData currentRow = entry.getValue();ctx.setCurrentKey(currentKey);if (inputInsertOnly) {processLastRowOnProcTime(currentRow,generateUpdateBefore,generateInsert,state,out,isStateTtlEnabled,equaliser);} else {processLastRowOnChangelog(currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);}
}
  • processLastRowOnProcTime

对数据按照时间语义进行去重,将当前数据作为最新,这个函数只针对insert only的数据

static void checkInsertOnly(RowData currentRow) {Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT);
}

整套处理逻辑就是对数据根据场景修改数据的RowKind类型

} else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow);
}
  • processLastRowOnChangelog

这个函数就是按Key去重,本质上也是针对数据修改RowKind

核心的一块功能就是更新的时候要将前一个数据修改为UPDATE_BEFORE

} else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow);
}

函数整体借用的是Flink的state功能,从状态中获取前面的数据,所以对状态缓存由要求;另外针对非删除型的数据,如果TTL没有开的话,就不会更新前面的数据

if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {// currentRow is the same as preRow and state cleaning is not enabled.// We do not emit retraction and update message.// If state cleaning is enabled, we have to emit messages to prevent too early// state eviction of downstream operators.return;
}

03 初始RowKind来源

前面的流程里,在进行changelog转换的时候,数据是已经存在一个RowKind的值了,这一章追踪初始RowKind的来源

基于Flink-27的设计,Kafka数据源处理任务有一个KafkaRecordEmitter,emitRecord当中做数据的反序列化

deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);

最终走到DeserializationSchema.deserialize完成最终的反序列化

default void deserialize(byte[] message, Collector<T> out) throws IOException {T deserialize = deserialize(message);if (deserialize != null) {out.collect(deserialize);}
}

这里message是一个二进制数组,实际是Kafka的数据类型ConsumerRecord。根据SQL当中的配置,value反序列化使用的是csv,所以走到CsvRowDataDeserializationSchema当中处理

final JsonNode root = objectReader.readValue(message);
return (RowData) runtimeConverter.convert(root);

这里读出来的root是数据的key,convert的转化的实现类是CsvToRowDataConverters,其createRowConverter接口当中创建了转化函数,函数中将数据转化为了Flink的数据类型GenericRowData

GenericRowData row = new GenericRowData(arity);

GenericRowData的定义当中,有初始化RowKind,就是insert

public GenericRowData(int arity) {this.fields = new Object[arity];this.kind = RowKind.INSERT; // INSERT as default
}

04 补充

0401 delete

按照官方说法,发送一个空消息就会产生delete

 Also, null values are interpreted in a special way: a record with a null value represents a “DELETE.

使用kafka producer控制台发送空消息无法解析

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-inputat [Source: UNKNOWN; byte offset: #UNKNOWN]

官方说法是kafka的控制台版本对 null的支持问题,需要3.2以上版本
https://issues.apache.org/jira/browse/FLINK-27663?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22upsert-kafka%22

空值处理逻辑在DynamicKafkaDeserializationSchema.deserialize当中
这里根据输入的数据是否空值进行分支处理;非空值时走的就是前三章的逻辑,也就是这里是前三章逻辑的入口

if (record.value() == null && upsertMode) {// collect tombstone messages in upsert mode by handoutputCollector.collect(null);
} else {valueDeserialization.deserialize(record.value(), outputCollector);
}

空值时走到OutputProjectionCollector.emitRow,这里会设置初始类型为DELETE

if (physicalValueRow == null) {if (upsertMode) {rowKind = RowKind.DELETE;} else {throw new DeserializationException("Invalid null value received in non-upsert mode. Could not to set row kind for output record.");}
} else {rowKind = physicalValueRow.getRowKind();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/724029.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EC600模块通过AT指令接入阿里云物联网平台并发布属性

摘要&#xff1a;本文介绍一下如何通过EC600模块的AT指令&#xff0c;将设备属性值发送到阿里云物联网平台的方法。 这个模块供电可以是 5-16V 和电脑通过USB串口连接&#xff0c;4线即可。未来集成到自己的系统中的时候&#xff0c;可以直接发送指令即可。 使用的软件是FreeAT…

【Vue3】Hooks:一种全新的组件逻辑组织方式

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

STM32作为SPI slave与主机异步通信

背景 最近被测试提了个BUG&#xff0c;说某款产品在用户按下前面板的按键后&#xff0c;对应的按键灯没有亮起来。前面板跟主机是通过SPI口通信&#xff0c;前面板是从机&#xff0c;从机想要主动发送消息&#xff0c;需要通过GPIO中断来通知主机&#xff1a; 上图前面板是ST…

阿里云不杀熟,云服务器优惠价格99元一年,新老用户均可购买

2024阿里云服务器优惠活动政策整理&#xff0c;阿里云99计划ECS云服务器2核2G3M带宽99元一年、2核4G5M优惠价格199元一年&#xff0c;轻量应用服务器2核2G3M服务器61元一年、2核4G4M带宽165元1年&#xff0c;云服务器4核16G10M带宽26元1个月、149元半年&#xff0c;云服务器8核…

14 程序地址空间

背景 kernel 2.6.32 32位平台 空间布局图 如何理解地址划分 地址划分&#xff0c;本质是调整地址空间的定义start和end&#xff0c;内存中定义了管理每个区域范围的结构体&#xff0c;叫mm_struct&#xff0c;每个进程都有一个这个结构体指针变量 验证上面划分的结构&#…

十四经脉最全总结(什么是经脉、十二经脉动态图、经脉走向、人体和手足哪面为阳,哪面为阴?)

目录 一.什么是经脉二.人体和手足哪面为阳&#xff1f;哪面为阴&#xff1f;三.任督二脉3.1 任脉3.2 督脉 四.十二经脉4.1 什么是 厥阴&#xff0c;少阴&#xff0c;太阴&#xff1b;少阳&#xff0c;阳明&#xff0c;太阳&#xff1f;4.2 十二经脉总分布4.3 手三阴经1.手厥阴心…

4、pod运维replicationCtroller、replicaSet、DeamonSet、Job、Cronjob

1、kubenetes 会自动重新运行失败的pod应用 pod运行失败&#xff0c;会自动重启&#xff0c;但是节点失败&#xff0c;pod会被移除&#xff0c; 除非配置了relicationController来管理资源 2、保持pod的健康存活 配置探针&#xff0c;发送http请求 3、查看前一个pod的运行日…

mysql-视图,创建表,存储过程,循环,判断实操命令

数据库操作命令在IDEA工具database的console命令 数据库表结构与视图 -- 查询隔离级别 select transaction_isolation;-- 设置隔离级别 set session transaction isolation level read committed ; set session transaction isolation level REPEATABLE READ ;start transacti…

蚂蚁感冒c++

题目 思路 “两蚂蚁碰面会掉头&#xff0c;若其中一只蚂蚁感冒了&#xff0c;会把感冒传染给碰到的蚂蚁”&#xff0c;这句话看作是“两蚂蚁碰面会互相穿过&#xff0c;只是把感冒的状态传给了另一只蚂蚁”&#xff0c;因为哪只蚂蚁感冒了并不是题目的重点&#xff0c;重点是有…

如何在Word里一次性给全部汉字加拼音?

word是大家日常使用频率较高的工作软件&#xff0c;功能性很强&#xff0c;有上乘的文档格式设置工具&#xff0c;利用它可更轻松、高效地组织和编写文档&#xff0c;熟练运用word&#xff0c;在职场上很重要。那么word如何添加拼音呢?下面给大家介绍一下吧。 方法一&#xf…

线性dp P4310-绝世好题/P4933 大师【日记】

1.绝世好题&#xff08;P4310&#xff09; 绝世好题https://www.luogu.com.cn/problem/P4310 比较考验思维的一道题目&#xff0c;码量和理解难度都不大&#xff0c;重在思维。 一开始看错题&#xff0c;以为是求子串&#xff08;还在想为啥考的纯位运算枚举&#xff0c;whe…

vue iis 配置

下载安装两个IIS模块 1). 传送门&#xff1a;URL Rewrite 2). 传送门&#xff1a;Application Request Routing 注 : 只有在 服务器的主页 有Application Request Routing 部署VUE网站 生成网站 在VUE项目打包生成出发布文件,即文件夹 dist,此处忽略 复制到你需要存放网站的…

Skywalking官方的实战模拟项目Live-Demo

Skywalking 官方的实战模拟项目Live-Demo Live-Demo 是 Skywalking 官方的实战模拟项目&#xff0c;其中包含4个子模块项目 projectA访问projectB、projectC两个SpringBoot项目 projectB访问本地的H2数据库 projectC访问www.baidu.com并同时向一台Kafka消息队列写入数据 proje…

入门指南:使用uni-app构建跨平台应用

入门指南&#xff1a;使用uni-app构建跨平台应用 &#x1f31f; 前言 欢迎来到我的小天地&#xff0c;这里是我记录技术点滴、分享学习心得的地方。&#x1f4da; &#x1f6e0;️ 技能清单 编程语言&#xff1a;Java、C、C、Python、Go前端技术&#xff1a;Jquery、Vue.js、R…

六、软考-系统架构设计师笔记-软件工程基础知识

1、软件工程 软件工程是将系统化的、严格约束的、可量化的方法应用于软件的开发、运行和维护&#xff0c;即将工程化应用于软件并对上述方法的研究。 软件要经历从需求分析、软件设计、软件开发、运行维护&#xff0c;直至被淘汰这样的全过程&#xff0c;这个过程称为软件的生…

Android使用OpenGL和FreeType绘制文字

Open GL主要是渲染图形的&#xff0c;有时候需要绘制文字&#xff0c;网上搜了一下&#xff0c;基本思路都是把文字转成位图&#xff0c;再使用Open GL纹理进行渲染。加载纹理在特定阶段才能成功&#xff08;在onSurfaceCreated中加载&#xff09;&#xff0c;这样就无法动态的…

部署LVS负载均衡架构

目录 一、ipvsadm 工具 二、NAT模式下部署LVS负载均衡 1、部署NFS共享存储服务器 1.1 安装NFS软件 1.2 新建共享目录和站点文件 1.3 设置共享策略 2、部署节点服务器1 2.1 安装并启动nginx软件 2.2 挂载共享目录到网页站点目录 2.3 修改网关 3、部署节点服务器2 3.…

在ABAP中创建一个简单的守护进程

原文地址&#xff1a;Create a simple Daemon in ABAP 目录 一、ABAP语言中的守护进程是什么&#xff1f;二、ABAP 守护进程框架 (ADF)三、ABAP 守护进程类四、创建一个简单的ABAP守护进程步骤1&#xff1a;创建一个新的ABAP Daemon类步骤2&#xff1a;实现ON_ACCEPT方法第三步…

「滚雪球学Java」:GUI编程(章节汇总)

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴——bug菌&#xff0c;今天又来给大家普及Java SE相关知识点了&#xff0c;别躲起来啊&#xff0c;听我讲干货还不快点赞&#xff0c;赞多了我就有动力讲得更嗨啦&#xff01;所以呀&#xff0c;养成先点赞后阅读的好…

Kosmos-1: 通用接口架构下的多模态大语言模型

Kosmos-1: 通用接口架构下的多模态大语言模型 FesianXu 20230513 at Baidu Search Team 前言 在大规模语言模型&#xff08;Large Language Model, LLM&#xff09;看似要带来新一番人工智能变革浪潮之际&#xff0c;越来越多尝试以LLM作为通用接口去融入各种任务的工作&#…