导入JDBC元数据到Apache Atlas

前言

前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美

本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas

数据库schema与catalog

按照SQL标准的解释,在SQL环境下CatalogSchema都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog,每个Catalog又包含多个Schema,而每个Schema又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema,而该Schema又必然属于一个Catalog,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog名称.Schema名称.表名称。这里还有一点需要注意的是,SQL标准并不要求每个数据库对象的完全限定名称是唯一的。

从实现的角度来看,各种数据库系统对CatalogSchema的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog名,使用用户名作为Schema名,具体可参见下表:

表1 常用数据库

供应商Catalog支持Schema支持
Oracle不支持Oracle User ID
MySQL不支持数据库名
MS SQL Server数据库名对象属主名,2005版开始有变
DB2指定数据库对象时,Catalog部分省略Catalog属主名
Sybase数据库名数据库属主名
Informix不支持不需要
PointBase不支持数据库名

原文:https://www.cnblogs.com/ECNB/p/4611309.html

元数据模型层级抽象

不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系

在这里插入图片描述

  • Datasource -> Catalog -> Schema -> Table -> Column
  • Datasource -> Catalog -> Table -> Column
  • Datasource -> Schema -> Table -> Column

元数据转换设计

在这里插入图片描述

提供元数据

借鉴Apache DolphinScheduler中获取Connection的方式,不多赘述。

public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}

转换元数据

  1. 元数据模型

创建数据库的元数据模型

private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}

创建数据库模式的元数据模型

private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}

创建数据库表的元数据模型

private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}

创建数据库列的元数据模型

private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}

创建实体之间的关系模型

private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 数据库和模式的关系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和数据表的关系// 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和数据列的关系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
  1. 提取元数据

    不再赘述

  2. 转换元数据

使用工厂模式,提供不同类型的元数据转换方式

public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}

List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysqlinformation_schema

public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}

举例:JdbcMysqlTransfer 和 MysqlTransferFactory

@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.数据库实体转换DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表实体转换String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列转换for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
  1. 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊处理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.创建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判断是否存在实体, 存在则填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存储或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}

效果展示

  1. 元数据类型定义

在这里插入图片描述

在这里插入图片描述

  1. 测试导入元数据

由于mysql没有采用schema,因此jdbc_schema为空

在这里插入图片描述

如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容

在这里插入图片描述

数据表元数据,qualifiedName使用数据库连接url.表名
在这里插入图片描述

如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/206101.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从 Kindle 文件中提取内容:GroupDocs.Parser V23.11

从 Kindle 文件中提取内容 2023 年 12 月 6 日 GroupDocs.Parser V23.11 增加了对 Kindle 文档格式的支持&#xff0c;支持从 Kindle 电子书和文档中提取文本和元数据。 GroupDocs.Parser 是一款全面的文档解析解决方案&#xff0c;使您能够从多种文档格式中提取文本、元数据和…

SLAM算法与工程实践——SLAM基本库的安装与使用(3):Pangolin库

SLAM算法与工程实践系列文章 下面是SLAM算法与工程实践系列文章的总链接&#xff0c;本人发表这个系列的文章链接均收录于此 SLAM算法与工程实践系列文章链接 下面是专栏地址&#xff1a; SLAM算法与工程实践系列专栏 文章目录 SLAM算法与工程实践系列文章SLAM算法与工程实践…

Linux--文件权限与shell外壳的理解

目录 一.Linux的用户与用户切换&#xff0c;提权 二.对文件权限的理解 1.文件权限角色的权限文件属性 2.Linux中的三种角色 3.为什么会存在所属组这个角色 4.文件属性的意义 4.1.第一个字母的意义 4.2 第2——第10个字母的意义 4.3修改文件权限的方法 三.目录权限 四…

文本润色工具有哪些,高质量的文本润色软件

在当今信息过载的时代&#xff0c;文本的重要性愈发凸显。即便是最精心构思的文章&#xff0c;若未经过仔细的润色&#xff0c;也难以达到最佳的表达效果。本文将专心分享文本润色工具的种类。 文本润色工具的种类 文本润色工具根据其功能和应用范围可以分为多个种类&#xff…

js/jQuery常见操作 之 jQuery操作复选框的常见问题

js/jQuery常见操作 之 jQuery操作复选框的常见问题 1. js/jQuery的其他一些常见基础操作2. 全选/全不选问题2.1 效果2.2 实现代码2.2.1 简单js实现2.2.2 jQuery实现2.2.2.1 注意语法&#xff08;区别jQuery版本&#xff09;2.2.2.2 完整代码实现 3. jQuery实现点击 行tr 实现ch…

【Java 基础】21 多线程同步与锁

文章目录 1.存在的问题2.使用同步解决问题1) synchronized2) volatile3) 锁 总结 用多线程过程中&#xff0c;有可能出现 多个线程同时处理&#xff08;获取或修改等&#xff09;同一个数据&#xff0c;这个时候就 会发生数据不同步的问题&#xff0c; 因此出现了同步和锁来…

APP备案(Android) - 获取签名证书公钥、MD5

因为近期刚针对各应用平台对APP备案时间节点要求进行了统一整理&#xff0c;然后隔天就被要求提供一下app相关的的公钥和MD5&#xff0c;虽然很快就解决了这个事情&#xff0c;但忍不住又稍微衍生了一下&#xff0c;但行小步&#xff0c;莫问远方吧 关联Blog APP备案(Android)…

