Flink cdc3.0动态变更表结构——源码解析

文章目录

  • 前言
  • 源码解析
    • 1. 接收schema变更事件
    • 2. 发起schema变更请求
    • 3. schema变更请求具体处理
    • 4. 广播刷新事件并阻塞
    • 5. 处理FlushEvent
    • 6. 修改sink端schema
  • 结尾

前言

上一篇Flink cdc3.0同步实例 介绍了最新的一些功能和问题,本篇来看下新功能之一的动态变更表结构的具体实现。
在 Flink 中,应用程序由流数据流组成,这些数据流是由用户定义的Operators进行转换。
在这里插入图片描述
Flink CDC 3.0 框架中流动的数据类型被称为Event,代表外部系统产生的变更事件。每个事件都标有发生更改的表 ID 。事件分为SchemaChangeEventDataChangeEvent,分别代表表结构和数据的变化。处理schema变更的Operators对应图中的SchemaOperator
在这里插入图片描述
(以下代码使用Flink Release 3.0.0)

源码解析

1. 接收schema变更事件

我们以添加字段触发的AddColumnEvent为例,它实现了SchemaChangeEventSchemaOperator 当接收到有AddColumnEvent 事件时,会在processElement 中调用handleSchemaChangeEvent处理。
在这里插入图片描述

2. 发起schema变更请求

说明下这里的response实际是直接返回的new SchemaChangeResponse(true), 由于构造的shouldSendFlushEvent 直接传入true, 所以后续也会进入if条件。我们接着requestSchemaChange 方法看
在这里插入图片描述
由于知道response是直接创建的已知结果,因此responseFuture.get() 也不会阻塞。我们接着来看toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue<>(request));的实现
在这里插入图片描述

3. schema变更请求具体处理

通过几层的调用,上述变更请求会走到 SchemaRegistryhandleCoordinationRequest(CoordinationRequest request),我们的请求是SchemaChangeRequest,所以会调用requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
在这里插入图片描述
这里可以看到response 是直接创建的SchemaChangeResponse(true)。 接着schemaManager.applySchemaChange(request.getSchemaChangeEvent());注册新的schema。

在这里插入图片描述
另外还有个重点,在startToWaitForReleaseRequest方法中会重置responseFuture, 原本的response通过return返回了。而PendingSchemaChange中的response重置,主要就是为了等schema变更完成设计。(主线程会再次发起请求调用responseFuture.get() ,忽略这里会不理解后面为什么会阻塞)
在这里插入图片描述

4. 广播刷新事件并阻塞

回到第二部分,因为response是一个明确对象没有阻塞,返回后会直接广播FlushEventschemaChangeEvent(再次发起schemaChangeEvent不是很理解)。之后requestReleaseUpstream 请求调用responseFuture.get()会阻塞,因为response在第三步已经重置为new CompletableFuture<>(), 利用的1.8的特性。这也是收到变更事件后要保证sink端变更才能发放数据。
在这里插入图片描述

5. 处理FlushEvent

FlushEvent 由什么Operator处理,在官方架构图中其实没有指出,但是图标可以看出是通过sink端完成,我们可以找到DataSinkWriterOperator类,有对FlushEvent的处理。
在这里插入图片描述
实际调用SchemaRegistry::handleEventFromOperator方法,重点在requestHandler.flushSuccess(flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
在这里插入图片描述
其中applySchemaChange 就是在具体的sink端变更,下面会展开。 当变更完成后会执行waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));,实际就通知第4部分的response这里处理完了,可以正常放开数据流。
在这里插入图片描述

6. 修改sink端schema

每个sink端有自定义的metadataApplier
在这里插入图片描述
我们以DorisMetadataApplier为例,applyAddColumnEvent 会构造addFieldSchema,然后在schemaChangeManager 中转换为对应的sql执行。
在这里插入图片描述

结尾

