Flink CDC-MySQL CDC配置及DataStream API实现代码...可实现监控采集多个数据库的多个表

MySQL CDC配置

第一步: 启用binlog

1. 检查MySQL的binlog是否已启用

show variables like '%log_bin%';

2. 若未启用binlog

  1. 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
  2. 找到[mysqld]部分,添加如下配置
    log-bin=mysql-bin    # 指定二进制日志文件的名称前缀
    server-id=1          # 唯一标识MySQL服务器的数字
    expire_logs_days=30  # binlog日志过期时间(按实际情况配置)
    
  3. 保存并关闭配置文件, 并重启MySQL服务使配置生效
    sudo systemctl restart mysqld
    

第二步: 设置binlog格式为row

因为要监控表记录变更前后的具体数据, 需要将binlog格式设置为row.

1. 确保MySQL的binlog格式设置为ROW

show variables like '%binlog_format%';

2. 若未设置为row

  1. 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
  2. 找到[mysqld]部分,添加如下配置
    binlog_format=ROW
    
  3. 保存并关闭配置文件, 并重启MySQL服务使配置生效
    sudo systemctl restart mysqld
    

第三步: 创建CDC用户

创建一个具备合适权限的MySQL用户, 使得Debezium MySQL connector可以监控数据库的变化.

  • 创建MySQL用户, 用于Flink CDC连接到MySQL数据库

    CREATE USER 'flinkcdc'@'%' IDENTIFIED BY 'FlinkCDC_123456';
    
  • 授予该用户适当的权限以访问要采集的数据库和表。

    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY 'FlinkCDC_123456';
    
  • 使权限生效

    FLUSH PRIVILEGES;
    

Flink CDC DataStream API实现

所使用软件的版本

  • java 1.8
  • Scala 2.11
  • Flink 1.14.2
  • Flink CDC 2.3.0
  • Source MySQL 5.7, Sink MySQL 5.7
  • jackson 2.10.2

Flink CDC DataStream API可实现一个job监控采集多个数据库、多个表.

1. 定义MySqlSource

//源数据库连接配置文件
Properties dbProps = DbConfigUtil.loadConfig("mysql.properties");//Debezium配置
Properties debeziumProps = new Properties();
//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string
//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)
//以double值来表示它们,这可能会到值精度丢失
//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
debeziumProps.setProperty("decimal.handling.mode", "string");
//Time、date和timestamps可以以不同的精度表示,包括:
//adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。
//adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。
//connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。
debeziumProps.setProperty("time.precision.mode", "connect");//MySQL CDC数据源
MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(dbProps.getProperty("host")).port(Integer.parseInt(dbProps.getProperty("port"))).databaseList(dbProps.getProperty("database_list").split(",")).tableList(dbProps.getProperty("table_list").split(",")).username(dbProps.getProperty("username")).password(dbProps.getProperty("password")).connectionPoolSize(2).serverTimeZone("Asia/Shanghai").debeziumProperties(debeziumProps).deserializer(new JsonDebeziumDeserializationSchema()).serverId("6001").startupOptions(StartupOptions.initial()).build();

2. 数据处理


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 启用Checkpoint
env.enableCheckpointing(60000);
// 默认即为EXACTLY_ONCE。设置Checkpoint模式为EXACTLY_ONCE,每条记录在恢复的时候都是精确一次地处理的
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置状态后端
env.setStateBackend(new HashMapStateBackend());
// 设置Checkpoint状态存储系统及目录
env.getCheckpointConfig().setCheckpointStorage("hdfs://ns/flink/checkpoint/mysql_cdc");
// 两次Checkpoint之间的最小暂停时间是500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints必须在指定的时间内完成,否则被丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
//只允许checkpoint连续失败两次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 设置最大并行运行的Checkpoint数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 在作业取消时保留外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用非对齐Checkpoint,可以极大减少背压情况的下Checkpoint次数
env.getCheckpointConfig().enableUnalignedCheckpoints();//获取数据源
SingleOutputStreamOperator<String> dataStreamSource = env.addSource(sourceFunction).uid("source-01").name("read-from-source");ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);//JSON字符串转JsonNode
SingleOutputStreamOperator<JsonNode> dataStreamJsonNode = dataStreamSource.map(line -> mapper.readTree(line)).uid("map-01").name("source-to-JsonNode");// 从监控的多个表中过滤出'订单表', 并解析Json的after数据
SingleOutputStreamOperator<OrderInfo> orderOperator = dataStreamJsonNode.filter(line -> "order_info".equalsIgnoreCase(line.get("source").get("table").asText())).uid("order-info-filter-01").name("filter-order-info").map(line -> line.get("after").toString()).uid("order-info-map-01").name("parse-order-info-after").map(line -> mapper.readValue(line, OrderInfo.class)).uid("order-info-map-02").name("order-info-to-pojo");

