如何利用开源插件?又快又好地搞好数据接口开发,连通不同应用系统

目录

前言介绍:

开源插件 Tapdata PDK

快速开始目标数据库接入

准备环境

下载源码并编译

创建目标数据库的Connector工程

开发完成之后通过 TDD 进行测试验证

如何提交到 PDK 开源项目

彩蛋


前言介绍:

毫不夸张地说,没有开发者还没踢过“应用数据不互通”这块铁板——平台不同、技术不同、存储和部署方式不同的情况下,又缺少必要的接口,应用系统之间难以互通。而随着业务需求的不断扩展,应用也在不断向多元化、个性化发展,未来业务与陈旧技术栈间矛盾也日益凸显,需要的接口数量也越来越多。

如何简单快速地搞定接口开发,也就成了一个需要我们考虑的问题。最近,我挖到了一个很香的开源插件,仔细研究了技术文档之后,决定安利给大家:

开源插件 Tapdata PDK

GitHub 链接:https://github.com/tapdata/idaas-pdk

这个项目的发布者是国内一个专攻实时数据服务平台的创业团队 Tapdata,据官方透露,这次开源的这个小组件,也是其核心产品开源的投路石,背靠的是这个团队在数据实时同步方面相当成熟的实力。

PDK 是其数据接口技术抽象化而来的一个开源插件开发框架,通过 Source Plugin 接口或者 Target Plugin 接口,可以快速实现新数据库作为 Tapdata 的源或目标的适配兼容,从而通过 Tapdata Cloud产品和即将开源的 Tapdata,免费获得各种异构数据源到目标数据库或平台的实时数据对接能力。

按照 PDK 连接器的开发规范进行数据源和目标端的开发,可以简化数据链路的开发流程,通过详细的开发规划和内置的 TDD 测试,可简单、快速地完成新数据源和目标端的开发工作。

支持类型包括:

  • 接入数据库: MySQL、Oracle、PostgreSQL 等
  • 接入 SaaS 产品: Salesforce、vika 维格表、金数据表单、Zoho CRM 等
  • 接入自定义数据源: 可对接私有协议数据源

快速开始目标数据库接入