以上就是这两天对源码跟进的记录,后续思考使用local环境Debug中间过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670591.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新零售的升维体验,摸索华为云GaussDB如何实现数据赋能

新零售商业模式 商业模式通常是由客户价值、企业资源和能力、盈利方式三个方面构成。其最主要的用途是为实现客户价值最大化。 商业模式通过把能使企业运行的内外各要素整合起来&#xff0c;从而形成一个完整的、高效率的、具有独特核心竞争力的运行系统&#xff0c;并通过最…

Windows显示空的可移动磁盘的解决方案

123  大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式…

寒武纪显卡实现高维向量的softmax并行优化

关于寒武纪编程可以参考本人之前的文章添加链接描述&#xff0c;添加链接描述&#xff0c;添加链接描述 高维向量softmax的基础编程 高维向量的softmax实现更加复杂&#xff0c;回忆之前在英伟达平台上实现高维向量的softmax函数&#xff0c;比如说我们以形状为[1,2,3,4,5,6]…

Unity_ShaderGraph节点问题

Unity_ShaderGraph节点问题 Unity版本&#xff1a;Unity2023.1.19 为什么在Unity2023.1.19的Shader Graph中找不见PBR Master节点&#xff1f; 以下这个PBR Maste从何而来&#xff1f;

linux下 Make 和 Makefile构建你的项目

Make 和 Makefile构建你的项目 介绍 在软件开发中&#xff0c;构建项目是一个必不可少的步骤。make 是一个强大的自动化构建工具&#xff0c;而 Makefile 是 make 工具使用的配置文件&#xff0c;用于描述项目的构建规则和依赖关系。本篇博客将介绍 make 和 Makefile 的基本概…

【成品论文】2024美赛B题完整成品论文23页+3小问matlab代码+数据集汇总

2024 年美国大学生数学建模竞赛&#xff08;2024 美赛&#xff09;B 题&#xff1a; 2024 MCM 问题 B: 搜寻潜水艇 题目翻译&#xff1a; Maritime Cruises Mini-Submarines (MCMS)是一家总部位于希腊的公司&#xff0c;专门制造能够携 带人类到达海洋最深处的潜水艇。潜水艇是…

【Kubernetes】在k8s1.24及以上版本基于containerd容器运行时测试pod从harbor拉取镜像

基于containerd容器运行时测试pod从harbor拉取镜像 1、安装高版本containerd2、安装docker3、登录harbor上传镜像4、从harbor拉取镜像 1、安装高版本containerd 集群中各个节点都要操作 yum remove containerd.io -y yum install containerd.io-1.6.22* -y cd /etc/containe…

SpringBoot实战第三天

今天主要完成了&#xff1a; 新增棋子分类 棋子分类列表 获取棋子分类详情 更新棋子分类 更新棋子分类和添加棋子分类_分组校验 新增棋子 新增棋子参数校验 棋子分类列表查询(条件分页) 先给出分类实体类 Data public class Category {private Integer id;//主键IDNot…

[UI5 常用控件] 06.Splitter,ResponsiveSplitter

文章目录 前言1. Splitter1.1 属性 2. ResponsiveSplitter 前言 本章节记录常用控件Splitter,ResponsiveSplitter。主要功能是分割画面布局。 其路径分别是&#xff1a; sap.ui.layout.Splittersap.ui.layout.ResponsiveSplitter 1. Splitter 1.1 属性 orientation &#x…

DevOps落地笔记-17|度量指标:寻找真正的好指标?

前面几个课时端到端地介绍了软件开发全生命周期中涉及的最佳实践&#xff0c;经过上面几个步骤&#xff0c;企业在进行 DevOps 转型时技术方面的问题解决了&#xff0c;这个时候我们还缺些什么呢&#xff1f;事实上很多团队和组织在实施 DevOps 时都专注于技术&#xff0c;而忽…

【Linux网络编程三】Udp套接字编程(简易版服务器)

