MySQL-CDC 新增同步表确无法捕获增量问题处理

Flink-CDC版本:2.3.0

问题描述

之前通过Flink-CDC捕获Mysql数据库的数据变更情况,代码大致如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkEnvConf);MySqlSource<String> mysql = MySqlSource.<String>builder().hostname(host).port(port).serverId(serverId).username(username).password(password).databaseList(database).tableList(tableList).startupOptions(startupOptions).debeziumProperties(debeziumProp).jdbcProperties(jdbcProp).deserializer(new JsonDebeziumDeserializationSchema()).build();DataStreamSource<String> mySQLSource = env.fromSource(mysql, WatermarkStrategy.noWatermarks(), "MySQL Source");mySQLSource.print();debezium.database.history=com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory

并且我是开启的checkpoint,并且重启程序后是从checkpoint进行恢复的

一开始同步一张表table_a的增量数据,发现没问题,后续新增表table_b,在捕获table_b的数据时,发现异常:

Encountered change event 'Event{header=EventHeaderV4{timestamp=170917       7391000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=117, nextPosition=769436194, flags=0}, data=TableMapEventData{tableId=5303, database='test', table='table_b', columnTypes=8, 15, 18, 18, 18, 18, 18, 18, 18, 18, 18, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 1, 15, columnMetadata=0, 192,        0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 96, 96, 96, 96, 96, 384, 96, 96, 384, 30, 30, 30, 30, 0, 96, columnNullability={5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1       9, 20, 21, 22, 23, 24, 25, 26}, eventMetadata=TableMapEventMetadata{signedness={1}, defaultCharset=33, charsetCollations=null, columnCharsets=null, columnNames=null,        setStrValues=null, enumStrValues=null, geometryTypes=null, simplePrimaryKeys=null, primaryKeysWithPrefix=null, enumAndSetDefaultCharset=null, enumAndSetColumnCharse       ts=null,visibility=null}}}' at offset {transaction_id=null, ts_sec=1709177391, file=binlog.000476, pos=769435520, server_id=1, event=3} for table test.table_b whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
101065 Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=769436058 --stop-position=769436194 --verbose binlog.000476

问题解决

结合debezium的源码,并且在网上找了一下相关方案如下:
链接:https://help.aliyun.com/zh/flink/support/faq-about-cdc#section-nbg-sb4-ebe

主要是两个点
1、不建议使用配置'debezium.snapshot.mode'='never'
2、通过'debezium.inconsistent.schema.handling.mode' = 'warn'参数避免报错

针对1:不使用'debezium.snapshot.mode'='never'意味着每次重启CDC进程的时候,就要重新消费一遍同步表的所有数据,无法满足业务需求
针对2:修改配置'debezium.inconsistent.schema.handling.mode' = 'warn',其实这种办法是治标不治本,修改配置只是让程序打印warn日志,代码可以继续运行,还是无法解决无法捕获增量的问题;

没办法,只能debug源码来发现问题了。先从报错位置开始看起

MySqlStreamingChangeEventSource

private void informAboutUnknownTableIfRequired(MySqlOffsetContext offsetContext, Event event, TableId tableId, String typeToLog) {if (tableId != null&& connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {metrics.onErroneousEvent("source = " + tableId + ", event " + event);EventHeaderV4 eventHeader = event.getHeader();if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {LOGGER.error("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",event,offsetContext.getOffset(),tableId,System.lineSeparator(),eventHeader.getPosition(),eventHeader.getNextPosition(),offsetContext.getSource().binlogFilename());throw new DebeziumException("Encountered change event for table "+ tableId+ " whose schema isn't known to this connector");} else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {LOGGER.warn("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"+ "The event will be ignored.{}"+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",event,offsetContext.getOffset(),tableId,System.lineSeparator(),System.lineSeparator(),eventHeader.getPosition(),eventHeader.getNextPosition(),offsetContext.getSource().binlogFilename());} else {LOGGER.debug("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"+ "The event will be ignored.{}"+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",event,offsetContext.getOffset(),tableId,System.lineSeparator(),System.lineSeparator(),eventHeader.getPosition(),eventHeader.getNextPosition(),offsetContext.getSource().binlogFilename());}} else {LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);metrics.onFilteredEvent("source = " + tableId);}
}protected void handleUpdateTableMetadata(MySqlOffsetContext offsetContext, Event event) {TableMapEventData metadata = unwrapData(event);long tableNumber = metadata.getTableId();String databaseName = metadata.getDatabase();String tableName = metadata.getTable();TableId tableId = new TableId(databaseName, null, tableName);// 获取了日志变更信息,根据tableId(表名)在判断缓存中是否存在// 如果是新增表,在taskContext.getSchema() 对象中是不存在的if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) {LOGGER.debug("Received update table metadata event: {}", event);} else {informAboutUnknownTableIfRequired(offsetContext, event, tableId, "update table metadata");}
}