java多线程(二)线程池

目录 java线程池 线程池应用场景&#xff1a; 如何创建线程池&#xff1a; 有什么区别&#xff1a; 不同线程池对应的应用场景 案例 输出结果 java线程池 Java线程池是一种预先创建一定数量的线程&#xff0c;并将任务提交给这些线程执行的机制。线程池可以避免频繁创建…

ExecutorService、Callable、Future实现有返回结果的多线程原理解析

原创/朱季谦 在并发多线程场景下&#xff0c;存在需要获取各线程的异步执行结果&#xff0c;这时&#xff0c;就可以通过ExecutorService线程池结合Callable、Future来实现。 我们先来写一个简单的例子—— public class ExecutorTest {public static void main(String[] ar…

Vulnhub项目:EMPIRE: BREAKOUT

一、靶机地址 靶机地址&#xff1a;Empire: Breakout ~ VulnHub 靶机介绍&#xff1a; 该靶机被定义为简单&#xff0c;但是如果没有找到&#xff0c;那就难度成中等了&#xff01; 二、渗透过程 老三样&#xff0c;发现目标&#xff0c;这里用 arp-scan 确定靶机 ip&#…

Java基础50题:14. 使用方法求最大值(2种方法)

概述 使用方法求最大值。 创建方法求两个数的最大值max2&#xff0c;随后再写一个求3个数的最大值函数max3。 要求&#xff1a; 在max3这个方法中&#xff0c;调用max2函数&#xff0c;来实现3个数的最大值计算。 方法一 【代码】 public class P14 {public static int max…

算法___

文章目录 算法两数之和 算法 两数之和 题目如下图&#xff1a; 我的答案如下图&#xff1a; 我采用的是最笨的思路&#xff0c;直接暴力的两次循环&#xff0c;第一次外循环是取数组的第一个元素&#xff0c;然后内循环会遍历数组后面除第一个的所有元素&#xff0c;然后和…

DDD架构思想专栏二《领域层的决策设计思想详解》

如果不了解DDD基本概念的读者可以去看这篇文章&#xff0c;传送门&#xff1a;DDD架构思想专栏一《初识领域驱动设计DDD落地》-CSDN博客 前言介绍 在上一章节介绍了领域驱动设计的基本概念以及按照领域驱动设计的思想进行代码分层&#xff0c;但是仅仅只是从一个简单的分层结…

【Flink系列三】数据流图和任务链计算方式

上文介绍了如何计算并行度和slot的数量&#xff0c;本文介绍Flink代码提交后&#xff0c;如何生成计算的DAG数据流图。 程序和数据流图 所有的Flink程序都是由三部分组成的&#xff1a;Source、Transformation和Sink。Source负责读取数据源&#xff0c;Transformation利用各种…

Remix IDE 快速开始Starknet

文章目录 一、Remix 项目二、基于Web的开发环境Remix 在线 IDE三、Starknet Remix 插件如何使用使用 Remix【重要】通过 Starknet by Example 学习一、Remix 项目 Remix 项目网站 在以太坊合约开发领域,Remix 项目享有很高的声誉,为各个级别的开发人员提供功能丰富的工具集…

JS中深拷贝与浅拷贝

定义 深拷贝&#xff08;Deep Copy&#xff09;和浅拷贝&#xff08;Shallow Copy&#xff09;是在编程中常用的两种对象复制方式。 浅拷贝&#xff08;Shallow Copy&#xff09;&#xff1a; 浅拷贝是创建一个新的对象&#xff0c;将原始对象的属性值复制到新对象中。如果属…

Smart Link和Monitor Link

Smart Link和Monitor Link简介 Smart Link&#xff0c;又叫做备份链路。一个Smart Link由两个接口组成&#xff0c;其中一个接口作为另一个的备份。Smart Link常用于双上行组网&#xff0c;提供可靠高效的备份和快速的切换机制。 Monitor Link是一种接口联动方案&#xff0c;它…

nodejs流

什么是流 stream 流是用于在 Node.js 中处理流数据的抽象接口。 node:stream 模块提供了用于实现流接口的 API。 什么是流数据 流数据是指一组顺序、大量、快速、连续到达的数据序列&#xff0c;一般情况下数据流可被视为一个随时间延续而无限增长的动态数据集合。流数据应用…

【keil备忘录】2. stm32 keil仿真时的时间测量功能

配置仿真器Trace内核时钟为单片机实际的内核时钟&#xff0c;需要勾选Enable设置&#xff0c;设置完成后Enable取消勾选也可以&#xff0c;经测试时钟频率配置仍然生效&#xff0c;此处设置为48MHZ: 时间测量时必须打开register窗口&#xff0c;否则可能不会计数 右下角有计…

第十四章 : Spring Boot 整合spring-session,使用redis共享

第十四章 &#xff1a; Spring Boot 整合spring-session,使用redis共享 前沿 本文重点讲述&#xff1a;spring boot工程中使用spring-session机制进行安全认证&#xff0c;并且通过redis存储session&#xff0c;满足集群部署、分布式系统的session共享。 基于SPringBoot 2.3.2…