基于 Flink CDC 构建 MySQL 的 Streaming ETL to MySQL

简介

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:
• 数据同步:用于备份,容灾;
• 数据分发:一个数据源分发给多个下游系统;
• 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。
CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:
• 基于查询的 CDC:
• 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
• 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
• 不保障实时性,基于离线调度存在天然的延迟。
• 基于日志的 CDC:
• 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
• 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
• 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
对比常见的开源 CDC 方案,我们可以发现:
• 对比增量同步能力,
• 基于日志的方式,可以很好的做到增量同步;
• 而基于查询的方式是很难做到增量同步的。
• 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
• 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
• 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
• 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
• 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
• 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
• 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。
在这里插入图片描述
在这里插入图片描述

1.安装单机版

下载

yum install -y java-1.8.0-openjdk.x86_64
yum install -y  java-1.8.0-openjdk-devel
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
mkdir -p /opt/flink
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/flink 

下载jar复制到/opt/flink/flink-1.17.2/lib

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.4.2</version><scope>provided</scope>
</dependency>

配置

vim /opt/flink/flink-1.17.2/conf/flink-conf.yaml

rest.port: 8081
rest.bind-address: 0.0.0.0
jobmanager.execution.timezone: Asia/Shanghai

启动

/opt/flink/flink-1.17.2/bin/stop-cluster.sh
/opt/flink/flink-1.17.2/bin/start-cluster.sh 访问http://10.6.8.227:8081/

2.创建 两个mysql 数据库

docker run -p 13306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysqldocker run -p 23306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

初始化mysql 表结构

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);

在源库中插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");

3.CDC 步骤

启动 /opt/flink/flink-1.17.2/bin/sql-client.sh
只能一条语句一条语句的执行

CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '13306','username' = 'root','password' = 'mysql','database-name' = 'mydb','table-name' = 'products');CREATE TABLE sink_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:23306/mydb?serverTimezone=Asia/Shanghai','username' = 'root','password' = 'mysql','table-name' = 'sink_products');insert into sink_products select * from products;

4.验证

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

参考文档

http://124.220.104.235/web/chatgpt

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/221368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

抖捧自动直播是什么,系统功能讲解

目前有在做实体行业级商家服务的老板 你还在为不会直播&#xff0c;不敢直播而苦恼吗&#xff1f; 你还在为想做直播&#xff0c;但没空开直播而焦灼吗&#xff1f; 今天&#xff0c;你的问题都可以统统解决 实体行业直播必备黑科技&#xff1a;抖捧AI自动直播 只需要一部手…

使用阿里巴巴同步工具DataX实现Mysql与ElasticSearch数据同步

一、Linux环境要求 二、准备工作 2.1 Linux安装jdk 2.2 linux安装python 2.3 下载DataX&#xff1a; 三、DataX压缩包导入&#xff0c;解压缩 四、编写同步Job 五、执行Job 六、定时更新 6.1 创建定时任务 6.2 提交定时任务 6.3 查看定时任务 七、增量更新思路 一、Linux环境要…

C#教程(一):面向对象

1、介绍 C#是一种多范式编程语言&#xff0c;但其中一个主要的编程范式是面向对象编程&#xff08;OOP&#xff09;。面向对象编程有一些特点&#xff0c;而C#提供了丰富的功能来支持这些特点。 2、面向对象特点 封装&#xff08;Encapsulation&#xff09;&#xff1a; 封装…

Nginx访问FTP服务器文件的时效性/安全校验

背景 FTP文件服务器在我们日常开发中经常使用&#xff0c;在项目中我们经常把FTP文件下载到内存中&#xff0c;然后转为base64给前端进行展示。如果excel中也需要导出图片&#xff0c;数据量大的情况下会直接返回一个后端的开放接口地址&#xff0c;然后在项目中对接口的参数进…

使用shell脚本将一台虚拟机上面数据分发到其他虚拟机上面xsync

目录 1&#xff0c;功能2&#xff0c;注意点3&#xff0c;shell脚本介绍4&#xff0c;bash内容 1&#xff0c;功能 使用shell脚本将一台虚拟机上面数据分发到其他虚拟机上面。 2&#xff0c;注意点 需要修改的地方&#xff1a;hadoop250 hadoop251 hadoop252 hadoop253 hado…

魔搭社区上线Mistral AI 首个开源 MoE 模型 Mixtral8x7B

Mistral AI 近日发布了首个开源 MoE 模型 Mixtral8x7B&#xff0c;并宣布在魔搭社区上线。 Mixtral-8x7B 是一款混合专家模型&#xff08;Mixtrue of Experts&#xff09;&#xff0c;由8个拥有70亿参数的专家网络组成&#xff0c;在能力上&#xff0c;Mixtral-8x7B 支持32k t…

