基于 Flink CDC 构建 MySQL 的 Streaming ETL to MySQL

简介

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:
• 数据同步:用于备份,容灾;
• 数据分发:一个数据源分发给多个下游系统;
• 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。
CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:
• 基于查询的 CDC:
• 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
• 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
• 不保障实时性,基于离线调度存在天然的延迟。
• 基于日志的 CDC:
• 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
• 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
• 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
对比常见的开源 CDC 方案,我们可以发现:
• 对比增量同步能力,
• 基于日志的方式,可以很好的做到增量同步;
• 而基于查询的方式是很难做到增量同步的。
• 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
• 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
• 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
• 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
• 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
• 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
• 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。
在这里插入图片描述
在这里插入图片描述

1.安装单机版

下载

yum install -y java-1.8.0-openjdk.x86_64
yum install -y  java-1.8.0-openjdk-devel
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
mkdir -p /opt/flink
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/flink 

下载jar复制到/opt/flink/flink-1.17.2/lib

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.4.2</version><scope>provided</scope>
</dependency>

配置

vim /opt/flink/flink-1.17.2/conf/flink-conf.yaml

rest.port: 8081
rest.bind-address: 0.0.0.0
jobmanager.execution.timezone: Asia/Shanghai

启动

/opt/flink/flink-1.17.2/bin/stop-cluster.sh
/opt/flink/flink-1.17.2/bin/start-cluster.sh 访问http://10.6.8.227:8081/

2.创建 两个mysql 数据库

docker run -p 13306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysqldocker run -p 23306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

初始化mysql 表结构

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);

在源库中插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");

3.CDC 步骤

启动 /opt/flink/flink-1.17.2/bin/sql-client.sh
只能一条语句一条语句的执行

CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '13306','username' = 'root','password' = 'mysql','database-name' = 'mydb','table-name' = 'products');CREATE TABLE sink_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:23306/mydb?serverTimezone=Asia/Shanghai','username' = 'root','password' = 'mysql','table-name' = 'sink_products');insert into sink_products select * from products;

4.验证

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

参考文档

http://124.220.104.235/web/chatgpt

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/221368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

抖捧自动直播是什么,系统功能讲解

目前有在做实体行业级商家服务的老板 你还在为不会直播&#xff0c;不敢直播而苦恼吗&#xff1f; 你还在为想做直播&#xff0c;但没空开直播而焦灼吗&#xff1f; 今天&#xff0c;你的问题都可以统统解决 实体行业直播必备黑科技&#xff1a;抖捧AI自动直播 只需要一部手…

【vue】element el-table怎么实现跨页勾选

【vue】element el-table怎么实现跨页勾选 需求 由后端提供分页接口&#xff0c;每一次翻页el-table的数据都会被刷新一次&#xff0c;这种情况下怎么实现跨页的勾选 代码 <el-tableref"multipleTable"v-loading"loading":data"operationList&…

使用阿里巴巴同步工具DataX实现Mysql与ElasticSearch数据同步

一、Linux环境要求 二、准备工作 2.1 Linux安装jdk 2.2 linux安装python 2.3 下载DataX&#xff1a; 三、DataX压缩包导入&#xff0c;解压缩 四、编写同步Job 五、执行Job 六、定时更新 6.1 创建定时任务 6.2 提交定时任务 6.3 查看定时任务 七、增量更新思路 一、Linux环境要…

C#教程(一):面向对象

1、介绍 C#是一种多范式编程语言&#xff0c;但其中一个主要的编程范式是面向对象编程&#xff08;OOP&#xff09;。面向对象编程有一些特点&#xff0c;而C#提供了丰富的功能来支持这些特点。 2、面向对象特点 封装&#xff08;Encapsulation&#xff09;&#xff1a; 封装…

亚马逊云科技产品测评榜单新鲜出炉,你上榜了吗?

随着云计算时代的到来&#xff0c;高新科技产品成为我们生活中不可或缺的东西&#xff0c;亚马逊云科技产品全面发力以创新技术助力每一个用户。感受和测评最热门的亚马逊云科技开发者工具与服务为技术发展提供了更多的可能性。 亚马逊云科技产品测评中&#xff0c;大家借助亚马…

微信小程序--判断目标元素是否在可视区域内(可视区域播放视频)

步骤&#xff1a; 1、创建对象实例 2、获取/指定界面上的节点信息 3、判断节点是否在当前屏幕可视区域 微信API提供了两种获取创建对象实例和获取节点的方法&#xff0c;按照以上步骤&#xff0c;我们一一来看&#xff1a; 1、wx.createSelectorQuery() wx.createSelectorQ…

Nginx访问FTP服务器文件的时效性/安全校验

背景 FTP文件服务器在我们日常开发中经常使用&#xff0c;在项目中我们经常把FTP文件下载到内存中&#xff0c;然后转为base64给前端进行展示。如果excel中也需要导出图片&#xff0c;数据量大的情况下会直接返回一个后端的开放接口地址&#xff0c;然后在项目中对接口的参数进…

