测试例子:
SELECT e.NAME, d.DEPT_NAME,d.DEPT_ID,EMP_ID,100+EMP_ID+100 FROM EMP e JOIN DEPT d ON e.DEPT_ID = d.DEPT_ID WHERE e.EMP_ID IN (SELECT EMP_ID FROM EMP WHERE DEPT_ID = 10)
代码示例:
package com.test;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.metadata.RelMetadataQuery;import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;public class SqlLineageExample {public static void main(String[] args) throws SQLException, ValidationException, RelConversionException, SqlParseException {// 创建 Calcite 连接Properties info = new Properties();info.setProperty("lex", "JAVA");CalciteConnection connection = (CalciteConnection) DriverManager.getConnection("jdbc:calcite:", info);SchemaPlus rootSchema = connection.getRootSchema();// 定义并添加自定义表到 schemarootSchema.add("EMP", new AbstractTable() {@Overridepublic RelDataType getRowType(RelDataTypeFactory typeFactory) {return typeFactory.builder().add("EMP_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 20).add("DEPT_ID", SqlTypeName.INTEGER).build();}});rootSchema.add("DEPT", new AbstractTable() {@Overridepublic RelDataType getRowType(RelDataTypeFactory typeFactory) {return typeFactory.builder().add("DEPT_ID", SqlTypeName.INTEGER).add("DEPT_NAME", SqlTypeName.VARCHAR, 20).build();}});// 创建 FrameworkConfigFrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(rootSchema).build();// 创建 PlannerPlanner planner = Frameworks.getPlanner(config);// 复杂 SQL 查询String sql = "SELECT e.NAME, d.DEPT_NAME,d.DEPT_ID,EMP_ID,100+EMP_ID+100 FROM EMP e JOIN DEPT d ON e.DEPT_ID = d.DEPT_ID " +"WHERE e.EMP_ID IN (SELECT EMP_ID FROM EMP WHERE DEPT_ID = 10)";// 解析 SQLSqlNode parsedNode = planner.parse(sql);// 校验 SQLSqlNode validatedNode = planner.validate(parsedNode);// 转换为关系代数树RelRoot relRoot = planner.rel(validatedNode);RelNode relNode = relRoot.project();// 打印字段来源printFieldLineage(relNode);}private static void printFieldLineage(RelNode relNode) {RelMetadataQuery mq = relNode.getCluster().getMetadataQuery();for (RelDataTypeField field : relNode.getRowType().getFieldList()) {Set<RelColumnOrigin> origins = mq.getColumnOrigins(relNode, field.getIndex());if (origins != null) {for (RelColumnOrigin origin : origins) {String fieldName = field.getName();String tableName = origin.getOriginTable().getQualifiedName().get(0);System.out.println("Field: " + fieldName + " comes from Table: " + tableName);}} else {System.out.println("Field: " + field.getName() + " origin unknown");}}}
}
结果输出:
打印结果如下:
Field: NAME comes from Table: EMP
Field: DEPT_NAME comes from Table: DEPT
Field: DEPT_ID comes from Table: DEPT
Field: EMP_ID comes from Table: EMP
Field: EXPR$4 comes from Table: EMP
依赖 JAR包
<dependencies><dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.34.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.36</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.36</version></dependency> </dependencies>