Neo4j实现表字段级血缘关系

需求背景

需要在前端页面展示当前表字段的所有上下游血缘关系,以进一步做数据诊断治理。大致效果图如下:
在这里插入图片描述首先这里解释什么是表字段血缘关系,SQL 示例:

CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;

如上 DDL 语句中,创建的 table_b 的 order_id 和 order_status 字段来源于 table_a,代表table_a 就是 table_b 的来源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游字段,它们之间就存在血缘关系。

INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;

如上 DML 语句中,table_c 的 order_id 字段来源于 table_a,而 order_status 来源于 table_b,表示 table_c 和 table_a、table_b 之间也存在血缘关系。

由上也可看出想要存储血缘关系,还需要先解析 sql,这块儿主要使用了开源项目 calcite 的解析器,这篇文章不再展开,本篇主要讲如何存储和如何展示

环境配置

参考另一篇:SpringBoot 配置内嵌式 Neo4j

Node 数据结构定义

因为要展示表的字段之间的血缘关系,所以直接将表字段作为图节点存储,表字段之间的血缘关系就用图节点之间的关系表示,具体 node 定义如下:

public class ColumnVertex {// 唯一键private String name;public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;}public String getCatalogName() {return Long.parseLong(name.split("\\.")[0]);}public String getDatabaseName() {return name.split("\\.")[1];}public String getTableName() {return name.split("\\.")[2];}public String getColumnName() {return name.split("\\.")[3];}
}

通用 Service 定义

public interface EmbeddedGraphService {// 添加图节点以及与上游节点之间的关系void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);// 寻找上游节点List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);// 寻找下游节点List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}

Service 实现

import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {@Resource private GraphDatabaseService graphDb;@Overridepublic void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {try (Transaction tx = graphDb.beginTx()) {tx.execute("MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"+ " MERGE (u)-[:UPSTREAM]->(c)",Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));tx.commit();}}@Overridepublic List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {List<ColumnVertex> result = new ArrayList<>();try (Transaction tx = graphDb.beginTx()) {Result queryResult =tx.execute("MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"+ " u.name AS name",Map.of("name", currentVertex.getName()));while (queryResult.hasNext()) {Map<String, Object> row = queryResult.next();result.add(new ColumnVertex().setName((String) row.get("name")));}tx.commit();}return result;}@Overridepublic List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {List<ColumnVertex> result = new ArrayList<>();try (Transaction tx = graphDb.beginTx()) {Result queryResult =tx.execute("MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"+ " d.name AS name",Map.of("name", currentVertex.getName()));while (queryResult.hasNext()) {Map<String, Object> row = queryResult.next();result.add(new ColumnVertex().setName((String) row.get("name")));}tx.commit();}return result;}
}

遍历图节点

实现逻辑:

  1. restful 接口入参:当前表(catalogName, databaseName, tableName)
  2. 定义返回给前端的数据结构,采用 nodes 和 edges 方式返回,然后前端再根据节点与边关系渲染出完整的血缘关系图;
public class ColumnLineageVO {List<ColumnLineageNode> nodes;List<ColumnLineageEdge> edges;
}public class ColumnLineageNode {private String databaseName;private String tableName;private List<String> columnNames;
}public class ColumnLineageEdge {private ColumnLineageEdgePoint source;private ColumnLineageEdgePoint target;
}public class ColumnLineageEdgePoint {private String databaseName;private String tableName;private String columnName;
}
  1. 查询表字段;
  2. 采用递归的方式,利用当前表字段遍历与当前表字段关联的所有上下游图节点;
  3. 将所有节点封装成 List ColumnLineageVO 返回给前端 。