MySqlDatabaseSchema

public boolean assignTableNumber(long tableNumber, TableId id) {// 通过schemaForfinal TableSchema tableSchema = schemaFor(id);if (tableSchema == null) {return false;}tableIdsByTableNumber.put(tableNumber, id);return true;
}

RelationalDatabaseSchema

@Override
public TableSchema schemaFor(TableId id) {// 最终是从schemasByTableId对象中取值// schemasByTableId 对象通过ConcurrentMap存储// 现在我们需要知道,ConcurrentMap 是什么时候将数据添加进去的return schemasByTableId.get(id);
}// 通过debug发现,调用下面这个方法,我们需要知道是谁在调用此方法
protected void buildAndRegisterSchema(Table table) {if (tableFilter.isIncluded(table.id())) {TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, columnFilter, columnMappers, customKeysMapper);schemasByTableId.put(table.id(), schema);}
}

HistorizedRelationalDatabaseSchema

// 在前面,我设置的配置是:debezium.database.history=com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory
@Override
public void recover(OffsetContext offset) {if (!databaseHistory.exists()) {String msg = "The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.";throw new DebeziumException(msg);}// 当我们断点在这里的时候,发现tables(), tableIds()是没有数据的databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), getDdlParser());// 当我们断点在这里的时候,发现tables(), tableIds()是有数据的// recover() 这个方法时完成了赋值// tables(), tableIds() 里面的数据,就是我们要的schema信息recoveredTables = !tableIds().isEmpty();for (TableId tableId : tableIds()) {buildAndRegisterSchema(tableFor(tableId));}
}

EmbeddedFlinkDatabaseHistory

@Override
public void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {listener.recoveryStarted();// schema 里面的值其实就是从tableSchemas里面遍历得到的for (TableChange tableChange : tableSchemas.values()) {schema.overwriteTable(tableChange.getTable());}listener.recoveryStopped();
}@Override
public void configure(Configuration config,HistoryRecordComparator comparator,DatabaseHistoryListener listener,boolean useCatalogBeforeSchema) {this.listener = listener;this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);// recoverString instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);this.tableSchemas = new HashMap<>();// tableSchemas 里面的值是通过removeHistory(instanceName)获取的for (TableChange tableChange : removeHistory(instanceName)) {tableSchemas.put(tableChange.getId(), tableChange);}
}// 这个方法的返回值是TABLE_SCHEMAS 返回的,所以要搞清楚
// TABLE_SCHEMAS在何时赋值的
public static Collection<TableChange> removeHistory(String engineName) {if (engineName == null) {return Collections.emptyList();}// Collection<TableChange> tableChanges = TABLE_SCHEMAS.remove(engineName);return tableChanges != null ? tableChanges : Collections.emptyList();
}// 在此方法下,TABLE_SCHEMAS 完成赋值
// 是谁在调用此方法
public static void registerHistory(String engineName, Collection<TableChange> engineHistory) {TABLE_SCHEMAS.put(engineName, engineHistory);
}

StatefulTaskContext

