【Flink CDC(一)】实现mysql整表与增量读取

文章目录

  • 一. 运行前准备
    • 1. 依赖
      • 1.1. Maven dependency
      • 1.2. SQL Client JAR(推荐)
    • 2. 配置 MySQL 服务器(必须)
  • 二. 功能说明
    • 1. 启动模式
    • 2. 全量阶段支持 checkpoint
    • 3. 关于无主键表
    • Exactly-Once 处理
  • 三. 实战
    • 1. 实现mysql整表与增量表同步
  • FAQ

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。

 

一. 运行前准备

1. 依赖

1.1. Maven dependency

<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 --><version>2.4.0</version>
</dependency>

 

1.2. SQL Client JAR(推荐)

下载 flink-sql-connector-mysql-cdc-2.4.0.jar 到 <FLINK_HOME>/lib/ 目录下。

 

2. 配置 MySQL 服务器(必须)

你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

# 创建用户
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';# 赋权
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';# 刷新权限
mysql> FLUSH PRIVILEGES;

注意:

scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

 

二. 功能说明

1. 启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog

  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
    的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改

  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

MySQLSource.builder().startupOptions(StartupOptions.earliest()) // 从最早位点启动.startupOptions(StartupOptions.latest()) // 从最晚位点启动.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动.startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动....build()CREATE TABLE mysql_source (...) WITH ('connector' = 'mysql-cdc','scan.startup.mode' = 'earliest-offset', -- 从最早位点启动'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动'scan.startup.mode' = 'specific-offset', -- 从特定位点启动'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合'scan.startup.mode' = 'timestamp', -- 从特定位点启动'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳...
)

 

2. 全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

 

3. 关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。

  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:

  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

 

Exactly-Once 处理

MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

 

三. 实战

1. 实现mysql整表与增量表同步

-- 'scan.startup.mode'= 'initial' 
-- 
CREATE TABLE tjy_sql1  
(  `id` int,  `name` string,  `face` string  ,PRIMARY KEY(id) NOT ENFORCED  
) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'xxx',  'port' = '3306',  'username' = 'middle_test',  'password' = '123456',  'database-name' = 'middle_test',  'table-name' = 'tjy_fortest1'  -- ,'scan.incremental.snapshot.enabled' = 'false'  --  initial: 默认值,全表同步,然后进行增量同步;--  'scan.startup.mode'= 'initial'  -- 'debezium.snapshot.mode' = 'initial'      );  CREATE TABLE tjy_sql1_sink  (  `id` int,  `name` string,  `face` string  ,PRIMARY KEY(id) NOT ENFORCED  ) WITH (  'connector' = 'mysql-x',  'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',  'username' = 'middle_test',  'password' = '123456',  'table-name' = 'flink_type',  'table-name' = 'tjy_fortest2'  );  insert into tjy_sql1_sink select * from tjy_sql1;

 

FAQ

相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

可能涉及到的问题

在这里插入图片描述

 

参考:
官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/705894.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何用生成式AI创建食谱,解决五岁孩童挑食问题?

如何处理孩子挑食问题&#xff0c;对父母来说可能是一个挑战。这需要耐心、创造力和策略的结合。在深入具体策略之前&#xff0c;了解五岁儿童的口味偏好仍在发展中&#xff0c;他们的饮食行为受多种因素影响&#xff0c;包括气质、接触不同类型食物的程度以及父母对饮食的态度…

【ArcGIS】利用DEM进行水文分析:流向/流量等

利用DEM进行水文分析 ArcGIS实例参考 水文分析通过建立地表水文模型&#xff0c;研究与地表水流相关的各种自然现象&#xff0c;在城市和区域规划、农业及森林、交通道路等许多领域具有广泛的应用。 ArcGIS实例 某流域30m分辨率DEM如下&#xff1a; &#xff08;1&#xff09…

微服务学习

一、服务注册发现 服务注册就是维护一个登记簿&#xff0c;它管理系统内所有的服务地址。当新的服务启动后&#xff0c;它会向登记簿交待自己的地址信息。服务的依赖方直接向登记簿要Service Provider地址就行了。当下用于服务注册的工具非常多ZooKeeper&#xff0c;Consul&am…

【深入理解设计模式】装饰者设计模式

装饰者设计模式 装饰者设计模式&#xff08;Decorator Design Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许向现有对象添加新功能而不改变其结构。这种模式通常用于需要动态地为对象添加功能或行为的情况&#xff0c;而且这些功能可以独立于对象本身来进行扩展…

Selenium IDE插件录制网页,解放双手

1、 国内下载地址 https://www.crx4chrome.com/crx/77585/ &#xff0c;这个网络正常基本可以下载&#xff0c;目前最新版本是3.17.2。 点击Crx4Chrome下载。下载后的文件名称是&#xff1a;mooikfkahbdckldjjndioackbalphokd-3.17.2-Crx4Chrome.com.crx。 2、 安装 直接打开…

探索创造无限可能——Autodesk AutoCAD 2022(CAD 2022)系统要求

随着科技的不断进步和发展&#xff0c;计算机辅助设计&#xff08;CAD&#xff09;已经成为现代设计行业中不可或缺的一部分。在众多CAD软件中&#xff0c;Autodesk AutoCAD 2022&#xff08;CAD 2022&#xff09;无疑是最受欢迎和广泛应用的一款软件。作为一款全球领先的CAD软…

