SeaTunnel扩展Source插件,自定义connector-webservice

代码结构

在seatunnel-connectors-v2中新建connector-webservice模块,可以直接赋值connector-http-base模块,webservice和http的方式比较类似,有些类直接复制了http中的代码。

核心类有WebserviceConfig,WebserviceParameter,WebserviceSource,WebserviceSourceReader

配置文件

env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "BATCH"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Webservice {url = "http://www.xxx.com.cn/xxx/WeatherWebService.asmx?wsdl"method = "getSupportCity"namespaceUri = "http://xxx.com.cn/"params = {"byProvinceName"="xxx"}result_table_name="table_3"}
}
transform {Sql {source_table_name = "table_3"result_table_name = "table_4"query = "select content  as fname from table_3"}
}
sink {Jdbc {removeDatabase="true"code="target"_compsName="sss"description=""mapType="1"source_table_name="table_4"writeMode="0"type="5"database="xxx"password="xxx"driver="com.mysql.cj.jdbc.Driver"url="jdbc:mysql://192.168.xxx:3306/xxx_test"pluginName="Jdbc"datasource="197"emptyType="2"user="xxx"table="xxx"generate_sink_sql="true"}
}

代码说明

WebserviceConfig
package org.apache.seatunnel.connectors.seatunnel.webservice.config;import lombok.Data;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;import java.io.Serializable;
import java.util.Map;@Data
public class WebserviceConfig  implements Serializable {public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;public static final Option<String> FORMAT =Options.key("format").stringType().defaultValue("JSON").withDescription("Http response format");public static final Option<String> URL =Options.key("url").stringType().noDefaultValue().withDescription("Webservice request url");public static final Option<String> METHOD =Options.key("method").stringType().noDefaultValue().withDescription("Webservice request method");public static final Option<String> NAMESPACE_URI =Options.key("namespaceUri").stringType().noDefaultValue().withDescription("Webservice request namespaceUri");public static final Option<Map<String, String>> PARAMS =Options.key("params").mapType().noDefaultValue().withDescription("Webservice request params");}

WebserviceParameter

package org.apache.seatunnel.connectors.seatunnel.webservice.config;import lombok.Data;import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;import org.apache.seatunnel.shade.com.typesafe.config.Config;
@Data
public class WebserviceParameter implements Serializable {protected String url;protected String method;protected String namespaceUri;protected Map<String, String> params;protected String body;public void buildWithConfig(Config pluginConfig) {this.setUrl(pluginConfig.getString(WebserviceConfig.URL.key()));this.setMethod(pluginConfig.getString(WebserviceConfig.METHOD.key()));this.setNamespaceUri(pluginConfig.getString(WebserviceConfig.NAMESPACE_URI.key()));if (pluginConfig.hasPath(WebserviceConfig.PARAMS.key())) {this.setParams(pluginConfig.getConfig(WebserviceConfig.PARAMS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,entry -> String.valueOf(entry.getValue().unwrapped()),(v1, v2) -> v2)));}}}

DeserializationCollector

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.seatunnel.connectors.seatunnel.webservice.source;import lombok.AllArgsConstructor;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;import java.io.IOException;@AllArgsConstructor
public class DeserializationCollector {private DeserializationSchema<SeaTunnelRow> deserializationSchema;public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {if (deserializationSchema instanceof JsonDeserializationSchema) {((JsonDeserializationSchema) deserializationSchema).collect(message, out);} else {SeaTunnelRow deserialize = deserializationSchema.deserialize(message);out.collect(deserialize);}}
}

SimpleTextDeserializationSchema

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.seatunnel.connectors.seatunnel.webservice.source;import lombok.AllArgsConstructor;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;@AllArgsConstructor
public class SimpleTextDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {private SeaTunnelRowType rowType;@Overridepublic SeaTunnelRow deserialize(byte[] message) {return new SeaTunnelRow(new Object[] {new String(message)});}@Overridepublic SeaTunnelDataType<SeaTunnelRow> getProducedType() {return rowType;}
}

