apache atlas 如何自定义hook

atals 是开源的数据元数据和数据资产管理平台,平台设计支持强大的图数数据库,nosql,和搜索引擎3个组件构建。都是基于开源构建。

 目前市场上开源的元数据管理工具有Atlas, Datahub, Openmetadata等,你要说二次开发,谁最好,如果是java 开发,还是 Atlas ,灵活,简单。其他两个都要会python,多种语言。

atlas 虽然支持,hbase,hive,impala,sqoop等这些组件的实时元数据采集。但是其他的可以采用自定义hook来实现钩子函数。下图是一个钩子函数的流程:

我们了解钩子函数先了解,数据源,所谓钩子函数,其实是需要源系统配合,这个其实就是源系统的一个监听机制,就是在客户端(写sql)——执行端,在中间有个监听程序,可以获取sql解析过程。如果源系统没有,那就不能实现监听数据获取。

其实不会写监听程序,atlas 也好处理,中间的kafka 就是一个实时监听通道,只要你按照atlas 的格式要求,提交监控程序,就可以实现元数据管理。kafka 有两个topic:ATLAS_HOOK_TOPICATLAS_ENTITIES_TOPIC 。只要满足这两个topic 的数据格式,可以实时写入元数据。

Atlas 在元数据管理,主要分为两部分API和kafka.在kafka之前我们先说一下什么是model .

其实models 类似我们的jdbc连接或者是presto 的catalog 信息。这个元数据的注册信息。就是你连接的是什么数据库,什么程序,字段,表,视图等这些信息需要进行注册,毕竟不同的库,这些信息不一样,比如hive 和hbase 的属性肯定不一样。那就需要建设model ,建model 有两种方式,一种是java API 

另外一个是通过model json 进行提交

源码里面有很多的json model文件

curl -i -X POST -H "Content-Type: application/json" -d '{"enumTypes": [],"structTypes": [],"classificationDefs": [],"entityDefs": [{"category": "ENTITY","version": 1,"name": "clickhouse_db","description": "clickhouse_db","typeVersion": "1.0","serviceType": "clickhouse","attributeDefs": [{"name": "location","typeName": "string","isOptional": true,"cardinality": "SINGLE","valuesMinCount": 0,"valuesMaxCount": 1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": 5},{"name": "clusterName","typeName": "string","isOptional": true,"cardinality": "SINGLE","valuesMinCount": 0,"valuesMaxCount": 1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": 8},{"name": "parameters","typeName": "map<string,string>","isOptional": true,"cardinality": "SINGLE","valuesMinCount": 0,"valuesMaxCount": 1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1},{"name": "ownerType","typeName": "string","isOptional": true,"cardinality": "SINGLE","valuesMinCount": 0,"valuesMaxCount": 1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1}],"superTypes": ["DataSet"],"subTypes": [],"relationshipAttributeDefs": [{"name": "inputToProcesses","typeName": "array<Process>","isOptional": true,"cardinality": "SET","valuesMinCount": -1,"valuesMaxCount": -1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1,"relationshipTypeName": "dataset_process_inputs","isLegacyAttribute": false},{"name": "schema","typeName": "array<avro_schema>","isOptional": true,"cardinality": "SET","valuesMinCount": -1,"valuesMaxCount": -1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1,"relationshipTypeName": "avro_schema_associatedEntities","isLegacyAttribute": false},{"name": "tables","typeName": "array<clickhouse_table>","isOptional": true,"cardinality": "SET","valuesMinCount": -1,"valuesMaxCount": -1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1,"relationshipTypeName": "clickhouse_table_db","isLegacyAttribute": false},{"name": "meanings","typeName": "array<AtlasGlossaryTerm>","isOptional": true,"cardinality": "SET","valuesMinCount": -1,"valuesMaxCount": -1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1,"relationshipTypeName": "AtlasGlossarySemanticAssignment","isLegacyAttribute": false},{"name": "outputFromProcesses","typeName": "array<Process>","isOptional": true,"cardinality": "SET","valuesMinCount": -1,"valuesMaxCount": -1,"isUnique": false,"isIndexable": false,"includeInNotification": false,"searchWeight": -1,"relationshipTypeName": "process_dataset_outputs","isLegacyAttribute": false}],"businessAttributeDefs": {}}],"relationshipDefs": []
}' --user admin:admin "http://localhost:21000/api/atlas/v2/types/typedefs"

