MySQL-CDC 新增同步表确无法捕获增量问题处理

Flink-CDC版本:2.3.0

问题描述

之前通过Flink-CDC捕获Mysql数据库的数据变更情况,代码大致如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkEnvConf);MySqlSource<String> mysql = MySqlSource.<String>builder().hostname(host).port(port).serverId(serverId).username(username).password(password).databaseList(database).tableList(tableList).startupOptions(startupOptions).debeziumProperties(debeziumProp).jdbcProperties(jdbcProp).deserializer(new JsonDebeziumDeserializationSchema()).build();DataStreamSource<String> mySQLSource = env.fromSource(mysql, WatermarkStrategy.noWatermarks(), "MySQL Source");mySQLSource.print();debezium.database.history=com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory

并且我是开启的checkpoint,并且重启程序后是从checkpoint进行恢复的

一开始同步一张表table_a的增量数据,发现没问题,后续新增表table_b,在捕获table_b的数据时,发现异常:

Encountered change event 'Event{header=EventHeaderV4{timestamp=170917       7391000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=117, nextPosition=769436194, flags=0}, data=TableMapEventData{tableId=5303, database='test', table='table_b', columnTypes=8, 15, 18, 18, 18, 18, 18, 18, 18, 18, 18, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 1, 15, columnMetadata=0, 192,        0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 96, 96, 96, 96, 96, 384, 96, 96, 384, 30, 30, 30, 30, 0, 96, columnNullability={5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1       9, 20, 21, 22, 23, 24, 25, 26}, eventMetadata=TableMapEventMetadata{signedness={1}, defaultCharset=33, charsetCollations=null, columnCharsets=null, columnNames=null,        setStrValues=null, enumStrValues=null, geometryTypes=null, simplePrimaryKeys=null, primaryKeysWithPrefix=null, enumAndSetDefaultCharset=null, enumAndSetColumnCharse       ts=null,visibility=null}}}' at offset {transaction_id=null, ts_sec=1709177391, file=binlog.000476, pos=769435520, server_id=1, event=3} for table test.table_b whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
101065 Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=769436058 --stop-position=769436194 --verbose binlog.000476

问题解决

结合debezium的源码,并且在网上找了一下相关方案如下:
链接:https://help.aliyun.com/zh/flink/support/faq-about-cdc#section-nbg-sb4-ebe

主要是两个点
1、不建议使用配置'debezium.snapshot.mode'='never'
2、通过'debezium.inconsistent.schema.handling.mode' = 'warn'参数避免报错

针对1:不使用'debezium.snapshot.mode'='never'意味着每次重启CDC进程的时候,就要重新消费一遍同步表的所有数据,无法满足业务需求
针对2:修改配置'debezium.inconsistent.schema.handling.mode' = 'warn',其实这种办法是治标不治本,修改配置只是让程序打印warn日志,代码可以继续运行,还是无法解决无法捕获增量的问题;

没办法,只能debug源码来发现问题了。先从报错位置开始看起

MySqlStreamingChangeEventSource

