欢迎收藏关注点赞, 持续输出CDC、debezium、flinkcdc内容,比心
代码仓库地址:https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java
代码版本debezium2.2
所属项目:debezium-core
代码位置:io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(ChangeEventSourceContext, O, SnapshotContext<P, O>, SnapshottingTask)
@Overridepublic SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousOffset,SnapshotContext<P, O> snapshotContext, SnapshottingTask snapshottingTask)throws Exception {final RelationalSnapshotContext<P, O> ctx = (RelationalSnapshotContext<P, O>) snapshotContext;Connection connection = null;try {LOGGER.info("Snapshot step 1 - Preparing");if (previousOffset != null && previousOffset.isSnapshotRunning()) {LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");}connection = createSnapshotConnection();connectionCreated(ctx);LOGGER.info("Snapshot step 2 - Determining captured tables");// Note that there's a minor race condition here: a new table matching the filters could be created between// this call and the determination of the initial snapshot position below; this seems acceptable, thoughdetermineCapturedTables(ctx);snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, ctx.capturedTables);LOGGER.info("Snapshot step 3 - Locking captured tables {}", ctx.capturedTables);if (snapshottingTask.snapshotSchema()) {lockTablesForSchemaSnapshot(context, ctx);}LOGGER.info("Snapshot step 4 - Determining snapshot offset");determineSnapshotOffset(ctx, previousOffset);LOGGER.info("Snapshot step 5 - Reading structure of captured tables");readTableStructure(context, ctx, previousOffset);if (snapshottingTask.snapshotSchema()) {LOGGER.info("Snapshot step 6 - Persisting schema history");createSchemaChangeEventsForTables(context, ctx, snapshottingTask);// if we've been interrupted before, the TX rollback will cause any locks to be releasedreleaseSchemaSnapshotLocks(ctx);}else {LOGGER.info("Snapshot step 6 - Skipping persisting of schema history");}if (snapshottingTask.snapshotData()) {LOGGER.info("Snapshot step 7 - Snapshotting data");createDataEvents(context, ctx);}else {LOGGER.info("Snapshot step 7 - Skipping snapshotting of data");releaseDataSnapshotLocks(ctx);ctx.offset.preSnapshotCompletion();ctx.offset.postSnapshotCompletion();}postSnapshot();dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset);return SnapshotResult.completed(ctx.offset);}finally {rollbackTransaction(connection);}}
简单介绍:
step1-3,准备快照连接,
step1 创建快照所需要用的jdbc连接
step2 确认/验证采集表(capture tables),通过jdbc连接获取数据库中表和配置采集表进行对比验证
step3 锁定表,锁定表有两个目的,确保快照数据对应的当前schema的一致性(step 5),快照数据完成后转为实时数据的日志位置(step 4)
step 4-7 收集快照信息
setp4 确认快照日志位置
step5 确认快照数据结构
step6 准备/跳过快照结构数据,history schema
step7 准备/跳过快照数据, 快照数据读取结果对应数据op为r