【calcite】calcite实现SQL列级数据血缘 data lineage 查询

一、背景

大数据数据血缘,内部实现十分复杂一般需要依赖框架。calcite作为apache顶级项目,且为java体系成员,被多个项目所使用,如flink,spark,kafka等。calcite 对mysql,oracle,postgres和其他大数据平台支持较好,对sqlserver支持较差,没有看到sqlserver相关的代码。
另,python系推荐使用sqlglot,datahub采用。

calcite官方文档

二、 实现方式

gradle添加依赖:

dependencies {testImplementation('org.apache.calcite:calcite-core:1.32.0')
}

以下均有scala语言实现,并使用Mysql5.7测试完成:

drop table if exists test.st01;
CREATE TABLE test.st01(
s_id BIGINT comment '主键',
s_name VARCHAR(20)  comment '姓名',
s_age INT comment '年龄',
s_sex VARCHAR(10) comment '性别',
s_part  VARCHAR(10) comment '分区字段',
ts TIMESTAMP comment '创建时间'
);
insert into test.hive_st01 values(1,'zhangsan',10,'male','student','2020-01-01 18:01:01.666');
insert into test.hive_st01 values(2,'lisi',66,'female','teacher','2020-01-01 10:01:01.666');
insert into test.hive_st01 values(3,'sunlirong',50,'male','student','2020-01-01 10:01:01.666');
insert into test.hive_st01 values(4,'laoliu',38,'female','teacher','2020-01-01 10:01:01.666');create table test.st02 like test.st01;
insert into test.hive_st02 values(2,'wangwu',66,'male','teacher','2020-01-01 10:01:01.666');
insert into test.hive_st02 values(3,'zhaoliu',66,'female','student','2020-01-01 10:01:01.666');create table test.st03 like test.st01;

先是设置好两个sql语句:

  /*** 简单测试*/val MYSQL_SQL1 ="""|select * from `st01` where 1=1|""".stripMargin/*** 测试内容:1、insert into 2、mysql非标准sql函数CONCAT 3、join 4、where*/val MYSQL_SQL2 ="""|insert into `test`.`st03`|select s_id,combined_name s_name,s_age,s_sex,s_part,ts from (|select|a.s_id as s_id|,CONCAT(a.s_name,'-',b.s_name) as combined_name|,a.s_age+b.s_age as s_age|,a.s_sex as s_sex|,'none' as s_part|,current_timestamp as ts|from `test`.`st01` a inner join `test`.`st02` b on a.s_id=b.s_id|where a.s_sex='male'|) t0 order by ts limit 2|""".stripMargin.trim

初始化数据库连接参数:

  val MYSQL_DATABASE = "test"val MYSQL_USERNAME = "root"val MYSQL_PASSWORD = "你的密码"val MYSQL_JDBC_URL = s"jdbc:mysql://192.168.100.100:3306/${MYSQL_DATABASE}?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true"

依赖:

import org.apache.calcite.config.{Lex}
import org.apache.calcite.sql.{SqlBasicCall, SqlIdentifier, SqlNode, SqlNumericLiteral,  SqlSelect}
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.adapter.jdbc.JdbcSchema
import org.apache.calcite.jdbc.{CalciteConnection}
import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.rel.{ RelRoot}
import org.apache.calcite.rel.metadata.{RelColumnOrigin, RelMetadataQuery}
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.server.CalciteServerStatement
import org.apache.calcite.sql.fun.{SqlLibrary, SqlLibraryOperatorTableFactory}
import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidator}
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, Planner}
import org.apache.commons.dbcp2.BasicDataSourceimport java.sql.{Connection, DriverManager, ResultSet}
import org.scalatest.funsuite.AnyFunSuiteimport java.util
import scala.collection.JavaConversions._
import scala.collection.mutable
// 此处就是根据表结构设置,写死def printStandardRs(rs: ResultSet): Unit = {while (rs.next()) {val s_id = rs.getLong(1)val s_name = rs.getString(2)val s_age = rs.getInt(3)val s_sex = rs.getString(4)val s_part = rs.getString(5)val ts = rs.getTimestamp(6)println(s"Get result s_id:${s_id},s_name:${s_name},s_age:${s_age},s_sex:${s_sex},s_part:${s_part},ts:${ts}")}}
// 打印RelRoot内部含有的列的血缘信息def printRelRoot(relRoot: RelRoot): Unit = {val fields = relRoot.fieldsval mq: RelMetadataQuery = relRoot.rel.getCluster.getMetadataQueryfor (index <- 0 until fields.size()) {val destFieldName = fields(index).getValueval origins: util.Set[RelColumnOrigin] = mq.getColumnOrigins(relRoot.rel, index)if (origins != null && origins.nonEmpty) {val oriStr = origins.map(ori => {val depTbl: RelOptTable = ori.getOriginTable// val depTblSchema = depTbl.getRelOptSchemaval fieldNames = depTbl.getRowType.getFieldNames.toSeqval depColOrd: Int = ori.getOriginColumnOrdinalval depFldName: String = fieldNames(depColOrd)val qualifiers: mutable.ListBuffer[String] = mutable.ListBuffer.emptyqualifiers.addAll(depTbl.getQualifiedName)qualifiers.add(depFldName)qualifiers.mkString(".")}).mkString(",")println(s"${destFieldName} <- ${oriStr}")}}}