private void informAboutUnknownTableIfRequired(MySqlOffsetContext offsetContext, Event event, TableId tableId, String typeToLog) {if (tableId != null&& connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {metrics.onErroneousEvent("source = " + tableId + ", event " + event);EventHeaderV4 eventHeader = event.getHeader();if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {LOGGER.error("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",event,offsetContext.getOffset(),tableId,System.lineSeparator(),eventHeader.getPosition(),eventHeader.getNextPosition(),offsetContext.getSource().binlogFilename());throw new DebeziumException("Encountered change event for table "+ tableId+ " whose schema isn't known to this connector");} else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {LOGGER.warn("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"+ "The event will be ignored.{}"+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",event,offsetContext.getOffset(),tableId,System.lineSeparator(),System.lineSeparator(),eventHeader.getPosition(),eventHeader.getNextPosition(),offsetContext.getSource().binlogFilename());} else {LOGGER.debug("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"+ "The event will be ignored.{}"+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",event,offsetContext.getOffset(),tableId,System.lineSeparator(),System.lineSeparator(),eventHeader.getPosition(),eventHeader.getNextPosition(),offsetContext.getSource().binlogFilename());}} else {LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);metrics.onFilteredEvent("source = " + tableId);}
}protected void handleUpdateTableMetadata(MySqlOffsetContext offsetContext, Event event) {TableMapEventData metadata = unwrapData(event);long tableNumber = metadata.getTableId();String databaseName = metadata.getDatabase();String tableName = metadata.getTable();TableId tableId = new TableId(databaseName, null, tableName);// 获取了日志变更信息,根据tableId(表名)在判断缓存中是否存在// 如果是新增表,在taskContext.getSchema() 对象中是不存在的if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) {LOGGER.debug("Received update table metadata event: {}", event);} else {informAboutUnknownTableIfRequired(offsetContext, event, tableId, "update table metadata");}
}

MySqlDatabaseSchema

public boolean assignTableNumber(long tableNumber, TableId id) {// 通过schemaForfinal TableSchema tableSchema = schemaFor(id);if (tableSchema == null) {return false;}tableIdsByTableNumber.put(tableNumber, id);return true;
}

RelationalDatabaseSchema

@Override
public TableSchema schemaFor(TableId id) {// 最终是从schemasByTableId对象中取值// schemasByTableId 对象通过ConcurrentMap存储// 现在我们需要知道,ConcurrentMap 是什么时候将数据添加进去的return schemasByTableId.get(id);
}// 通过debug发现,调用下面这个方法,我们需要知道是谁在调用此方法
protected void buildAndRegisterSchema(Table table) {if (tableFilter.isIncluded(table.id())) {TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, columnFilter, columnMappers, customKeysMapper);schemasByTableId.put(table.id(), schema);}
}

HistorizedRelationalDatabaseSchema

// 在前面,我设置的配置是:debezium.database.history=com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory
@Override
public void recover(OffsetContext offset) {if (!databaseHistory.exists()) {String msg = "The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.";throw new DebeziumException(msg);}// 当我们断点在这里的时候,发现tables(), tableIds()是没有数据的databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), getDdlParser());// 当我们断点在这里的时候,发现tables(), tableIds()是有数据的// recover() 这个方法时完成了赋值// tables(), tableIds() 里面的数据,就是我们要的schema信息recoveredTables = !tableIds().isEmpty();for (TableId tableId : tableIds()) {buildAndRegisterSchema(tableFor(tableId));}
}

EmbeddedFlinkDatabaseHistory

@Override
public void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {listener.recoveryStarted();// schema 里面的值其实就是从tableSchemas里面遍历得到的for (TableChange tableChange : tableSchemas.values()) {schema.overwriteTable(tableChange.getTable());}listener.recoveryStopped();
}@Override
public void configure(Configuration config,HistoryRecordComparator comparator,DatabaseHistoryListener listener,boolean useCatalogBeforeSchema) {this.listener = listener;this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);// recoverString instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);this.tableSchemas = new HashMap<>();// tableSchemas 里面的值是通过removeHistory(instanceName)获取的for (TableChange tableChange : removeHistory(instanceName)) {tableSchemas.put(tableChange.getId(), tableChange);}
}// 这个方法的返回值是TABLE_SCHEMAS 返回的,所以要搞清楚
// TABLE_SCHEMAS在何时赋值的
public static Collection<TableChange> removeHistory(String engineName) {if (engineName == null) {return Collections.emptyList();}// Collection<TableChange> tableChanges = TABLE_SCHEMAS.remove(engineName);return tableChanges != null ? tableChanges : Collections.emptyList();
}// 在此方法下,TABLE_SCHEMAS 完成赋值
// 是谁在调用此方法
public static void registerHistory(String engineName, Collection<TableChange> engineHistory) {TABLE_SCHEMAS.put(engineName, engineHistory);
}

StatefulTaskContext

