SeaTunnel扩展Transform插件,自定义转换插件

代码结构

在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

自定义转换插件功能说明

这是个适配KafkaSource的转换插件,接收到的原文格式为:

{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

需要转换为只保留cont里面的数据

{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

任务配置文件

env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "STREAMING"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Kafka {bootstrap.servers = "xxxxx:9092"topic = "test_in2"consumer.group = "167321237613format="text"result_table_name="kafka"}}transform {ExtractFromCJ {source_table_name="kafka"result_table_name="kafka1"schema = {fields {NAME = "string"TABLE = "string"create_time = "string"ID="string"}}}}sink {kafka {source_table_name="kafka1"topic = "test_out2"bootstrap.servers = "xxxx:9092"kafka.request.timeout.ms = 60000semantics = EXACTLY_ONCE}}

代码说明

XXXConfig代码,这个类主要用来保存transform的配置项

package org.apache.seatunnel.transform.extract;import lombok.Getter;import lombok.Setter;import org.apache.seatunnel.api.configuration.Option;import org.apache.seatunnel.api.configuration.Options;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import java.io.Serializable;import java.util.Map;@Getter@Setterpublic class ExtractFromCJTransformConfig implements Serializable {public static final Option<Map<String, String>> SCHEMA =Options.key("schema.fields").mapType().noDefaultValue().withDescription("Specify the field mapping relationship between input and output");private Map<String, String> fieldColumns;public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();Map<String, String> fieldColumns = config.get(SCHEMA);extractFromCJTransformConfig.setFieldColumns(fieldColumns);return extractFromCJTransformConfig;}}

XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

package org.apache.seatunnel.transform.extract;import com.google.auto.service.AutoService;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.connector.TableTransform;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableFactoryContext;import org.apache.seatunnel.api.table.factory.TableTransformFactory;@AutoService(Factory.class)public class ExtractFromCJTransformFactory implements TableTransformFactory {@Overridepublic String factoryIdentifier() {return  "ExtractFromCJ";}@Overridepublic OptionRule optionRule() {return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();}@Overridepublic TableTransform createTransform(TableFactoryContext context) {CatalogTable catalogTable = context.getCatalogTable();ReadonlyConfig options = context.getOptions();ExtractFromCJTransformConfig extractFromCJTransformConfig =ExtractFromCJTransformConfig.of(options);return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);}}

XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

package org.apache.seatunnel.transform.extract;import cn.hutool.core.collection.CollUtil;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.google.auto.service.AutoService;import lombok.NoArgsConstructor;import lombok.NonNull;import lombok.extern.slf4j.Slf4j;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.ConfigValidator;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;import org.apache.seatunnel.api.table.catalog.Column;import org.apache.seatunnel.api.table.catalog.ConstraintKey;import org.apache.seatunnel.api.table.catalog.PhysicalColumn;import org.apache.seatunnel.api.table.catalog.PrimaryKey;import org.apache.seatunnel.api.table.catalog.TableIdentifier;import org.apache.seatunnel.api.table.catalog.TableSchema;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.api.transform.SeaTunnelTransform;import org.apache.seatunnel.shade.com.typesafe.config.Config;import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;@AutoService(SeaTunnelTransform.class)@NoArgsConstructor@Slf4jpublic class ExtractFromCJTransform extends AbstractCatalogSupportTransform {private ExtractFromCJTransformConfig config;protected SeaTunnelRowType inputRowType;@Overridepublic String getPluginName() {return "ExtractFromCJ";}public ExtractFromCJTransform(@NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {super(catalogTable);this.config = config;}@Overrideprotected void setConfig(Config pluginConfig) {ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)).validate(new ExtractFromCJTransformFactory().optionRule());this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));}@Overrideprotected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {return inputRowType;}@Overrideprotected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {Object content = inputRow.getFields()[0];String data = content.toString();Object[] outputDataArray = new Object[0];if (JSONUtil.isJson(data)) {JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");if (!cont.isEmpty()) {if (!CollUtil.isEmpty(this.config.getFieldColumns())) {outputDataArray = new Object[this.config.getFieldColumns().size()];int t = 0;for (String key : this.config.getFieldColumns().keySet()) {String value = cont.getStr(key);outputDataArray[t] = value;t++;}} else {outputDataArray = new Object[1];outputDataArray[0] = JSONUtil.toJsonStr(cont);}}}SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);outputRow.setRowKind(inputRow.getRowKind());outputRow.setTableId(inputRow.getTableId());return outputRow;}@Overrideprotected TableSchema transformTableSchema() {List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();List<ConstraintKey> outputConstraintKeys =inputCatalogTable.getTableSchema().getConstraintKeys().stream().map(ConstraintKey::copy).collect(Collectors.toList());PrimaryKey copiedPrimaryKey =inputCatalogTable.getTableSchema().getPrimaryKey() == null? null: inputCatalogTable.getTableSchema().getPrimaryKey().copy();if (CollUtil.isEmpty(this.config.getFieldColumns())) {return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(inputColumns).constraintKey(outputConstraintKeys).build();} else {List<Column> transformColumns = new ArrayList<>();for (String key : this.config.getFieldColumns().keySet()) {SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));}return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(transformColumns).constraintKey(outputConstraintKeys).build();}}@Overrideprotected TableIdentifier transformTableIdentifier() {return inputCatalogTable.getTableId().copy();}}

文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

执行结果

来源消息

结果消息

以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67632.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为OD机试 - 找出经过特定点的路径长度 - 深度优先搜索(Java 2022 Q4 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…

【记录】USSOCOM Urban3D 数据集读取与处理

Urban3D数据集内容简介 Urban3D数据集图像为正摄RGB影像&#xff0c;分辨率为50cm。 从SpaceNet上使用aws下载数据&#xff0c;文件夹结构为&#xff1a; |- 01-Provisional_Train|- GT|- GT中包含GTC&#xff0c;GTI&#xff0c;GTL.tif文件&#xff0c;GTL为ground truth b…

合并两个有序链表(每日一题)

“路虽远&#xff0c;行则将至” ❤️主页&#xff1a;小赛毛 ☕今日份刷题&#xff1a;合并两个有序链表 题目描述&#xff1a; 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例1&#xff1a; 输入&#xff1a;l1 …

web pdf 拖拽签章

web pdf 拖拽签章 主要通过火狐的pdfjs 来实现 1. 下载js 并编译 地址 https://mozilla.github.io/pdf.js/ 按照官网当下下载并编译就得到了js 2.其实也没有什么好讲的&#xff0c;都是用的js中的方法&#xff0c;官网中都有 按照步骤就能生成一个document元素&#xff0c;然…

【真题解析】系统集成项目管理工程师 2023 年上半年真题卷(案例分析)

本文为系统集成项目管理工程师考试(软考) 2023 年上半年真题(全国卷),包含答案与详细解析。考试共分为两科,成绩均 ≥45 即可通过考试: 综合知识(选择题 75 道,75分)案例分析(问答题 4 道,75分)案例分析(问答题*4)试题一试题二试题三试题四案例分析(问答题*4) …

FANUC机器人电气控制柜内部硬件电路和模块详细介绍

FANUC机器人电气控制柜内部硬件电路和模块详细介绍 PSU电源单元 通过背板传输了如下电源 +5 +2.0V +3.3 +24v +24E +15V -15V 主板--接口描述: 主板内部结构: 面板电路板: 引申一下 KM21 与 KM22 的作用它们分别接至操作面板上上的急停按

基于RabbitMQ的模拟消息队列之五——虚拟主机设计

文章目录 一、创建VirtualHost类二、初始化三、API1.创建交换机2.删除交换机3.创建队列4.删除队列5.创建绑定6.删除绑定7.发送消息转发规则 8.订阅消息1.消费者管理2.推送消息给消费者 3.添加一个消费者管理ConsumerManager9.确认消息 创建VirtualHost类。 1.串起内存和硬盘的数…

Ansible之变量

一&#xff09;Ansible变量介绍 我们在PlayBook⼀节中&#xff0c;将PlayBook类⽐成了Linux中的shell。 那么它作为⼀⻔Ansible特殊的语⾔&#xff0c;肯定要涉及到变量定义、控 制结构的使⽤等特性。 在这⼀节中主要讨论变量的定义和使⽤ 二&#xff09;变量命名规则 变量的…

机器学习-波士顿房价预测

目录 一.数据处理 读入数据 数据形状变换 数据集划分 数据归一化处理 将上面封装成load data函数 二. 模型设计 完整封装运行代码&#xff1a; 根据loss值进行梯度计算 控制部分变量的变化图像&#xff1a; 一.数据处理 读入数据 # 导入需要用到的package import numpy as np…

51单片机热水器温度控制系统仿真设计( proteus仿真+程序+原理图+报告+讲解视频)

51单片机热水器温度控制系统仿真设计 1.主要功能&#xff1a;2.仿真3. 程序代码4. 原理图5. 设计报告6. 设计资料内容清单 &&下载链接 51单片机热水器温度控制系统仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 仿真图proteus7.8及以上 程序编译器&#x…

常见的几种排序算法

目录 一、插入排序 1、直接插入排序 1.1、排序方法 1.2、图解分析 1.3、代码实现 2、希尔排序 2.1、排序方法 2.2、图解分析 2.3、代码实现 二、选择排序 1、直接选择排序 1.1、排序方法 1.2、图解分析 1.3、代码实现 2、堆排序 2.1、排序方法 2.2、图解分析 …

按键精灵调节界面不显示插件

就像我这样的---这是正常的现象 但是假如你不小心把这个给岔了&#xff0c;那么 点击了启动它就是这样的 这个东西的唯一解决措施就是电脑重启&#xff0c;没得办法&#xff0c;天地万物都有bug这个没得办法

Vue——vue3中的ref和reactive数据理解以及父子组件之间props传递的数据

ref()函数 这是一个用来接受一个内部值&#xff0c;返回一个响应式的、可更改的 ref 对象&#xff0c;此对象只有一个指向其内部值的属性 .value。 作用&#xff1a;创建一个响应式变量&#xff0c;使得某个变量在发生改变时可以同步发生在页面上。 模板语句中使用这个变量时…

使用Puppeteer进行游戏数据可视化

导语 Puppeteer是一个基于Node.js的库&#xff0c;可以用来控制Chrome或Chromium浏览器&#xff0c;实现网页操作、截图、测试、爬虫等功能。本文将介绍如何使用Puppeteer进行游戏数据的爬取和可视化&#xff0c;以《英雄联盟》为例。 概述 《英雄联盟》是一款由Riot Games开…

docker 安装rabbitmq

前提&#xff1a;安装好docker docker安装_Steven-Russell的博客-CSDN博客 centos7安装docker_centos7 docker 安装软件_Steven-Russell的博客-CSDN博客 1、启动docker systemctl start docker 2、下载镜像 // 可以先search查询一下可用镜像&#xff0c;此处直接下载最新版本…

JavaScript代码中字符串如何换行?

在工作中&#xff0c;代码提交之前可能会有一些语法检查的限制&#xff0c;限制我们的单行代码长度。 对于一些逻辑代码&#xff0c;有多种换行方式。这里主要记录一下对于字符串过长情况的处理方式。 对于字符串&#xff0c;除了使用 进行字符串拼接之外&#xff0c;也可以…

在公网上使用SSH远程连接安卓手机Termux:将Android手机变身为远程服务器

文章目录 前言1.安装ssh2.安装cpolar内网穿透3.远程ssh连接配置4.公网远程连接5.固定远程连接地址 前言 使用安卓机跑东西的时候&#xff0c;屏幕太小&#xff0c;有时候操作不习惯。不过我们可以开启ssh&#xff0c;使用电脑PC端SSH远程连接手机termux。 本次教程主要实现在…

猫头虎博主赠书二期:《Go黑帽子 渗透测试编程之道(安全技术经典译丛) 》

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

算法通关村第11关【白银】| 位运算高频算法题

一、移位的妙用 1.位1的个数 思路&#xff1a; 利用一个数和1与操作&#xff0c;结果就是最低位的特点&#xff0c;每次右移都能知道一位是不是1 public class Solution {// you need to treat n as an unsigned valuepublic int hammingWeight(int n) {int count 0;for(in…

Oracle数据库尚硅谷学习笔记

文章目录 Oracle数据库体系结构简介补充SQL初步导入sql文件别名连接符distinct去重的坑 过滤和排序数据日期格式比较运算其它比较运算符逻辑运算优先级排序 单行函数SQL中不同类型的函数单行函数字符数值日期转换通用 使用条件表达式嵌套查询 多表查询等值连接非等值连接左外连…