更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。
一、错误展现:
Caused by: io.debezium.connector.oracle.logminer.parser.DmlParserException: Failed to parse insert DML: 'insert into "HIS_DATA". at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parseInsert(LogMinerDmlParser.java:109)at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parse(LogMinerDmlParser.java:73)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.parseDmlStatement(AbstractLogMinerEventProcessor.java:1078)... 16 common frames omitted
Caused by: java.lang.ArrayIndexOutOfBoundsException: 74at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parseColumnListClause(LogMinerDmlParser.java:239)at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parseInsert(LogMinerDmlParser.java:100)... 18 common frames omitted
二、问题原因:
通过分析源代码,发现debezium-connector-oracle-1.9.7.Final中的方法存在问题,造成数组越界,一般方式场景在:oracle某张表表结构发生新增字段,然后flink cdc的任务加载的还是旧的表字段信息,当被监控表业务场景写入数据的时候,由于sql里面已经带了新增字段,但是table里面的字段还是旧的,造成解析错误,源代码如下:
private int parseColumnListClause(String sql, int start, String[] columnNames) {int index = start;boolean inQuote = false;for(int var6 = 0; index < sql.length(); ++index) {char c = sql.charAt(index);if (c == '(' && !inQuote) {start = index + 1;} else {if (c == ')' && !inQuote) {++index;break;}if (c == '"') {if (inQuote) {inQuote = false;columnNames[var6++] = sql.substring(start + 1, index);start = index + 2;} else {inQuote = true;}}}}return index;}
三、解决方案:
解决思路:重写io.debezium.connector.oracle.logminer.parser下的LogMinerDmlParser类中的方法即可,方案有两种:
方案1:如果新增字段的内容不需要,比如:只需获取主键信息及对应的值,那就忽略掉新增的字段,新增字段内容不包含在解析后的数据中。那只需要调整一下代码即可:
在代码中新建一个包路径为:io.debezium.connector.oracle.logminer.parser,
复制源码路径下的LogMinerDmlParser类中内容:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//package io.debezium.connector.oracle.logminer.parser;import io.debezium.DebeziumException;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.relational.Column;
import io.debezium.relational.Table;public class LogMinerDmlParser implements DmlParser {private static final String NULL_SENTINEL = "${DBZ_NULL}";private static final String NULL = "NULL";private static final String INSERT_INTO = "insert into ";private static final String UPDATE = "update ";private static final String DELETE_FROM = "delete from ";private static final String AND = "and ";private static final String OR = "or ";private static final String SET = " set ";private static final String WHERE = " where ";private static final String VALUES = " values ";private static final String IS_NULL = "IS NULL";private static final String UNSUPPORTED = "Unsupported";private static final String UNSUPPORTED_TYPE = "Unsupported Type";private static final int INSERT_INTO_LENGTH = "insert into ".length();private static final int UPDATE_LENGTH = "update ".length();private static final int DELETE_FROM_LENGTH = "delete from ".length();private static final int VALUES_LENGTH = " values ".length();private static final int SET_LENGTH = " set ".length();private static final int WHERE_LENGTH = " where ".length();public LogMinerDmlParser() {}public LogMinerDmlEntry parse(String sql, Table table) {if (table == null) {throw new DmlParserException("DML parser requires a non-null table");} else {if (sql != null && sql.length() > 0) {switch(sql.charAt(0)) {case 'd':return this.parseDelete(sql, table);case 'i':return this.pa