这一步是要注册数据库类型:注册数据库,注册数据表,注册字段等

下一步要对,库-表,字段进行关系映射

#/v2/types/typedefs
{"entityDefs": [],"classificationDefs": [],"structDefs": [],"enumDefs": [],"relationshipDefs": [{"category": "RELATIONSHIP","version": 1,"name": "clickhouse_table_db","description": "clickhouse_table_db","typeVersion": "1.0","serviceType": "clickhouse","attributeDefs": [],"relationshipCategory": "AGGREGATION","propagateTags": "NONE","endDef1": {"type": "clickhouse_table","name": "db","isContainer": false,"cardinality": "SINGLE","isLegacyAttribute": false},"endDef2": {"type": "clickhouse_db","name": "tables","isContainer": true,"cardinality": "SET","isLegacyAttribute": false}},{"category": "RELATIONSHIP","version": 1,"name": "clickhouse_table_columns","description": "clickhouse_table_columns","typeVersion": "1.0","serviceType": "clickhouse","attributeDefs": [],"relationshipCategory": "COMPOSITION","propagateTags": "NONE","endDef1": {"type": "clickhouse_table","name": "columns","isContainer": true,"cardinality": "SET","isLegacyAttribute": false},"endDef2": {"type": "clickhouse_column","name": "table","isContainer": false,"cardinality": "SINGLE","isLegacyAttribute": false}},{"category": "RELATIONSHIP","version": 1,"name": "clickhouse_table_storagedesc","description": "clickhouse_table_storagedesc","typeVersion": "1.0","serviceType": "clickhouse","attributeDefs": [],"relationshipCategory": "ASSOCIATION","propagateTags": "NONE","endDef1": {"type": "clickhouse_table","name": "sd","isContainer": false,"cardinality": "SINGLE","isLegacyAttribute": false},"endDef2": {"type": "clickhouse_storagedesc","name": "table","isContainer": false,"cardinality": "SINGLE","isLegacyAttribute": false}}]
}

关系是 数据库-表-字段-属性等关系映射,这个是为了映射跳转。

第二步:kafka写数据

写入数据,可以通过api调研,也可以通过kafka 提交:

{"version": {"version": "1.0.0","versionParts": Array[1]},"msgCompressionKind": "NONE","msgSplitIdx": 1,"msgSplitCount": 1,"msgSourceIP": "10.45.1.116","msgCreatedBy": "bi","msgCreationTime": 1710575827820,"message": {"type": "ENTITY_CREATE_V2","user": "bi","entities": {"entities": [{"typeName": "clickhouse_table","attributes": {"owner": "bi","ownerType": "USER","sd": Object{...},"tableType": "MANAGED","createTime": 1710575827000,"qualifiedName": "test.wuxl_0316_ss@primary","columns": [Object{...},Object{...}],"name": "wuxl_0316_ss","comment": "测试表","parameters": {"transient_lastDdlTime": "1710575827"},"db": {"typeName": "clickhouse_db","attributes": {"owner": "bi","ownerType": "USER","qualifiedName": "test@primary","clusterName": "primary","name": "test","description": "","location": "hdfs://HDFS80727/bi/test.db","parameters": {}},"guid": "-861237351166886","version": 0,"proxy": false}},"guid": "-861237351166888","version": 0,"proxy": false},Object{...},Object{...},Object{...},Object{...}]}}
}

可以通过flink 提交