美颜技术讲解:视频美颜SDK的开发与集成

如今&#xff0c;美颜技术的应用愈发成为吸引用户的一项重要功能。本文将深入探讨视频美颜SDK的开发与集成&#xff0c;揭示其背后的技术原理和实现步骤。 一、美颜技术的背后 美颜技术并非仅仅是简单的滤镜效果&#xff0c;而是一项涉及复杂图像处理和算法的技术。在视频美颜…

wordpress 修改社交图标

要去掉标记的图标&#xff0c;死活找不到在那里配置。后来找到了&#xff0c;下图&#xff08;wordpress 小白&#xff0c;特此记录&#xff09;

安装python第三方库后,在pycharm中不能正常导入

python小白学习opencv&#xff0c;使用pip安装完opencv库后import cv2报错&#xff0c;按照如下设置解决&#xff1a; 需要正确设置python解释器路径

Python内置类属性`__cmp__`属性的使用教程

概要 在Python中&#xff0c;__cmp__属性是一个特殊的方法&#xff0c;用于自定义类的实例之间的比较方式。深入了解和熟练运用这一特性&#xff0c;可以使自定义类更加灵活和强大。本教程将详细介绍__cmp__的基本概念、高级用法以及一些注意事项&#xff0c;通过丰富的示例代…

跨境电商群发消息工具定制贵吗?

随着全球电子商务的快速发展&#xff0c;跨境电商已经成为了一种新兴的商业形式。 为了能够更好地与海外客户沟通&#xff0c;许多卖家开始寻找跨境电商群发消息工具&#xff0c;那么&#xff0c;这些工具的定制费用是否昂贵呢? 首先&#xff0c;我们需要明确一点&#xff1…

在线免费压缩pdf文件

在线免费压缩pdf文件&#xff0c;不用登陆哦&#xff0c; https://www.ilovepdf.com/ https://online2pdf.com/#

IP属地变化背后的原因

随着互联网的普及和技术的不断发展&#xff0c;IP属地变化的现象越来越受到人们的关注。近日&#xff0c;有网友发现自己的IP属地发生了变化&#xff0c;引发了广泛讨论。那么&#xff0c;IP属地为什么会发生变化呢&#xff1f; 首先&#xff0c;网络环境的变化是导致IP属地变化…

uniapp 之 图片 视频 文件上传

<view class"" style"padding: 24rpx 0"><text>相关资料 <text class"fs-26 color-666">&#xff08;图片、视频、文档不超过9个&#xff09;</text> </text><view class"flex align-center" style&…

亚马逊云科技re_Invent 2023产品体验:亚马逊云科技产品应用实践 王炸产品Amazon Q,你的AI助手

本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 意料之中 2023年9月25日&#xff0c;亚马逊宣布与 Anthropic 正式展开战略合作&#x…

Globalsign—— SSL证书中的LV

一&#xff1a;SSL证书 SSL 证书可以实现网站的 https 加密&#xff0c;保证从客户端到服务端传输的数据是加密的。越来越多的网络信息泄露事件也给我们敲响了警钟&#xff0c;信息安全不容小觑。网站建设者们也应该要把网络信息安全放在首位&#xff0c;给网站部署 SSL …

第一个“hello Android”程序

1、首先安装Android studio&#xff08;跳过&#xff09; Android Studio是由Google推出的官方集成开发环境&#xff08;IDE&#xff09;&#xff0c;专门用于Android应用程序的开发。它是基于JetBrains的IntelliJ IDEA IDE构建的&#xff0c;提供了丰富的功能和工具&#xff0…

8V-24V升降12V2A升降压芯片WT3205

8V-24V升降12V2A升降压芯片WT3205 WT3205是一款专为升压开关电源设计的DC-DC直流转换控制器。 WT3205的输入电压范围是5V至32v&#xff0c;电路元器件少&#xff0c;应用简单。WT3205采用固定频率的PWM控制方式&#xff0c;330KHz的振荡器&#xff0c;电流模式控制单元&#x…

_pickle.PicklingError: Can‘t pickle : import of module failed

有问题 没问题的 python - pickle cant import a module that exists? - Stack Overflow

Apifox 最新更新:迭代分支功能上线、在线文档支持多格式导出!

Apifox 新版本上线啦&#xff01; 看看本次版本更新主要涵盖的重点内容&#xff0c;有没有你所关注的功能特性&#xff1a; HTTP 项目新增「迭代分支」功能支持通过数据库表直接生成 API 文档的数据模型在线文档支持多方式导出用户反馈问题优化 数据库支持「测试连接」保持自…