Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

  • 准备
    • 准备 Flink Standalone 集群
    • 准备 docker compose
    • 为 MySQL 准备记录
    • 使用 Flink CDC CLI 提交作业
  • 同步架构和数据更改
  • 路由变更
  • 清理

本教程将展示如何使用 Flink CDC 快速构建从 MySQL 到 StarRocks 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中执行,整个过程使用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。

准备

准备一台安装了 Docker 的 Linux 或 MacOS 电脑。

准备 Flink Standalone 集群

下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。

使用以下命令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。

cd flink-1.18.0

通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒执行一次检查点。

execution.checkpointing.interval: 3000

使用以下命令启动 Flink 集群。

./bin/start-cluster.sh

如果启动成功,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。
在这里插入图片描述
多次执行start-cluster.sh可以启动多个TaskManager。

准备 docker compose

以下教程将使用 docker-compose 准备所需的组件。使用下面提供的内容创建 docker-compose.yml 文件:

version: '2.1'
services:StarRocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8080:8080"- "9030:9030"MySQL:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

Docker Compose 应包含以下服务(容器):

  • MySQL:包含一个名为 app_db 的数据库
  • StarRocks:存储来自 MySQL 的表
docker-compose up -d

该命令会自动以分离模式启动 Docker Compose 配置中定义的所有容器。运行 docker ps 检查这些容器是否正常运行。您也可以访问 http://localhost:8030/ 检查 StarRocks 是否正在运行。

为 MySQL 准备记录

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建 app_db 数据库和订单、产品、发货表,然后插入记录

-- create database
CREATE DATABASE app_db;USE app_db;-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

使用 Flink CDC CLI 提交作业

  • 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
    flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。
  • 下载下面列出的连接器包并移动到 lib 目录
    下载链接只针对稳定版本,SNAPSHOT 依赖需要自行基于 master 或 release 分支构建。请注意,需要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。
    • MySQL 管道连接器 3.1.0
    • StarRocks 管道连接器 3.1.0

您还需要将 MySQL 连接器放入 Flink lib 文件夹或使用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:

  • MySQL Connector Java

编写任务配置yaml文件。下面是同步整个数据库的示例文件mysql-to-starrocks.yaml:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to StarRocksparallelism: 2

注意:

  • source 中的 tables: app_db..* 通过正则匹配同步 app_db 中的所有表。
  • sink 中的 table.create.properties.replication_num 是因为 Docker 镜像中只有一个 StarRocks BE 节点。

最后,使用Cli将作业提交到Flink Standalone集群。

bash bin/flink-cdc.sh mysql-to-starrocks.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks

我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to StarRocks“的作业正在运行。
在这里插入图片描述
通过Dbeaver等数据库连接工具使用mysql://127.0.0.1:9030连接jdbc,可以在StarRocks中查看写入三张表的数据。

在这里插入图片描述

同步架构和数据更改

进入MySQL容器

docker-compose exec mysql mysql -uroot -p123456

然后修改MySQL中的schema和记录,Doris的表也会实时改变:

在MySQL中的orders中插入一条记录:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

在 MySQL 的订单中添加一列:

ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

从 MySQL 更新订单中的一条记录:

UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

从 MySQL 中删除订单中的一条记录:

DELETE FROM app_db.orders WHERE id=2;

每执行一步刷新一下Dbeaver,可以看到StarRocks中展示的订单表会实时更新,如下图:
在这里插入图片描述
同样的,通过修改shipping和products表,你也可以在StarRocks中实时看到同步修改的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是使用路由功能的示例文件:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2

通过上面的路由配置,我们可以将app_db.orders的表结构和数据同步到ods_db.ods_orders中,从而实现数据库迁移的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:

route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders

这样我们就可以将app_db.order01、app_db.order02、app_db.order03等分片表同步到一张ods_db.ods_orders表中了。

注意,目前还不支持多张表存在相同主键数据的场景,后续版本会支持。

清理

完成教程后,运行以下命令停止docker-compose.yml目录中的所有容器:

docker-compose down

在Flink flink-1.18.0目录下,执行以下命令停止Flink集群:

./bin/stop-cluster.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/56776.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Ansible实践笔记]自动化运维工具Ansible(二):Ansible的playbook及角色

Ansible playbook(剧本) 详情请参考[Ansible实践笔记]自动化运维工具Ansible(一):初探ansible&ansible的点对点模式 文章目录 Ansible playbook(剧本)介绍核心字段环境配置案例&#xff1…

React--》掌握Valtio让状态管理变得轻松优雅

Valtio采用了代理模式,使状态管理变得更加直观和易于使用,同时能够与React等框架无缝集成,本文将深入探讨Valtio的核心概念、使用场景以及其在提升应用性能中的重要作用,帮助你掌握这一强大工具,从而提升开发效率和用户…

【Go语言】

type关键字的用法 定义结构体定义接口定义类型别名类型定义类型判断 别名实际上是为了更好地理解代码/ 这里要分点进行记录 使用传值的例子,当两个类型不一样需要进行类型转换 type Myint int // 自定义类型,基于已有的类型自定义一个类型type Myin…

用kali入侵 DarkHole_2测试

进入kali系统调出root交互式界面 netdiscover -r 000.000.000.000/24 -------局域网探测IP工具 nmap 设备端口扫描 发现两个攻击点一个是80端口的Http 一个是22端口的ssh 发现有许多GIT文件 可能会出现git源码泄露 使用githack URL 命令还原git源文件 打开面板控制命令行 输入…

2.插入排序(斗地主起牌)

一、思想 扑克牌起牌 代码: 二、时间复杂度: 最好情况(已经排序好的):T O(N) 最坏情况(完全逆序):T O(N^2) 三、优劣: 严格的大小比较之后才进行错位插入&#x…

