spark-sql字段血缘实现

spark-sql字段血缘实现

背景

Apache Spark是一个开源的大数据处理框架,它提供了一种高效、易于使用的方式来处理大规模数据集。在Spark中,数据是通过DataFrame和Dataset的形式进行操作的,这些数据结构包含了一系列的字段(也称为列)。字段血缘是Spark中的一个关键概念,它帮助我们理解数据的来源和流向,从而更好地理解和控制数据处理过程。

字段血缘是指在数据处理过程中,一个字段的值是如何从源数据产生并传递给目标数据的。在Spark中,字段血缘是通过依赖关系进行管理的。每个字段都有一个或多个依赖关系,这些依赖关系定义了字段的值如何从其他字段或数据源产生。

前提

spark版本:2.4.3
使用语言:java+scala

技术实现

1. spark-sql的执行计划,了解如何实现字段血缘解析

在这里插入图片描述
一个sql会经历一些列的处理,最终生成spark-core的代码,提交到集群运行。
首先看一下一个简单的sql生成的逻辑执行计划长什么样子

insert into default.jy_test
select * from default.jy_test

未解析的逻辑执行计划:

'InsertIntoTable 'UnresolvedRelation `default`.`jy_test`, false, false
+- 'Project [*]+- 'UnresolvedRelation `default`.`jy_test`

解析后(analyzer)的逻辑执行计划:

InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- Project [id#0, name#1]+- SubqueryAlias `default`.`jy_test`+- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]

优化后(optimizer)的逻辑执行计划:

InsertIntoHiveTable `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id, name]
+- HiveTableRelation `default`.`jy_test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#0, name#1]

重点来了

  1. 所谓的逻辑执行计划就是一个树形结构
  2. 树形结构中的叶子结点,就是hive的表信息:库名、表名、字段信息,并且spark给每一个字段生成了一个唯一的id
  3. 树形结构中的非叶子只包含了字段信息,不包含库表信息

所以想要实现字段血缘,我们需要做的就是通过那个生成的唯一id去一层层的关联,当关联到叶子结点的时候,就找到了库名表名

2. 构建一颗与解析后的逻辑执行计划一模一样的树形结构

  1. 首先定义node对象,用来存放节点信息
public abstract class Node {private String name;private List<Column> columnList = new ArrayList<>();private List<Node> children = new ArrayList<>();private Node parentNode;private String graphId;
}
  1. 其次定义column对象,用来存放字段信息
public class Column {private String name;private Long exprId;private String ColumnType;private ArrayList<Column> child = new ArrayList<Column>();private String tableName;private String process;
}
  1. 根据spark-sql生成的逻辑执行计划,我们为每一个逻辑节点创建对应的结点,由于结点很多,我这里直接给个截图,源码会在文章最后提供出来
    在这里插入图片描述

  2. 解析spark-sql生成的解析后的逻辑执行计划
    首先获取逻辑执行计划,这里提供两种方式:
    1.通过spark-session获取,该方法可以用来做测试,非常的方便

     LogicalPlan logicalPlan = spark.sessionState().sqlParser().parsePlan(sql);LogicalPlan analyzer = spark.sessionState().analyzer().execute(logicalPlan);
    

    2.通过QueryExecution获得,这里贴个图,详情看源码
    在这里插入图片描述

  3. 解析spark生成的analyzer,构建我们自己的树形结构
    这里贴一下主要的逻辑,使用scala去递归解析抽象语法树会方便很多

def resolveLogicPlan(plan: LogicalPlan, root: Node): Unit = {plan match {case plan: InsertIntoHadoopFsRelationCommand =>val node = root.asInstanceOf[Root]node.setName(NodeType.INSERTINTOHIVETABLE.getName)val database: String = plan.catalogTable.get.identifier.database.getOrElse("default")val table: String = plan.catalogTable.get.identifier.tableval fullTableName = database + "." + tableplan.catalogTable.get.schema.foreach { field => {val column = new Column()column.setName(field.name)column.setTableName(fullTableName)node.getColumnList.add(column)}}resolveLogicPlan(plan.query, node)case plan: SaveIntoDataSourceCommand =>val table: String = plan.options.get("dbtable").getOrElse("")val url: String = plan.options.get("url").getOrElse("")val user: String = plan.options.get("user").getOrElse("")val password: String = plan.options.get("password").getOrElse("")// 定义匹配数据库名称的正则表达式模式val pattern: Regex = ".*://[^/]+/(\\w+)".r// 使用正则表达式进行匹配val dbNameOption: Option[String] = pattern.findFirstMatchIn(url).map(_.group(1))val fullTableName = dbNameOption.getOrElse("") + "." + tableval node = root.asInstanceOf[Root]node.setName(NodeType.SAVEINTODATASOURCECOMMAND.getName)// 连接mysql,根据库名表明获取字段列表val fieldsList = getFieldsListFromMysql(url, user, password, table)fieldsList.foreach { field => {val column = new Column()column.setName(field)column.setTableName(fullTableName)node.getColumnList.add(column)}}resolveLogicPlan(plan.query, node)case plan: InsertIntoHiveTable =>val node = root.asInstanceOf[Root]node.setName(NodeType.INSERTINTOHIVETABLE.getName)val database: String = plan.table.identifier.database.getOrElse("default")val table: String = plan.table.identifier.tableval fullTableName = database + "." + tablenode.setTableName(fullTableName)plan.table.schema.foreach { field => {val column = new Column()column.setName(field.name)column.setTableName(fullTableName)node.getColumnList.add(column)}}resolveLogicPlan(plan.query, node)case plan: Aggregate =>val node = new AggregateNode()insertNodeColumnsFromNamedExpression(node, plan.aggregateExpressions)node.setParentNode(root)root.getChildren.add(node)resolveLogicPlan(plan.child, node)case plan: Project =>val node = new ProjectNode()insertNodeColumnsFromNamedExpression(node, plan.projectList)node.setParentNode(root)root.getChildren.add(node)resolveLogicPlan(plan.child, node)case plan: LogicalRelation =>val node = new LogicalRelationNode()dfsLogicalRelation(plan, node)node.setParentNode(root)root.getChildren.add(node)case plan: HiveTableRelation =>val node = new LogicalRelationNode()dfsLogicalRelation(plan, node)node.setParentNode(root)root.getChildren.add(node)case plan: Filter =>val node = new FilterNode()node.setParentNode(root)node.setCondition(plan.condition.toString)root.getChildren.add(node)resolveLogicPlan(plan.child, node)case plan: Join =>val node = new JoinNode()node.setName(plan.joinType.toString + " " + node.getName)node.setParentNode(root)node.setCondition(plan.condition.toString)root.getChildren.add(node)resolveLogicPlan(plan.left, node)resolveLogicPlan(plan.right, node)case plan: Window =>val node = new WindowNode()insertNodeColumnsFromNamedExpression(node, plan.windowExpressions)node.setParentNode(root)root.getChildren.add(node)resolveLogicPlan(plan.child, node)case plan: Union =>val node = new UnionNode()node.setParentNode(root)root.getChildren.add(node)plan.children.foreach(resolveLogicPlan(_, node))case plan: SubqueryAlias =>val node = new SubqueryNode()node.setName(node.getName + " " + plan.name.toString())node.setParentNode(root)root.getChildren.add(node)resolveLogicPlan(plan.child, node)case plan: Generate =>val node = new GenerateNode()processGenerate(plan, node)node.setParentNode(root)root.getChildren.add(node)resolveLogicPlan(plan.child, node)case _ =>plan.children.foreach(resolveLogicPlan(_, root))}}
  1. 到这里,我们已经得到了自己的树形结构。接下来要通过唯一id进行关联,补充库表信息。
    不知道大家注意没有,我们在node对象中有一个方法,

    在这里插入图片描述
    这里用到了访问者设计模式,感兴趣的同学可以学习一下,在spark-sql源码中,同样用的是访问者设计模式。

这里主要说一下Visitor的定义及方法:processColumn方法主要是拿自己的ExprId和所有孩子结点的ExprId比较,如果相等的话,说明是同一个字段,那就表名复制过来。

public interface Visitor {void visit(Node node);default void processColumn(Node node) {for (Column column1 : node.getColumnList()) {for (Node nd : node.getChildren()) {for (Column column2 : nd.getColumnList()) {processColumn(column1, column2);}}}}default void processColumn(Column column1, Column column2) {List<Column> child = column1.getChild();child.forEach(ch -> processColumn(ch, column2));if (column1.getExprId().equals(column2.getExprId())) {if(column2.getTableName() != null) {column1.setTableName(column2.getTableName());} else {column1.getChild().addAll(column2.getChild());}}}
}

LineageVisitor是Visitor的实现类, 主要用来做模式匹配,不同的结点处理方式会有不同,感兴趣的同学看一下这块的代码。

public class LineageVisitor implements Visitor{@Overridepublic void visit(Node node) {switch (node.getClass().getSimpleName()) {case "FilterNode" :case "SubqueryNode" :case "JoinNode" : copyChildColumnToThis(node); break;case "WindowNode" :case "GenerateNode" : copyChildColumnToThisWithProcess(node); break;case "Root" : processColumn((Root)node); break;case "UnionNode" : processColumn((UnionNode)node); break;default : processColumn(node);}}@Overridepublic void processColumn(Node node) {node.getChildren().forEach( child -> child.accept(this));Visitor.super.processColumn(node);}public void copyChildColumnToThis(Node node) {node.getChildren().forEach( child -> child.accept(this));for (Node child : node.getChildren()) {node.getColumnList().addAll(child.getColumnList());}}public void copyChildColumnToThisWithProcess(Node node) {node.getChildren().forEach( child -> child.accept(this));Visitor.super.processColumn(node);for (Node child : node.getChildren()) {node.getColumnList().addAll(child.getColumnList());}}public void processColumn(Root node) {node.getChildren().forEach( child -> child.accept(this));if(node.getColumnList().size() > 0) {for (int i = 0; i < node.getChildren().get(0).getColumnList().size(); i++) {for (Node child : node.getChildren()) {node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));}}}}public void processColumn(UnionNode node) {node.getChildren().forEach( child -> child.accept(this));int size = node.getChildren().get(0).getColumnList().size();for (Column column : node.getChildren().get(0).getColumnList()) {Column column1 = new Column();column1.setName(column.getName());column1.setExprId(column.getExprId());node.getColumnList().add(column1);}for (int i = 0; i < size; i++) {for (Node child : node.getChildren()) {node.getColumnList().get(i).getChild().add(child.getColumnList().get(i));}}}
}

成果展示

还是拿最开始的sql ,看一下最终生成的字段血缘

insert into default.jy_test select * from default.jy_test

在这里插入图片描述

最后

字段血缘实现起来还是比较困难的,需要了解spak-sql的底层原理和一些技巧。
这里方便大家使用、学习、交流,所以贡献自己的源码,仓库地址:https://gitee.com/chenxiaoliang0901/crock/tree/main

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/607100.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flex弹性盒子常用的布局属性详解

想必大家在开发中经常会用到flex布局。而且还会经常用到 justify-content 属性实现分栏等等 接下来给大家分别讲一下 justify-content 的属性值。 以下是我敲的效果图大家可以清晰看出区别 space-between 属性值可以就是说两端对齐 space-evenly 属性值是每个盒子之间的…

Requests库的接口测试实现

Requests库是在接口测试中被广泛运用的库&#xff0c;包括模拟请求的下发&#xff0c;请求相关配置和响应结果的获取&#xff0c;核心主体都是通过request库完成。在接口测试中使用非常频繁。 一、Requests库环境搭建 接口测试的核心从模拟请求开始。在Python中&#xff0c;通…

15个等轴视图设计的电动车汽车无人机等PR剪辑素材视频制作元素

包含15个等轴视图、等距视角电动车、汽车、无人机、沙漏、飞机等PR剪辑素材视频制作元素mogrt动画模板。 特征&#xff1a; 等距设计&#xff1b; 可以更改颜色&#xff1b; 分辨率&#xff1a;全高清&#xff08;19201080&#xff09;&#xff1b; 持续时间&#xff1a;15秒&a…

IDEA+Git——项目分支管理

IDEAGit——项目分支管理 1. 前言2. 基础知识点2.1. 分支区分2.2. Git 代码提交规范2.3. 四个工作区域2.4. 文件的四种状态2.5. 常用命令2.6 注重点 3. IDEA分支管理 1. 前言 在Git中&#xff0c;分支是项目的不同版本&#xff0c;当开始开发一个新项目时&#xff0c;主分支通常…

使用命令行方式搭建uni-app + Vue3 + Typescript + Pinia + Vite + Tailwind CSS + uv-ui开发脚手架

使用命令行方式搭建uni-app Vue3 Typescript Pinia Vite Tailwind CSS uv-ui开发脚手架 项目代码以上传至码云&#xff0c;项目地址&#xff1a;https://gitee.com/breezefaith/uniapp-vue3-ts-scaffold 文章目录 使用命令行方式搭建uni-app Vue3 Typescript Pinia V…

【Python】不一样的Ansible(一)

不一样的Ansible——进阶学习 前言正文概念Ansible CorePlugins和Modules 插件插件类型编写自定义插件基本要求插件选项文档标准编写插件 添加一个本地插件注册为内置插件指定插件目录 其他一些技巧更改Strategy 结语 前言 Ansible 是一个极其简单的 IT 自动化引擎&#xff0c…

Windows下Redis5+可视化软件下载、安装和配置教程-2024年1月8日

Windows下Redis5下载、安装和配置教程-2024年1月8日 一、下载二、安装三、配置环境四、配置可视化客户端 一、下载 redis是现在是没有对win系统版进行维护的&#xff0c;这个是大神完成的&#xff0c;目前是到5版本&#xff0c;选择Redis-x64-5.0.14.1.zip点击下载 下载地址&…

pgAdmin和asdf postgres的安装

安装pgAdmin&#xff1a; curl https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo apt-key addsudo sh -c echo "deb https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/$(lsb_release -cs) pgadmin4 main" > /etc/apt/sources.list.d/pgadmi…

.NET 6中如何使用Redis

1、安装redis Redis在windows平台上不受官方支持&#xff0c;所以想要在window安装Redis就必须去下载windows提供的安装包。安装地址&#xff1a;https://github.com/tporadowski/redis/releases 2、在NueGet安装包 3、在appsettings.json文件里面添加Redis相关配置信息 &quo…

MySQL之子查询、连接查询(内外)以及分页查询

一、案例&#xff08;接上一篇文章&#xff09; 09&#xff09;查询学过「张三」老师授课的同学的信息 -- 一共有两种方式 -- 第一种方式&#xff1a; SELECT s.*,c.cname,t.tname,sc.score FROMt_mysql_teacher t,t_mysql_course c,t_mysql_student s,t_mysql_score sc WHERE…

鸿蒙设备-开发板基础学习(BearPi-HM Micro)

theme: minimalism 每当学习一门新的编程语言或者上手一款新的开发板&#xff0c;在学习鸿蒙设备开发过程中&#xff0c;带大家写的第一个程序&#xff0c;通过这个程序&#xff0c;我们可以对鸿蒙设备开发的整个流程有一个初步的体验。BearPi-HM Micro开发板为例&#xff1a;…

「MCU」SD NAND芯片之国产新选择优秀

文章目录 前言 传统SD卡和可贴片SD卡 传统SD卡 可贴片SD卡 实际使用 总结 前言 随着目前时代的快速发展&#xff0c;即使是使用MCU的项目上也经常有大数据存储的需求。可以看到经常有小伙伴这样提问&#xff1a; 大家好&#xff0c;请问有没有SD卡芯片&#xff0c;可以…

MongoDB高级集群架构设计

两地三中心集群架构设计 容灾级别 RPO & RTO RPO&#xff08;Recovery Point Objective&#xff09;&#xff1a;即数据恢复点目标&#xff0c;主要指的是业务系统所能容忍的数据丢失量。RTO&#xff08;Recovery Time Objective&#xff09;&#xff1a;即恢复时间目标&…

C++学习笔记——string类和new函数

目录 string类 1.功能增强 1.1 子字符串提取 1.2 字符串拼接 1.3 大小写转换 1.4 字符串比较 2.性能优化 3.使用示例 下面是一个简单的使用示例&#xff0c;展示了如何使用改进后的String类&#xff1a; NEW函数 2.1NEW函数的基本用法 2.2NEW函数的注意事项 2.3避…

密码学:一文读懂非对称加密算法 DH、RSA

文章目录 前言非对称加密算法的由来非对称加密算法的家谱1.基于因子分解难题2.基于离散对数难题 密钥交换算法-DH密钥交换算法-DH的通信模型初始化DH算法密钥对甲方构建DH算法本地密钥乙方构建DH算法本地密钥DH算法加密消息传递 典型非对称加密算法-RSARSA的通信模型RSA特有的的…

建模软件Rhinoceros mac介绍说明

Rhinoceros mac是一款3D设计软件“犀牛”&#xff0c;在当今众多三维建模软件中&#xff0c;Rhinoceros 版因为其体积小、功能强大、对硬件要求低而广受欢迎&#xff0c;对于专业的3D设计人员来说它是一款不错的3D建模软件&#xff0c;Rhinoceros Mac中文版能轻易整合3DS MAX与…

Git命令+github仓库克隆

Git github Git常用命令 开始 git init #创建仓库 git status #查看仓库的状态 git status -s #简单的查看仓库的状态 git ls-files #查看暂存区的内容 git reflog #查看操作的历史记录 暂存区 git add git add <file&g…

网安入门11-文件上传(前后端绕过,变形马图片马)

Upload-Labs Upload-Labs是一个使用PHP语言编写、专注于文件上传漏洞的闯关式网络安全靶场。练习该靶场可以有效地了解并掌握文件上传漏洞的原理、利用方法和修复方案。 思考&#xff1a;他只让我传一个.jpg的图片&#xff0c;我想传一个.php的木马&#xff0c;两者什么区别 …

如何计算指标波动贡献率?(附Pandas实现)

大家好&#xff0c;我是阿粥 “为什么这个月销售额提升了30%&#xff1f;” “为什么转化率又降了&#xff0c;同比竟然降低了42%&#xff0c;什么原因导致的呢&#xff1f;” 这些都是数据分析师在工作中经常会遇到的问题&#xff0c;甚至有些基础岗的数据分析师要花80%以上的…

静态S5在项目管理中的应用与案例分享

静态S5作为一种强大的数据分析工具&#xff0c;不仅在数据处理和可视化方面表现出色&#xff0c;还在项目管理中发挥着重要作用。本篇将通过实际案例分享&#xff0c;探讨静态S5在项目管理中的应用与优势。 一、静态S5在项目管理中的应用 项目进度管理&#xff1a;静态S5通过…