使用shell脚本将一台虚拟机上面数据分发到其他虚拟机上面xsync

目录 1&#xff0c;功能2&#xff0c;注意点3&#xff0c;shell脚本介绍4&#xff0c;bash内容 1&#xff0c;功能 使用shell脚本将一台虚拟机上面数据分发到其他虚拟机上面。 2&#xff0c;注意点 需要修改的地方&#xff1a;hadoop250 hadoop251 hadoop252 hadoop253 hado…

魔搭社区上线Mistral AI 首个开源 MoE 模型 Mixtral8x7B

Mistral AI 近日发布了首个开源 MoE 模型 Mixtral8x7B&#xff0c;并宣布在魔搭社区上线。 Mixtral-8x7B 是一款混合专家模型&#xff08;Mixtrue of Experts&#xff09;&#xff0c;由8个拥有70亿参数的专家网络组成&#xff0c;在能力上&#xff0c;Mixtral-8x7B 支持32k t…

Java的引用类型有几种?区别是什么?

Java中的引用类型主要分为四种&#xff1a;强引用&#xff08;Strong Reference&#xff09;、软引用&#xff08;Soft Reference&#xff09;、弱引用&#xff08;Weak Reference&#xff09;和虚引用&#xff08;Phantom Reference&#xff09;。这些引用类型在Java中主要用于…

MyBatis-Flex 常见问题

文章目录 官网常见问题MyBatis-Flex 没有启动或者启动出错怎么办&#xff1f;示例中的 AccountMapper 和 "ACCOUNT" 在哪里&#xff0c;报错了。阿里镜像找不到依赖&#xff1f;SpringBoot 3.2 项目&#xff0c;启动报错 Invalid value type for attribute factoryBe…

【Qt5】QMouseEvent的globalPos

2023年12月14日&#xff0c;周四下午 QMouseEvent的globalPos()函数是用于获取鼠标事件发生时的全局坐标。它返回一个QPoint对象&#xff0c;表示鼠标事件的全局位置。 全局坐标是相对于整个屏幕的坐标系&#xff0c;而不是相对于应用程序窗口或控件的坐标系。它可以用来确定鼠…

美颜技术讲解:视频美颜SDK的开发与集成

如今&#xff0c;美颜技术的应用愈发成为吸引用户的一项重要功能。本文将深入探讨视频美颜SDK的开发与集成&#xff0c;揭示其背后的技术原理和实现步骤。 一、美颜技术的背后 美颜技术并非仅仅是简单的滤镜效果&#xff0c;而是一项涉及复杂图像处理和算法的技术。在视频美颜…

wordpress 修改社交图标

要去掉标记的图标&#xff0c;死活找不到在那里配置。后来找到了&#xff0c;下图&#xff08;wordpress 小白&#xff0c;特此记录&#xff09;

解析“七星创客”模式在零售行业的核心要素与成功关键

在互联网时代&#xff0c;商业模式不断创新&#xff0c;其中“七星创客”模式备受关注。零售行业面临着竞争激烈的市场环境&#xff0c;如何衔接“七星创客”模式以提高销售业绩和用户忠诚度成为重要议题。本文将探讨零售行业如何成功衔接“七星创客”模式&#xff0c;并提出具…

安装python第三方库后,在pycharm中不能正常导入

python小白学习opencv&#xff0c;使用pip安装完opencv库后import cv2报错&#xff0c;按照如下设置解决&#xff1a; 需要正确设置python解释器路径

Python内置类属性`__cmp__`属性的使用教程

概要 在Python中&#xff0c;__cmp__属性是一个特殊的方法&#xff0c;用于自定义类的实例之间的比较方式。深入了解和熟练运用这一特性&#xff0c;可以使自定义类更加灵活和强大。本教程将详细介绍__cmp__的基本概念、高级用法以及一些注意事项&#xff0c;通过丰富的示例代…

跨境电商群发消息工具定制贵吗?

随着全球电子商务的快速发展&#xff0c;跨境电商已经成为了一种新兴的商业形式。 为了能够更好地与海外客户沟通&#xff0c;许多卖家开始寻找跨境电商群发消息工具&#xff0c;那么&#xff0c;这些工具的定制费用是否昂贵呢? 首先&#xff0c;我们需要明确一点&#xff1…

线程上下文设计模式

线程上下文机制是参考应用或者系统上下文的机制&#xff0c;使每个线程拥有自己的上下文&#xff0c;不与其他线程共享。线程上下文机制有不同于其他上下文机制的地方&#xff0c;即线程的生命周期结束后&#xff0c;线程上下文也要回收掉&#xff0c;不然容易出现内存泄露。 T…

在线免费压缩pdf文件

在线免费压缩pdf文件&#xff0c;不用登陆哦&#xff0c; https://www.ilovepdf.com/ https://online2pdf.com/#