Flink cdc3.0同步实例(动态变更表结构、分库分表同步)

文章目录

  • 前言
  • 准备
    • flink环境
    • docker构建mysql、doris环境
    • 数据准备
  • 通过 FlinkCDC cli 提交任务
    • 整库同步
    • 同步变更
    • 路由变更
    • 路由表结构不一致无法同步
  • 结尾

前言

最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功能使 Flink CDC 3.0 在更复杂的数据集成与用户业务场景中发挥作用:用户无需在数据源发生 schema 变更时手动介入,大大降低用户的运维成本;只需对同步任务进行简单配置即可将多表、多库同步至下游,并进行合并等逻辑,显著降低用户的开发难度与入门门槛。Flink CDC 3.0 正式发布。
我们今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,来体验下新上的整库同步、表结构变更同步和分库分表同步的功能。

准备

flink环境

准备 Flink Standalone 集群,下载最新版本 Flink 1.18.0 ,解压后得到 flink-1.18.0 目录。并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint,方便后续观察数据变更。

execution.checkpointing.interval: 3000

使用下面的命令启动 Flink 集群。

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
在这里插入图片描述
多次执行 start-cluster.sh 可以拉起多个 TaskManager,保证Total Task Slots >= 2, 不然提交任务会有资源不足异常,比如我这里执行了3次。 或者是修改 conf/flink-conf.yaml 资源配置。

docker构建mysql、doris环境

如果有安装这两个组件,就可以免去docker,接下来的教程将以 docker-compose 的方式准备所需要的组件。
由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令:

sysctl -w vm.max_map_count=2000000

docker 镜像启动,使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:doris:image: yagagagaga/doris-standaloneports:- "8030:8030"- "8040:8040"- "9030:9030"mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

该 Docker Compose 中包含的容器有:

MySQL: 包含商品信息的数据库 app_db

Doris: 存储从 MySQL 中根据规则映射过来的结果表

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8030/ 来查看 Doris 是否运行正常。

数据准备

进入 MySQL 容器, 或者通过客户端工具连接到mysql

docker-compose exec mysql mysql -uroot -p123456

创建数据库 app_db 和表 orders,products 并插入数据

-- 创建数据库
CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
进入 Doris Web UI。http://localhost:8030/,默认的用户名为 root,默认密码为空。
通过 Web UI 创建 app_db 数据库

create database if not exists app_db;

在这里插入图片描述

通过 FlinkCDC cli 提交任务

下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.0.0
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。

下载下面列出的 connector 包,并且移动到 lib 目录下

  • MySQL pipeline connector 3.0.0
  • Apache Doris pipeline connector 3.0.0
    在这里插入图片描述

整库同步

编写任务配置 yaml 文件,下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2

其中:
source 中的 tables: app_db.\.* 通过正则匹配同步 app_db 下的所有表。
sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。

最后,通过命令行提交任务到 Flink Standalone cluster

bash bin/flink-cdc.sh conf/mysql-to-doris.yaml

提交成功后,返回信息如:
在这里插入图片描述
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris 的任务正在运行。job id对应上面的cb049fe4a2112510a77ee46e197054a6
在这里插入图片描述
打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功写入。
在这里插入图片描述

同步变更

接下来,修改 MySQL 数据库中表的数据,Doris 中显示的订单数据也将实时更新:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 区别于官方再新增一条数据
INSERT INTO app_db.orders VALUES (4, 200, 200.00);

也可以拆开每执行一步,刷新一次 Doris Web UI,可以看到 Doris 中显示的 orders 数据将实时更新,如下所示:
在这里插入图片描述
同样的,去修改 shipments, products 表,也能在 Doris 中实时看到同步变更的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件conf/mysql-to-doris-route.yaml说明:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db1.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db1.orders\.*sink-table: app_db1.ods_orderspipeline:name: Sync MySQL Database to Dorisparallelism: 2

通过上面的 route 配置,使用正则表达式,可以将诸如 app_db1.order_01、app_db1.order_02 的表汇总到 app_db1.ods_orders 中。从而实现分库分表同步的功能。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
另外官方文档里的写法存在一个问题。
在这里插入图片描述
正则表达式前面加上’\‘转义,app_db1.orders\.*,否则会抛出异常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
在这里插入图片描述
我们在mysql和doris分别创建数据库app_db1, 然后初始化mysql