public ColumnLineageVO getColumnLineage(Table table) {ColumnLineageVO columnLineageVO = new ColumnLineageVO();List<ColumnLineageNode> nodes = new ArrayList<>();List<ColumnLineageEdge> edges = new ArrayList<>();// DeduplicationSet<String> visitedNodes = new HashSet<>();Set<String> visitedEdges = new HashSet<>();Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();ColumnLineageNode currentNode =ColumnLineageNode.builder().databaseName(table.getDatabaseName()).tableName(table.getTableName()).type(TableType.EXTERNAL_TABLE.getDesc()).build();nodes.add(currentNode);visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());for (String columnName : table.getColumnNames()) {ColumnVertex currentVertex =new ColumnVertex(table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);traverseUpstreamColumnVertex(currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);traverseDownstreamColumnVertex(currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);}columnLineageVO.setNodes(nodes);columnLineageVO.setEdges(edges);return columnLineageVO;}private void traverseUpstreamColumnVertex(ColumnVertex currentVertex,List<ColumnLineageNode> nodes,List<ColumnLineageEdge> edges,Set<String> visitedNodes,Set<String> visitedEdges,Map<String, List<ColumnVertex>> cache) {List<ColumnVertex> upstreamVertices;if (cache.containsKey(currentVertex.getName())) {upstreamVertices = cache.get(currentVertex.getName());} else {upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);cache.put(currentVertex.getName(), upstreamVertices);}for (ColumnVertex upstreamVertex : upstreamVertices) {String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();if (!visitedNodes.contains(nodeKey)) {ColumnLineageNode upstreamNode =ColumnLineageNode.builder().databaseName(upstreamVertex.getDatabaseName()).tableName(upstreamVertex.getTableName()).type(TableType.EXTERNAL_TABLE.getDesc()).build();nodes.add(upstreamNode);visitedNodes.add(nodeKey);}String edgeKey =upstreamVertex.getDatabaseName()+ upstreamVertex.getTableName()+ upstreamVertex.getColumnName()+ currentVertex.getDatabaseName()+ currentVertex.getTableName()+ currentVertex.getColumnName();if (!visitedEdges.contains(edgeKey)) {ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);edges.add(edge);visitedEdges.add(edgeKey);}traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);}}private void traverseDownstreamColumnVertex(ColumnVertex currentVertex,List<ColumnLineageNode> nodes,List<ColumnLineageEdge> edges,Set<String> visitedNodes,Set<String> visitedEdges,Map<String, List<ColumnVertex>> cache) {List<ColumnVertex> downstreamVertices;if (cache.containsKey(currentVertex.getName())) {downstreamVertices = cache.get(currentVertex.getName());} else {downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);cache.put(currentVertex.getName(), downstreamVertices);}for (ColumnVertex downstreamVertex : downstreamVertices) {String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();if (!visitedNodes.contains(nodeKey)) {ColumnLineageNode downstreamNode =ColumnLineageNode.builder().databaseName(downstreamVertex.getDatabaseName()).tableName(downstreamVertex.getTableName()).type(TableType.EXTERNAL_TABLE.getDesc()).build();nodes.add(downstreamNode);visitedNodes.add(nodeKey);}String edgeKey =currentVertex.getDatabaseName()+ currentVertex.getTableName()+ currentVertex.getColumnName()+ downstreamVertex.getDatabaseName()+ downstreamVertex.getTableName()+ downstreamVertex.getColumnName();if (!visitedEdges.contains(edgeKey)) {ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);edges.add(edge);visitedEdges.add(edgeKey);}traverseDownstreamColumnVertex(downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/53245.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PB4引脚作GPIO上电高电平问题

问题说明 给旧项目debug&#xff0c;芯片是国民技术 N32G452VEL7 &#xff08;用起来跟32没多大差 包括PB4在内有多个引脚作为输出&#xff0c;默认低电平&#xff0c;在状态机内先输出高电平再回到低电平&#xff0c;来模拟按键的状态&#xff08;相当于按键按下松开后按键功…

计算机竞赛 基于大数据的时间序列股价预测分析与可视化 - lstm

文章目录 1 前言2 时间序列的由来2.1 四种模型的名称&#xff1a; 3 数据预览4 理论公式4.1 协方差4.2 相关系数4.3 scikit-learn计算相关性 5 金融数据的时序分析5.1 数据概况5.2 序列变化情况计算 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &…

词向量及文本向量

文章目录 引言1. 文本向量化2. one-hot编码3. 词向量-word2vec3.1 词向量-基于语言模型 4 词向量 - word2vec基于窗口4.1 词向量-如何训练 5. Huffman树6. 负采样-negative sampling7. Glove基于共现矩阵7.1 Glove词向量7.2 Glove对比word2vec 8. 词向量训练总结9. 词向量应用9…

论文解读 | ScanNet:室内场景的丰富注释3D重建

原创 | 文 BFT机器人 大型的、有标记的数据集的可用性是为了利用做有监督的深度学习方法的一个关键要求。但是在RGB-D场景理解的背景下&#xff0c;可用的数据非常少,通常是当前的数据集覆盖了一小范围的场景视图&#xff0c;并且具有有限的语义注释。 为了解决这个问题&#…

9.阿里Sentinel哨兵

1.Sentinel Sentinel&#xff08;哨兵&#xff09;是由阿里开源的一款流量控制和熔断降级框架&#xff0c;用于保护分布式系统中的应用免受流量涌入、超载和故障的影响。它可以作为微服务架构中的一部分&#xff0c;用于保护服务不被异常流量冲垮&#xff0c;从而提高系统的稳定…

系统上线安全测评需要做哪些内容?

电力信息系统、航空航天、交通运输、银行金融、地图绘画、政府官网等系统再正式上线前需要做安全测试。避免造成数据泄露从而引起的各种严重问题。 那么系统上线前需要做哪些测试内容呢&#xff1f;下面由我给大家介绍 1、安全机制检测-应用安全 身份鉴别 登录控制模块 应提供…

Linux:权限

目录 一、shell运行原理 二、权限 1.权限的概念 2.文件访问权限的相关设置方法 三、常见的权限问题 1.目录权限 2.umsk(权限掩码) 3.粘滞位 一、shell运行原理 1.为什么我们不是直接访问操作系统&#xff1f; ”人“不善于直接使用操作系统如果让人直接访问操作系统&a…

数据通信——TCP(三次握手及基础特性)

引言 TCP&#xff08;传输控制协议&#xff09;&#xff0c;不像之前的UDP那样&#xff0c;因为这个协议要将很多复杂的东西&#xff0c;所以这次的特性是简单的特性&#xff0c;后续会讲一些复杂难懂的知识&#xff0c;这次先说一些TCP明显的特性 面向连接 TCP提供了对连接的管…

构建高性能云原生大数据处理平台:融合人工智能优化数据分析流程

文章目录 架构要点优势与应用案例研究&#xff1a;基于云原生大数据平台的智能营销分析未来展望&#xff1a;大数据与人工智能的融合结论 &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏…

【STM32RT-Thread零基础入门】 6. 线程创建应用(线程挂起与恢复)

硬件&#xff1a;STM32F103ZET6、ST-LINK、usb转串口工具、4个LED灯、1个蜂鸣器、4个1k电阻、2个按键、面包板、杜邦线 文章目录 前言一、RT-Thread相关接口函数1. 挂起线程2. 恢复线程 二、程序设计1. car_led.c2.car_led.h3. main.c 三、程序测试总结 前言 在上一个任务中&a…

Linux操作系统--常用指令(文件目录类指令)

(1).pwd指令 功能:显示当前工作目录的绝对路径。 如果你使用cd命令进行切换的时候不知道到了哪里,就可以使用该指令输出路径查看。 (2).cd命令 功能:用于切换路径 语法: cd + 路径(路径可以指绝对路径,也可以是相对路径)

URL中传递JSON字符串

今天遇见了一个需求&#xff0c;从post请求中在url里传递json字符串&#xff0c; 就是路径?参数11那种情况 最后怎么解决的呢&#xff1f; 需要使用前端方法&#xff0c;先用JSON.stringify格式化成字符串&#xff0c;再用encodeURIComponent把JSON里面的符号转转为url支持的…

ppt如何转pdf文档?用这个方法可将ppt转pdf

在现代社会中&#xff0c;PPT(幻灯片)已成为一种常见的演示工具&#xff0c;被广泛应用于学术、商务、培训等领域。然而&#xff0c;PPT文件的使用和分享存在一些问题&#xff0c;例如文件格式不兼容、内容修改易被篡改等。为了解决这些问题&#xff0c;将PPT转换为PDF格式已成…

AI夏令营第三期用户新增挑战赛学习笔记

1、数据可视化 1.数据探索和理解&#xff1a;数据可视化可以帮助我们更好地理解数据集的特征、分布和关系。通过可视化数据&#xff0c;我们可以发现数据中的模式、异常值、缺失值等信息&#xff0c;从而更好地了解数据的特点和结构。2.特征工程&#xff1a;数据可视化可以帮助…

TinyVue - 华为云 OpenTiny 出品的企业级前端 UI 组件库,免费开源,同时支持 Vue2 / Vue3,自带 TinyPro 中后台管理系统

华为最新发布的前端 UI 组件库&#xff0c;支持 PC 和移动端&#xff0c;自带了 admin 后台系统&#xff0c;完成度很高&#xff0c;web 项目开发又多一个选择。 关于 OpenTiny 和 TinyVue 在上个月结束的华为开发者大会2023上&#xff0c;官方正式进行发布了 OpenTiny&#…

成都睿趣科技:抖音开网店前期的流程是什么

随着互联网的快速发展&#xff0c;电子商务成为了商业领域中的一大利器&#xff0c;而在电商领域中&#xff0c;抖音作为一个强大的平台&#xff0c;也吸引了众多商家的目光。然而&#xff0c;要在抖音上开设一家成功的网店&#xff0c;并不是一件简单的事情&#xff0c;需要经…

研磨设计模式day12命令模式

目录 定义 几个参数 场景描述 代码示例 参数化设置 命令模式的优点 本质 何时选用 定义 几个参数 Command&#xff1a;定义命令的接口。 ConcreteCommand:命令接口的实现对象。但不是真正实现&#xff0c;是通过接收者的功能来完成命令要执行的操作 Receiver&#x…

无涯教程-进程 - 信号(Signals)

信号是对进程的通知&#xff0c;指示事件的发生。信号也称为软件中断&#xff0c;无法预知其发生&#xff0c;因此也称为异步事件。 可以用数字或名称指定信号&#xff0c;通常信号名称以SIG开头。可用信号kill –l(列出信号名称为l)检查可用信号&#xff0c;如下所示- 无论何…

骨传导耳机和普通耳机哪个危害大?一文读懂骨传导耳机!

作为一个5年重度运动爱好者&#xff0c;常年跑步、爬山、骑行&#xff0c;入手过的各类耳机超30款&#xff0c;用真实体验告诉大家&#xff0c;骨传导耳机和普通耳机哪个危害大&#xff01; 首先大家要知道的是&#xff0c;不管什么类型的耳机&#xff0c;如说说音量过大&…

SpringBoot生成和解析二维码完整工具类分享(提供Gitee源码)

前言&#xff1a;在日常的开发工作当中可能需要实现一个二维码小功能&#xff0c;我参考了网上很多关于SpringBoot生成二维码的教程&#xff0c;最终还是自己封装了一套完整生成二维码的工具类&#xff0c;可以支持基础的黑白二维码、带颜色的二维码、带Logo的二维码、带颜色和…