本文都是使用calcite 的RelMetadataQuery 类,提供的血缘信息查询。
需要注意的是
(1)都需要提供数据库名db名.tbl名否则,无法找到表,因其是从SchemaPlus提供的元数据信息找表。
(2)create table as无法被planner识别,如果需要分析create table as的血缘可能需要自己写正则拆分sql,把select部分单独提取出来,再进行识别。

2.1 通用版本

    var sql = MYSQL_SQL2// 具体连接参数参考:org.apache.calcite.config.CalciteConnectionProperty// 具体sql functions参考:org.apache.calcite.sql.fun.SqlLibraryOperatorsval conn = DriverManager.getConnection("jdbc:calcite:fun=mysql;lex=MYSQL;model=inline:" + getDefaultMysqlConnConfig)val stmt = conn.createStatement()try {val rs = stmt.executeQuery(sql)printStandardRs(rs)rs.close()} catch {case ex: Exception => println(ex.getMessage)}val ccStmt = conn.createStatement().unwrap(classOf[CalciteServerStatement])val cxt = ccStmt.createPrepareContext()val mysqlValidateConfig: SqlValidator.Config = SqlValidator.Config.DEFAULT.withConformance(SqlConformanceEnum.MYSQL_5)// 获取parse config,用于planner.parseval mysqlParseConfig = SqlParser.config().withLex(Lex.MYSQL).withConformance(SqlConformanceEnum.MYSQL_5)// 获取OperatorTable,operator操作符集合,用于planner.validate// 方法一、不含 sql funcs,不可用// val calciteCatalogReader=new CalciteCatalogReader(cxt.getRootSchema,List(MYSQL_DATABASE),cxt.getTypeFactory,CalciteConnectionConfig.DEFAULT)// 方法二、所有内置的 sql funcs,不推荐使用// val sqlFuncs: Seq[SqlFunction] = classOf[SqlLibraryOperators].getFields.toSeq.map(f => f.get(null)).filter(v => v.isInstanceOf[SqlFunction]).map(f=>f.asInstanceOf[SqlFunction])// val sqlOperatorTable=SqlOperatorTables.of(sqlFuncs)// 方法三、推荐使用,使用扫描注解的方式加载,类SqlLibraryOperators中各个方法都有注解// 必须有SqlLibrary.STANDARD,否则 “=”都无法识别。val mysqlOperatorTable = SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(util.EnumSet.of(SqlLibrary.STANDARD, SqlLibrary.MYSQL))val frameworkConfig = Frameworks.newConfigBuilder().parserConfig(mysqlParseConfig).defaultSchema(cxt.getRootSchema.plus()).sqlValidatorConfig(mysqlValidateConfig).operatorTable(mysqlOperatorTable).build()val planner = Frameworks.getPlanner(frameworkConfig)val parsedNode = planner.parse(sql)val validatedNode = planner.validate(parsedNode)val relRoot: RelRoot = planner.rel(validatedNode)// println(s"get RelNode:${relRoot}")printRelRoot(relRoot)stmt.close()conn.close()

2.2 代码版本