// configure()内部调用registerHistory完成schema的赋值
// 其实就是调用:mySqlSplit.getTableSchemas().values() 完成对schema的赋值
public void configure(MySqlSplit mySqlSplit) {// initial stateful objectsfinal boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);EmbeddedFlinkDatabaseHistory.registerHistory(sourceConfig.getDbzConfiguration().getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),mySqlSplit.getTableSchemas().values());......
}

总结:

  1. 为什么新增实时表时,新增表的增量数据无法捕获?

因为RelationalDatabaseSchema对象内部,有一个对象Tables,Tables内部并没有保存新增表的schema信息,在解析到新增表的增量数据时会判断Tables内是否存在这个表,如果不存在会直接将这张表的增量数据过滤

  1. Tables对象内的schema信息是怎么获取到的?

通过上面源码从下到上的解析可以发现,Tables对象内的schema信息是通过MySqlSplit 这个对象传进来的,我们现在需要搞明白,MySqlSplit是怎么获取到的。

下面这段代码流程比较简单,直接写出来
image.png

1com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#addSplits
2org.apache.flink.connector.base.source.reader.SourceReaderBase#addSplits
3org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager#addSplits
4org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#startFetcher
5java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
这边是多线程异常提交:org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
6org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher#run
7org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher#runOnce
8org.apache.flink.connector.base.source.reader.fetcher.FetchTask#run
9com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#fetch
10com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#checkSplitOrStartNext
11com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#submitSplit
12com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext#configure最终调用:configure()

主要看下面:

// 此方法的入参splits,是flink通过savepoint恢复,从state中获取的
// 如果之前只捕获table_A表的增量,那么splits对象内部只有table_A的schema信息
// 如果此程序是第一次启动,那么splits中是没有任何一张表的shcema信息,那么flink-cdc代码是肯定有去获取表的schema信息的实现
// 下面看discoverTableSchemasForBinlogSplit()
@Override
public void addSplits(List<MySqlSplit> splits) {// restore for finishedUnackedSplitsList<MySqlSplit> unfinishedSplits = new ArrayList<>();for (MySqlSplit split : splits) {LOG.info("Add Split: " + split);if (split.isSnapshotSplit()) {MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();snapshotSplit = discoverTableSchemasForSnapshotSplit(snapshotSplit);if (snapshotSplit.isSnapshotReadFinished()) {finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);} else {unfinishedSplits.add(split);}} else {MySqlBinlogSplit binlogSplit = split.asBinlogSplit();// the binlog split is suspendedif (binlogSplit.isSuspended()) {suspendedBinlogSplit = binlogSplit;} else if (!binlogSplit.isCompletedSplit()) {uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());} else {uncompletedBinlogSplits.remove(split.splitId());MySqlBinlogSplit mySqlBinlogSplit =discoverTableSchemasForBinlogSplit(split.asBinlogSplit());unfinishedSplits.add(mySqlBinlogSplit);}}}// notify split enumerator again about the finished unacked snapshot splitsreportFinishedSnapshotSplitsIfNeed();// add all un-finished splits (including binlog split) to SourceReaderBaseif (!unfinishedSplits.isEmpty()) {super.addSplits(unfinishedSplits);}
}private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {final String splitId = split.splitId();// 当split == null时,才会去获取所有cdc表的schema信息// 如果我是从state恢复,split肯定 != null// 真正需要改的地方就是这里,我比较暴力,直接改为if(true)if (split.getTableSchemas().values().isEmpty()) {try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {Map<TableId, TableChanges.TableChange> tableSchemas =TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);LOG.info("The table schema discovery for binlog split {} success", splitId);return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);} catch (SQLException e) {LOG.error("Failed to obtains table schemas due to {}", e.getMessage());throw new FlinkRuntimeException(e);}} else {LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);return split;}
}

image.png

重新打包编译后测试,之前的问题已经解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/715437.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis--事务机制的详解及应用