3. sink到MySQL


// 定义JdbcSink
SinkFunction<OrderInfo> orderInfoSink = JdbcSink.sink(UPSERT_SQL,(JdbcStatementBuilder<OrderInfo>) (ps, order) -> new OrderInfoPreparedStatementSetter().setParams(ps, order),JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(2000).withMaxRetries(3).build(),JdbcSinkConnUtil.getConnOptions("sink-mysql.properties")
);orderOperator.addSink(orderInfoSink);

参考

  1. https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-property-decimal-handling-mode
  2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc.html
  3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/75953.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL之事务

事务概念 事务就是一组DML语句组成&#xff0c;这些语句在逻辑上存在相关性&#xff0c;这一组DML语句要么全部成功&#xff0c;要么全部失败&#xff0c;是一 个整体。 一个完整的事务&#xff0c;绝对不是简单的 sql 集合&#xff0c;还需要满足如下四个属性&#xff08;ACI…

由Qt::BlockingQueuedConnection引起的关闭Qt主页面而后台仍有进程残留

BUG&#xff1a;由Qt::BlockingQueuedConnection引起的关闭Qt主页面而后台仍有进程残留 1、错误代码示例 首先我们看下下面的代码&#xff0c;可以思考一下代码的错误之处 /** BlockingQueueDeadLock.h **/ #pragma once#include <QtWidgets/QMainWindow> #include &q…

Android 13.0 Launcher3定制之双层改单层(去掉抽屉式四)

1.概述 在13.0的系统产品开发中,对于在Launcher3中的抽屉模式中,系统默认的就是抽屉单层模式,但是在很多产品中需要默认为单层模式,就是要求去掉双层抽屉模式,接下来看下如何继续实现去掉抽屉双层模式,来变成单层模式第四节 2.Launcher3定制之双层改单层(去掉抽屉式四)的…

初阶扫雷(超详解)

✨博客主页&#xff1a;小钱编程成长记 &#x1f388;博客专栏&#xff1a;C语言小游戏 &#x1f388;推荐相关博文&#xff1a;初阶三子棋&#xff08;超详解&#xff09; 初阶扫雷 1.游戏介绍2.基本思路3.实现前的准备4.实现步骤4.1 打印菜单4.2 初始化扫雷棋盘4.3 打印扫雷棋…

单片机之硬件记录

一、概念 VBAT 当使用电池或其他电源连接到VBAT脚上时&#xff0c;当VDD断电时&#xff0c;可以保存备份寄存器的内容和维持RTC的功能。如果应用中没有使用外部电池&#xff0c;VBAT引脚应接到VDD引脚上。 VCC&#xff1a;Ccircuit 表示电路的意思,即接入电路的电压&#x…

C语言入门Day_18 判断和循坏的小结

目录 前言&#xff1a; 1.判断 2.循环 3.课堂笔记 4.思维导图 前言&#xff1a; 判断语句和循环语句都可以大致分为三个部分&#xff0c;第一个部分是固定的语法格式&#xff1b;第二部分是代码的执行顺序&#xff0c;第三部分是判断和循环成立与否的判断条件。 1.判断 1…

Allegro166版本如何在颜色管理器中实时显示层面操作指导

Allegro166版本如何在颜色管理器中实时显示层面操作指导 在用Allegro166进行PCB设计的时候,需要在颜色管理器中频繁的开关层面。但是166不像172一样在颜色管理器中可以实时的开关层面,如下图 需要打开Board Geometry/Soldermask_top层,首先需要勾选这个层面,再点击Apply即…

ubuntu 20.04 docker 安装 mysql

要在Ubuntu 20.04上安装Docker并运行MySQL容器&#xff0c;您可以按照以下步骤操作&#xff1a; 1.更新系统包列表&#xff1a; sudo apt update2.安装Docker&#xff1a; sudo apt install docker.io3.启动Docker服务并设置其开机自启动&#xff1a; sudo systemctl start…

直播|DITA内容发布工具解析 - 问答总结

9月6日&#xff0c;我们进行了一场名为“DITA内容发布工具解析”的直播。通过直播&#xff0c;大家了解到&#xff1a; DITA-OT简介 默认输出效果 定制以后输出效果 发布过程与样式定制 在问答环节&#xff0c;大家进行了热烈沟通。我将几个大家关心的问题和答复总结如下&…

