【大数据】Flink CDC 的概览和使用

Flink CDC 的概览和使用

  • 1.什么是 CDC
  • 2.什么是 Flink CDC
  • 3.Flink CDC 前生今世
    • 3.1 Flink CDC 1.x
    • 3.2 Flink CDC 2.x
    • 3.3 Flink CDC 3.x
  • 4.Flink CDC 使用
  • 5.Debezium 标准 CDC Event 格式详解

1.什么是 CDC

CDCChange Data Capture数据变更抓取)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

CDC 可以捕获数据库中的以下类型的数据变化:

  • ✅ 插入(Insert):当新数据被插入到数据库表中时。
  • ✅ 更新(Update):当数据库表中的现有数据被修改时。
  • ✅ 删除(Delete):当数据从数据库表中被删除时。

2.什么是 Flink CDC

Flink CDC 是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现 数据同步数据管道实时分析实时应用 等功能。

本质上是一系列的 Flink Source Connector 集合,用于来获取数据库的实时变更,底层基于 Debezium 实现。

🚀 https://github.com/ververica/flink-cdc-connectors

3.Flink CDC 前生今世

3.1 Flink CDC 1.x

Flink CDC 1.x 开启了 Flink 在 CDC 上的实践之路,Flink CDC 1.x 第一次引入了 Debezium 框架,利用 Debezium 已有的能力将数据库实时变更接入到 Flink 流计算框架中,利用 Flink 丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC 1.x 只能说是可以用,但不能生产上用,为什么:

  • 1.x 版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态。
  • 各方面功能还不够完善,比如自动加表、DDL 事件传递等。

在这里插入图片描述

总体而言 Flink CDC 1.x 只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。

在这里插入图片描述

3.2 Flink CDC 2.x

2.x 版本中,Flink CDC 引入了 Netfix DBLog 中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于 FLIP-27 对 Flink Source API 的重构,Flink CDC 也基于 FLIP-27 升级到了新的框架设计,至此,Flink CDC 被大规模公司使用并投入到生产中。

在这里插入图片描述

3.3 Flink CDC 3.x

近期,Flink CDC 发布了全新的 3.0 版本,并宣布捐赠回 Flink 主项目,在新的 3.0 版本中,Flink CDC 对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的 Flink Source Connector 转变为了 Data Integration Engine,未来将与 SeaTunnelDataXChunjun 等一系列老牌数据集成项目同台竞技,让我们拭目以待。

在这里插入图片描述

4.Flink CDC 使用

在本地启动一个 MySQL 的 Docker 环境。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4

创建表:

create database cdc_test;
use cdc_test;create table cdc_table (id int primary key auto_increment,name varchar(1000),age int
);

在 IDEA 中新建一个Java 项目。

导入依赖:

<flink-cdc.version>2.4.2</flink-cdc.version>
<flink.version>1.16.3</flink.version>
<logback.version>1.2.7</logback.version><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version>
</dependency>

编写代码:

public class FlinkCDCApplication {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(60000L);MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*"..tableList("cdc_test.cdc_table") // set captured table.username("root").password("debezium").includeSchemaChanges(true).startupOptions(StartupOptions.latest()).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC").print();env.execute();}
}

添加日志配置:

<!--~ Licensed to the Apache Software Foundation (ASF) under one or more~ contributor license agreements.  See the NOTICE file distributed with~ this work for additional information regarding copyright ownership.~ The ASF licenses this file to You under the Apache License, Version 2.0~ (the "License"); you may not use this file except in compliance with~ the License.  You may obtain a copy of the License at~~    http://www.apache.org/licenses/LICENSE-2.0~~ Unless required by applicable law or agreed to in writing, software~ distributed under the License is distributed on an "AS IS" BASIS,~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.~ See the License for the specific language governing permissions and~ limitations under the License.--><configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern></encoder></appender><root level="INFO"><appender-ref ref="STDOUT" /></root>
</configuration>

5.Debezium 标准 CDC Event 格式详解