-- 创建表orders_01
CREATE TABLE `orders_01` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- 创建表orders_02
CREATE TABLE `orders_02` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

启动新的job。
在这里插入图片描述
然后在orders_01,orders_02分别插入数据


INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);

在doris里验证,数据都写入了app_db1.ods_orders
在这里插入图片描述

路由表结构不一致无法同步

看Schema Evolution 设计原理,Flink CDC 3.0 在作业拓扑中引入了 SchemaRegistry,结合 SchemaOperator 协调并控制作业拓扑中的 schema 变更事件处理。当上游数据源发生 schema 变更时,SchemaRegistry 会控制 SchemaOperator 以暂停数据流,并将流水线中的数据从 sink 全部刷出以保证 schema 一致性。当 schema 变更事件在外部系统处理成功后,SchemaOperator 恢复数据流,完成本次 schema 变更的处理。
在这里插入图片描述
所以考虑只修改orders_01,再插入数据看doris同步的变化。

-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);

可以看到doris中的app_db1.orders表结构发生了变化,但是orders_02的id=14这条数据没有正常写入。flink异常提示:java.lang.IllegalStateException: Column size does not match the data size
在这里插入图片描述
而当修改orders_02的表结构,也会有异常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后写入的数据无法正常同步。

结尾

flink cdc的功能越来越强,也再尝试解决用户的使用痛点。不过放到生产环境使用还需要建立在更多的实践测试之上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/236073.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文笔记:Accurate Localization using LTE Signaling Data

1 intro 论文提出LTELoc,仅使用信令数据实现精准定位 信令数据已经包含在已在LTE系统中,因此这种方法几乎不需要数据获取成本仅使用TA(时序提前)和RSRP【这里单位是瓦】(参考信号接收功率) TA值对应于信号…

vue没有使用fetch报错 Uncaught (in promise) TypeError: Failed to fetch

出现下面的错误,主要也没有用谷歌浏览器什么和发起fetch请求,找了很久没有什么发现 POST https://www.google-analytics.com/mp/collect?measurement_idG-04CMS1PYS6&api_secretpRgvhB8VTii5eSmcTzVaOg net::ERR_BLOCKED_BY_CLIENT Uncaught (in pr…

获投1050万欧元!德国量子公司Kipu Quantum成功研发特定压缩算法

​(图片来源:网络) 近日,德国量子软件公司Kipu Quantum宣布成功完成种子轮融资,融资总额达1050万欧元(约合8000万人民币)。该初创公司目前已开发出运行高性能量子计算机所需的压缩算法。该算法…

windows安装、基本使用vim

标题:windows安装、基本使用vim 1.下载并安装GVIM 百度网盘链接 提取码:2apr 进入安装界面,如下,勾选 其它都是默认即可 参考; 2.在powershell中使用vim 参考blog:window10安装vim编辑器 安装好后&…

HTML+CSS做一个冰立方体时钟

文章目录 💕效果展示💕代码展示HTMLJS💕效果展示 💕代码展示 HTML <!DOCTYPE html> <html lang

双三次 Bezier 曲面

双三次 Bezier 曲面的定义 Bezier 曲面是由 Bezier 曲线扩展得到&#xff0c;它是两组正交的 Bezier 曲线控制点构造空间网格生成的曲面 p ( u , v ) ∑ i 0 3 ∑ j 0 3 P i , j B i , 3 ( u ) B j , 3 ( v ) , ( u , v ) ∈ [ 0 , 1 ] [ 0 , 1 ] \begin{equation} \bm{p}…

法线贴图实现衣服上皱褶特效

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 法线贴图在3D建模中扮演着重要的角色&#xff0c;它通过模拟表面的微…

c++内存池项目

文章目录 一、内存池介绍二、ThreadCache实现三、CentralCache实现四、PageCache实现五、回收内存六、大于256KB的内存申请与释放七、将new和delete换为定长内存池八、多线程环境下对比malloc进行基准测试九、使用基数树进行性能优化 一、内存池介绍 二、ThreadCache实现 下面…

springboot集成springdoc-openapi(模拟前端请求)

目录 描述---痛点 Springfox对比springdoc-openapi 1. 成熟度和维护性&#xff1a; 2. 依赖和配置&#xff1a; 3. 注解和使用方式&#xff1a; 4. 特性和扩展性&#xff1a; 应用目录结构 pom文件 新增测试controller StaffController YUserController 启动测试看下…

