Flink-CDC 全面解析

Flink-CDC 全面解析

一、CDC 概述

(一)什么是 CDC

CDC 即 Change Data Capture(变更数据获取),其核心要义在于严密监测并精准捕获数据库内发生的各种变动情况,像数据的插入、更新以及删除操作,还有数据表相关的变动等,都会被它一一察觉。并且它会严格按照这些变动实际发生的先后顺序,毫无遗漏地完整记录下来,随后将这些变更信息写入到消息中间件里,方便其他服务按需进行订阅与消费。形象地说,它就像是数据库的“贴身管家”,时刻留意着数据库的任何“风吹草动”,一旦有变化,立马就能获取到相应信息。

(二)CDC 的种类

CDC 主要划分为基于查询和基于 Binlog 这两种方式,下面来看看它们之间的差别:

  • 搭建 mysql 集群:在搭建时可以选择使用阿里巴巴的 mycat 来实现分库分表功能,以此更好地管理和扩展数据库架构。
  • Canal:当我们想要知晓数据库中某个表的变动情况时,Canal 就能派上用场了,它的原理是依托 mysql 的 binlog,通过解析 binlog 来获取表的变更信息。例如,要是没有 CDC 的话,若想在大屏幕上实时展示订单的统计数据,那就得利用 Canal 去读取 mysql 里的实时订单数据,然后传递给 kafka,再由 kafka 把相关信息发送给 Flink;而要是有了 CDC,Flink 就能直接检测到 mysql 数据的变化,进而得出各项指标了。值得一提的是,CDC 底层其实内置了一个软件叫 debezium。

(三)Flink-CDC

Flink 社区精心打造了 flink-cdc-connectors 组件,这可是个功能强大的 source 组件,它具备直接从 MySQL、PostgreSQL 等数据库读取全量数据以及增量变更数据的能力,极大地拓展了 Flink 与数据库交互的便捷性。而且 Flink 还有诸如 mysql、kafka、hbase、cdc 等多种连接器,其触发器方面,默认大多是基于时间的,像 eventTime、procssingTime 等,当然也支持自定义触发器,比如在智慧交通项目中就有过相关应用。目前这个组件已经开源,开源地址为 https://github.com/ververica/flink-cdc-connectors ,方便广大开发者使用和贡献代码。

二、Flink CDC 案例实操

(一)DataStream 方式的应用

  1. 导入依赖:详细的依赖导入可参考 https://blog.csdn.net/mynameisgt/article/details/125826905 这个链接内容。要是启动时报错了,也不用慌,解决方案可查看 https://blog.csdn.net/qq_27721169/article/details/132151345 ,一般来说,就是修改 mysql 的驱动包的版本就行。不过要测试相关代码,还得先开启 mysql 的 binlog 日志,具体操作就是开启 MySQL Binlog 并重启 MySQL。
  2. 编写代码:代码编写完成后,接着要进行建库、建表操作,随后开启 mysql 的 binlog,具体可参照相应文档来执行。
  3. 案例测试
    • 打包并上传至 Linux:将写好的代码打包好,然后上传到 Linux 系统中,为后续部署做准备。
    • 开启 MySQL Binlog 并重启 MySQL:再次强调这个操作的重要性,同时要查看 binlog 数据最新的大小,观察其前后变化情况,方便后续验证数据变更捕获是否准确。
    • 创建一个表:创建一个带有随便几个字段的表,用于测试 Flink-CDC 对数据变更的捕获功能。
    • 启动 Flink 集群:让 Flink 集群运行起来,为处理数据变更提供运行环境。
    • 启动 HDFS 集群:启动 HDFS 集群,保障数据存储等相关功能的正常运行。
    • 启动程序:正式启动编写好的 Flink-CDC 程序,开始检测数据变更情况。
    • 在 MySQL 的 cdc_test.z_user_info 表中添加、修改或者删除数据:人为地在指定表中制造数据变更,以此来检验 Flink-CDC 是否能准确捕获到这些变化。
    • 给当前的 Flink 程序创建 Savepoint:执行 bin/flink savepoint JobId hdfs://bigdata01:9820/flink/save 命令,创建 Savepoint,方便后续程序重启等操作时恢复状态。
    • 关闭程序以后从 Savepoint 重启程序:通过 bin/flink run -s hdfs://bigdata01:9820/flink/save/... -c com.bigdata.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar 命令,实现从 Savepoint 重启程序,验证程序的稳定性和数据处理的连贯性。

(二)自定义反序列化器

  1. 代码实现:在 Flink-CDC 中,有像 Canal、Maxwell 等相关总结内容。而且代码版本和 sql 版本存在一定区别:
    • 版本支持:代码版本的 Flink 在 1.12 和 1.13 版本都支持相关操作,然而 sql 版本的 Flink 只有到 1.13 版本才支持。
    • 监听范围:代码版本支持一次监听多个数据库以及多个表,功能更为强大;而 sql 版本则只支持单库单表的监听。
    • 反序列化器:sql 版本中无需进行自定义反序列化器,相对简洁;但代码版本就需要自定义反序列化器了,当然,也可以选择不定义,根据具体业务场景和需求来决定。