// configure()内部调用registerHistory完成schema的赋值
// 其实就是调用:mySqlSplit.getTableSchemas().values() 完成对schema的赋值
public void configure(MySqlSplit mySqlSplit) {// initial stateful objectsfinal boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);EmbeddedFlinkDatabaseHistory.registerHistory(sourceConfig.getDbzConfiguration().getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),mySqlSplit.getTableSchemas().values());......
}

总结:

  1. 为什么新增实时表时,新增表的增量数据无法捕获?

因为RelationalDatabaseSchema对象内部,有一个对象Tables,Tables内部并没有保存新增表的schema信息,在解析到新增表的增量数据时会判断Tables内是否存在这个表,如果不存在会直接将这张表的增量数据过滤

  1. Tables对象内的schema信息是怎么获取到的?

通过上面源码从下到上的解析可以发现,Tables对象内的schema信息是通过MySqlSplit 这个对象传进来的,我们现在需要搞明白,MySqlSplit是怎么获取到的。

下面这段代码流程比较简单,直接写出来
image.png

1com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#addSplits
2org.apache.flink.connector.base.source.reader.SourceReaderBase#addSplits
3org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager#addSplits
4org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#startFetcher
5java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
这边是多线程异常提交:org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
6org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher#run
7org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher#runOnce
8org.apache.flink.connector.base.source.reader.fetcher.FetchTask#run
9com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#fetch
10com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#checkSplitOrStartNext
11com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#submitSplit
12com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext#configure最终调用:configure()

主要看下面:

// 此方法的入参splits,是flink通过savepoint恢复,从state中获取的
// 如果之前只捕获table_A表的增量,那么splits对象内部只有table_A的schema信息
// 如果此程序是第一次启动,那么splits中是没有任何一张表的shcema信息,那么flink-cdc代码是肯定有去获取表的schema信息的实现
// 下面看discoverTableSchemasForBinlogSplit()
@Override
public void addSplits(List<MySqlSplit> splits) {// restore for finishedUnackedSplitsList<MySqlSplit> unfinishedSplits = new ArrayList<>();for (MySqlSplit split : splits) {LOG.info("Add Split: " + split);if (split.isSnapshotSplit()) {MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();snapshotSplit = discoverTableSchemasForSnapshotSplit(snapshotSplit);if (snapshotSplit.isSnapshotReadFinished()) {finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);} else {unfinishedSplits.add(split);}} else {MySqlBinlogSplit binlogSplit = split.asBinlogSplit();// the binlog split is suspendedif (binlogSplit.isSuspended()) {suspendedBinlogSplit = binlogSplit;} else if (!binlogSplit.isCompletedSplit()) {uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());} else {uncompletedBinlogSplits.remove(split.splitId());MySqlBinlogSplit mySqlBinlogSplit =discoverTableSchemasForBinlogSplit(split.asBinlogSplit());unfinishedSplits.add(mySqlBinlogSplit);}}}// notify split enumerator again about the finished unacked snapshot splitsreportFinishedSnapshotSplitsIfNeed();// add all un-finished splits (including binlog split) to SourceReaderBaseif (!unfinishedSplits.isEmpty()) {super.addSplits(unfinishedSplits);}
}private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {final String splitId = split.splitId();// 当split == null时,才会去获取所有cdc表的schema信息// 如果我是从state恢复,split肯定 != null// 真正需要改的地方就是这里,我比较暴力,直接改为if(true)if (split.getTableSchemas().values().isEmpty()) {try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {Map<TableId, TableChanges.TableChange> tableSchemas =TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);LOG.info("The table schema discovery for binlog split {} success", splitId);return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);} catch (SQLException e) {LOG.error("Failed to obtains table schemas due to {}", e.getMessage());throw new FlinkRuntimeException(e);}} else {LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);return split;}
}

image.png

重新打包编译后测试,之前的问题已经解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/715437.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis--事务机制的详解及应用