PHP HTTPoxy CGI 应用程序漏洞 CVE-2016-5385

HTTPoxy CGI 应用程序漏洞 CVE-2016-5385 已亲自复现 漏洞名称漏洞描述影响版本 漏洞复现环境搭建漏洞利用 修复建议 漏洞名称 漏洞描述 在Oracle Communications BRM 10.x/12.x&#xff08;云软件&#xff09;中发现漏洞。它已经被宣布为关键。此漏洞影响组件用户数据库的未…

Linux笔记---用户和权限管理基本命令介绍

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;Linux学习 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 ​编辑 前言&#xff1a; 命令&#xff1a; whoami&#xff1a; passwd&#xff1a; useradd&#xff1a; userdel&#xff1a; chm…

华为交换配置OSPF与BFD联动

实验拓扑 组网需求 如图所示&#xff0c;SW1、SW2和SW3之间运行OSPF&#xff0c;SW1和SW2之间的交换机仅作透传功能。现在需要SW1和SW2能够快速感应它们之间的链路状态&#xff0c;当链路SW1-SW2发生故障时&#xff0c;业务能快速切换到备份链路SW1-SW3-SW2上 配置思路 采用…

文件的基本管理

目录 一、Linux系统目录结构和相对/绝对路径 &#xff08;一&#xff09;系统目录结构 &#xff08;二&#xff09;相对路径和绝对路径 1.绝对路径 2.相对路径 &#xff08;三&#xff09;通配符的作用 二、创建、复制、删除文件&#xff0c;rm -rf /意外事故 &#xf…

说说对React Hooks的理解?解决了什么问题?

面试官&#xff1a;说说对React Hooks的理解&#xff1f;解决了什么问题&#xff1f; 一、是什么 Hook 是 React 16.8 的新增特性。它可以让你在不编写 class 的情况下使用 state 以及其他的 React 特性 至于为什么引入hook&#xff0c;官方给出的动机是解决长时间使用和维护…

山景DU561—32位高性能音频处理器(DSP)芯片

音频处理可以更好地捕捉和处理声音和音乐&#xff1b;而DSP音频处理芯片是一种利用数字信号处理技术进行音频处理的专用芯片&#xff1b;可用于多种应用&#xff0c;从音乐拾音到复杂的音频信号处理&#xff0c;和声音增强。 由工采网代理的山景DU561是一款集成多种音效算法高…

08、基于LunarLander登陆器的DDQN强化学习(含PYTHON工程)

08、基于LunarLander登陆器的DDQN强化学习&#xff08;含PYTHON工程&#xff09; LunarLander复现&#xff1a; 07、基于LunarLander登陆器的DQN强化学习案例&#xff08;含PYTHON工程&#xff09; 08、基于LunarLander登陆器的DDQN强化学习&#xff08;含PYTHON工程&#xf…

DTC营销新模式,创新商业引领裂变营销新潮流的玩法!

DTC营销新模式&#xff0c;创新商业引领裂变营销新潮流的玩法&#xff01; 随着市场竞争的加剧&#xff0c;企业寻求创新的营销模式以突破困境&#xff0c;脱颖而出。其中&#xff0c;DTC&#xff08;Direct-to-Consumer&#xff0c;直接面向消费者&#xff09;营销新模式应运…

CentOS 7 制作openssh 9.6 rpm包更新修复安全漏洞 —— 筑梦之路

2023年12月18日 openssh 发布新版9.6p1&#xff0c;详细内容阅读OpenSSH: Release Notes 背景说明 之前也写过多篇制作openssh rpm包的文章&#xff0c;为何要重新来写一篇制作openssh 9.6版本的&#xff1f; openssh 9.6 rpm包制作和之前存在区别&#xff0c;对于CentOS 7来…

spring之面向切面:AOP(2)

学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰。各位小伙伴&#xff0c;如果您&#xff1a; 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持&#xff0c;想组团高效学习… 想写博客但无从下手&#xff0c;急需…

【Python】函数

一、函数介绍 二、函数的定义 三、函数的参数 四、函数的返回值 五、函数说明文档 六、函数的嵌套调用 七、变量的作用域 一、函数介绍 函数的使用 函数的作用 函数 函数&#xff1a;是组织好的&#xff0c;可重复使用的&#xff0c;用来实现特定功能的代码段。 input()、p…