Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

  • 准备
    • 准备 Flink Standalone 集群
    • 准备 docker compose
    • 为 MySQL 准备记录
    • 使用 Flink CDC CLI 提交作业
  • 同步架构和数据更改
  • 路由变更
  • 清理

本教程将展示如何使用 Flink CDC 快速构建从 MySQL 到 StarRocks 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中执行,整个过程使用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。

准备

准备一台安装了 Docker 的 Linux 或 MacOS 电脑。

准备 Flink Standalone 集群

下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。

使用以下命令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。

cd flink-1.18.0

通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒执行一次检查点。

execution.checkpointing.interval: 3000

使用以下命令启动 Flink 集群。

./bin/start-cluster.sh

如果启动成功,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。
在这里插入图片描述
多次执行start-cluster.sh可以启动多个TaskManager。

准备 docker compose

以下教程将使用 docker-compose 准备所需的组件。使用下面提供的内容创建 docker-compose.yml 文件:

version: '2.1'
services:StarRocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8080:8080"- "9030:9030"MySQL:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

Docker Compose 应包含以下服务(容器):

  • MySQL:包含一个名为 app_db 的数据库
  • StarRocks:存储来自 MySQL 的表
docker-compose up -d

该命令会自动以分离模式启动 Docker Compose 配置中定义的所有容器。运行 docker ps 检查这些容器是否正常运行。您也可以访问 http://localhost:8030/ 检查 StarRocks 是否正在运行。

为 MySQL 准备记录

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建 app_db 数据库和订单、产品、发货表,然后插入记录

-- create database
CREATE DATABASE app_db;USE app_db;-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

使用 Flink CDC CLI 提交作业

  • 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
    flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。
  • 下载下面列出的连接器包并移动到 lib 目录
    下载链接只针对稳定版本,SNAPSHOT 依赖需要自行基于 master 或 release 分支构建。请注意,需要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。
    • MySQL 管道连接器 3.1.0
    • StarRocks 管道连接器 3.1.0

您还需要将 MySQL 连接器放入 Flink lib 文件夹或使用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:

  • MySQL Connector Java

编写任务配置yaml文件。下面是同步整个数据库的示例文件mysql-to-starrocks.yaml:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to StarRocksparallelism: 2

注意:

  • source 中的 tables: app_db..* 通过正则匹配同步 app_db 中的所有表。
  • sink 中的 table.create.properties.replication_num 是因为 Docker 镜像中只有一个 StarRocks BE 节点。

最后,使用Cli将作业提交到Flink Standalone集群。

bash bin/flink-cdc.sh mysql-to-starrocks.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks

我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to StarRocks“的作业正在运行。
在这里插入图片描述
通过Dbeaver等数据库连接工具使用mysql://127.0.0.1:9030连接jdbc,可以在StarRocks中查看写入三张表的数据。

在这里插入图片描述

同步架构和数据更改

进入MySQL容器

docker-compose exec mysql mysql -uroot -p123456

然后修改MySQL中的schema和记录,Doris的表也会实时改变:

在MySQL中的orders中插入一条记录:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

在 MySQL 的订单中添加一列:

ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

从 MySQL 更新订单中的一条记录:

UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

从 MySQL 中删除订单中的一条记录:

DELETE FROM app_db.orders WHERE id=2;

每执行一步刷新一下Dbeaver,可以看到StarRocks中展示的订单表会实时更新,如下图:
在这里插入图片描述
同样的,通过修改shipping和products表,你也可以在StarRocks中实时看到同步修改的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是使用路由功能的示例文件:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2

通过上面的路由配置,我们可以将app_db.orders的表结构和数据同步到ods_db.ods_orders中,从而实现数据库迁移的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:

route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders

这样我们就可以将app_db.order01、app_db.order02、app_db.order03等分片表同步到一张ods_db.ods_orders表中了。

注意,目前还不支持多张表存在相同主键数据的场景,后续版本会支持。

清理

完成教程后,运行以下命令停止docker-compose.yml目录中的所有容器:

docker-compose down

在Flink flink-1.18.0目录下,执行以下命令停止Flink集群:

./bin/stop-cluster.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/56776.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Ansible实践笔记]自动化运维工具Ansible(二):Ansible的playbook及角色

Ansible playbook(剧本) 详情请参考[Ansible实践笔记]自动化运维工具Ansible(一):初探ansible&ansible的点对点模式 文章目录 Ansible playbook(剧本)介绍核心字段环境配置案例&#xff1…

React--》掌握Valtio让状态管理变得轻松优雅

Valtio采用了代理模式,使状态管理变得更加直观和易于使用,同时能够与React等框架无缝集成,本文将深入探讨Valtio的核心概念、使用场景以及其在提升应用性能中的重要作用,帮助你掌握这一强大工具,从而提升开发效率和用户…

【Go语言】

type关键字的用法 定义结构体定义接口定义类型别名类型定义类型判断 别名实际上是为了更好地理解代码/ 这里要分点进行记录 使用传值的例子,当两个类型不一样需要进行类型转换 type Myint int // 自定义类型,基于已有的类型自定义一个类型type Myin…

用kali入侵 DarkHole_2测试