-- 使用Flinksql往Atlas自带的topic里写消息
CREATE TABLE ads_zdm_offsite_platform_daren_rank_df_to_kafka (data string
) WITH ('connector' = 'kafka','topic' = 'ATLAS_HOOK','properties.bootstrap.servers' = 'localhost:9092', 'format' = 'raw'
);insert into ads_zdm_offsite_platform_daren_rank_df_to_kafka
select '{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"10.45.1.116","msgCreatedBy":"bi","msgCreationTime":1710575827820,"message":{"type":"ENTITY_CREATE_V2","user":"bi","entities":{"entities":[{"typeName":"clickhouse_table","attributes":{"owner":"bi","ownerType":"USER","sd":{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},"tableType":"MANAGED","createTime":1710575827000,"qualifiedName":"test.wuxl_0316_ss@primary","columns":[{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}],"name":"wuxl_0316_ss","comment":"测试表","parameters":{"transient_lastDdlTime":"1710575827"},"db":{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false}},"guid":"-861237351166888","version":0,"proxy":false},{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false},{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}]}}}' as data
;

atlas 在自定义表,应用程序,报表等都有很方便的接口,可以通过接口或者kafka提交实时的变更信息,方便实时监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/8319.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙内核源码分析(编译环境篇) | 编译鸿蒙防掉坑指南

几点说明 kernel_liteos_a_note | 中文注解鸿蒙内核 是在 OpenHarmony 的 kernel_liteos_a 基础上给内核源码加上中文注解的版本.与官方源码按月保持同步,同步历史如下: 2021/10/09 – 增加性能优化模块perf,优化了文件映射模块2021/09/14 – common,extended等几个目录结构和M…

实战28套JAVA高端架构P6/P7/P8架构—全栈架构

概述 Java SE Java SE&#xff08;Java Platform&#xff0c;Standard Edition&#xff09;。Java SE 以前称为J2SE。它允许开发和部署在桌面、服务器、嵌入式环境和实时环境中使用的Java应用程序。Java SE 包含了支持Java Web 服务开发的类&#xff0c;并为Java Platform&…

VisualGDB : 在windows上开发和调试Linux代码(一)

传送门&#xff1a; 《VisualGDB &#xff1a; 解决编码导致的编译错误》 一、补充windows上 VisualGDB的安装 这里给大家附一个官方的下载路径&#xff1a;https://visualgdb.com/download/&#xff0c;根据自己的系统选择下载 笔者另附一个云盘的下载路径 VisualGDB https…

【excel】数据非数值导致排序失效

场景 存在待排序列的数值列&#xff0c;但排序失效&#xff0c;提示类型有问题&#xff1a; 解决 选中该列&#xff0c;数据→分列 而后发现提示消失&#xff0c;识别为数字&#xff0c;可正常排序。

Linux实验 文件系统

实验目的&#xff1a; 了解Linux系统的目录结构和文件类型&#xff1b;掌握Linux系统目录和文件的操作&#xff1b;掌握Linux系统文件权限的设置。 实验内容&#xff1a; 在VMware中启动已经安装好的CentOS&#xff0c;本地登录root账号&#xff0c;并在桌面上打开终端&#…

前端JS必用工具【js-tool-big-box】,验证是否是Unicode字符,获取一个字符串的字节长度,以及新增发送JSONP跨域请求的方法

js-tool-big-box&#xff0c;目前已经收集到了用户需求&#xff0c;希望可以添加一些公用方法&#xff0c;我觉得这很好&#xff0c;我们一起把这个前端通用工具做大一些&#xff0c;帮助更多的小伙伴少些util代码&#xff0c;更多的关注于自己的业务开发&#xff0c;真是不错。…

EXCEL数据快速上传至SAP透明表

文章目录 前言一、案例介绍/笔者需求二、备份数据三、数据处理转化 a.EXCEL转为TXT注意事项 b.EXCEL转为TXT 四、ABAP结合内表更新数据至透明表 a.代码实现 b.断点TXT上传至内表 c.查看上传结果 五、总结 前言 这篇文章…

如何从多个文件夹内转移全部文件(忽略文件夹的结构)(进行复制)(再打包)

首先&#xff0c;需要用到的这个工具&#xff1a; 度娘网盘 提取码&#xff1a;qwu2 蓝奏云 提取码&#xff1a;2r1z 04文件夹里面有只有1个名称为"1"的文件夹&#xff0c;“1”里面有“2”&#xff0c;“2”有“3”&#xff0c;“3”有“4”&#xff0c;从“1”开…

暴力破解【1】