区别就是此处使用代码构建dataSource和SchemaPlus

  case class CalciteConn(conn: Connection, schema: SchemaPlus)def getCalciteMysqlConn(): CalciteConn = {Class.forName("com.mysql.cj.jdbc.Driver")val dataSource = new BasicDataSourcedataSource.setUrl(MYSQL_JDBC_URL)dataSource.setUsername(MYSQL_USERNAME)dataSource.setPassword(MYSQL_PASSWORD)Class.forName("org.apache.calcite.jdbc.Driver")val connection = DriverManager.getConnection("jdbc:calcite:fun=mysql;lex=MYSQL")val calciteConnection = connection.unwrap(classOf[CalciteConnection])val rootSchema: SchemaPlus = calciteConnection.getRootSchemaval schema: JdbcSchema = JdbcSchema.create(rootSchema, MYSQL_DATABASE, dataSource, null, MYSQL_DATABASE)rootSchema.add(MYSQL_DATABASE, schema)CalciteConn(calciteConnection, rootSchema)}
    var targetSql = MYSQL_SQL2println(s"get sql:\n${targetSql}")var parserConfig: SqlParser.Config = SqlParser.config()parserConfig = parserConfig.withLex(Lex.MYSQL).withConformance(SqlConformanceEnum.MYSQL_5)val calciteConn = getCalciteMysqlConn()val rootSchema = calciteConn.schemaval schemaList = new java.util.ArrayList[String]schemaList.add(MYSQL_DATABASE)val mysqlOperatorTable = SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(SqlLibrary.STANDARD, SqlLibrary.MYSQL)val mysqlConfig: SqlValidator.Config = SqlValidator.Config.DEFAULT.withConformance(SqlConformanceEnum.MYSQL_5).withLenientOperatorLookup(true)val frameworkConfig: FrameworkConfig = Frameworks.newConfigBuilder.defaultSchema(rootSchema).operatorTable(mysqlOperatorTable).parserConfig(parserConfig).sqlValidatorConfig(mysqlConfig).build()val planner = Frameworks.getPlanner(frameworkConfig)val sqlNode: SqlNode = planner.parse(targetSql)val validatedNode: SqlNode = planner.validate(sqlNode)val relRoot: RelRoot = planner.rel(validatedNode)printRelRoot(relRoot)

最终打印结果:

s_id <- test.hive_st01.s_id
s_name <- test.hive_st02.s_name,test.hive_st01.s_name
s_age <- test.hive_st02.s_age,test.hive_st01.s_age
s_sex <- test.hive_st01.s_sex

参考文章:

基于Calcite解析Flink SQL列级数据血缘

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/12094.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SHELL-双重循环习题练习

1.99乘法表 #!/bin/bash #99乘法表for ((second1; second<9; second)) dofor ((first1; first<second; first))do echo -n -e "${first}*${second}$[first*second]\t" done echo done ######### 首先定义了一个外循环变量second&#xff0c;初始值为1&am…

AI 情感聊天机器人工作之旅 —— 与复读机问题的相遇与别离

前言&#xff1a;先前在杭州的一家大模型公司从事海外闲聊机器人产品&#xff0c;目前已经离职&#xff0c;文章主要讨论在闲聊场景下遇到的“复读机”问题以及一些我个人的思考和解决方案。文章内部已经对相关公司和人员信息做了去敏&#xff0c;如仍涉及到机密等情况&#xf…

linux学习:多媒体开发库SDL+视频、音频、事件子系统+处理yuv视频源

目录 编译和移植 视频子系统 视频子系统产生图像的步骤 api 初始化 SDL 的相关子系统 使用指定的宽、高和色深来创建一个视窗 surface 使用 fmt 指定的格式创建一个像素点​编辑 将 dst 上的矩形 dstrect 填充为单色 color​编辑 将 src 快速叠加到 dst 上​编辑 更新…

连锁收银系统源代码有哪些功能,进销存+收银+会员+门店补货+线上商城

在现代零售行业&#xff0c;高效的管理系统是保持连锁店运营顺畅的关键。而开源连锁收银系统作为一款功能丰富的管理软件&#xff0c;为零售企业提供了全面的解决方案&#xff0c;涵盖了进销存管理、收银、会员、门店补货以及线上商城等多个方面&#xff0c;帮助企业实现精细化…

C语言判断字符旋转

前言 今天我们使用c语言来写代码来实现字符串选择的判断&#xff0c;我们来看题目 题目描述 写一个函数&#xff0c;判断一个字符串是否为另外一个字符串旋转之后的字符串。 例如&#xff1a;给定s1 AABCD和s2 BCDAA&#xff0c;返回1 给定s1abcd和s2ACBD&#xff0c;返回0. A…

想白嫖?音视频的文本提取和总结?NoteGPT满足你

NoteGPT实现了音频、录音以及视频的AI总结 NoteGPT最近做了一个功能&#xff1a;Audio Summary&#xff08;Audio Summary with AI - NoteGPT&#xff09; 1&#xff09;完全免费&#xff1b; 2&#xff09;支持mp3、mp4&#xff1b; 3&#xff09;支持URL和本地上传&…

数据特征降维 | 线性判别分析(LDA)附Python代码

线性判别分析(Linear Discriminant Analysis,LDA)是一种经典的监督学习方法,主要用于降维和模式识别任务。与主成分分析(PCA)不同,LDA考虑了类别信息,旨在找到一个投影,使得同类样本尽可能接近,不同类样本尽可能分开。 以下是LDA的基本步骤: 数据准备:收集带有类…

【UE Niagara】在UI上生成粒子

效果 步骤 1. 在虚幻商城中将“Niagara UI Render”插件安装到引擎 2. 打开虚幻编辑器&#xff0c;勾选插件“Niagara UI Renderer”&#xff0c;然后重启编辑器 3. 先创建一个控件蓝图&#xff0c;该控件蓝图只包含一个按钮 这里设置尺寸框尺寸为200*50 4. 显示该控件 5. 新…