Vue2进阶篇学习笔记

文章目录 Vue2进阶学习笔记前言1、Vue脚手架学习1.1 Vue脚手架概述1.2 Vue脚手架安装1.3 常用属性1.4 插件 2、组件基本概述3、非单文件组件3.1 非单文件组件的基本使用3.2 组件的嵌套 4、单文件组件4.1 快速体验4.2 Todo案例 5、浏览器本地存储6、组件的自定义事件6.1 使用自定…

k8s集群calio网络问题

k8s calio节点报错 Readiness probe failed: calico/node is not ready: BIRD is not ready: BGP not established with 172.24.0.12023-09-07 05:42:37.176 [INFO][200] health.go 156: Number of node(s) with BGP peering established 0这个错误是由于不同主机网卡不一致造…

【Image captioning】Dual-Level Collaborative Transformer for Image Captioning在自定义数据集的调试与实现(过程完整详细)

Dual-Level Collaborative Transformer for Image Captioning在自定义数据集的调试与实现(过程完整详细) 作者:安静到无声 个人主页 目录 Dual-Level Collaborative Transformer for Image Captioning在自定义数据集的调试与实现(过程完整详细)环境配置生成 region featu…

MySQL 全局锁、表级锁、行锁详解

前言 MySQL 里面的锁大致可以分成全局锁、表级锁和行锁三类&#xff0c;全局锁和表级锁是在server层实现的。 全局锁 全局锁就是对整个数据库实例加锁。MySQL 提供了一个加全局读锁的方法&#xff0c;命令是 Flush tables with read lock (FTWRL)。当你需要让整个库处于只读状…

beetlsql3.x版本适配达梦数据库

BeetlSQL介绍 BeetlSQL的目标是提供开发高效&#xff0c;维护高效&#xff0c;运行高效的数据库访问框架&#xff0c;在一个系统多个库的情况下&#xff0c;提供一致的编写代码方式。支持如下数据平台 传统数据库&#xff1a;MySQL(国内兼容MySQL协议的各种大数据库),MariaDB…

软件测试/测试开发丨测试用例自动录入 学习笔记

点此获取更多相关资料 本文为霍格沃兹测试开发学社学员学习笔记分享 原文链接&#xff1a;https://ceshiren.com/t/topic/27139 测试用例自动录入 测试用例自动录入的价值 省略人工同步的步骤&#xff0c;节省时间 兼容代码版本的自动化测试用例 用例的执行与调度统一化管理…

新时代的监控系统--网站可观测性的基础功能

观测云RUM基础功能 更快地故障调试速度 理解用户体验&#xff0c;包括应用性能数据&#xff0c;比如网站核心指标等实时识别报错的设备、系统或国家&#xff0c;快速标注错误的部位解决客户端错误&#xff0c;包括特定用户&#xff0c;包括一键定位代码错误 监控100%前端错误和…

【数据结构】红黑树的插入与验证

文章目录 一、基本概念1.时代背景2. 基本概念3.基本性质 二、实现原理1. 插入1.1变色1.2旋转变色①左旋②右旋③右左双旋④左右双旋 2.验证 源码总结 一、基本概念 1.时代背景 1972年鲁道夫拜尔(Rudolf Bayer)发明了一种数据结构&#xff0c;这是一种特殊的B树4阶情况。这些树…

go初识iris框架(四) -框架设置操作

前言 iris(1) iris(2) iris(3) 框架设置操作 当我们的一个路径是xxx/user/info,xxx/user/login,xxx/user/register的时候,我们发现前面都有一个user,我们如果用/{data:string}这样的话这样导致我们的路径是灵活的&#xff0c;所以我们得用其他方法 这里我们的路径是以anime为…

蓝桥杯打卡Day6

文章目录 N的阶乘基本算术整数查询 一、N的阶乘OI链接 本题思路&#xff1a;本题是关于高精度的模板题。 #pragma GCC optimize(3) #include <bits/stdc.h>constexpr int N1010;std::vector<int> a; std::vector<int> f[N];std::vector<int> mul(in…

MATLAB入门-数据的导入和导出

MATLAB入门-数据的导入和导出 注&#xff1a;本篇文章是课程学习笔记&#xff0c;课程链接为&#xff1a;头歌 常见的几个导入数据的方法 load函数 load函数专门用于引入MATLAB的.mat格式数据&#xff0c;十分的简单方便。 例如&#xff1a;一个-ASCII编码形式存储的数据文件…