(三)FlinkSQL 方式的应用

  1. 代码实现:在代码实现过程中,有一些需要特别留意的“坑”:
    • jar 包版本问题:在 maven 中,各个 jar 包之间的版本有可能出现不兼容等问题,比如可能会出现 java.lang.NoSuchMethodError:scala.Predef$.refArrayOps 这样的错误,需要仔细排查和调整 jar 包版本。
    • 主键问题:在 FlinkSQL 里,如果创建的表没有主键,尤其是在 Flink 1.13 版本之后,会遇到 The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled', default: true (fallback keys: [])' to 'true' 这样的限制,所以创建表的时候一定要记得加上主键。详细的常见 SQL 错误内容可参考 https://help.aliyun.com/zh/flink/support/common-sql-errors?spm=a2c4g.11186623.0.i32#section-9oq-z7x-sq0 。

以下是一段示例代码:

package com.bigdata.cdc;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @基本功能:* @program:FlinkProject* @author: 闫哥* @create:2024-06-13 11:01:11**/
public class CdcSQLTest {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表对象//3. 编写sql语句//4. 将Table变为stream流tenv.executeSql("CREATE TABLE user_info2 (\n" +" id INT NOT NULL primary key,\n" +" name STRING,\n" +" age int\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'bigdata01',\n" +" 'port' = '3306',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'scan.startup.mode' = 'latest-offset', " +" 'database-name' = 'cdc_test',\n" +" 'table-name' = 'user_info'\n" +")");tenv.executeSql("select * from user_info2").print();Table table = tenv.sqlQuery("select * from user_info2");DataStream<Tuple2<Boolean, Row>> retractStream = tenv.toRetractStream(table, Row.class);retractStream.print();//5. execute-执行env.execute();}
}

在这段代码中,首先是准备 Flink 的运行环境,设置好运行模式以及并行度等基础参数,接着获取 StreamTableEnvironment 对象,用于后续的 SQL 操作。然后创建了名为 user_info2 的表,定义了表结构以及相关的连接配置信息,如连接的数据库、用户名、密码等,通过执行 SQL 查询语句并将结果转换为流的形式进行输出,最后执行整个程序,实现基于 FlinkSQL 方式对 Flink-CDC 的应用实践。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP语言的字符串处理

PHP语言的字符串处理 引言 字符串是编程中最基本的数据类型之一&#xff0c;通常用于存储和操作文本数据。在PHP语言中&#xff0c;对字符串的处理非常灵活且强大。无论是简单的字符操作&#xff0c;还是复杂的模式匹配&#xff0c;PHP都提供了丰富的函数和工具来满足不同的需…

PHP的扩展Imagick的安装

windows下的安装 下载&#xff1a;Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows​​​​​​​ 下载&#xff1a;ghostscript&#xff08;PDF提取图片时用到&#xff0c;不处理PDF可以不安装&#xff09; Ghostscript : Downloads 安装扩展 Imagick解压后&…

THREE.js的VideoTexture以及CanvasTexture在部分浏览器以及小程序webview中纯黑不起作用的解决办法

黑色是因为video没有自动播放导致的。 而且video必须设置muted&#xff08;静音&#xff09;属性&#xff0c;否则视频都无法播放&#xff1b; 如果不设置muted,也可以用设置x5-video-player-type"h5" 替代&#xff08;意为兼容qq浏览器&#xff0c;解决在小程序中黑…

【redis】ubuntu18安装redis7

在Ubuntu 18下安装Redis7可以通过以下两种方法实现&#xff1a;手动编译安装和使用APT进行安装。 Ubuntu 18系统的环境和版本&#xff1a; $ cat /proc/version Linux version 4.15.0-213-generic (builddlcy02-amd64-079) (gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)…

Java实现迭代器模式

一、简介 1、定义 迭代器模式(Iterator Pattern)是一种面向集合对象而生的行为设计模式。对于集合对象而言&#xff0c;会涉及对集合的添加和删除操作&#xff0c;也要支持遍历集合元素的操作。可以把遍历操作放在集合对象中&#xff0c;但这样做&#xff0c;集合对象就承担太…

uniapp中h5使用地图

