前言
最近业务这边有个指标需要用到大数据这边的列式数据库进行处理,由于kettle不支持clickhouse数据源驱动,这里查了一下网上的相关资料,发现了一些别人开发好的驱动包,下载下来后使用效果不尽人意。总结下来有以下几个问题:
-
不支持schema目录展示
-
生成的DDL语句无法执行,右键预览数据报错
-
查询数据出现错误
注意:低版本的kettle即使装ClickHouse驱动包后也不一定支持ClickHouse数据库连接(具体受clickhouse的驱动包编译版本限制,目前自己测试的最低支持到kettle 7.1),只有高版本的kettle在安装ClickHouse驱动包后才支持ClickHouse数据库连接,因此这里使用的时比较稳定的9.4.0版本。
源码分析
综上所述,我基于上述问题进行了驱动包的改造,首先是无非基于schema进行层级预览,导致很多表都混合在一起,不方便查看,这里我研究了一下kettle的源码:
//DatabaseMeta.javapublic String[] getSchemas() throws KettleDatabaseException {ArrayList<String> catalogList = new ArrayList<>();ResultSet catalogResultSet = null;try {catalogResultSet = databaseMeta.getSchemas( getDatabaseMetaData() );// Grab all the catalog names and put them in an array listwhile ( catalogResultSet != null && catalogResultSet.next() ) {catalogList.add( catalogResultSet.getString( 1 ) );}} catch ( SQLException e ) {throw new KettleDatabaseException( "Error getting schemas!", e );} finally {try {if ( catalogResultSet != null ) {catalogResultSet.close();}} catch ( SQLException e ) {throw new KettleDatabaseException( "Error closing resultset after getting schemas!", e );}}if ( log.isDetailed() ) {log.logDetailed( "read :" + catalogList.size() + " schemas from db meta-data." );}return catalogList.toArray( new String[ catalogList.size() ] );
}
//DatabaeInterface.java//databaseMeta会通过相应的数据源接口类DatabaseInterface的自己的查询sechema方法进行查询//如果此方法没有被重写,那么就使用DatabaseMetaData自身的getSchemas()
default ResultSet getSchemas( DatabaseMetaData databaseMetaData, DatabaseMeta dbMeta ) throws SQLException {return databaseMetaData.getSchemas();
}
可以看到这里调用了jdk自带的DatabaseMetaData类
接着查看clickhouse自身的驱动包里的实现类,ClickhouseDatabaseMetaData.java
//ClickHouseDatabaseMetaData.java@Override
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {//可以看到这里有有个判断逻辑,如果是未配置useSchema参数,则返回空的目录//这下恍然大迷糊,原来是jdbc中少了相关参数if (!connection.getJdbcConfig().useSchema()) {return empty("TABLE_SCHEM String, TABLE_CATALOG Nullable(String)");}Map<String, String> params = Collections.singletonMap("pattern",ClickHouseChecker.isNullOrEmpty(schemaPattern) ? "'%'": ClickHouseValues.convertToQuotedString(schemaPattern));ResultSet rs = query(ClickHouseParameterizedQuery.apply("select name as TABLE_SCHEM, null as TABLE_CATALOG "+ "from system.databases where name like :pattern order by name", params));if (!connection.getJdbcConfig().isExternalDatabaseSupported()) {return rs;}return new CombinedResultSet(rs,query(ClickHouseParameterizedQuery.apply("select concat('jdbc(''', name, ''')') as TABLE_SCHEM, null as TABLE_CATALOG "+ "from jdbc('', 'SHOW DATASOURCES') where TABLE_SCHEM like :pattern order by name",params), true));
}
源码改造方案
那么直接就在jdbc中追加该参数,验证一下:
@Override
public String getURL(String hostname, String port, String databaseName) throws KettleDatabaseException {if (getAccessType() == DatabaseMeta.TYPE_ACCESS_ODBC) {return "jdbc:odbc:" + databaseName;} else if (getAccessType() == DatabaseMeta.TYPE_ACCESS_NATIVE) {String _hostname = hostname;String _port = port;String _databaseName = databaseName;String _SocketTimeOut = "?socket_timeout=3600000&databaseTerm=schema";if (Utils.isEmpty(hostname)) {_hostname = "localhost";}if (Utils.isEmpty(port) || port.equals("-1")) {_port = "";}if (Utils.isEmpty(databaseName)) {throw new KettleDatabaseException("必须指定数据库名称");}if (!databaseName.startsWith("/")) {_databaseName = "/" + databaseName;}return "jdbc:clickhouse://" + _hostname + (Utils.isEmpty(_port) ? "" : ":" + _port) + _databaseName + _SocketTimeOut;} else {throw new KettleDatabaseException("不支持的数据库连接方式[" + getAccessType() + "]");}
}
打包测试
将项目打包:
部署插件包
将打包好的jar拷贝到kettle的目录下面:
拷贝到pdi-ce-9.4.0.0-343\data-integration\plugins路径下面,进行解压:
结构如上所示
验证功能
重启kettle,配置clickhouse数据源进行验证:
测试连接功能
测试查看数据目录功能
这个时候已经可以通过schema进行查看相关数据库信息了。
测试一下数据预览和表结构关系
测试一下DDL功能
测试查询性能
62万条数据读取,连续测试3次查询,性能维持在4w/s左右
测试插入性能
100w条数据,写入性能测试3次,平均速度在4000/s
插件包下载地址
链接: https://pan.baidu.com/s/1OvTznq14EYGVd2mEIYO3yA 提取码: 9xim 复制这段内容后打开百度网盘手机App,操作更方便哦
也可后台私信我获取源码,自行编译打包。