WebserviceSource
package org.apache.seatunnel.connectors.seatunnel.webservice.source;import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.webservice.config.WebserviceConfig;
import org.apache.seatunnel.connectors.seatunnel.webservice.config.WebserviceParameter;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;@AutoService(SeaTunnelSource.class)
public class WebserviceSource extends AbstractSingleSplitSource<SeaTunnelRow> {protected final WebserviceParameter webserviceParameter = new WebserviceParameter();protected SeaTunnelRowType rowType;protected JobContext jobContext;protected String contentField;protected DeserializationSchema<SeaTunnelRow> deserializationSchema;@Overridepublic String getPluginName() {return "Webservice";}@Overridepublic void prepare(Config pluginConfig) throws PrepareFailException {CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, WebserviceConfig.URL.key());if (!result.isSuccess()) {throw new RuntimeException(String.format("PluginName: %s, PluginType: %s, Message: %s",getPluginName(), PluginType.SOURCE, result.getMsg()));}this.webserviceParameter.buildWithConfig(pluginConfig);buildSchemaWithConfig(pluginConfig);}protected void buildSchemaWithConfig(Config pluginConfig) {if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();// default use json formatString format = WebserviceConfig.FORMAT.defaultValue();if (pluginConfig.hasPath(WebserviceConfig.FORMAT.key())) {format = pluginConfig.getString(WebserviceConfig.FORMAT.key());}switch (format.toLowerCase()) {case "json":this.deserializationSchema =new JsonDeserializationSchema(false, false, rowType);break;default:// TODO: use format SPIthrow new RuntimeException(String.format("Unsupported data format [%s], http connector only support json format now",format));}} else {this.rowType = CatalogTableUtil.buildSimpleTextSchema();this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType);}}@Overridepublic void setJobContext(JobContext jobContext) {this.jobContext = jobContext;}@Overridepublic Boundedness getBoundedness() {return Boundedness.BOUNDED;}@Overridepublic SeaTunnelDataType<SeaTunnelRow> getProducedType() {return this.rowType;}@Overridepublic AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {return new WebserviceSourceReader(webserviceParameter, readerContext,deserializationSchema,contentField);}
}

