SeaTunnel扩展Transform插件,自定义转换插件

代码结构

在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

自定义转换插件功能说明

这是个适配KafkaSource的转换插件,接收到的原文格式为:

{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

需要转换为只保留cont里面的数据

{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

任务配置文件

env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "STREAMING"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Kafka {bootstrap.servers = "xxxxx:9092"topic = "test_in2"consumer.group = "167321237613format="text"result_table_name="kafka"}}transform {ExtractFromCJ {source_table_name="kafka"result_table_name="kafka1"schema = {fields {NAME = "string"TABLE = "string"create_time = "string"ID="string"}}}}sink {kafka {source_table_name="kafka1"topic = "test_out2"bootstrap.servers = "xxxx:9092"kafka.request.timeout.ms = 60000semantics = EXACTLY_ONCE}}

代码说明

XXXConfig代码,这个类主要用来保存transform的配置项

package org.apache.seatunnel.transform.extract;import lombok.Getter;import lombok.Setter;import org.apache.seatunnel.api.configuration.Option;import org.apache.seatunnel.api.configuration.Options;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import java.io.Serializable;import java.util.Map;@Getter@Setterpublic class ExtractFromCJTransformConfig implements Serializable {public static final Option<Map<String, String>> SCHEMA =Options.key("schema.fields").mapType().noDefaultValue().withDescription("Specify the field mapping relationship between input and output");private Map<String, String> fieldColumns;public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();Map<String, String> fieldColumns = config.get(SCHEMA);extractFromCJTransformConfig.setFieldColumns(fieldColumns);return extractFromCJTransformConfig;}}

XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

package org.apache.seatunnel.transform.extract;import com.google.auto.service.AutoService;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.connector.TableTransform;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableFactoryContext;import org.apache.seatunnel.api.table.factory.TableTransformFactory;@AutoService(Factory.class)public class ExtractFromCJTransformFactory implements TableTransformFactory {@Overridepublic String factoryIdentifier() {return  "ExtractFromCJ";}@Overridepublic OptionRule optionRule() {return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();}@Overridepublic TableTransform createTransform(TableFactoryContext context) {CatalogTable catalogTable = context.getCatalogTable();ReadonlyConfig options = context.getOptions();ExtractFromCJTransformConfig extractFromCJTransformConfig =ExtractFromCJTransformConfig.of(options);return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);}}

XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

package org.apache.seatunnel.transform.extract;import cn.hutool.core.collection.CollUtil;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.google.auto.service.AutoService;import lombok.NoArgsConstructor;import lombok.NonNull;import lombok.extern.slf4j.Slf4j;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.ConfigValidator;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;import org.apache.seatunnel.api.table.catalog.Column;import org.apache.seatunnel.api.table.catalog.ConstraintKey;import org.apache.seatunnel.api.table.catalog.PhysicalColumn;import org.apache.seatunnel.api.table.catalog.PrimaryKey;import org.apache.seatunnel.api.table.catalog.TableIdentifier;import org.apache.seatunnel.api.table.catalog.TableSchema;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.api.transform.SeaTunnelTransform;import org.apache.seatunnel.shade.com.typesafe.config.Config;import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;@AutoService(SeaTunnelTransform.class)@NoArgsConstructor@Slf4jpublic class ExtractFromCJTransform extends AbstractCatalogSupportTransform {private ExtractFromCJTransformConfig config;protected SeaTunnelRowType inputRowType;@Overridepublic String getPluginName() {return "ExtractFromCJ";}public ExtractFromCJTransform(@NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {super(catalogTable);this.config = config;}@Overrideprotected void setConfig(Config pluginConfig) {ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)).validate(new ExtractFromCJTransformFactory().optionRule());this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));}@Overrideprotected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {return inputRowType;}@Overrideprotected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {Object content = inputRow.getFields()[0];String data = content.toString();Object[] outputDataArray = new Object[0];if (JSONUtil.isJson(data)) {JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");if (!cont.isEmpty()) {if (!CollUtil.isEmpty(this.config.getFieldColumns())) {outputDataArray = new Object[this.config.getFieldColumns().size()];int t = 0;for (String key : this.config.getFieldColumns().keySet()) {String value = cont.getStr(key);outputDataArray[t] = value;t++;}} else {outputDataArray = new Object[1];outputDataArray[0] = JSONUtil.toJsonStr(cont);}}}SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);outputRow.setRowKind(inputRow.getRowKind());outputRow.setTableId(inputRow.getTableId());return outputRow;}@Overrideprotected TableSchema transformTableSchema() {List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();List<ConstraintKey> outputConstraintKeys =inputCatalogTable.getTableSchema().getConstraintKeys().stream().map(ConstraintKey::copy).collect(Collectors.toList());PrimaryKey copiedPrimaryKey =inputCatalogTable.getTableSchema().getPrimaryKey() == null? null: inputCatalogTable.getTableSchema().getPrimaryKey().copy();if (CollUtil.isEmpty(this.config.getFieldColumns())) {return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(inputColumns).constraintKey(outputConstraintKeys).build();} else {List<Column> transformColumns = new ArrayList<>();for (String key : this.config.getFieldColumns().keySet()) {SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));}return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(transformColumns).constraintKey(outputConstraintKeys).build();}}@Overrideprotected TableIdentifier transformTableIdentifier() {return inputCatalogTable.getTableId().copy();}}

文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

执行结果

来源消息

结果消息

以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67632.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为OD机试 - 找出经过特定点的路径长度 - 深度优先搜索(Java 2022 Q4 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…

【记录】USSOCOM Urban3D 数据集读取与处理

Urban3D数据集内容简介 Urban3D数据集图像为正摄RGB影像&#xff0c;分辨率为50cm。 从SpaceNet上使用aws下载数据&#xff0c;文件夹结构为&#xff1a; |- 01-Provisional_Train|- GT|- GT中包含GTC&#xff0c;GTI&#xff0c;GTL.tif文件&#xff0c;GTL为ground truth b…

合并两个有序链表(每日一题)

“路虽远&#xff0c;行则将至” ❤️主页&#xff1a;小赛毛 ☕今日份刷题&#xff1a;合并两个有序链表 题目描述&#xff1a; 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例1&#xff1a; 输入&#xff1a;l1 …

web pdf 拖拽签章

web pdf 拖拽签章 主要通过火狐的pdfjs 来实现 1. 下载js 并编译 地址 https://mozilla.github.io/pdf.js/ 按照官网当下下载并编译就得到了js 2.其实也没有什么好讲的&#xff0c;都是用的js中的方法&#xff0c;官网中都有 按照步骤就能生成一个document元素&#xff0c;然…

Linux系统gdb调试常用命令

GDB&#xff08;GNU调试器&#xff09;是一款常用的调试工具&#xff0c;用于调试C、C等编程语言的程序。以下是一些常用的GDB命令&#xff1a; 1. 启动程序&#xff1a; - gdb <executable>&#xff1a;启动GDB调试器&#xff0c;并加载可执行文件。 2. 设置断点&a…

【真题解析】系统集成项目管理工程师 2023 年上半年真题卷(案例分析)

本文为系统集成项目管理工程师考试(软考) 2023 年上半年真题(全国卷),包含答案与详细解析。考试共分为两科,成绩均 ≥45 即可通过考试: 综合知识(选择题 75 道,75分)案例分析(问答题 4 道,75分)案例分析(问答题*4)试题一试题二试题三试题四案例分析(问答题*4) …

FANUC机器人电气控制柜内部硬件电路和模块详细介绍

FANUC机器人电气控制柜内部硬件电路和模块详细介绍 PSU电源单元 通过背板传输了如下电源 +5 +2.0V +3.3 +24v +24E +15V -15V 主板--接口描述: 主板内部结构: 面板电路板: 引申一下 KM21 与 KM22 的作用它们分别接至操作面板上上的急停按

基于RabbitMQ的模拟消息队列之五——虚拟主机设计

文章目录 一、创建VirtualHost类二、初始化三、API1.创建交换机2.删除交换机3.创建队列4.删除队列5.创建绑定6.删除绑定7.发送消息转发规则 8.订阅消息1.消费者管理2.推送消息给消费者 3.添加一个消费者管理ConsumerManager9.确认消息 创建VirtualHost类。 1.串起内存和硬盘的数…

node基础之三:http 模块

// 1. 导入模块 const http require("http"); // 2. 创建服务 const server http.createServer((request, response) > {// 获取请求方法request.method;// 获取请求 url&#xff08;只包含url中的路径和查询字符串&#xff09;request.url;// 获取 HTTP 协议版…

使用gradio创建一个提取pdf、excel中表格数据的demo

使用Gradio创建一个提取pdf、excel中表格数据的demo 最近需要对pdf、excel文件中的表格进行提取&#xff0c;用于一些分析&#xff0c;所以使用python完成了一个小工具&#xff0c;可以处理上传的pdf、excel文件&#xff0c;将其中所有表格提取出后存入数组输出&#xff1a; …

uni-app 中 swiper 轮播图高度自适应

方法一 1、首先 swiper 标签的宽度是 width: 100% 2、swiper 标签存在默认高度是 height: 150px &#xff1b;高度无法实现由内容撑开&#xff0c;在默认情况下&#xff0c;图片的高度显示总是 150px swiper 宽度 / swiper 高度 原图宽度 / 原图高度 swiper 高度 swiper …

Ansible之变量

一&#xff09;Ansible变量介绍 我们在PlayBook⼀节中&#xff0c;将PlayBook类⽐成了Linux中的shell。 那么它作为⼀⻔Ansible特殊的语⾔&#xff0c;肯定要涉及到变量定义、控 制结构的使⽤等特性。 在这⼀节中主要讨论变量的定义和使⽤ 二&#xff09;变量命名规则 变量的…

机器学习-波士顿房价预测

目录 一.数据处理 读入数据 数据形状变换 数据集划分 数据归一化处理 将上面封装成load data函数 二. 模型设计 完整封装运行代码&#xff1a; 根据loss值进行梯度计算 控制部分变量的变化图像&#xff1a; 一.数据处理 读入数据 # 导入需要用到的package import numpy as np…

51单片机热水器温度控制系统仿真设计( proteus仿真+程序+原理图+报告+讲解视频)

51单片机热水器温度控制系统仿真设计 1.主要功能&#xff1a;2.仿真3. 程序代码4. 原理图5. 设计报告6. 设计资料内容清单 &&下载链接 51单片机热水器温度控制系统仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 仿真图proteus7.8及以上 程序编译器&#x…

如何快速生成一个H5滑动的卡片(单页和分页都有)

单页 <ul class"combo"><li v-for"(item, index) in arr" :key"index"><div class"combo-name">{{ item.A }}</div><div class"combo-price">{{ item.B }}</div><div class"co…

常见的几种排序算法

目录 一、插入排序 1、直接插入排序 1.1、排序方法 1.2、图解分析 1.3、代码实现 2、希尔排序 2.1、排序方法 2.2、图解分析 2.3、代码实现 二、选择排序 1、直接选择排序 1.1、排序方法 1.2、图解分析 1.3、代码实现 2、堆排序 2.1、排序方法 2.2、图解分析 …

按键精灵调节界面不显示插件

就像我这样的---这是正常的现象 但是假如你不小心把这个给岔了&#xff0c;那么 点击了启动它就是这样的 这个东西的唯一解决措施就是电脑重启&#xff0c;没得办法&#xff0c;天地万物都有bug这个没得办法

AUTOSAR规范与ECU软件开发(实践篇)9.1 AUTOSAR与功能安全

目录 1、前言 2、AUTOSAR对ISO 26262中支持部分的要求 (1)概述 (2)ISO 26262对架构设计的要求 1、前言 作为当前汽车领域最流行的话题之一, A

Vue——vue3中的ref和reactive数据理解以及父子组件之间props传递的数据

ref()函数 这是一个用来接受一个内部值&#xff0c;返回一个响应式的、可更改的 ref 对象&#xff0c;此对象只有一个指向其内部值的属性 .value。 作用&#xff1a;创建一个响应式变量&#xff0c;使得某个变量在发生改变时可以同步发生在页面上。 模板语句中使用这个变量时…

详细介绍c++中的类

C 中的类是面向对象编程的基本概念&#xff0c;它指的是一种能够封装数据和方法的用户定义数据类型。类是程序中一个重要的概念&#xff0c;它允许程序员通过定义类来实现代码复用、模块化和继承等特性。 C 中的类由以下部分组成&#xff1a; Data members&#xff1a;成员变量…