{"before": null,"after": {"id": 1,"name": "xing.yu","age": 26,"new_column": "dewu"},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1702723640000,"snapshot": "false","db": "cdc_test","sequence": null,"table": "cdc_table","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 2394,"row": 0,"thread": 39,"query": null},"op": "c","ts_ms": 1702723640483,"transaction": null
}
{// 表数据更新前的值,update/delete"before": {},// 表数据更新后的值,create/update"after": {},// 元数据信息"source": {},// 操作类型 c/d/u"op": "",// 记录解析时间"ts_ms": "","transaction": ""
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/603556.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

专业级的渗透测试服务,助力航空业数字化安全启航

​某知名航空公司是中国首批民营航空公司之一&#xff0c;运营国内外航线200多条&#xff0c;也是国内民航最高客座率的航空公司之一。在数字化发展中&#xff0c;该航空公司以数据驱动决策&#xff0c;通过精细化管理、数字创新和模式优化等方式&#xff0c;实现了精准营销和个…

k8s之flink的几种创建方式

在此之前需要部署一下私人docker仓库&#xff0c;教程搭建 Docker 镜像仓库 注意&#xff1a;每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启 一、session 模式 Session 模式是指在 Kubernetes 上启动一个共享的…

智慧旅游景区解决方案:PPT全文49页,附下载

关键词&#xff1a;智慧景区建设&#xff0c;智慧旅游平台&#xff0c;智慧旅游运营检测系统项目&#xff0c;智慧文旅&#xff0c;智慧景区开发与管理&#xff0c;智慧景区建设核心&#xff0c;智慧景区开发与管理 一、智慧景区建设现状 1、基础设施建设&#xff1a;智慧景区…

推荐收藏!万字长文带入快速使用 keras

这些年&#xff0c;有很多感悟&#xff1a;一个人精力是有限的&#xff0c;一个人视野也有有限的&#xff0c;你总会不经意间发现优秀人的就在身边。 看我文章的小伙伴应该经常听我说过的一句话&#xff1a;技术要学会交流、分享&#xff0c;不建议闭门造车。一个人可以走的很…

Keil5,ARM编译器 软件优化注意事项

优化C代码中的环路终止 循环是大多数程序中的常见结构。由于大量的执行时间通常花费在循环中&#xff0c;因此值得关注时间关键循环。 如果不谨慎地编写&#xff0c;环路终止条件可能会导致大量开销。在可能的情况下&#xff1a; 使用简单的终止条件。 写入倒计时到零循环。…

MySQL三种常见存储引擎【理论】【需动手操作】

先放一个大佬的博客 等以后有时间按大佬写的 动手操作一下 链接 MySOL 的存储引擎是指 MySOL 数据库管理系统中用于处理数据存诸和检索的组件。 MySOL 常用的存储引擎有以下几个: InnoDB: InnoDB 是 MySQL(5.5)的默认存储引擎&#xff0c;支持事务处理、行级锁定和物理外键约…

2024年超详细的Python3学习路径规划

前言 基于Python3.5 1.第一阶段基础&#xff08;必须&#xff09; Python3 环境搭建Python3 基础语法Python3 基本数据类型Python3 数据类型转换Python3 解释器Python3 注释Python3 运算符Python3 数字(Number)Python3 字符串Python3 列表Python3 元组Python3 字典Python3 集…

dnSpy调试工具二次开发1-新增菜单

测试环境&#xff1a; window 10 visual studio 2019 版本号&#xff1a;16.11.15 .net framework 4.8 开发者工具包 下载 .NET Framework 4.8 | 免费官方下载 .net 5开发者工具包 下载 .NET 5.0 (Linux、macOS 和 Windows) 利用git拉取代码(源码地址&#xff1a;Gi…

启动IDEA报错,web servcer failed to start.port 8080 was already in use.

启动IDEA报错&#xff0c;web servcer failed to start.port 8080 was already in use. 问题现状 启动IDEA失败&#xff0c;端口被占用。 解决办法&#xff1a; 使用netstat -ano指令&#xff0c;查看端口占用情况 因为我是win11的系统&#xff0c;使用指令时出现如下提示。…

【IC设计】移位寄存器

目录 理论讲解背景介绍什么是移位寄存器按工作模式分类verilog语法注意事项 设计实例循环移位寄存器算术双向移位寄存器5位线性反馈移位寄存器伪随机码发生器3位线性反馈移位寄存器32位线性反馈移位寄存器串行移位寄存器&#xff08;打4拍&#xff09;双向移位寄存器&#xff1…

c语言题目之统计二级制数中1的个数

文章目录 题目一、方法1二、方法2三&#xff0c;方法3总结 题目 统计二进制数中1的个数 输入一行&#xff0c;输出一行 输入&#xff1a; 输入一个整数 输出&#xff1a; 输出存储在内存中二进制的1的个数 一、方法1 之前的文章中&#xff0c;小编写了有关于内存在二进制中的存…

Fiddler工具 — 8.会话列表(Session List)

1、会话列表说明 Fiddler抓取到的每条HTTP请求&#xff08;每一条称为一个session&#xff09;。 主要包含了请求的ID编号、状态码、协议、主机名、URL、内容类型、body大小、进程信息、自定义备注等信息。如下图&#xff1a; 说明&#xff1a; 名称含义#抓取HTTP Request的顺…

Ribbon相关问题及答案(2024)

1、Ribbon是什么&#xff0c;它在微服务架构中扮演什么角色&#xff1f; Ribbon是一个客户端负载均衡器&#xff0c;它在微服务架构中扮演着关键性的角色。Ribbon的设计理念是在客户端进行服务发现和负载均衡&#xff0c;这种方式不同于传统的通过中心化的负载均衡器&#xff…

YHZ018 Python 运算符优先级

资源编号&#xff1a;YHZ018 配套视频&#xff1a;https://www.bilibili.com/video/BV1zy4y1Z7nk?p19 YHZ018&#xff1a;运算符优先级 &#x1fabf; 运算符优先级 Python支持多种运算符&#xff0c;下表按照优先级从高到低的顺序列出了所有运算符。运算符的优先级决定了在表…

面试算法90:环形房屋偷盗

题目 一条环形街道上有若干房屋。输入一个数组表示该条街道上的房屋内财产的数量。如果这条街道上相邻的两幢房屋被盗就会自动触发报警系统。请计算小偷在这条街道上最多能偷取的财产的数量。例如&#xff0c;街道上5家的财产用数组[2&#xff0c;3&#xff0c;4&#xff0c;5…

js实现全选按钮,反选

点击全选按钮&#xff0c;下面的按钮全部选中&#xff1b;再次点击&#xff0c;全部取消选择。 点击下面的按钮时&#xff0c;检查下面的按钮是不是全部都选中&#xff0c;如果全部选中了&#xff0c;需要修改全选按钮的选中状态为ture。 全选反选 <!DOCTYPE html> <…

Linux系统IO—探索输入输出操作的奥秘

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;HEART BEAT—YOASOBI 2:20━━━━━━️&#x1f49f;──────── 5:35 &#x1f504; ◀️ ⏸ ▶️ ☰ …

c++之迭代器

目录 一、迭代器 二、几种常见的迭代器类型 三、使用迭代器时注意事项 一、迭代器 在C中&#xff0c;迭代器是一种用于遍历容器元素的对象。迭代器提供了一种通用的方式来访问各种不同类型的容器&#xff0c;如数组、向量、列表、集合和映射等。 使用迭代器可以避免直接操作…

三、Qt核心与Qt类库

一、Qt核心&#xff1a;元对象系统 1、Qt核心特点 Qt对标准C进行了扩展&#xff0c;引入了一些新的概念和功能元对象编译器&#xff08;MOC&#xff09;是一个预处理器&#xff0c;先将Qt的特性程序转为标准C程序&#xff0c;再由标准C编译器进行编译Qt为C语言增加的特性在Qt…

提升开发效率:npm包管理器的使用技巧

文章目录 一、npm简介二、npm的基本操作1. 安装Node.js和npm2. 创建和管理项目3. 安装依赖4. 卸载依赖5. 更新依赖 三、npm的高级特性1. 使用不同版本的依赖项2. 查看已安装的依赖项和它们的版本信息3. 运行脚本命令 《Node.js从入门到精通&#xff08;软件开发视频大讲堂&…