Redis事务的概念&#xff1a; Redis事务就是将一系列命令包装成一个队列&#xff0c;在执行时候按照添加的顺序依次执行&#xff0c;中间不会被打断或者干扰&#xff0c;在执行事务中&#xff0c;其他客户端提交的命令不可以插入到执行事务的队列中&#xff0c;简单来说Redis事…

【Linux】进程优先级以及Linux内核进程调度队列的简要介绍

进程优先级 基本概念查看系统进程修改进程的优先级Linux2.6内核进程调度队列的简要介绍和进程优先级有关的概念进程切换 基本概念 为什么会存在进程优先级&#xff1f;   进程优先级用于确定在资源竞争的情况下&#xff0c;哪个进程将被操作系统调度为下一个运行的进程。进程…

SSH教程

ssh 是远程连接的利器, 可以说凡是涉及到 linux 服务器, ssh 就是一个绕不开的话题. 本文作为一个教程, 尽可能详细的帮助读者设置 ssh, 并给出一些常用的 ssh 配置方法 (主要用于 linux 系统的远程登录和文件传输). 1. 简介 ssh 分为两个部分, sshd 服务端和 ssh 客户端. ssh…

黑马鸿蒙学习笔记1:TEXT组件

业余时间学习下黑马鸿蒙课程&#xff0c;主要截取重要的PPT学习&#xff1a; 其实就是用$r&#xff08;&#xff09;的方法&#xff0c;去调用本地化资源文件&#xff0c;可以做多语言了。 比如每个语言目录下都有个string.json文件&#xff0c;然后用键值对name,value的方式搭…

JVM 补充——StringTable

具体哪些String是相等的&#xff0c;各种String的情况&#xff0c;看这个&#xff1a; https://javaguide.cn/java/basis/java-basic-questions-02.html#string-%E4%B8%BA%E4%BB%80%E4%B9%88%E6%98%AF%E4%B8%8D%E5%8F%AF%E5%8F%98%E7%9A%84 String的基本特性 String&#xf…

【C++】STL简介 | STL六大组件 | string类 | string类对象操作

目录 1. 什么是STL 2. STL的版本 3. STL的六大组件 4. STL的缺陷 5. 引出string类 6. 标准库中的string类 6.1 string类简介 6.2 string类对象的构造 6.3. string类对象的容量 6.4. string类对象的遍历 6.5. string类对象的修改 6.6. string类非成员函数 6.7. vs…

基于CNN-LSTM-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络&#xff08;CNN&#xff09;在时间序列中的应用 4.2 长短时记忆网络&#xff08;LSTM&#xff09;处理序列依赖关系 4.3 注意力机制&#xff08;Attention&#xff09; 5…

MySQL 学习记录 2

原文&#xff1a;https://blog.iyatt.com/?p13818 13 存储引擎 查看一下前面创建的一张表的创建语句&#xff0c;当时并没有显式指定引擎&#xff0c;MySQL 自动指定的 InnoDB&#xff0c;即默认引擎是这个。 创建表的时候要显式指定引擎可以参考这个语句 查看当前 MySQL …

JWT基于Cookie的会话保持,并解决CSRF问题的方案

使用JWT进行浏览器接口请求&#xff0c;在使用Cookie进行会话保持传递Token时&#xff0c;可能会存在 CSRF 漏洞问题&#xff0c;同时也要避免在产生XSS漏洞时泄漏Token问题&#xff0c;如下图在尽可能避免CSRF和保护Token方面设计了方案。 要点解释如下&#xff1a; 将JWT存入…

Snagit 2024:让你的屏幕活动瞬间变得生动有力 mac/win版

Snagit 2024 屏幕录制与截图软件是一款功能强大的工具&#xff0c;专为现代用户设计&#xff0c;以满足他们在工作、学习和娱乐中对屏幕内容捕捉和分享的需求。这款软件结合了屏幕录制和截图功能&#xff0c;为用户提供了一种高效、便捷的方式来捕捉屏幕上的精彩瞬间。 Snagit…

