Flink-CDC 全面解析

Flink-CDC 全面解析

一、CDC 概述

(一)什么是 CDC

CDC 即 Change Data Capture(变更数据获取),其核心要义在于严密监测并精准捕获数据库内发生的各种变动情况,像数据的插入、更新以及删除操作,还有数据表相关的变动等,都会被它一一察觉。并且它会严格按照这些变动实际发生的先后顺序,毫无遗漏地完整记录下来,随后将这些变更信息写入到消息中间件里,方便其他服务按需进行订阅与消费。形象地说,它就像是数据库的“贴身管家”,时刻留意着数据库的任何“风吹草动”,一旦有变化,立马就能获取到相应信息。

(二)CDC 的种类

CDC 主要划分为基于查询和基于 Binlog 这两种方式,下面来看看它们之间的差别:

  • 搭建 mysql 集群:在搭建时可以选择使用阿里巴巴的 mycat 来实现分库分表功能,以此更好地管理和扩展数据库架构。
  • Canal:当我们想要知晓数据库中某个表的变动情况时,Canal 就能派上用场了,它的原理是依托 mysql 的 binlog,通过解析 binlog 来获取表的变更信息。例如,要是没有 CDC 的话,若想在大屏幕上实时展示订单的统计数据,那就得利用 Canal 去读取 mysql 里的实时订单数据,然后传递给 kafka,再由 kafka 把相关信息发送给 Flink;而要是有了 CDC,Flink 就能直接检测到 mysql 数据的变化,进而得出各项指标了。值得一提的是,CDC 底层其实内置了一个软件叫 debezium。

(三)Flink-CDC

Flink 社区精心打造了 flink-cdc-connectors 组件,这可是个功能强大的 source 组件,它具备直接从 MySQL、PostgreSQL 等数据库读取全量数据以及增量变更数据的能力,极大地拓展了 Flink 与数据库交互的便捷性。而且 Flink 还有诸如 mysql、kafka、hbase、cdc 等多种连接器,其触发器方面,默认大多是基于时间的,像 eventTime、procssingTime 等,当然也支持自定义触发器,比如在智慧交通项目中就有过相关应用。目前这个组件已经开源,开源地址为 https://github.com/ververica/flink-cdc-connectors ,方便广大开发者使用和贡献代码。

二、Flink CDC 案例实操

(一)DataStream 方式的应用

  1. 导入依赖:详细的依赖导入可参考 https://blog.csdn.net/mynameisgt/article/details/125826905 这个链接内容。要是启动时报错了,也不用慌,解决方案可查看 https://blog.csdn.net/qq_27721169/article/details/132151345 ,一般来说,就是修改 mysql 的驱动包的版本就行。不过要测试相关代码,还得先开启 mysql 的 binlog 日志,具体操作就是开启 MySQL Binlog 并重启 MySQL。
  2. 编写代码:代码编写完成后,接着要进行建库、建表操作,随后开启 mysql 的 binlog,具体可参照相应文档来执行。
  3. 案例测试
    • 打包并上传至 Linux:将写好的代码打包好,然后上传到 Linux 系统中,为后续部署做准备。
    • 开启 MySQL Binlog 并重启 MySQL:再次强调这个操作的重要性,同时要查看 binlog 数据最新的大小,观察其前后变化情况,方便后续验证数据变更捕获是否准确。
    • 创建一个表:创建一个带有随便几个字段的表,用于测试 Flink-CDC 对数据变更的捕获功能。
    • 启动 Flink 集群:让 Flink 集群运行起来,为处理数据变更提供运行环境。
    • 启动 HDFS 集群:启动 HDFS 集群,保障数据存储等相关功能的正常运行。
    • 启动程序:正式启动编写好的 Flink-CDC 程序,开始检测数据变更情况。
    • 在 MySQL 的 cdc_test.z_user_info 表中添加、修改或者删除数据:人为地在指定表中制造数据变更,以此来检验 Flink-CDC 是否能准确捕获到这些变化。
    • 给当前的 Flink 程序创建 Savepoint:执行 bin/flink savepoint JobId hdfs://bigdata01:9820/flink/save 命令,创建 Savepoint,方便后续程序重启等操作时恢复状态。
    • 关闭程序以后从 Savepoint 重启程序:通过 bin/flink run -s hdfs://bigdata01:9820/flink/save/... -c com.bigdata.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar 命令,实现从 Savepoint 重启程序,验证程序的稳定性和数据处理的连贯性。

(二)自定义反序列化器

  1. 代码实现:在 Flink-CDC 中,有像 Canal、Maxwell 等相关总结内容。而且代码版本和 sql 版本存在一定区别:
    • 版本支持:代码版本的 Flink 在 1.12 和 1.13 版本都支持相关操作,然而 sql 版本的 Flink 只有到 1.13 版本才支持。
    • 监听范围:代码版本支持一次监听多个数据库以及多个表,功能更为强大;而 sql 版本则只支持单库单表的监听。
    • 反序列化器:sql 版本中无需进行自定义反序列化器,相对简洁;但代码版本就需要自定义反序列化器了,当然,也可以选择不定义,根据具体业务场景和需求来决定。