export function loadTMap(key) {return new Promise(function(resolve, reject) {window.init function() {// resolve(qq) //注意这里resolve(TMap) //注意这里}var script document.createElement("script");script.type "text/javascript";// scrip…

获取地图文档中的图层列表

大多数情况下,获取地图文档中的图层列表是地理处理脚本中的首要工作之一.获取图层列表后,脚本可以循环遍历每个图层并执行某些类型的处理.制图模块中的ListLayers()函数提供获取图层列表的功能.本节将学习如何获得地图文档中的图层列表. 操作方法: 1.在arcgis中打开地图文件 …

软件系统安全逆向分析-混淆对抗

1. 概述 在一般的软件中&#xff0c;我们逆向分析时候通常都不能直接看到软件的明文源代码&#xff0c;或多或少存在着混淆对抗的操作。下面&#xff0c;我会实践操作一个例子从无从下手到攻破目标。 花指令对抗虚函数表RC4 2. 实战-donntyousee 题目载体为具有漏洞的小型软…

#渗透测试#网络安全# 一文了解什么是跨域CROS!!!

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

【权限管理】Apache Shiro学习教程

Apache Shiro 是一个功能强大且灵活的安全框架&#xff0c;主要用于身份认证&#xff08;Authentication&#xff09;、授权&#xff08;Authorization&#xff09;、会话管理&#xff08;Session Management&#xff09;和加密&#xff08;Cryptography&#xff09;。它旨在为…

Spring事件发布与监听

Spring事件机制详解&#xff1a;事件发布与监听 在Spring框架中&#xff0c;事件机制基于发布-订阅模式&#xff0c;允许组件之间进行解耦。发布者发布事件&#xff0c;监听者订阅并响应这些事件。Spring事件机制的核心在于ApplicationEvent和ApplicationListener&#xff0c;…

ClickHouse vs StarRocks 选型对比

一、面向列存的 DBMS 新的选择 Hadoop 从诞生已经十三年了&#xff0c;Hadoop 的供应商争先恐后的为 Hadoop 贡献各种开源插件&#xff0c;发明各种的解决方案技术栈&#xff0c;一方面确实帮助很多用户解决了问题&#xff0c;但另一方面因为繁杂的技术栈与高昂的维护成本&…

Win11家庭版转专业版

Win11家庭版转专业版&#xff08;亲测有效&#xff09; 第一步 【断网】输入这个密钥&#xff1a; R8NJ8-9X7PV-C7RCR-F3J9X-KQBP6 第二步 点击下一步会自动重启 第三步 【联网】输入这个密钥&#xff1a; F3NWX-VFMFC-MHYYF-BCJ3K-QV66Y 注意 两次输入密钥的地方一致 …

TypeScript语言的网络编程

TypeScript语言的网络编程 引言 在现代软件开发中&#xff0c;网络编程是一个不可或缺的部分。随着互联网的快速发展&#xff0c;网络应用程序越来越普遍&#xff0c;涉及到从简单的个人网站到复杂的企业级应用。TypeScript作为一种强类型的JavaScript超集&#xff0c;近年来…

在高德地图上加载3DTilesLayer图层模型/天地瓦片

1. 引入必要的库 Three.js&#xff1a;一个用于创建和显示3D图形的JavaScript库。vuemap/three-layer&#xff1a;一个Vue插件&#xff0c;它允许你在高德地图中添加Three.js图层。vuemap/layer-3dtiles&#xff1a;一个用于处理3D Tiles格式数据的Vue插件&#xff0c;可以用来…

Linux 高级路由 —— 筑梦之路

Linux 高级路由详解 本文将基于您提供的 Linux 高级路由极简教程 文章&#xff0c;深入探讨 Linux 高级路由的概念、配置方法以及应用场景。 一、什么是 Linux 高级路由&#xff1f; Linux 高级路由是指利用 Linux 内核提供的强大网络功能&#xff0c;实现超越传统路由表和默…

IP 地址与蜜罐技术

基于IP的地址的蜜罐技术是一种主动防御策略&#xff0c;它能够通过在网络上布置的一些看似正常没问题的IP地址来吸引恶意者的注意&#xff0c;将恶意者引导到预先布置好的伪装的目标之中。 如何实现蜜罐技术 当恶意攻击者在网络中四处扫描&#xff0c;寻找可入侵的目标时&…

【Word_笔记】Word的修订模式内容改为颜色标记

需求如下&#xff1a;请把修改后的部分直接在原文标出来&#xff0c;不要采用修订模式 步骤1&#xff1a;打开需要转换的word后&#xff0c;同时按住alt和F11 进入&#xff08;Microsoft Visual Basic for Appliations&#xff09; 步骤2&#xff1a;插入 ---- 模块 步骤3&…

[0405].第05节:搭建Redis主从架构

Redis学习大纲 一、3主3从的集群配置&#xff1a; 1.1.集群规划 1.分片集群需要的节点数量较多&#xff0c;这里我们搭建一个最小的分片集群&#xff0c;包含3个master节点&#xff0c;每个master包含一个slave节点&#xff0c;结构如下&#xff1a; 2.每组是一主一从&#x…

科研绘图系列:R语言绘制分组箱线图(boxplot)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载导入数据数据预处理画图输出系统信息介绍 科研绘图系列:R语言绘制分组箱线图(boxplot) 加载R包 library(ggpubr) library(ggplot2) library(tidyverse) # dev…