Redis事务的概念&#xff1a; Redis事务就是将一系列命令包装成一个队列&#xff0c;在执行时候按照添加的顺序依次执行&#xff0c;中间不会被打断或者干扰&#xff0c;在执行事务中&#xff0c;其他客户端提交的命令不可以插入到执行事务的队列中&#xff0c;简单来说Redis事…

【Linux】进程优先级以及Linux内核进程调度队列的简要介绍

进程优先级 基本概念查看系统进程修改进程的优先级Linux2.6内核进程调度队列的简要介绍和进程优先级有关的概念进程切换 基本概念 为什么会存在进程优先级&#xff1f;   进程优先级用于确定在资源竞争的情况下&#xff0c;哪个进程将被操作系统调度为下一个运行的进程。进程…

SSH教程

ssh 是远程连接的利器, 可以说凡是涉及到 linux 服务器, ssh 就是一个绕不开的话题. 本文作为一个教程, 尽可能详细的帮助读者设置 ssh, 并给出一些常用的 ssh 配置方法 (主要用于 linux 系统的远程登录和文件传输). 1. 简介 ssh 分为两个部分, sshd 服务端和 ssh 客户端. ssh…

黑马鸿蒙学习笔记1:TEXT组件

业余时间学习下黑马鸿蒙课程&#xff0c;主要截取重要的PPT学习&#xff1a; 其实就是用$r&#xff08;&#xff09;的方法&#xff0c;去调用本地化资源文件&#xff0c;可以做多语言了。 比如每个语言目录下都有个string.json文件&#xff0c;然后用键值对name,value的方式搭…

JVM 补充——StringTable

具体哪些String是相等的&#xff0c;各种String的情况&#xff0c;看这个&#xff1a; https://javaguide.cn/java/basis/java-basic-questions-02.html#string-%E4%B8%BA%E4%BB%80%E4%B9%88%E6%98%AF%E4%B8%8D%E5%8F%AF%E5%8F%98%E7%9A%84 String的基本特性 String&#xf…

[算法沉淀记录] 分治法应用 —— 二分搜索(Binary Search)

分治法应用 —— 二分搜索 算法基本思想 二分搜索&#xff08;Binary Search&#xff09;是一种在有序数组中查找特定元素的高效算法。它每次将搜索区间减半&#xff0c;从而快速地缩小搜索范围。二分搜索的基本思想是&#xff1a;首先将待查关键字与数组中间位置的关键字比较…

【C++】STL简介 | STL六大组件 | string类 | string类对象操作

目录 1. 什么是STL 2. STL的版本 3. STL的六大组件 4. STL的缺陷 5. 引出string类 6. 标准库中的string类 6.1 string类简介 6.2 string类对象的构造 6.3. string类对象的容量 6.4. string类对象的遍历 6.5. string类对象的修改 6.6. string类非成员函数 6.7. vs…

使用滤镜属性将网页从彩色变黑白

在某些情况下&#xff0c;例如为了表达哀悼或纪念&#xff0c; 许多网站会将页面颜色从彩色调整为黑白灰色。我到网上查找答案&#xff0c;发现有些是通过javascript或jQuery实现的&#xff0c;我试了一下居然无效。 后来找到一个方法&#xff0c;是仅用一行CSS代码就能搞定的&…

基于CNN-LSTM-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络&#xff08;CNN&#xff09;在时间序列中的应用 4.2 长短时记忆网络&#xff08;LSTM&#xff09;处理序列依赖关系 4.3 注意力机制&#xff08;Attention&#xff09; 5…

MySQL 学习记录 2

原文&#xff1a;https://blog.iyatt.com/?p13818 13 存储引擎 查看一下前面创建的一张表的创建语句&#xff0c;当时并没有显式指定引擎&#xff0c;MySQL 自动指定的 InnoDB&#xff0c;即默认引擎是这个。 创建表的时候要显式指定引擎可以参考这个语句 查看当前 MySQL …

【牛客】SQL130 试卷发布当天作答人数和平均分