sql 行列互换

在SQL中进行行列互换可以使用PIVOT函数。下面是一个示例查询及其对应的结果&#xff1a; 创建测试表格 CREATE TABLE test_table (id INT PRIMARY KEY,name VARCHAR(50),category VARCHAR(50) );向测试表格插入数据 INSERT INTO test_table VALUES (1, A, Category A); INSE…

某电力铁塔安全监测预警系统案例分享

项目概述 电力铁塔是承载电力供应的重要设施&#xff0c;它的安全性需要得到可靠的保障。但是铁塔一般安装在户外&#xff0c;分布广泛&#xff0c;且有很多安装在偏远地区&#xff0c;容易受到自然、人力的影响和破环。因此需要使用辅助的方法实时监控铁塔的安全状态&#xff…

计算机设计大赛 深度学习大数据物流平台 python

文章目录 0 前言1 课题背景2 物流大数据平台的架构与设计3 智能车货匹配推荐算法的实现**1\. 问题陈述****2\. 算法模型**3\. 模型构建总览 **4 司机标签体系的搭建及算法****1\. 冷启动**2\. LSTM多标签模型算法 5 货运价格预测6 总结7 部分核心代码8 最后 0 前言 &#x1f5…

office word保存pdf高质量设置

1 采用第三方pdf功能生成 分辨率越大质量越好

MySQL集群 双主架构(配置命令)

CSDN 成就一亿技术人&#xff01; 今天刚开学第一天给大家分享一期&#xff1a;MySQL集群双主的配置需求和命令 CSDN 成就一亿技术人&#xff01; 神秘泣男子主页&#xff1a;作者首页 <———— MySQL专栏 &#xff1a;MySQL数据库专栏<———— MySQL双主是一…

Oracle 基础表管理(Heap-Organized Table Management)

表是数据库中负责数据存储的对象&#xff0c;在RDBMS中&#xff0c;数据以行、列的形式存储在表中。Oracle中表有很多种类型&#xff0c;最基础且应用最常用的类型就是堆表&#xff08;Heap-Organized Table&#xff09;&#xff0c;本文列举了Oracle堆表的常用管理操作。 一、…

pytorch --反向传播和优化器

1. 反向传播 计算当前张量的梯度 Tensor.backward(gradientNone, retain_graphNone, create_graphFalse, inputsNone)计算当前张量相对于图中叶子节点的梯度。 使用反向传播&#xff0c;每个节点的梯度&#xff0c;根据梯度进行参数优化&#xff0c;最后使得损失最小化 代码…

React Hooks概述及常用的React Hooks介绍

Hook可以让你在不编写class的情况下使用state以及其他React特性 useState ● useState就是一个Hook ● 通过在函数组件里调用它来给组件添加一些内部state,React会在重复渲染时保留这个state 纯函数组件没有状态&#xff0c;useState()用于设置和使用组件的状态属性。语法如下…

Qt的QThread、QRunnable和QThreadPool的使用

1.相关描述 随机生产1000个数字&#xff0c;然后进行冒泡排序与快速排序。随机生成类继承QThread类、冒泡排序使用moveToThread方法添加到一个线程中、快速排序类继承QRunnable类&#xff0c;添加到线程池中进行排序。 2.相关界面 3.相关代码 widget.cpp #include "widget…

实验室储样瓶耐强酸强碱PFA材质试剂瓶适用新材料半导体

PFA&#xff0c;全名可溶性聚四氟乙烯&#xff0c;试剂瓶又叫取样瓶、样品瓶、广口瓶、储样瓶等。主要用于痕量分析、同位素分析等实验室&#xff0c;广泛应用于新兴的半导体、新材料、多晶硅、硅材、微电子等行业。 规格参考&#xff1a;30ml、60ml、100ml、125ml、250ml、30…

C++笔记之执行一个可执行文件时指定动态库所存放的文件夹lib的路径

C++笔记之执行一个可执行文件时指定动态库所存放的文件夹lib的路径 参考博文: 1.C++笔记之执行一个可执行文件时指定动态库所存放的文件夹lib的路径 2.Linux笔记之LD_LIBRARY_PATH详解 3.qt-C++笔记之使用QProcess去执行一个可执行文件时指定动态库所存放的文件夹lib的路径 c…

如何将本地项目上传到github上

将本地项目上传到github上有很多种方法&#xff0c;这里只讲述我认为最简单快捷的一种&#xff0c;先在github中创建一个仓库&#xff0c;接着在本地建文件夹&#xff0c;用命令行将项目推送到本地仓库&#xff0c;然后连接远程仓库&#xff0c;将本地项目推送到远程仓库上。要…

时间序列分析实战(四):Holt-Winters建模及预测

&#x1f349;CSDN小墨&晓末:https://blog.csdn.net/jd1813346972 个人介绍: 研一&#xff5c;统计学&#xff5c;干货分享          擅长Python、Matlab、R等主流编程软件          累计十余项国家级比赛奖项&#xff0c;参与研究经费10w、40w级横向 文…

Jessibuca 插件播放直播流视频

jessibuca官网&#xff1a;http://jessibuca.monibuca.com/player.html git地址&#xff1a;https://gitee.com/huangz2350_admin/jessibuca#https://gitee.com/link?targethttp%3A%2F%2Fjessibuca.monibuca.com%2F 项目需要的文件 1.播放组件 <template ><div i…