WebserviceSourceReader
package org.apache.seatunnel.connectors.seatunnel.webservice.source;import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.template.Template;
import cn.hutool.http.webservice.SoapClient;
import cn.hutool.http.webservice.SoapProtocol;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ReadContext;
import org.apache.seatunnel.connectors.seatunnel.webservice.config.WebserviceParameter;import java.io.IOException;
import java.util.HashMap;@Slf4j
public class WebserviceSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {protected final SingleSplitReaderContext context;private static final Option[] DEFAULT_OPTIONS = {Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL};private final String contentJson;private final Configuration jsonConfiguration =Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);protected final WebserviceParameter webserviceParameter;private final DeserializationCollector deserializationCollector;public WebserviceSourceReader(WebserviceParameter webserviceParameter,SingleSplitReaderContext context,DeserializationSchema<SeaTunnelRow> deserializationSchema,String contentJson) {this.webserviceParameter = webserviceParameter;this.context = context;this.contentJson = contentJson;this.deserializationCollector = new DeserializationCollector(deserializationSchema);}@Overridepublic void open() throws Exception {log.info("WebserviceSourceReader open");}@Overridepublic void close() throws IOException {log.info("WebserviceSourceReader close");}@Overridepublic void pollNext(Collector<SeaTunnelRow> output) throws Exception {try {SoapClient client = SoapClient.create(webserviceParameter.getUrl()).setMethod(webserviceParameter.getMethod(), webserviceParameter.getNamespaceUri());for (String key : webserviceParameter.getParams().keySet()) {String param = webserviceParameter.getParams().get(key);client = client.setParam(key, param);}String result = client.send(false);
//        deserializationCollector.collect(result.getBytes(), output);SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{getSoapBody(result)});output.collect(seaTunnelRow);log.info("WebserviceSourceReader pollNext");} catch (Exception e) {e.printStackTrace();} finally {if (Boundedness.BOUNDED.equals(context.getBoundedness())) {// signal to the source that we have reached the end of the data.log.info("Closed the bounded http source");context.signalNoMoreElement();}}}public String getSoapBody(String xml) {if (xml.indexOf("<soap:Body>") != -1) {return StrUtil.subBetween(xml, "<soap:Body>", "</soap:Body>");} else {return StrUtil.subBetween(xml, "<soap12:Body>", "</soap12:Body>");}}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--Licensed to the Apache Software Foundation (ASF) under one or morecontributor license agreements.  See the NOTICE file distributed withthis work for additional information regarding copyright ownership.The ASF licenses this file to You under the Apache License, Version 2.0(the "License"); you may not use this file except in compliance withthe License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.apache.seatunnel</groupId><artifactId>seatunnel-connectors-v2</artifactId><version>2.3.3-SNAPSHOT</version></parent><artifactId>connector-webservice</artifactId><name>SeaTunnel : Connectors V2 : Webservice</name><properties><rabbitmq.version>5.9.0</rabbitmq.version><json-path.version>2.7.0</json-path.version></properties><dependencies><dependency><groupId>org.apache.seatunnel</groupId><artifactId>connector-common</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.apache.seatunnel</groupId><artifactId>seatunnel-format-json</artifactId><version>${project.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.22</version></dependency><dependency><groupId>com.jayway.jsonpath</groupId><artifactId>json-path</artifactId><version>${json-path.version}</version></dependency></dependencies><scm><tag>2.3.2</tag></scm>
</project>

执行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/190703.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【上海大学数字逻辑实验报告】三、组合电路(二)

一、实验目的 掌握8421码到余3码的转换。掌握2421码到格雷码的转换。进一步熟悉组合电路的分析和设计方法。学会使用Quartus II设计8421码到余3码的转换电路逻辑图。学会使用Quartus II设计2421码到格雷码的转换电路逻辑图。 二、实验原理 8421码是最常用的BCD码&#xff0c…

TOP-K问题和向上调整算法和向下调整算法的时间复杂度问题的分析

TOP-K问题 TOP-K问题&#xff1a;即求数据结合中前K个最大的元素或者最小的元素&#xff0c;一般情况下数据量都比较大 比如&#xff1a;专业前10名、世界500强、富豪榜、游戏中前100的活跃玩家等 对于Top-K问题&#xff0c;能想到的最简单直接的方式就是排序&#xff0c;但是…

【程序设计】简易生产者、消费者模型

需求&#xff1a; 创建消息队列时需要指定队列的容量上限&#xff0c;队列中没有消息时&#xff0c;消费者从队列中take元素会阻塞&#xff1b;队列中的消息数量达到容量上限时&#xff0c;生产者往队列中put元素会阻塞。要保证线程安全。 组成&#xff1a; &#xff08;1&…

【NeurIPS 2023】PromptIR: Prompting for All-in-One Blind Image Restoration

PromptIR: Prompting for All-in-One Blind Image Restoration&#xff0c; NeurIPS 2023 论文&#xff1a;https://arxiv.org/abs/2306.13090 代码&#xff1a;https://github.com/va1shn9v/promptir 解读&#xff1a;即插即用系列 | PromptIR&#xff1a;MBZUAI提出一种基…

文件操作--IO

目录 ♫什么是文件 ♫文件路径 ♫文件类型 ♫文件的管理 ♪File的构造方法 ♪File的常用方法 ♫文件的内容操作 ♪InputStream ♪OutputStream ♪字符流读写文件 ♫Scanner与流对象 ♫什么是文件 文件在计算机里可以指“狭义”的文件&#xff08;指硬盘上的文件和目录&…

c语言详解牛顿迭代法以及求解倒数和平方根

Newtons iteration method 是在实数域和复数域利用切线不断逼近方程根的一种求高次曲线方程的方法&#xff0c;区别于梯度下降法&#xff0c;它是二阶导&#xff0c;收敛速度比较快&#xff0c;对于非凸函数&#xff0c;牛顿法容易受到鞍点或者最大值点的吸引。由于牛顿迭代法是…

产品学习之路(一)

在做好开发的同时&#xff0c;还需要熟悉产品业务逻辑&#xff0c;不能为了功能而做功能&#xff0c;要从产品经理的角度去看待每个需求和客户痛点所在&#xff0c;这样针对产品设计出来的东西自己也有发言权&#xff1b; 目前作为一名前端开发人员&#xff0c;也在自学产品知识…

xxl-job 分布式任务调度框架

文章目录 分布式任务调度XXL-Job 简介XXL-Job 环境搭建XXL-Job (源码说明)配置部署调度中心docker安装 Bean模式任务(方法形式)-入门案例任务详解任务详解-执行器任务详解-基础配置任务详解-调度配置任务详解-基础配置任务详解-阻塞处理策略任务详解-路由策略 路由策略路由策略…

Redis数据结构之压缩列表

压缩列表是Redis为节约内存而开发的&#xff0c;是由一系列特殊编码的连续内存块组成的顺序型数据结构。一个压缩列表可以包含任意多个节点&#xff0c;每个节点可以保存一个字节数组或者整数值。 压缩列表构成 zlbytes: 记录整个压缩列表占用的内存字节数&#xff0c;对压缩列…

LD_PRELOAD劫持、ngixn临时文件、无需临时文件rce

LD_PRELOAD劫持 <1> LD_PRELOAD简介 LD_PRELOAD 是linux下的一个环境变量。用于动态链接库的加载&#xff0c;在动态链接库的过程中他的优先级是最高的。类似于 .user.ini 中的 auto_prepend_file&#xff0c;那么我们就可以在自己定义的动态链接库中装入恶意函数。 也…

Java数据结构之《折半查找》题目

一、前言&#xff1a; 这是怀化学院的&#xff1a;Java数据结构中的一道难度中等的一道编程题(此方法为博主自己研究&#xff0c;问题基本解决&#xff0c;若有bug欢迎下方评论提出意见&#xff0c;我会第一时间改进代码&#xff0c;谢谢&#xff01;) 后面其他编程题只要我写完…

自定义函数中的(int*a,int*b)与(int*a,int n)

事实上第一种更安全&#xff0c;不会因越界发生占位&#xff0c;从而导致错误。

C++的类和对象(一)

目录 1、面向过程和面向对象初认识 2、为什么要有类 3、类的定义 类的两种定义方式 4、类的访问限定符 5、类的作用域 5.1 为什么要有作用域&#xff1f; 5.2类作用域 6、类的实例化 6.1类的实例化的定义 6.2类的实例化的实现 6.3经典面试题 7、类对象 7.1类对…

计算机体系结构补充篇----静态超标量流水线及循环展开(一)

本文仅供学习&#xff0c;不作任何商业用途&#xff0c;严禁转载。部分资料取自----计算机系统结构教程(第二版)张晨曦等。部分资料来自----国科大计算机体系结构课程PPT–张科、刘珂、高婉玲 计算机体系结构----静态超标量流水线及循环展开&#xff08;一&#xff09; 摘要静…

FreeRTOS第2天:

1. 二值信号量简介&#xff08;386.11&#xff09; 什么是信号量&#xff1f; 信号量&#xff08;Semaphore&#xff09;&#xff0c;是在多任务环境下使用的一种机制&#xff0c;是可以用来保证两个或多个关键代码段不被并 发调用。信号量这个名字&#xff0c;我们可以把它拆…

【蓝桥杯软件赛 零基础备赛20周】第5周——高精度大数运算与队列

文章目录 1. 数组的应用–高精度大数运算1.1 Java和Python计算大数1.2 C/C高精度计算大数1.2.1 高精度加法1.2.2 高精度减法 2. 队列2.1 手写队列2.1.1 C/C手写队列2.1.2 Java手写队列2.1.3 Python手写队列 2.2 C STL队列queue2.3 Java队列Queue2.4 Python队列Queue和deque2.5 …

边缘数据中心和5G的融合彻底改变数据传输和物联网

伴随着数字化时代的飞速发展&#xff0c;边缘数据中心和5G技术的联袂崛起&#xff0c;正深刻塑造着人们对数据的创造、传输和处理方式。据Gartner公司的预测&#xff0c;到2025年&#xff0c;企业数据的三分之二将在边缘计算设施中涌现&#xff0c;而非传统的集中式数据中心。这…

134. 加油站(贪心算法)

根据题解 这道题使用贪心算法&#xff0c;找到当前可解决问题的状态即可 「贪心算法」的问题需要满足的条件&#xff1a; 最优子结构&#xff1a;规模较大的问题的解由规模较小的子问题的解组成&#xff0c;规模较大的问题的解只由其中一个规模较小的子问题的解决定&#xff…

分享88个节日PPT,总有一款适合您

分享88个节日PPT&#xff0c;总有一款适合您 88个节日PPT下载链接&#xff1a;https://pan.baidu.com/s/1mfLrdlB9Y1jqz2vkVIwBNA?pwd6666 提取码&#xff1a;6666 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 学习知识费力气&#xff0c;收集整理更不易…