xxl-job--01--简介

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.xxl-job1. 1 发展历史1.2 XXL-JOB的系统架构1.3 xxl-job与其他框架对比 2. XXL-JOB的使用2.1 准备工作- 配置调度中心XXL-JOB的数据表 2.2 配置执行器1 引入依赖包…

vue cesium加载点与定位到指定位置

vue cesium定位到指定位置 window.viewer.camera.flyTo({destination: Cesium.Cartesian3.fromDegrees(point.longDeg, point.latDeg, 6500000), orientation: {heading: 6.2079384332084935, roll: 0.00031509431759868534, pitch: -1.535}, duration: 3})vue cesium加载点 …

为什么HashMap的键值可以为null,而ConcurrentHashMap不行?

写在开头 今天在写《HashMap很美好&#xff0c;但线程不安全怎么办&#xff1f;ConcurrentHashMap告诉你答案&#xff01;》这篇文章的时候&#xff0c;漏了一个知识点&#xff0c;知道晚上吃饭的时候才凸显想到&#xff0c;关于ConcurrentHashMap在存储Key与Value的时候&…

【Java】面向对象之多态超级详解!!

文章目录 前言一、多态1.1 多态的概念1.2 多态的实现条件1.3 重写1.3.1方法重写的规则1.3.2重写和重载的区别 1.4 向上转型和向下转型1.4.1向上转型1.4.2向下转型 1.5 多态的优缺点1.5.1 使用多态的好处1.5.2 使用多态的缺陷 结语 前言 为了深入了解JAVA的面向对象的特性&…

基于yolov5的电瓶车和自行车检测系统,可进行图像目标检测,也可进行视屏和摄像检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示&#xff1a; 基于yolov5的电瓶车和自行车检测系统_哔哩哔哩_bilibili &#xff08;一&#xff09;简介 基于yolov5的电瓶车和自行车检测系统是在pytorch框架下实现的&#xff0c;这是一个完整的项目&#xff0c;包括代码&#xff0c;数据集&#xff0c;训练好的模型…

Unity(第二十一部)动画的基础了解(感觉不了解其实也行)

1、动画组件老的是Animations 动画视频Play Automatically 是否自动播放Animate Physics 驱动方式&#xff0c;勾选后是物理驱动Culling Type 剔除方式 默认总是动画化就会一直执行下去&#xff0c;第二个是基于渲染播放&#xff08;离开镜头后不执行&#xff09;&#xff0c; …

MySQL中json类型的字段

有些很复杂的信息&#xff0c;我们一般会用扩展字段传一个json串&#xff0c;字段一般用text类型存在数据库。mysql5.7以后支持json类型的字段&#xff0c;还可以进行sql查询与修改json内的某个字段的能力。 1.json字段定义 ip_info json DEFAULT NULL COMMENT ip信息, 2.按…

Doris实战——拈花云科的数据中台实践

目录 前言 一、业务背景 二、数据中台1.0—Lambda 三、新架构的设计目标 四、数据中台2.0—Apache Doris 4.1 新架构数据流转 4.2 新架构收益 五、新架构的落地实践 5.1 模型选择 5.1.1 Unique模型 5.1.2 Aggregate模型 5.2 资源管理 5.3 批量建表 5.4 计算实现…

Stable Diffusion 模型分享:Realistic Stock Photo(真实的库存照片)

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八 下载地址 模型介绍 条目内容类型大模型基础模型SDXL 1.0来源CIVITAI作者PromptSharingSamaritan文件名称reali…

Vue3_2024_1天【Vue3创建和响应式,对比Vue2】

前言&#xff1a; Vue3对比Vue2版本&#xff0c;它在性能、功能、易用性和可维护性方面都有显著的提升和改进。 性能优化&#xff1a;模板编译器的优化、对Proxy的支持以及使用了更加高效的Virtual DOM算法等。这使得Vue3的打包大小减少了41%&#xff0c;初次渲染提速55%&#…