【Linux网络编程三】Udp套接字编程(简易版服务器&#xff09; 一.创建套接字二.绑定网络信息1.构建通信类型2.填充网络信息①网络字节序的port②string类型的ip地址 3.最终绑定 三.读收消息1.服务器端接收消息recvfrom2.服务器端发送消息sendto3.客户端端发送消息sendto4.客户端…

TCP 了解

参考&#xff1a;4.2 TCP 重传、滑动窗口、流量控制、拥塞控制 | 小林coding TCP报文 其中比较重要的字段有&#xff1a;&#xff08;1&#xff09;序号&#xff08;sequence number&#xff09;&#xff1a;Seq序号&#xff0c;占32位&#xff0c;用来标识从TCP源端向目的端发…

利用IP地址精准定位服务

在数字化时代&#xff0c;IP地址已成为连接我们与网络世界的纽带之一。通过IP地址&#xff0c;我们可以追踪用户的位置信息&#xff0c;实现精准定位服务。本文将探讨如何利用IP地址精准定位服务&#xff0c;为个人和企业带来便利和价值。 一、什么是IP地址精准定位服务&#…

【FPGA】高云FPGA之IP核的使用->PLL锁相环

FPGA开发流程 1、设计定义2、设计输入3、分析和综合4、功能仿真5、布局布线6、时序仿真7、IO分配以及配置文件&#xff08;bit流文件&#xff09;的生成8、配置&#xff08;烧录&#xff09;FPGA9、在线调试 1、设计定义 使用高云内置IP核实现多路不同时钟输出 输入时钟50M由晶…

IDEA创建SpringBoot+Mybatis-Plus项目

IDEA创建SpringBootMybatis-Plus项目 一、配置Maven apache-maven-3.6.3的下载与安装&#xff08;详细教程&#xff09; 二、创建SpringBoot项目 在菜单栏选择File->new->project->Spring Initializr&#xff0c;然后修改Server URL为start.aliyun.com&#xff0c…

【图像文本化】Base64编解码OpenCV4中 Mat 对象

学习《OpenCV应用开发&#xff1a;入门、进阶与工程化实践》一书 做真正的OpenCV开发者&#xff0c;从入门到入职&#xff0c;一步到位&#xff01; 前言 很多时候在开发中&#xff0c;需要保存图像为文本形式&#xff0c;以便于存储与传输。最常见的就是把图像文件编码为Ba…

C# CAD交互界面-自定义工具栏(二)

运行环境 vs2022 c# cad2016 调试成功 一、引用 acdbmgd.dllacmgd.dllaccoremgd.dllAutodesk.AutoCAD.Interop.Common.dllAutodesk.AutoCAD.Interop.dll using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.T…

spring boot学习第十篇:elastic search必须使用用户名密码授权后才能访问、在java代码中操作索引

前提条件&#xff1a;安装好了elastic search服务&#xff0c;参考&#xff1a;elastic search入门_ubuntu elasticsearch 密码-CSDN博客 1、配置elastic search必须使用用户名密码授权才能访问 1.1开启x-pack验证 修改config目录下面的elasticsearch.yml文件&#xff0c;添…

VM 虚拟机和容器技术之间有什么区别?

随着云计算技术的不断发展&#xff0c;虚拟机和容器技术作为两种常见的虚拟化技术&#xff0c;被广泛应用于云计算领域。虽然虚拟机和容器技术都是虚拟化技术&#xff0c;但它们之间存在一些重要的区别。本文将详细介绍虚拟机和容器技术的区别&#xff0c;以便读者更好地了解这…

亚信安慧AntDB推动技术创新与满足用户需求

随着互联网技术的迅猛发展&#xff0c;大数据时代的到来&#xff0c;数据库的需求不断增长。在这样的背景下&#xff0c;国产分布式数据库正逐渐崭露头角&#xff0c;AntDB作为其中的重要代表&#xff0c;也积极参与到了这场竞争中。作为国内的技术创新者&#xff0c;AntDB不仅…