Excel——项目管理,设置时间到期自动提醒及颜色高亮

效果图 第一步、自动获取合同到期日期 1、首先合同【签约日期】和【到期日期】下面的数据必须是日期格式&#xff0c;不能是其它的格式否则无法计算&#xff0c;如果是其它格式需要转换成标准的日期格式&#xff0c;如下图所示。 2、在“到期日期”下面的第一个单元格中输入公…

MySQL深入理解事务(详解)

事务概述 事务是数据库区别于文件系统的重要特性之一&#xff0c;当我们有了事务就会让数据库始终保持一致性&#xff0c;同时我们还能通过事务机制恢复到某个时间点&#xff0c;这样可以保证已提交到数据库的修改不会因为系统崩溃而丢失。 1、基本概念 事务&#xff1a;一组…

如何让机器理解人类语言?Embedding技术详解

如何让机器理解人类语言&#xff1f;Embedding技术详解 文章目录 如何让机器理解人类语言&#xff1f;Embedding技术详解介绍什么是词嵌入&#xff1f;什么是句子嵌入&#xff1f;句子嵌入模型实现句子嵌入的方法值得尝试的句子嵌入模型 句子嵌入库实践Step 1Step 2Step 3 Doc2…

GBJ3510-ASEMI室内空调机GBJ3510

编辑&#xff1a;ll GBJ3510-ASEMI室内空调机GBJ3510 型号&#xff1a;GBJ3510 品牌&#xff1a;ASEMI 封装&#xff1a;GBJ-4 最大重复峰值反向电压&#xff1a;1000V 最大正向平均整流电流(Vdss)&#xff1a;35A 功率(Pd)&#xff1a;中小功率 芯片个数&#xff1a;4…

股东那些事儿:解锁企业背后的权力玩家与盈利秘籍

Hello&#xff0c;大家好啊&#xff0c;今天咱们要聊的主角&#xff0c;是每个企业背后不可或缺的隐形巨擘——股东。他们是谁&#xff1f;他们怎样从公司的经营中分一杯羹&#xff1f;又如何在商业棋盘上运筹帷幄&#xff1f;搬好小板凳&#xff0c;咱们这就开启股东世界的探秘…

Node.js 学习笔记 express框架

express express 使用express下载express 初体验 express 路由什么是路由1路由的使用验证的方法 2获取请求报文参数3获取路由参数4响应设置响应报文 express 中间件5中间件全局中间件路由中间件 6静态资源中间件注意事项案例 7请求体数据8防盗链实现防盗链 9路由模块化router E…

【python】文件操作(持续更新)

1.替换文件名后缀 from pathlib import Pathpdf_path "example.pdf" docx_path Path(pdf_path).with_suffix(.docx) print(docx_path) # 输出: example.docx

亚信安慧AntDB:软硬兼施,创造更多可能

亚信安慧AntDB是一款备受推崇的数据库系统&#xff0c;其极具适配能力使其在软硬件融合方面具备出色表现。无论是与国产硬件还是软件系统配合&#xff0c;AntDB都能实现高效稳定的运行状态&#xff0c;为用户提供优质的数据库服务。从上游到下游的领域&#xff0c;AntDB都展现出…

流量分析利器arkime的学习之路(四)---注意点

用了几天之后会出现系统问题&#xff0c;有些服务器启动不了了&#xff0c;一看原来硬盘满了 [rootc79 ~]# df -h 文件系统 容量 已用 可用 已用% 挂载点 devtmpfs 1.9G 0 1.9G 0% /dev tmpfs 1.9G 0 1.9G…

Java——类和对象第二节——封装

1.什么是封装 封装是面向对象程序的三大特性之一&#xff0c;面向对象程序的三大特性分别是封装&#xff0c;继承&#xff0c;多态 而封装简单来说就是套壳隐藏细节 打个比方&#xff1a; 在一些电脑厂商生产电脑时&#xff0c;仅对用户提供开关机键&#xff0c;键盘输入&a…

Ubuntu20.04中的Pyqt4

如何在ubuntu20.04中安装Pyqt4 我已经尝试了所有的命令&#xff1a; sudo apt-get install python-qt4 sudo apt-get install libqt4-dev sudo apt-get install pyqt4-dev-tools sudo apt-get install pyqt4.qsci-dev sudo apt install python3-pyqt4 尽管pyqt5运行得很流畅…

瑞友科技质量改进服务事业部总经理张力受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 北京瑞友科技股份有限公司质量改进服务事业部总经理张力先生受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“PMO如何对接战略成为企业IT投资成功的有效保障”。大会将于6月29-30日在北京举办&#xff0c;敬请关注&#x…