(三)FlinkSQL 方式的应用

  1. 代码实现:在代码实现过程中,有一些需要特别留意的“坑”:
    • jar 包版本问题:在 maven 中,各个 jar 包之间的版本有可能出现不兼容等问题,比如可能会出现 java.lang.NoSuchMethodError:scala.Predef$.refArrayOps 这样的错误,需要仔细排查和调整 jar 包版本。
    • 主键问题:在 FlinkSQL 里,如果创建的表没有主键,尤其是在 Flink 1.13 版本之后,会遇到 The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled', default: true (fallback keys: [])' to 'true' 这样的限制,所以创建表的时候一定要记得加上主键。详细的常见 SQL 错误内容可参考 https://help.aliyun.com/zh/flink/support/common-sql-errors?spm=a2c4g.11186623.0.i32#section-9oq-z7x-sq0 。

以下是一段示例代码:

package com.bigdata.cdc;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @基本功能:* @program:FlinkProject* @author: 闫哥* @create:2024-06-13 11:01:11**/
public class CdcSQLTest {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表对象//3. 编写sql语句//4. 将Table变为stream流tenv.executeSql("CREATE TABLE user_info2 (\n" +" id INT NOT NULL primary key,\n" +" name STRING,\n" +" age int\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'bigdata01',\n" +" 'port' = '3306',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'scan.startup.mode' = 'latest-offset', " +" 'database-name' = 'cdc_test',\n" +" 'table-name' = 'user_info'\n" +")");tenv.executeSql("select * from user_info2").print();Table table = tenv.sqlQuery("select * from user_info2");DataStream<Tuple2<Boolean, Row>> retractStream = tenv.toRetractStream(table, Row.class);retractStream.print();//5. execute-执行env.execute();}
}

在这段代码中,首先是准备 Flink 的运行环境,设置好运行模式以及并行度等基础参数,接着获取 StreamTableEnvironment 对象,用于后续的 SQL 操作。然后创建了名为 user_info2 的表,定义了表结构以及相关的连接配置信息,如连接的数据库、用户名、密码等,通过执行 SQL 查询语句并将结果转换为流的形式进行输出,最后执行整个程序,实现基于 FlinkSQL 方式对 Flink-CDC 的应用实践。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件系统安全逆向分析-混淆对抗

1. 概述 在一般的软件中&#xff0c;我们逆向分析时候通常都不能直接看到软件的明文源代码&#xff0c;或多或少存在着混淆对抗的操作。下面&#xff0c;我会实践操作一个例子从无从下手到攻破目标。 花指令对抗虚函数表RC4 2. 实战-donntyousee 题目载体为具有漏洞的小型软…

#渗透测试#网络安全# 一文了解什么是跨域CROS!!!

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

ClickHouse vs StarRocks 选型对比

一、面向列存的 DBMS 新的选择 Hadoop 从诞生已经十三年了&#xff0c;Hadoop 的供应商争先恐后的为 Hadoop 贡献各种开源插件&#xff0c;发明各种的解决方案技术栈&#xff0c;一方面确实帮助很多用户解决了问题&#xff0c;但另一方面因为繁杂的技术栈与高昂的维护成本&…

Win11家庭版转专业版

Win11家庭版转专业版&#xff08;亲测有效&#xff09; 第一步 【断网】输入这个密钥&#xff1a; R8NJ8-9X7PV-C7RCR-F3J9X-KQBP6 第二步 点击下一步会自动重启 第三步 【联网】输入这个密钥&#xff1a; F3NWX-VFMFC-MHYYF-BCJ3K-QV66Y 注意 两次输入密钥的地方一致 …

IP 地址与蜜罐技术

基于IP的地址的蜜罐技术是一种主动防御策略&#xff0c;它能够通过在网络上布置的一些看似正常没问题的IP地址来吸引恶意者的注意&#xff0c;将恶意者引导到预先布置好的伪装的目标之中。 如何实现蜜罐技术 当恶意攻击者在网络中四处扫描&#xff0c;寻找可入侵的目标时&…

【Word_笔记】Word的修订模式内容改为颜色标记

需求如下&#xff1a;请把修改后的部分直接在原文标出来&#xff0c;不要采用修订模式 步骤1&#xff1a;打开需要转换的word后&#xff0c;同时按住alt和F11 进入&#xff08;Microsoft Visual Basic for Appliations&#xff09; 步骤2&#xff1a;插入 ---- 模块 步骤3&…

[0405].第05节:搭建Redis主从架构

Redis学习大纲 一、3主3从的集群配置&#xff1a; 1.1.集群规划 1.分片集群需要的节点数量较多&#xff0c;这里我们搭建一个最小的分片集群&#xff0c;包含3个master节点&#xff0c;每个master包含一个slave节点&#xff0c;结构如下&#xff1a; 2.每组是一主一从&#x…