1.c/s架构暴力破解 1.1 c/s架构暴力破解前提条件 知道目标地址&#xff0c;端口&#xff0c;协议&#xff0c;无后端验证、用户名字典、密码字典 1.2 c/s架构暴力破解工具 hydra、bruter、medusa爆破 2.b/s架构暴力破解 2.1 无验证码绕过 不带验证码无测试次数的直接使…

QT 客户端软件开发

QT 是一种功能强大且灵活的跨平台应用程序开发框架&#xff0c;但也存在一些技术难点&#xff0c;需要开发者仔细考虑和克服。以下是一些常见的 QT 软件开发的技术难点。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1. 跨平台兼容性…

低价监测如何保证准确率

品牌做低价监测的目的&#xff0c;是为了管控渠道中的低价链接&#xff0c;最终是使这些低价不再影响渠道&#xff0c;使他们下架或者改价链接&#xff0c;所以监测结果的准确性&#xff0c;是会影响最终的治理范围和治理结果的&#xff0c;这就需要做到百分百的准确监测&#…

通过 Java 操作 redis -- set 集合基本命令

关于 redis set 集合类型的相关命令推荐看Redis - Set 集合 要想通过 Java 操作 redis&#xff0c;首先要连接上 redis 服务器&#xff0c;推荐看通过 Java 操作 redis -- 连接 redis 本博客只介绍了一小部分常用的命令&#xff0c;其他的命令根据上面推荐的博客也能很简单的使…

详细解读性能测试指标(性能指标、CPU、内存、负载、磁盘)

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号【互联网杂货铺】&#xff0c;回复 1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 性能测试指标是衡量系统性能的评价标准&#xff0c;常用的系统性…

USB系列一:USB技术概念

在这里USB的历史就不赘述了&#xff0c;有兴趣可以自己去搜索。也省略掉USB接口的概述&#xff0c;这些都是一些飞技术性的常识性的知识&#xff0c;没必要浪费篇幅和文字来描述。 一、USB总线版本&#xff1a;&#xff08;从USB1.1说起&#xff09; 1、USB1.1 1998年9月23日…

使用Vue连接Mqtt实现主题的订阅及消息发布

效果如下&#xff1a; 直接贴代码&#xff0c;本地创建一个html文件将以下内容贴入即可 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, …

深度学习论文: SuperPoint: Self-Supervised Interest Point Detection and Description

深度学习论文: SuperPoint: Self-Supervised Interest Point Detection and Description SuperPoint: Self-Supervised Interest Point Detection and Description PDF: https://arxiv.org/pdf/1712.07629 PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTorch代…

【话题】如何看待AI技术,以及AI技术的发展现状和未来趋势

大家好&#xff0c;我是全栈小5&#xff0c;欢迎阅读小5的系列文章&#xff0c;这是《话题》系列文章 目录 背景一、引言二、AIGC技术的发展现状2.1、技术突破与成果2.2、应用领域的拓展2.3、市场规模的增长 三、AIGC技术的未来趋势3.1、技术融合与创新3.2、应用领域的深化3.3、…

绿盟之旅——一段安全实习结束

去年&#xff0c;因为着急找实习&#xff0c;拿着简历就开始海投&#xff0c;当时想的是有人让我去就谢天谢地了&#xff0c;第一个约我面试的就是绿盟&#xff0c;也很顺利的通过了面试&#xff0c;当时让我选择在上海还是北京&#xff0c;我选择的是上海&#xff0c;因为学校…

不要和别人比,要和自己的过去比!才会有进步!

现在的人都喜欢拿自己去和别人比较&#xff0c;当然是和比你混得好的人比&#xff0c;比你弱的你也不会去比。比如这个朋友又换了一辆车&#xff0c;那个朋友又买了一套房&#xff0c;另一个朋友又加薪了等等&#xff0c;比来比去总觉得比不上别人。这样比较对自己很不好&#…

Python脚本批量造数据、跑定时任务协助测试

批量造数据 连接Mysql的信息 1 import pymysql 2 # 数据库连接信息 3 # 多个库要有多个conn 4 conn pymysql.connect( 5 host"主机", 6 user"用户名", 7 password"密码", 8 database"库名" 9 ) 10 conn1 pymysql.connect(…