目前,PDK 团队已将技术文档公开,大家可以前往 GitHub(https://github.com/tapdata/idaas-pdk) 具体了解。

准备环境

  • Java 8
  • Maven
  • Git
  • IntelliJ IDEA

下载源码并编译

git clone https://github.com/tapdata/idaas-pdk.gitcd idaas-pdkmvn clean install

创建目标数据库的Connector工程

例如 group 为 io.tapdata, 数据库 name 为 XDB, 版本 version 为 0.0.1, 通过以下命令创建 Connector 工程

  • 目标数据库无需建表时

./bin/tap template --type target --group io.tapdata --name XDB --version 0.0.1 --output ./connectors

用 ItelliJ IDEA 打开 idaas-pdk, 在 idaas-pdk/connectors 下就能看见 xdb-connector 工程。

  • 在 spec.json 里填写 configOptions

configOptions 集成到 Tapdata 站点之后, 配置给用户在使用该Connector的时候的输入项, 例如数据库的连接地址, 用户名, 密码等等

{..."configOptions":{"connection":{"type":"object","properties":{"host":{"type": "string","title": "Host","x-decorator": "FormItem","x-component": "Input"}, "port":{"type": "number","title": "Port","x-decorator": "FormItem","x-component": "Input"}}}}}
  • 编写接入目标数据库的代码
@TapConnectorClass("spec.json")public class XDBConnector extends ConnectorBase implements TapConnector {@Overridepublic void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer) {//TODO Load tables from database, connection information in connectionContext#getConnectionConfig//Sample code shows how to define tables.consumer.accept(list(//Define first tabletable("empty-table1"),//Define second tabletable("empty-table2"))));}@Overridepublic void connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer) {//Assume below tests are successfully, below tests are recommended, but not required.//Connection test//TODO execute connection test hereconsumer.accept(testItem(TestItem.ITEM_CONNECTION, TestItem.RESULT_SUCCESSFULLY));//Login test//TODO execute login test hereconsumer.accept(testItem(TestItem.ITEM_LOGIN, TestItem.RESULT_SUCCESSFULLY));//Read test//TODO execute read test by checking role permissionconsumer.accept(testItem(TestItem.ITEM_READ, TestItem.RESULT_SUCCESSFULLY));//Write test//TODO execute write test by checking role permissionconsumer.accept(testItem(TestItem.ITEM_WRITE, TestItem.RESULT_SUCCESSFULLY));}private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) {//TODO write records into database//Below is sample code to print received events which suppose to write to database.AtomicLong inserted = new AtomicLong(0); //insert countAtomicLong updated = new AtomicLong(0); //update countAtomicLong deleted = new AtomicLong(0); //delete countfor(TapRecordEvent recordEvent : tapRecordEvents) {if(recordEvent instanceof TapInsertRecordEvent) {//TODO insert recordinserted.incrementAndGet();} else if(recordEvent instanceof TapUpdateRecordEvent) {//TODO update recordupdated.incrementAndGet();} else if(recordEvent instanceof TapDeleteRecordEvent) {//TODO delete recorddeleted.incrementAndGet();}}//Need to tell incremental engine the write resultwriteListResultConsumer.accept(writeListResult().insertedCount(inserted.get()).modifiedCount(updated.get()).removedCount(deleted.get()));}private void queryByFilter(TapConnectorContext connectorContext, List<TapFilter> filters, Consumer<List<FilterResult>> listConsumer){//Filter is exactly match.//If query by the filter, no value is in database, please still create a FitlerResult with null value in it. So that flow engine can understand the filter has no value.}}
  • 目标数据库须建表时

./bin/tap template --type targetNeedTable --group io.tapdata --name XDB --version 0.0.1 --output ./connectors

用 ItelliJ IDEA 打开 idaas-pdk, 在 idaas-pdk/connectors 下就能看见 xdb-connector 工程。

  • 在 spec.json 里填写 configOptions

configOptions 集成到 Tapdata 站点之后, 配置给用户在使用该 Connector 的时候的输入项, 例如数据库的连接地址、用户名、密码等

{..."configOptions":{"connection":{"type":"object","properties":{"host":{"type": "string","title": "Host","x-decorator": "FormItem","x-component": "Input"}, "port":{"type": "number","title": "Port","x-decorator": "FormItem","x-component": "Input"}     }}}
}
  • 在 spec.json 里填写 dataTypes(类型表达式)

dataTypes 用于描述该 Connector 接入数据库的所有字段的范围,以及转换到对应的 TapType。 源端数据库也会提供相同的 dataTypes 描述, 这样当源端数据流入到 Tapdata 里时, 会结合源端 dataTypes 的字段描述信息结合源端库表的字段信息, 通过 Tapdata 的中立数据结构进入到 Tapdata 的数据流中, 当数据要流入到目标数据库之前,Tapdata 会根据这些信息, 在目标库的 dataTypes 中找到最佳的存储字段, 通过 TapField 的 originType 告知给 PDK 开发者, 用以建表。

{..."dataTypes":{"boolean":{"bit":8, "unsigned":"", "to":"TapNumber"},"tinyint":{"bit":8, "to":"TapNumber"},"smallint":{"bit":16, "to":"TapNumber"},"int":{"bit":32, "to":"TapNumber"},"bigint":{"bit":64, "to":"TapNumber"},"largeint":{"bit":128, "to":"TapNumber"},"float":{"bit":32, "to":"TapNumber"},"double":{"bit":64, "to":"TapNumber"},"decimal[($precision,$scale)]":{"bit": 128, "precision": [1, 27], "defaultPrecision": 10, "scale": [0, 9], "defaultScale": 0, "to": "TapNumber"},"date":{"byte":3, "range":["0000-01-01", "9999-12-31"], "to":"TapDate"},"datetime":{"byte":8, "range":["0000-01-01 00:00:00","9999-12-31 23:59:59"],"to":"TapDateTime"},"char[($byte)]":{"byte":255, "to": "TapString", "defaultByte": 1},"varchar[($byte)]":{"byte":"65535", "to":"TapString"},"string":{"byte":"2147483643", "to":"TapString"},"HLL":{"byte":"16385", "to":"TapNumber", "queryOnly":true}}}
  • 编写接入目标数据库的代码
 @TapConnectorClass("spec.json")public class XDBConnector extends ConnectorBase implements TapConnector {@Overridepublic void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer) {//TODO Load schema from database, connection information in connectionContext#getConnectionConfig//Sample code shows how to define tables with specified fieldsconsumer.accept(list(//Define first tabletable("empty-table1")//Define a field named "id", origin field type, whether is primary key and primary key position.add(field("id", "varchar").isPrimaryKey(true).partitionKeyPos(1)).add(field("description", "string")).add(field("name", "varchar")).add(field("age", "int")))));}@Overridepublic void connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer) {//Assume below tests are successfully, below tests are recommended, but not required.//Connection test//TODO execute connection test hereconsumer.accept(testItem(TestItem.ITEM_CONNECTION, TestItem.RESULT_SUCCESSFULLY));//Login test//TODO execute login test hereconsumer.accept(testItem(TestItem.ITEM_LOGIN, TestItem.RESULT_SUCCESSFULLY));//Read test//TODO execute read test by checking role permissionconsumer.accept(testItem(TestItem.ITEM_READ, TestItem.RESULT_SUCCESSFULLY));//Write test//TODO execute write test by checking role permissionconsumer.accept(testItem(TestItem.ITEM_WRITE, TestItem.RESULT_SUCCESSFULLY));}@Overridepublic void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodecRegistry codecRegistry) {connectorFunctions.supportWriteRecord(this::writeRecord);connectorFunctions.supportQueryByFilter(this::queryByFilter);//If database need insert record before table created, then please implement the below two methods.connectorFunctions.supportCreateTable(this::createTable);connectorFunctions.supportDropTable(this::dropTable);//If database need insert record before table created, please implement the custom codec for the TapValue that data types in spec.json didn't cover.//TapTimeValue, TapMapValue, TapDateValue, TapArrayValue, TapYearValue, TapNumberValue, TapBooleanValue, TapDateTimeValue, TapBinaryValue, TapRawValue, TapStringValuecodecRegistry.registerFromTapValue(TapRawValue.class, "text", tapRawValue -> {if (tapRawValue != null && tapRawValue.getValue() != null)return toJson(tapRawValue.getValue());return "null";});}private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) {//TODO write records into database//Below is sample code to print received events which suppose to write to database.AtomicLong inserted = new AtomicLong(0); //insert countAtomicLong updated = new AtomicLong(0); //update countAtomicLong deleted = new AtomicLong(0); //delete countfor(TapRecordEvent recordEvent : tapRecordEvents) {if(recordEvent instanceof TapInsertRecordEvent) {//TODO insert recordinserted.incrementAndGet();PDKLogger.info(TAG, "Record Write TapInsertRecordEvent {}", toJson(recordEvent));} else if(recordEvent instanceof TapUpdateRecordEvent) {//TODO update recordupdated.incrementAndGet();PDKLogger.info(TAG, "Record Write TapUpdateRecordEvent {}", toJson(recordEvent));} else if(recordEvent instanceof TapDeleteRecordEvent) {//TODO delete recorddeleted.incrementAndGet();PDKLogger.info(TAG, "Record Write TapDeleteRecordEvent {}", toJson(recordEvent));}}//Need to tell incremental engine the write resultwriteListResultConsumer.accept(writeListResult().insertedCount(inserted.get()).modifiedCount(updated.get()).removedCount(deleted.get()));}private void queryByFilter(TapConnectorContext connectorContext, List<TapFilter> filters, Consumer<List<FilterResult>> listConsumer){//Filter is exactly match.//If query by the filter, no value is in database, please still create a FitlerResult with null value in it. So that flow engine can understand the filter has no value.}private void dropTable(TapConnectorContext connectorContext, TapDropTableEvent dropTableEvent) {TapTable table = connectorContext.getTable();//TODO implement drop table}private void createTable(TapConnectorContext connectorContext, TapCreateTableEvent createTableEvent) {//TODO implement create table.TapTable table = connectorContext.getTable();LinkedHashMap<String, TapField> nameFieldMap = table.getNameFieldMap();for(Map.Entry<String, TapField> entry : nameFieldMap.entrySet()) {TapField field = entry.getValue();String originType = field.getOriginType();//originType is the data types defined in spec.json//TODO use the generated originType to create table.}}}

开发完成之后通过 TDD 进行测试验证

提供 configOptions 里需要用户填写的内容的 json 文件, 例如上述 configOptions 里要求用户填写的是数据库的 Host 和 Port, 那么 tdd 的 xdb_tdd.json 文件内容如下:

{"connection": {"host": "192.168.153.132","port": 9030,}
}

执行 TDD 测试命令:

./bin/tap tdd --testConfig xdb_tdd.json ./connectors/xdb-connector

当 TDD 测试没有通过, 请根据错误提示修改对应错误,直至通过 TDD测试;

当 TDD 测试通过后, PDK Connector 就处于可以提交 Pull Request 的状态。

如何提交到 PDK 开源项目

① fork idaas-pdk, 基于远程的 main 分支建立本地分支

② 根据要接入数据库名称, 在 idaas-pdk/connectors 目录下新建模块, 命名规范为 {数据库小写名称}-connector, 例如接入数据库的名称为 XDB, 模块名称为 xdb-connector

③ 开发者根据官方 API 文档完成接入数据库的开发实现

④ 通过 TDD 测试后, 提交 PR 到 idaas-pdk

⑤ 官方团队 Review 提交的 PR 之后合并代码

彩蛋

感兴趣的同学先别急着开发,据了解,官方推出的免费版本,已陆续实现30个常见数据源/目标间的实时数据对接,如果其中已经包含了你想要接入的数据库,完全可以直接使用 Tapdata Cloud(Tapdata Cloud | 免费的异构数据库实时同步云平台 - Tapdata)进行免费数据实时同步。当然,如果你的需求现阶段还不被支持,就可以通过Tapdata PDK 自助开发,快速接入。

Tapdata Cloud 现阶段支持的数据连接类型

目前,Tapdata 开放了面向开发者的插件生态共建群,开发过程中可以提供技术交流和支持,感兴趣的同学可以扫码关注,拉你入群:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/561747.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java程序员周末时间搞锭银行信息管理系统毕业设计(java+springboot+mybatis+mysql+vue+elementui)等实现

博主介绍&#xff1a;✌公司项目主程、全网粉丝10W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,CSDN博客之星TOP100、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计✌ 公众号&#xff1a;java李杨勇 简历模板、学习资料、面试题库…

基于Java+SpringBoot+vue+elementui的校园文具商城系统详细设计和实现

博主介绍&#xff1a;✌公司项目主程、全网粉丝10W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,CSDN博客之星TOP100、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计✌ 公众号&#xff1a;java李阳勇 简历模板、学习资料、面试题…

又一门国产数据库语言诞生了,比SQL还好用

一、数据库语言的目标 1.1 数据库是做什么的 数据库这个软件&#xff0c;名字中有个“库”字&#xff0c;会让人觉得它主要是为了存储的。其实不然&#xff0c;数据库实现的重要功能有两条&#xff1a;计算、事务&#xff01;也就是我们常说的OLAP和OLTP&#xff0c;数据库的…

VUE:安装npm install报错Module build failed: Error: ENOENT: no such file or directory, scandir

报错信息如下&#xff1a; Module build failed: Error: ENOENT: no such file or directory, scandir D:\renren-fast-vue\node_modules\node-sass\vendorat Object.fs.readdirSync (fs.js:904:18)at Object.getInstalledBinaries (D:\renren-fast-vue\node_modules\node-sass…

如何让JOIN跑得更快?

JOIN 一直是数据库性能优化的老大难问题&#xff0c;本来挺快的查询&#xff0c;一旦涉及了几个 JOIN&#xff0c;性能就会陡降。而且&#xff0c;参与 JOIN 的表越大越多&#xff0c;性能就越难提上来。 其实&#xff0c;让 JOIN 跑得快的关键是要对 JOIN 分类&#xff0c;分…

Web前端期末大作业--中国港珠澳大桥网页设计(HTML+CSS+JavaScript)实现

博主介绍&#xff1a;✌公司项目主程、全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,CSDN博客之星TOP100、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计✌ 目录 前言介绍 项目简介 设计布局 功能截图 首页 项目简…

Web前端期末大作业--响应式少儿舞蹈网页设计(HTML+CSS+JavaScript)实现

博主介绍&#xff1a;✌公司项目主程、全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,CSDN博客之星Top50、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计✌ 公众号&#xff1a;java李阳勇 简历模板、学习资料、面试题库…

享誉全球的 Java 经典著作《Java核心技术》Java 17

Java 诞生 27 年来&#xff0c;这本享誉全球的 Java 经典著作《Core Java》一路伴随着 Java 的成长&#xff0c;得到了百万 Java 开发者的青睐&#xff0c;几乎出现在每个“学Java要看什么书”类似的书单里&#xff0c;影响了几代技术人。 27年间&#xff0c;每当 Java 有新的…

【云原生】Spring Cloud微服务学习路线汇总

Spring Cloud是什么&#xff1f; 简单来说Spring Cloud是一系列框架的组成集合。主要利用的我们现在主流应用的Spring Boot框架开发便利性、巧妙地简化了分布式系统基础设施的开发&#xff0c;如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等&#xff0c;都…

Oracle12C配置监听IP地址

根据自己的安装路径修改2个地方listener.ora和tnsnames.ora 我的路径在&#xff1a;E:\app\Administrator\product\12.2.0\dbhome_1\network\admin listener.ora文件 # listener.ora Network Configuration File: E:\app\Administrator\product\12.2.0\dbhome_1\NETWORK\ADMI…

强大的SQL计算利器-SPL

现代应用开发中&#xff0c;通常只用SQL实现简单的数据存取动作&#xff0c;而主要的计算过程和业务逻辑直接在应用程序中实现&#xff0c;主要原因在于&#xff1a; 过于复杂的SQL很难调试、编写、阅读、修改。SQL有方言特征&#xff0c;大量使用SQL后&#xff0c;会导致程序…

【毕业季·进击的技术er】大学生计算机毕业设计应该这样写

活动地址&#xff1a;毕业季进击的技术erhttps://marketing.csdn.net/p/f4a818f6455f3a9a7a20c89f60ad35f7 目录 扉页 摘要 目录 一 绪论 二、相关技术环境介绍 三、系统需求分析 四、系统架构设计 五、系统实现 六、系统测试 致谢 参考文献 以一个过来学长的角度来看…

【云原生】SpringCloud是什么?

SpringCloud是一个提供一些服务框架的服务治理平台。它包括&#xff1a;服务注册和发现、配置中心、消息中心、负载平衡、数据监控等。封装了微服务基础架构框架Netflix的多个开源组件&#xff0c;并与云平台和Spring boot框架集成。 SpringCloud也为开发人员提供了一个快速构…

Http请求:Google调用本地摄像头权限开启

项目场景&#xff1a; 最近在做一个考试培训系统&#xff01;里面用到了监控摄像需要调用本地摄像头 解决方案&#xff1a; 打开谷歌浏览器&#xff0c;输入chrome://flags/回车&#xff1a; 在输入框输入unsafely-treat-insecure-origin-as-secure 选择enable 点击relaunch重…

快收藏!最适合计算机大学生的Java毕业设计项目--高校食堂点餐系统

博主介绍&#xff1a;✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,CSDN博客之星TOP100、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计✌ &#x1f345;文末获取联系&#x1f345; 精彩专栏推荐&#x1f447;&#…

MySql根据字段名查询重复记录并删除!只保留一条

最近在处理业务数据的时候&#xff01;在几W条记录里存在着些相同的记录,如何用SQL语句,删除掉重复的呢? 可以用以下方法进行处理 其实很简单&#xff01;就是查找表中多余的重复记录&#xff0c;重复记录是根据单个字段来查询、然后删除其他重复的记录即可 查询重复记录 SEL…

基于Java+Spring+mybatis+vue+element实现酒店管理系统

博主介绍&#xff1a;✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,CSDN博客之星TOP100、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计✌ &#x1f345;文末获取联系&#x1f345; 精彩专栏推荐&#x1f447;&#…

无语!Jenkins 也宣布弃用 Java 8

继Java 之父 James Gosling 先前称&#xff0c;开发者应尽快弃用 JDK 8&#xff0c;可以选择 JDK 17 LTS&#xff0c;因为后者在各个方面都带来了巨大的改进。 开源 Devops 工具 Jenkins 宣布&#xff1a;从 6 月 28 日发布的 Jenkins 2.357 和即将发布的 9 月 LTS 版本开始&am…

(2022最新)Java毕业设计参考题目-题目新颖(值得收藏)

前言介绍 博主介绍&#xff1a;✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星TOP100、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业设计项目实战✌ &#x1f345;文末获取联系&#x1f345; 大四的同学马上要开…

Java实现二维码的生成和解析

最近因个人需求需要对根据内容生成二维码和进行解析&#xff01;记录一下&#xff01;二维码其实就是一种编码技术&#xff0c;只是这种编码技术是用在图片上了&#xff0c;将给定的一些文字&#xff0c;数字转换为一张经过特定编码的图片。这里利用的是 google 公司的 zxing使…