科研绘图系列:R语言绘制分组箱线图(boxplot)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载导入数据数据预处理画图输出系统信息介绍 科研绘图系列:R语言绘制分组箱线图(boxplot) 加载R包 library(ggpubr) library(ggplot2) library(tidyverse) # dev…

Hadoop - MapReduce编程

文章目录 前言一、创建mapreduce-demo项目1. 在idea上创建maven项目2. 导入hadoop相关依赖 二、MapReduce编程1. 相关介绍1.1 驱动类&#xff08;Driver Class&#xff09;1.1.1 驱动类的定义1.1.2 驱动类的功能1.1.3 驱动类的作用 1.2 Mapper1.2.1 Mapper 的定义1.2.2 Mapper …

原码的乘法运算>>>只有0,1

MQ : 乘数 X : 被乘数 ACC : 乘积高位 [当前位是1,加上被乘数; 当前位是 0,加上0] 例如: MQ的最低位是1,所以要加上被乘数(01101) >>>> 得出 01101 >>>>> ACC MQ 需要整体逻辑右移 (原本01101 01011 >>> 001101 0101) 现在的次低位是…

mapbox基础,style样式汇总,持续更新

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;mapbox 从入门到精通 文章目录 一、&#x1f340;前言二、&#x1f340;根属性2.1 so…

人工智能知识分享第九天-机器学习_集成学习

集成学习 概念 集成学习是机器学习中的一种思想&#xff0c;它通过多个模型的组合形成一个精度更高的模型&#xff0c;参与组合的模型称为弱学习器&#xff08;基学习器&#xff09;。训练时&#xff0c;使用训练集依次训练出这些弱学习器&#xff0c;对未知的样本进行预测时…

页面滚动下拉时,元素变为fixed浮动,上拉到顶部时恢复原状,js代码以视频示例

页面滚动下拉时,元素变为fixed浮动js代码 以视频示例 <style>video{width:100%;height:auto}.div2,#float1{position:fixed;_position:absolute;top:45px;right:0; z-index:250;}button{float:right;display:block;margin:5px} </style><section id"abou…

排序算法——堆排序

什么是堆 堆就是一种特殊的二叉树&#xff0c;他有以下特点&#xff1a; 堆中某个节点的值总是不大于或不小于其父节点的值&#xff1b; 堆总是一棵完全二叉树。 堆又可以分为大根堆和小根堆 大根堆&#xff1a;根节点最大&#xff0c;每个节点都小于或等于父节点 小跟堆&am…

K-means算法在无监督学习中的应用

K-means算法在无监督学习中的应用 K-means算法是一种典型的无监督学习算法&#xff0c;广泛用于聚类分析。在无监督学习中&#xff0c;模型并不依赖于标签数据&#xff0c;而是根据输入数据的特征进行分组。K-means的目标是将数据集分成K个簇&#xff0c;使得同一簇内的数据点…

Linux 35.6 + JetPack v5.1.4之 pytorch升级

Linux 35.6 JetPack v5.1.4之 pytorch升级 1. 源由2. 升级步骤1&#xff1a;获取二进制版本步骤2&#xff1a;安装二进制版本步骤3&#xff1a;获取torchvision步骤4&#xff1a;安装torchvision步骤5&#xff1a;检查安装版本 3. 使用4. 补充4.1 torchvision版本问题4.2 支持…

Spring——自动装配

假设一个场景&#xff1a; 一个人&#xff08;Person&#xff09;有一条狗&#xff08;Dog&#xff09;和一只猫(Cat)&#xff0c;狗和猫都会叫&#xff0c;狗叫是“汪汪”&#xff0c;猫叫是“喵喵”&#xff0c;同时人还有一个自己的名字。 将上述场景 抽象出三个实体类&…

TCP与DNS的报文分析

场景拓扑&#xff1a; 核心路由配置&#xff1a; 上&#xff08;DNS&#xff09;&#xff1a;10.1.1.1/24 下(WEB)&#xff1a;20.1.1.1/24 左&#xff08;client&#xff09;&#xff1a;192.168.0.1/24 右(PC3)&#xff1a;192.168.1.1/24Clint2配置&a…

PWR-STM32电源控制

一、原理 睡眠模式不响应其他操作&#xff0c;比如烧写程序&#xff0c;烧写时按住复位键松手即可下载&#xff0c;在禁用JTAG也可如此烧写程序。 对于低功耗模式可以通过RTC唤醒、外部中断唤醒、中断唤醒。 1、电源框图&#xff1a; VDDA主要负责模拟部分的供电、Vref和Vref-…

WebSocket 测试入门篇

Websocket 是一种用于 H5 浏览器的实时通讯协议&#xff0c;可以做到数据的实时推送&#xff0c;可适用于广泛的工作环境&#xff0c;例如客服系统、物联网数据传输系统&#xff0c; 基础介绍 我们平常接触最多的是 http 协议的接口&#xff0c;http 协议是请求与响应的模式&…