【用户投稿】使用 SeaTunnel 进行 HTTP 同步到 Doris 实战经验分享

需求背景

由于我司的项目中需要接入不同的数据源的数据到数仓中,在选择了众多的产品中最后选择了Apache SeaTunnel,对比参考

目前我这边使用的接口,暂时没有接口认证,如果需要接口认证的方式接入数据,再做讨论及测试

实际使用

Apache SeaTunnel版本: 2.3.4

话不多说,先贴最终的运行文件,由于我使用的jsonrest-api提交方式,所以结果如下图所示:

使用restconf的区别就在于job执行的环境不同,conf使用的是ClientJobExecutionEnvironment(经测试也支持json格式),而rest方式则使用的是RestJobExecutionEnvironment

接口返回的数据格式

{"code": "0000","msg": "成功","data": {"records": [{"id": "1798895733824393218","taskContent": "许可证02","taskType": "许可证"}]}
}
// 实际数据分页的很多,以上是示例

接入配置

{"env": {"job.mode": "BATCH","job.name": "SeaTunnel_Job"},"source": [{"result_table_name": "Table13367210156032","plugin_name": "Http","url": "http://*.*.*.*:*/day_plan_repair/page","method": "GET",  // Http请求方式 只支持GET和POST两种方式"format": "json", // 默认值是text 只支持json和text两种方式"json_field": {   // 可以看看作是从上述接口返回的数据中取数据的路径和key的映射关系,value则是取值的JsonPath"id": "$.data.records[*].id","taskContent": "$.data.records[*].taskContent","taskType": "$.data.records[*].taskType"},// "pageing": {//   "page_field": "current", // 当前页的key,就是分页接口的请求参数中的当前页的key,//   "batch_size": 10         // 每页取多少数据// },"schema": {"fields": {"id": "BIGINT", // 主键列问题,详见下面的问题"taskContent": "STRING","taskType": "STRING"}}}],"transform": [{"field_mapper": { // key是source中的schema.field中的值,value是sink中使用的值,例如下面的save_mode_create_template里的${rowtype_fields}使用的就是value,可以更改value作为sink的新命名列"id": "id", "taskContent": "task_content","taskType": "task_type"},"result_table_name": "Table13367210156033","source_table_name": "Table13367210156032","plugin_name": "FieldMapper"}],"sink": [{"source_table_name": "Table13367210156033","plugin_name": "Doris","fenodes ": "*.*.*.*:*","database": "test","password": "****","username": "****","table": "ods_day_plan","sink.label-prefix": "test-ods_day_plan", // Stream Load 导入使用的标签前缀。在 2pc 场景下,需要全局唯一性来保证 SeaTunnel 的 EOS 语义"sink.enable-2pc": false, // 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。Doris的二阶段提交详见https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual"data_save_mode": "APPEND_DATA", // 数据保存模式 DROP_DATA、APPEND_DATA、CUSTOM_PROCESSING、ERROR_WHEN_DATA_EXISTS官方提供了四种,我使用保留数据库结构,追加数据,可以详见源码中的DataSaveMode枚举"schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST", // Scheme保存模式 RECREATE_SCHEMA、CREATE_SCHEMA_WHEN_NOT_EXIST、ERROR_WHEN_SCHEMA_NOT_EXIST 我使用的是当Schema不存在时创建;具体释义详见SchemaSaveMode枚举"save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n  \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )","sink.enable-delete": true, //是否启用删除,此配置只有Doris的表模型是Unique模型,同时需要Doris表开启批量删除功能(默认开启 0.15+ 版本)"doris.config": {"format": "json","read_json_by_line": "true"}}]
}

实际使用中遇到的问题

Handle save mode failed

具体的报错日志中包含
Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:UNIQUE KEY ()^
Encountered: )
Expected: IDENTIFIER
解决方案:详见链接[issue](https://github.com/apache/seatunnel/issues/6646)使用了上述配置文件中的`save_mode_create_template`字段解决,目标中值的可以自行根据业务配置。

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]
在使用influxdb的连接时,遇到了**jar包冲突**的问题,最终发现在创建http链接的时候,`retrofit2`的依赖与`datahub`连接器中的存在版本冲突,我这里没有使用到`datahub`,所以**删除datahub的连接器**即可解决问题

Apache Doris BIGINT类型精度丢失问题

详见帖子

配置主键

Doris配置save_mode_create_template包含主键时,主键类型必须是数字或日期类型。

上面的source配置的schema中的id,接口返回的实际类型是字符串类型,但是是雪花算法的全数字类型,所以使用BIGINT类型自动转换

原因是Sink配置中的save_mode_create_templateUNIQUE KEY使用的id作为主键,Doris要求主键列类型必须是数字或者日期类型!!

个人经验

  1. 当sink、source、transform只有一个时,可以省略result_table_name、source_table_name配置项
  2. 下载源码,修改源码,在源码中增加log日志,并打包替换SeaTunnel运行时的jar,以方便根据日志得到自己想知道的结果或者方便理解代码
  3. 根据1的运用,熟知代码后可以进行二次开发,例如需要token认证的接口该怎么处理,值得深思。
  4. 另外source配置中的json_field中的value的JsonPath值,不支持 列表中复杂类型取值的问题Array或Map<String, Object>。也可以考虑二开解决
    // 举例:
    {"code": "0000","msg": "成功","data": {"records": [{"id": "1798895733824393218","taskContent": "许可证02","taskType": "许可证","region_list": [ // 此格式中的region_list无法解析和同步 $.data.records[*].region_list[*].id 会报数据和总数不匹配的错误{"id":"1","name": "11"},{"id":"1","name": "11"}]}]}
    }

    附上 测试代码 (使用的是JDK17)

        private static final Option[] DEFAULT_OPTIONS = {Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL};private JsonPath[] jsonPaths;private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);@Testpublic void test5() {String data = """{"code": "0000","msg": "成功","data": {"records": [{"id": "1798895733824393218","taskContent": "12312312313"}]}}""";Map<String, String> map = new HashMap<>();map.put("id", "$.data.records[*].id");map.put("taskContent", "$.data.records[*].taskContent");JsonField jsonField = JsonField.builder().fields(map).build();initJsonPath(jsonField);data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();log.error(data);}// 以下代码都是HttpSourceReader中的代码private void initJsonPath(JsonField jsonField) {jsonPaths = new JsonPath[jsonField.getFields().size()];for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {jsonPaths[index] =JsonPath.compile(jsonField.getFields().values().toArray(new String[] {})[index]);}}private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) {List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size());String[] keys = jsonField.getFields().keySet().toArray(new String[] {});for (List<String> data : datas) {Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size());final int[] index = {0};data.forEach(field -> {decodeData.put(keys[index[0]], field);index[0]++;});decodeDatas.add(decodeData);}return decodeDatas;}private List<List<String>> decodeJSON(String data) {ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);List<List<String>> results = new ArrayList<>(jsonPaths.length);for (JsonPath path : jsonPaths) {List<String> result = jsonReadContext.read(path);results.add(result);}for (int i = 1; i < results.size(); i++) {List<?> result0 = results.get(0);List<?> result = results.get(i);if (result0.size() != result.size()) {throw new HttpConnectorException(HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,String.format("[%s](%d) and [%s](%d) the number of parsing records is inconsistent.",jsonPaths[0].getPath(),result0.size(),jsonPaths[i].getPath(),result.size()));}}return dataFlip(results);}private List<List<String>> dataFlip(List<List<String>> results) {List<List<String>> datas = new ArrayList<>();for (int i = 0; i < results.size(); i++) {List<String> result = results.get(i);if (i == 0) {for (Object o : result) {String val = o == null ? null : o.toString();List<String> row = new ArrayList<>(jsonPaths.length);row.add(val);datas.add(row);}} else {for (int j = 0; j < result.size(); j++) {Object o = result.get(j);String val = o == null ? null : o.toString();List<String> row = datas.get(j);row.add(val);}}}return datas;}

    以上是我的一些经验分享,希望对大家有帮助!

    本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47601.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谈谈大数据采集和常见问题

01 什么是数据采集 数据采集是大数据的基石&#xff0c;不论是现在的互联网公司&#xff0c;物联网公司或者传统的IT公司&#xff0c;每个业务流程环节都会产生大量的数据&#xff0c;同时用户操作的日志也会产生大量的数据&#xff0c;为了将这些结构化和非结构化的数据进行…

[C/C++入门][for]24、菲波那契数列

斐波那契数列是数学中的一个经典数列&#xff0c;以其独特的递归性质而闻名。 数列的前两项通常是0和1&#xff08;或者有时从1开始&#xff0c;当然这个不是强制要求&#xff09;&#xff0c;之后的每一项都是前两项的和。数列的前几项如下所示&#xff1a; 0, 1, 1, 2, 3, …

docker网络互联

最近学习docker的时候发现多了很多网卡&#xff0c;这些似乎都和docker有关&#xff0c;所以我便往下深入了解了一番&#xff1b; 一、docker网卡 docker 0是安装 docker 的时候生成的虚拟网桥&#xff0c;它在内核层连通了其他物理或者虚拟网卡&#xff0c;这就可以将所…

AI发展下的伦理挑战,应当如何应对?

AI发展下的伦理挑战&#xff0c;应当如何应对&#xff1f; 人工智能飞速发展的同时&#xff0c;也逐渐暴露出侵犯数据隐私、制造“信息茧房”等种种伦理风险。随着AI技术在社会各个领域的广泛应用&#xff0c;关于AI伦理和隐私保护问题日趋凸显。尽管国外已出台系列法规来规范…

常用优秀内网穿透工具(实测详细版)

文章目录 1、前言2、安装Nginx3、配置Nginx4、启动Nginx服务4.1、配置登录页面 5、内网穿透5.1、cpolar5.1.1、cpolar软件安装5.1.2、cpolar穿透 5.2、Ngrok5.2.1、Ngrok安装5.2.2、随机域名5.2.3、固定域名5.2.4、前后端服务端口 5.3、NatApp5.4、Frp5.4.1、下载Frp5.4.2、暴露…

自动发卡机器人来看:生成式AI的未来,是聊天还是代理?

引言 今天我们要聊聊一个有趣的话题&#xff1a;生成式AI的未来究竟是在聊天系统&#xff08;Chat&#xff09;中展现智慧&#xff0c;还是在自主代理&#xff08;Agent&#xff09;中体现能力&#xff1f; 一、生成式AI&#xff0c;你是谁&#xff1f; 首先&#xff0c;生成…

【数据结构】--- 栈和队列

前言 前面学习了数据结构的顺序表、单链表、双向循环链表这些结构&#xff1b;现在就来学习栈和队列&#xff0c;这里可以简单的说栈和队列是具有特殊化的线性表 一、栈 1.1、栈的概念和结构 栈是一种遵循先入后出逻辑的线性数据结构。 栈是一种特殊的线性表&#xff0c;它只允…

vivado FFT IP Core

文章目录 前言FFT IP 接口介绍接口简介tdata 格式说明 其他细节关于计算精度及缩放系数计算溢出架构选择数据顺序实时/非实时模式数据输入输出时序关于配置信息的应用时间节点 FFT IP 例化介绍控制代码实现 & 测试参考文献 前言 由于计算资源受限&#xff0c;准备将上位机 …

【漏洞复现】泛微E-Cology WorkflowServiceXml SQL注入漏洞

0x01 产品简介 泛微e-cology是一款由泛微网络科技开发的协同管理平台&#xff0c;支持人力资源、财务、行政等多功能管理和移动办公。 0x02 漏洞概述 泛微OAE-Cology 接口/services/WorkflowServiceXml 存在SQL注入漏洞&#xff0c;可获取数据库权限&#xff0c;导致数据泄露…

Qt日志库QsLog使用教程

前言 最近项目中需要用到日志库。上一次项目中用到了log4qt库&#xff0c;这个库有个麻烦的点是要配置config文件&#xff0c;所以这次切换到了QsLog。用了后这个库的感受是&#xff0c;比较轻量级&#xff0c;嘎嘎好用&#xff0c;推荐一波。 下载QsLog库 https://github.c…

【踩坑日记】【教程】嵌入式 Linux 通过 nfs 下载出现 T T T T [Retry count exceeded: starting again]

文章目录 1 本篇文章解决的问题2 问题解决原理3 问题环境4 开启 ubuntu-20.04 的 nfs24.1 确认 nfs2 是否已经开启4.2 开启 nfs2 5 卸载 iptables5.1 卸载 iptables5.2 禁用 ufw5.3 尝试重新下载 6 原理分析6.1 nfs2 开启部分6.2 卸载 iptables 部分 7 后记7.1 拓扑结构一7.2 拓…

生成Elasticsearch xpack安全认证证书

首次启动 Elasticsearch 时&#xff0c;会为用户生成密码&#xff0c;并自动为用户配置 TLS &#xff0c;可以随时调整 TLS 配置&#xff0c;更新节点证书 一、生成证书 1、在任意节点上进入 Elasticsearch 的安装目录&#xff0c;使用 elasticsearch-certutil 为集群生成 CA…

【博士每天一篇文献-算法】连续学习算法之HNet:Continual learning with hypernetworks

阅读时间&#xff1a;2023-12-26 1 介绍 年份&#xff1a;2019 作者&#xff1a;Johannes von Oswald&#xff0c;Google Research&#xff1b;Christian Henning&#xff0c;EthonAI AG&#xff1b;Benjamin F. Grewe&#xff0c;苏黎世联邦理工学院神经信息学研究所 期刊&a…

npm 设置镜像

设置淘宝源 npm config set registry https://registry.npmmirror.com 设置阿里云源 npm config set registry https://npm.aliyun.com 设置腾讯云源 npm config set registry https://mirrors.cloud.tencent.com/npm/ 设置华为云源 npm config set registry https://mi…

嘿!openlayer(三)

嘿&#xff01;openlayer&#xff08;三&#xff09; 第三章 面向对象的openlayer 文章目录 嘿&#xff01;openlayer&#xff08;三&#xff09;前言一、面向对象的openlayer核心类渲染类事件类openlayer 主要工作原理数据组织数据解析数据渲染 二、直击深处OpenLayers 内部生…

【Vue3 ts】echars图表展示统计的月份数据

图片展示 此处内容为展示24年各个月份产品的创建数量。在后端统计24年各个月份产品数量后&#xff0c;以数组的格式发送给前端&#xff0c;前端负责展示。 后端 entity层&#xff1a; Data Schema(description "月份统计")public class MonthCount {private Stri…

处理uniapp刷新后,点击返回按钮跳转到登录页的问题

在使用uniapp的原生返回的按钮时&#xff0c;如果没有刷新会正常返回到对应的页面&#xff0c;如果刷新后会在当前页反复横跳&#xff0c;或者跳转到登录页。那个时候我第一个想法时&#xff1a;使用浏览器的history.back()方法。因为浏览器刷新后还是可以通过右上角的返回按钮…

01认识Java(介绍安装调试)

单元概述 本章主要介绍Java语言的发展历史&#xff0c;了解Java的运行原理及Java编程语言的特点&#xff0c;通过搭建Eclipse集成开发环境来运行Java应用程序。 1.1 Java简介 1.1.1 什么是Java 计算机语言是人与计算机之间的通讯语言&#xff0c;分为机器语言、汇编语言、高…

短视频是如何一步步“蚕食”我们大脑的?

点击上方△腾阳 关注 转载请联系授权 你好&#xff0c;我是腾阳。 今天我们将深入探讨短视频是如何「蚕食」我们的大脑。 首先问下自己&#xff0c;你有多久没有看完一篇长文了&#xff1f; 你是否曾在清晨阳光中&#xff0c;被手机屏幕上短视频图标吸引&#xff0c;而忘记…

ArrayList.subList的踩坑

需求描述&#xff1a;跳过list中的第一个元素&#xff0c;获取list中的其他元素 原始代码如下&#xff1a; List<FddxxEnterpriseVerify> companyList fddxxEnterpriseVerifyMapper.selectList(companyQueryWrapper);log.info("获取多个法大大公司数据量为&#…