描述 现有用户信息表user_info&#xff08;uid用户ID&#xff0c;nick_name昵称, achievement成就值, level等级, job职业方向, register_time注册时间&#xff09;&#xff0c;示例数据如下&#xff1a; iduidnick_nameachievementleveljobregister_time11001牛客1号31007算…

rke方式安装k8s集群

一、新机环境准备 1.1主机名设置 hostnamectl set-hostname XXX1.2 主机名与ip地址解析 vim /etc/hosts 192.168.0.140 rke 192.168.0.147 master1 192.168.0.152 node1 192.168.0.153 node21.3安装docker tar -xf docker-20.10.24.tgz cp ${SHELL_FOLDER}/docker/*…

【java】19:内部类(3)

成员内部类&#xff1a; 1.可以直接访问外部类的所有成员&#xff0c;包含私有的 class Outer01{//外部类 private int n1 10; public String name "张三"; class Innter01{ public void say0(){ System.out.println("Outer01 的n1 " n1 " outer…

JWT基于Cookie的会话保持,并解决CSRF问题的方案

使用JWT进行浏览器接口请求&#xff0c;在使用Cookie进行会话保持传递Token时&#xff0c;可能会存在 CSRF 漏洞问题&#xff0c;同时也要避免在产生XSS漏洞时泄漏Token问题&#xff0c;如下图在尽可能避免CSRF和保护Token方面设计了方案。 要点解释如下&#xff1a; 将JWT存入…

Snagit 2024:让你的屏幕活动瞬间变得生动有力 mac/win版

Snagit 2024 屏幕录制与截图软件是一款功能强大的工具&#xff0c;专为现代用户设计&#xff0c;以满足他们在工作、学习和娱乐中对屏幕内容捕捉和分享的需求。这款软件结合了屏幕录制和截图功能&#xff0c;为用户提供了一种高效、便捷的方式来捕捉屏幕上的精彩瞬间。 Snagit…

xxl-job--01--简介

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.xxl-job1. 1 发展历史1.2 XXL-JOB的系统架构1.3 xxl-job与其他框架对比 2. XXL-JOB的使用2.1 准备工作- 配置调度中心XXL-JOB的数据表 2.2 配置执行器1 引入依赖包…

vue cesium加载点与定位到指定位置

vue cesium定位到指定位置 window.viewer.camera.flyTo({destination: Cesium.Cartesian3.fromDegrees(point.longDeg, point.latDeg, 6500000), orientation: {heading: 6.2079384332084935, roll: 0.00031509431759868534, pitch: -1.535}, duration: 3})vue cesium加载点 …

【蛀牙】如何选择牙刷,牙膏和牙杯(含其他日常牙具:牙线,漱口水,冲牙器)

程序员生活指南之 【蛀牙】如何选择牙刷&#xff0c;牙膏和牙杯&#xff08;含其他日常牙具&#xff1a;牙线&#xff0c;漱口水&#xff0c;冲牙器&#xff09; 文章目录 一、如何选择牙刷&#xff0c;牙膏和牙杯1、如何选择牙刷2、如何选择牙膏3、如何选择牙杯 二、日常牙具&…

为什么HashMap的键值可以为null,而ConcurrentHashMap不行?

写在开头 今天在写《HashMap很美好&#xff0c;但线程不安全怎么办&#xff1f;ConcurrentHashMap告诉你答案&#xff01;》这篇文章的时候&#xff0c;漏了一个知识点&#xff0c;知道晚上吃饭的时候才凸显想到&#xff0c;关于ConcurrentHashMap在存储Key与Value的时候&…

【Java】面向对象之多态超级详解!!

文章目录 前言一、多态1.1 多态的概念1.2 多态的实现条件1.3 重写1.3.1方法重写的规则1.3.2重写和重载的区别 1.4 向上转型和向下转型1.4.1向上转型1.4.2向下转型 1.5 多态的优缺点1.5.1 使用多态的好处1.5.2 使用多态的缺陷 结语 前言 为了深入了解JAVA的面向对象的特性&…