进入kali系统调出root交互式界面 netdiscover -r 000.000.000.000/24 -------局域网探测IP工具 nmap 设备端口扫描 发现两个攻击点一个是80端口的Http 一个是22端口的ssh 发现有许多GIT文件 可能会出现git源码泄露 使用githack URL 命令还原git源文件 打开面板控制命令行 输入…

2.插入排序(斗地主起牌)

一、思想 扑克牌起牌 代码: 二、时间复杂度: 最好情况(已经排序好的):T O(N) 最坏情况(完全逆序):T O(N^2) 三、优劣: 严格的大小比较之后才进行错位插入&#x…

unity中GameObject介绍

在 Unity 中,Cube和Sphere等基本几何体是 Unity 引擎的内置预制体(Prefabs),它们属于 Unity 中的GameObject 系统,可以在 Unity 的 Hierarchy 视图或 Scene 视图中右键点击,然后在弹出的菜单中选择 3D Obje…

JVM参数选项类型

我的后端学习大纲 JVM学习大纲 1、类型1:标准参数选项: 1.1.特点: 1.比较稳定,后续基本不会发生变化2.以“-”开头 1.2.各种选项: 运行java或者java -help可以看到所有的标准选项 1.3.补充内容: -se…

Halcon 多相机统一坐标系(标定)

多相机统一坐标系是指将多个不同位置的相机的图像采集到同一个坐标系下进行处理和分析的方法。 在计算机视觉和机器视觉领域中,多相机统一坐标系被广泛应用于三维重建、立体视觉、目标跟踪等任务中。 以gen_binocular_rectification_map(生成描述图像映…

Android View的事件分发机制

前言 本文由于介绍本人关于View的事件分发机制的学习,如有不恰当的描述欢迎指出。 View基础 什么是View ​ View是Android中所有控件的基类,不管是Button、TextView、LinearLayout,它们的共同基类都是View。也就是说,View是界…

K8S配置storage-class

简介 Kubernetes支持NFS存储,需要安装nfs-subdir-external-provisioner,它是一个存储资源自动调配器,它可将现有的NFS服务器通过持久卷声明来支持Kubernetes持久卷的动态分配。该组件是对Kubernetes NFS-Client Provisioner的扩展&#xff0…

腾讯云跨AZ部署FortigateHA备忘录

随时保存配置 config system globalset admintimeout 480set alias "FortiGate-VM64-KVM"set gui-auto-upgrade-setup-warning disableset hostname "FG-Slave"set revision-backup-on-logout enableset revision-image-auto-backup enableset timezone &…

面向对象与设计模式第一节:深入理解OOP

第三章:面向对象与设计模式 第一节:深入理解OOP 面向对象编程(OOP)是一种编程范式,它将程序结构视为由对象组成,促进了代码的重用性和可维护性。在这一课中,我们将深入分析OOP的四个基本特性&…

[JAVAEE] 多线程的案例(三) - 线程池

目录 一. 什么是线程池 二. 线程池的作用 三. java提供的线程池类 四. ThreadPoolExecutor的构造方法及参数理解 1. int corePoolSize: 核心线程数. 2. int maximumPoolSize: 最大线程数 核心线程数 非核心线程数 3. int keepAliveTime:非核心线程允许空闲的最大时间. …

DataX简介及使用

目录 一、DataX离线同步工具DataX3.0介绍 1.1、 DataX 3.0概览 1.2、特征 1.3、DataX3.0框架设计 1.4、支持的数据元 1.5、DataX3.0核心架构 1.6、DataX 3.0六大核心优势 1.6.1、可靠的数据质量监控 1.6.2、丰富的数据转换功能 1.6.3、精准的速度控制 1.6.4、强劲的…

正则表达式和通配符

文章目录 正则表达式和通配符的区别正则表达式(Regex)通配符(Wildcards)总结 正则表达式的概念正则表达式的由来为什么要使用正则表达式 正则表达式的语法组成修饰符元字符\f\b\B 在Linux中的基础正则和扩展正则基础正则(BRE)^$.*…

面试时被问到“Scaling Law”,该怎么答?

在大模型的研发中,通常会有下面一些需求: 计划训练一个 10B 的模型,想知道至少需要多大的数据? 收集到了 1T 的数据,想知道能训练一个多大的模型? 老板准备 1 个月后开发布会,给的资源是 100 …

Linux安装Nginx教程(rpm安装方式)

本章教程,主要介绍如何在Linux Centos7系统上,使用rpm的方式进行安装Nginx。 一、安装wget插件 如果不存在wget下载插件,需要安装一下。 yum install -y wget二 、下载rpm安装包 官方提供的rpm下载地址:https://nginx.org/packages/centos/7/x86_64/RPMS/ <

【Nginx系列】499错误

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Postman常见问题及解决方(全)

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 1、网络连接问题 如果Postman无法发送请求或接收响应&#xff0c;可以尝试以下操作&#xff1a; 检查网络连接是否正常&#xff0c;包括检查网络设置、代理设置…

软考中级嵌入式系统设计师笔记分享(二)

1.TTL 电路是电流控制器件&#xff0c;而CMOS 电路是电压控制器件。 2.TTL 电路的速度快&#xff0c;传输延迟时间短(5-10ns)&#xff0c;但是功耗大。 常见的串行总线有 SPI、II2C、USB、RS232/RS422/RS485、CAN等;高速串行总线主要有 SATA、PCIE、IEEE 1394、Rapidl0、USB 3…