一、问题描述
部署了1.15服务端 canal.deployer-1.1.5 用于监听mysql的binlog日志,同时在项目中集成了canal client,用于在监听到指定数据表变化时自定义写入es。
服务端配置:
客户端配置:
public void run() {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getDeployerIp(), canalConfig.getPort()),canalConfig.getDestination(), "test", "123456");try {// 打开连接connector.connect();log.info("=====connector.connect()连接成功======");// 订阅数据库表,来覆盖服务端初始化时的设置connector.subscribe("nlc.works");// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);// 获取批量IDlong batchId = message.getId();// 获取批量的数量int size = message.getEntries().size();// 如果没有数据if (batchId == -1 || size == 0) {try {// 线程休眠1秒Thread.sleep(1000);
// log.info("=============暂无数据同步===============");} catch (InterruptedException e) {e.printStackTrace();}} else {// 如果有数据,处理数据
// log.info("canal同步进行中...");parseEntry(message.getEntries());}// 进行 batch id 的确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
报错信息如下:
com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong when doing authentication: auth failed for user:testat com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:192)at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)at com.cxstar.business.es.canal.CanalClient.run(CanalClient.java:63)at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
问题分析
这个错误表明在 Canal 客户端的身份验证过程中发生了问题,导致用户 “test” 的身份验证失败,从而无法连接到 Canal 服务器。但是服务端和客户端的账户密码是一致的,不应该会有此问题。
问题排查
下载源码进行debug,可以看到传入的参数是进行过加密的,并且把服务端的密码进行相同的加密后,两个密码字符串是一致的。说明报错问题就在于红框出的代码!
源码修改:注释掉原有的密码校验的方法,直接使用相同的加密方法验证密码是否一致
public boolean auth(String user, String passwd, byte[] seed) {// 如果user/passwd密码为空,则任何用户账户都能登录if ((StringUtils.isEmpty(this.user) || StringUtils.equals(this.user, user))) {if (StringUtils.isEmpty(this.passwd)) {return true;} else if (StringUtils.isEmpty(passwd)) {// 如果server密码有配置,客户端密码为空,则拒绝return false;}try {// 自己加的代码:对于服务端的密码进行与客户端相同的加密规则后的内容String serverPasswd = SecurityUtil.byte2HexStr(SecurityUtil.scramble411(this.passwd.getBytes(), seed));return org.apache.commons.lang3.StringUtils.equals(passwd, serverPasswd);/// 注释原因:源码中的密码比较方法存在问题//byte[] passForClient = SecurityUtil.hexStr2Bytes(passwd);// return SecurityUtil.scrambleServerAuth(passForClient, SecurityUtil.hexStr2Bytes(this.passwd), seed);} catch (NoSuchAlgorithmException e) {return false;}}return false;}
打包替换,注意是在server目录下打包!
替换重启后解决用户认证问题