exchange_proxy exchange 安全代理

1. 软件简介 exchange_proxy 是由小米公司开发并开源的,以 go 语言开发的 exchange 安全代理,可以将内网的 exchange 服务器的 https 服务安全地发布出去, 支持的功能如下: WEB 端增加 OTP 二次认证手机端增加设备激活绑定的功能屏蔽了 PC 端的 EWS 协议(意思就是不支持)…

gin入门教程(5):请求参数处理

在 Gin 中,处理请求参数非常简单。您可以从 URL 路由、查询字符串和请求体中提取参数。以下是几种常见的处理方式: 1. URL 路由参数 如果您想从 URL 中获取参数,可以使用路由定义中的冒号(:)符号: r.GET…

【PHP】在ThinkPHP6中Swoole与FPM的简单性能测试对比

一、前言 本文主要测试在ThinkPHP 6框架中,使用Swoole扩展库与使用PHP-FPM两者的HTTP并发性能差距,测试方法较简单,仅供参考。 二、测试环境 系统:Ubuntu 22.04 PHP版本:7.4.33 Swoole版本:4.8.13 ThinkPHP版本:6.1.5 ThinkPHP-Swoole扩展库版本:3.1.4 测试工具:A…

unity中GameObject介绍

在 Unity 中,Cube和Sphere等基本几何体是 Unity 引擎的内置预制体(Prefabs),它们属于 Unity 中的GameObject 系统,可以在 Unity 的 Hierarchy 视图或 Scene 视图中右键点击,然后在弹出的菜单中选择 3D Obje…

MySQLDBA修炼之道-开发篇(一)

三、开发基础 1. 数据模型 1.1 关系数据模型介绍 关于NULL 如果某个字段的值是未知的或未定义的&#xff0c;数据库会提供一个特殊的值NULL来表示。NULL值很特殊&#xff0c;在关系数据库中应该小心处理。例如查询语句“select*from employee where 绩效得分<85 or>绩…

ElasticSearch的向量存储和搜索

ElasticSearch的向量存储和搜索 引入依赖示例代码 引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schema…

JVM参数选项类型

我的后端学习大纲 JVM学习大纲 1、类型1&#xff1a;标准参数选项&#xff1a; 1.1.特点&#xff1a; 1.比较稳定&#xff0c;后续基本不会发生变化2.以“-”开头 1.2.各种选项&#xff1a; 运行java或者java -help可以看到所有的标准选项 1.3.补充内容&#xff1a; -se…

Halcon 多相机统一坐标系(标定)

多相机统一坐标系是指将多个不同位置的相机的图像采集到同一个坐标系下进行处理和分析的方法。 在计算机视觉和机器视觉领域中&#xff0c;多相机统一坐标系被广泛应用于三维重建、立体视觉、目标跟踪等任务中。 以gen_binocular_rectification_map&#xff08;生成描述图像映…

【NodeJS】NodeJS+mongoDB在线版开发简单RestfulAPI (七):MongoDB的设置

本项目旨在学习如何快速使用 nodejs 开发后端api&#xff0c;并为以后开展其他项目的开启提供简易的后端模版。&#xff08;非后端工程师&#xff09; 由于文档是代码写完之后&#xff0c;为了记录项目中需要注意的技术点&#xff0c;因此文档的叙述方式并非开发顺序&#xff0…

Android View的事件分发机制

前言 本文由于介绍本人关于View的事件分发机制的学习&#xff0c;如有不恰当的描述欢迎指出。 View基础 什么是View ​ View是Android中所有控件的基类&#xff0c;不管是Button、TextView、LinearLayout&#xff0c;它们的共同基类都是View。也就是说&#xff0c;View是界…

K8S配置storage-class

简介 Kubernetes支持NFS存储&#xff0c;需要安装nfs-subdir-external-provisioner&#xff0c;它是一个存储资源自动调配器&#xff0c;它可将现有的NFS服务器通过持久卷声明来支持Kubernetes持久卷的动态分配。该组件是对Kubernetes NFS-Client Provisioner的扩展&#xff0…

腾讯云跨AZ部署FortigateHA备忘录

随时保存配置 config system globalset admintimeout 480set alias "FortiGate-VM64-KVM"set gui-auto-upgrade-setup-warning disableset hostname "FG-Slave"set revision-backup-on-logout enableset revision-image-auto-backup enableset timezone &…

179.最大数

目录 题目解法sort可以自定义排序规则 题目 给定一组非负整数 nums&#xff0c;重新排列每个数的顺序&#xff08;每个数不可拆分&#xff09;使之组成一个最大的整数。 注意&#xff1a;输出结果可能非常大&#xff0c;所以你需要返回一个字符串而不是整数。 解法 class S…

Android开发兼容性问题3万字保姆级教程(Android版本、屏幕、多语言、硬件、第三方库、权限)

目录 第一章 Android版本兼容性 1.1 版本众多的挑战 1.2 设置版本参数 1.3 API版本检测 1.4 兼容性实例 使用minSdkVersion和targetSdkVersion 1.5 版本更新的应对策略 第二章 屏幕尺寸与分辨率兼容性 2.1 屏幕尺寸的多样性 2.2 响应式布局 2.3 drawable资源管理 使…

面向对象与设计模式第一节:深入理解OOP

第三章&#xff1a;面向对象与设计模式 第一节&#xff1a;深入理解OOP 面向对象编程&#xff08;OOP&#xff09;是一种编程范式&#xff0c;它将程序结构视为由对象组成&#xff0c;促进了代码的重用性和可维护性。在这一课中&#xff0c;我们将深入分析OOP的四个基本特性&…