Seatunnel本地模式快速测验

前言

SeaTunnel(先前称为WaterDrop)是一个分布式、高性能、易于扩展的数据集成平台,旨在实现海量数据的同步和转换。它支持多种数据处理引擎,包括Apache Spark和Apache Flink,并在某个版本中引入了自主研发的Zeta引擎。SeaTunnel不仅适用于离线数据同步,还能支持CDC(Change Data Capture)实时数据同步,这使得它在处理多样化数据集成场景时表现出色。

本节内容作为官方的一个补充测验,快速开始体验吧。


一、Apache Seatunnel是什么?

从官网的介绍看:
Next-generation high-performance, distributed, massive data integration tool.
通过这几个关键词你能看到它的定位:下一代,高性能,分布式,大规模数据集成工具。

那到底好不好用呢?

二、安装

  1. 下载
https://seatunnel.apache.org/download

推荐:v2.3.5

  1. 安装
    环境:Java8
    安装插件:修改 config/plugin_config 以下是一些常用的,不要的暂时不装。
--connectors-v2--
connector-cdc-mysql
connector-clickhouse
connector-file-local
connector-hive
connector-jdbc
connector-kafka
connector-redis
connector-doris
connector-fake
connector-console
connector-elasticsearch
--end--

然后执行:
➜ seatunnel bin/install-plugin.sh 2.3.5
等待执行完毕,就安装完了,很简单。

三、测试

1. 测试 local模式下的用例

修改下模板的测试用例,然后执行如下命令:

bin/seatunnel.sh --config ./config/v2.batch.config -e local任务的配置很简单:
这里使用了FakeSource来模拟输出两列,通过设置并行度=2 来打印 16 条输出数据。
2024-07-01 21:56:06,617 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file:
{"env" : {"parallelism" : 2,"job.mode" : "BATCH","checkpoint.interval" : 10000},"source" : [{"schema" : {"fields" : {"name" : "string","age" : "int"}},"row.num" : 16,"parallelism" : 2,"result_table_name" : "fake","plugin_name" : "FakeSource"}],"sink" : [{"plugin_name" : "Console"}]
}任务的输出信息,这里的输出组件是 Console所以打印到了控制台
2024-07-01 21:56:07,559 INFO  [o.a.s.c.s.f.s.FakeSourceReader] [BlockingWorker-TaskGroupLocation{jobId=860156818549112833, pipelineId=1, taskGroupId=30000}] - Closed the bounded fake source
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : hECbG, 520364021
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : LnGDW, 105727523
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : UYXBT, 1212484110
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : NYiCn, 1208734703
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : cSZan, 151817804
任务的统计信息:
***********************************************Job Statistic Information
***********************************************
Start Time                : 2024-07-01 21:56:06
End Time                  : 2024-07-01 21:56:08
Total Time(s)             :                   2
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

2. 使用 Flink引擎

在上面的测试用例中可以看到如下的日志输出:

 Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='FakeSource'

这表示默认情况下它使用的是 seatunnel engine 执行的,官方称之为 zeta 。 这一块内容我们先看下 Flink引擎这边是如何执行的。

  1. 下载安装 flink1.17
    https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/

    启动local cluster 模式

    ➜  flink bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host MacBook-Pro-2.local.
    Starting taskexecutor daemon on host MacBook-Pro-2.local.
    
  2. 配置环境变量

    ➜  config cat seatunnel-env.sh
    # Home directory of spark distribution.
    SPARK_HOME=${SPARK_HOME:-/Users/mac/apps/spark}
    # Home directory of flink distribution.
    FLINK_HOME=${FLINK_HOME:-/Users/mac/apps/flink}
    
  3. 修改slot插槽数量为大于等于 2
    为什么?因为默认的配置中配置了 2 个并行度,而 local启动的默认情况下只有个插槽可供使用,因此任务无法运行。
    在这里插入图片描述
    默认启动后资源插槽:
    在这里插入图片描述
    提交程序运行后,发现一直无法对 sourcez做任务切分:
    在这里插入图片描述

    这是因为 job 的并行度是 2,如下所示:
    在这里插入图片描述

    在这里插入图片描述

    因此需要修改插槽数量才可以运行,官方这点可没说清楚,需要注意下。

  4. 运行测试用例

    ➜  seatunnel bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
    Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-flink-15-starter.jar --config ./config/v2.streaming.conf.template --name SeaTunnel
    Job has been submitted with JobID 9a949409a6f218d50b66ca22cc49b9c4
    

    现在我们修改插槽数量为 2,测试如下:
    访问:http://localhost:8081/#/overview
    在这里插入图片描述
    TaskManager输出日志如下:
    在这里插入图片描述

3. 使用 Spark引擎

  1. 提交命令
➜  seatunnel bin/start-seatunnel-spark-3-connector-v2.sh \
--master 'local[4]' \
--deploy-mode client \
--config ./config/v2.streaming.conf.templateExecute SeaTunnel Spark Job: ${SPARK_HOME}/bin/spark-submit --class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" --master "local[4]" --deploy-mode "client" --jars "/Users/mac/server/seatunnel/lib/seatunnel-transforms-v2.jar,/Users/mac/server/seatunnel/lib/seatunnel-hadoop3-3.1.4-uber.jar,/Users/mac/server/seatunnel/connectors/connector-fake-2.3.5.jar,/Users/mac/server/seatunnel/connectors/connector-console-2.3.5.jar" --conf "job.mode=STREAMING" --conf "parallelism=2" --conf "checkpoint.interval=2000" /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-spark-3-starter.jar --config "./config/v2.streaming.conf.template" --master "local[4]" --deploy-mode "client" --name "SeaTunnel"

遇到报错:

2024-07-01 23:25:04,610 INFO v2.V2ScanRelationPushDown:
Pushing operators to SeaTunnelSourceTable
Pushed Filters:
Post-Scan Filters:
Output: name#0, age#1Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/write/Write

看样子是缺少包导致的导致的,可以参见 issue讨论https://github.com/apache/seatunnel/issues/4879 貌似需要 spark 版本 >=3.2 ,而我的是 3.1.1 因此当前这个问题暂时无解。

Since spark 3.2.0, buildForBatch and buildForStreaming have been deprecated in org.apache.spark.sql.connector.write.WriteBuilder. So you should keep spark version >= 3.2.0.

于是,我便下载了 3.2.4(spark -> spark-3.2.4-bin-without-hadoop) 测试后出现了新的问题。

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filterat java.lang.Class.getDeclaredMethods0(Native Method)at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)at java.lang.Class.privateGetMethodRecursive(Class.java:3048)at java.lang.Class.getMethod0(Class.java:3018)at java.lang.Class.getMethod(Class.java:1784)at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:684)at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:666)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.Filterat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:359)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 7 more

这说的是 log4j的 jar包似乎不存在,由于我们使用的 spark 版本没有 hadoop的依赖,因此需要在 spark-env.sh里面配置相关的属性,如下:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home
export HADOOP_HOME=/Users/mac/apps/hadoop
export HADOOP_CONF_DIR=/Users/mac/apps/hadoop/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/Users/mac/apps/hadoop/bin/hadoop classpath)
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077

再次提交测试后,结果如下:

24/07/02 13:40:19 INFO ConfigBuilder: Parsed config file:
{"env" : {"parallelism" : 2,"job.mode" : "STREAMING","checkpoint.interval" : 2000},"source" : [{"schema" : {"fields" : {"name" : "string","age" : "int"}},"row.num" : 16,"parallelism" : 2,"result_table_name" : "fake","plugin_name" : "FakeSource"}],"sink" : [{"plugin_name" : "Console"}]
}24/07/02 13:40:19 INFO SparkContext: Running Spark version 3.2.4
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 1 [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16)]
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 0 [FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)]
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_1) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_0) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : eMaly, 2131476727
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : Osfqi, 257240275
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : BYVKb, 730735331

看结果符合预期,也就是使用 spark 提交 seatunnl引擎的流任务,通过FakeSource模拟两列输出了 16 条数据。看来的确是需要 spark3.2.x版本的才能成功了。


参考

https://www.modb.pro/db/605827

总结

本节主要总结了单机模式下使用 seatunel完成官方示例程序,初步体会使用,其实使用起来还是很简单的,模式同我之前介绍的 DataX如出一辙,可喜的是它有自己的 web页面可以配置,
因此后面我将分享下如何在页面中进行配置同步任务,最后时间允许的情况下,分析起优秀的源码设计思路,千里之行始于足下,要持续学习,持续成长,然后持续分享,再会~。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/38655.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Django学习第二天

启动项目命令 python manage.py runserver 动态获取当前时间 javascript实现数据动态更新代码 <script>setInterval(function() {var currentTimeElement document.getElementById(current-time);var currentTime new Date();currentTimeElement.textContent Curren…

经典的卷积神经网络模型 - ResNet

经典的卷积神经网络模型 - ResNet flyfish 2015年&#xff0c;何恺明&#xff08;Kaiming He&#xff09;等人在论文《Deep Residual Learning for Image Recognition》中提出了ResNet&#xff08;Residual Network&#xff0c;残差网络&#xff09;。在当时&#xff0c;随着…

【List】判断集合相等、集合拷贝

【List】判断集合相等、集合拷贝 【一】判断集合是否相等【1】☆使用list中的containAll【2】使用for循环遍历contains方法【3】将list先排序再转为String进行比较【4】使用list.retainAll()方法【5】使用MD5加密方式【6】转换为Java8中的新特性steam流再进行排序来进行比较 【…

AI数字人直播源码出售价格公布!

随着数字人行业的兴起&#xff0c;以数字人直播为代表的应用场景逐渐成为人们日常生活中不可分割的一部分&#xff0c;再加上艾媒研究数据显示&#xff0c;超五成以上的被调查群体的企业使用过虚拟人技术&#xff0c;超三成被调查群体的企业计划使用虚拟人技术等结论的公布&…

python-图像模糊处理(赛氪OJ)

[题目描述] 给定 n 行 m 列的图像各像素点的灰度值&#xff0c;要求用如下方法对其进行模糊化处理&#xff1a; 1. 四周最外侧的像素点灰度值不变。 2. 中间各像素点新灰度值为该像素点及其上下左右相邻四个像素点原灰度值的平均&#xff08;四舍五入&#xff09;输入&#xff…

【C语言】inline 关键字

在C语言中&#xff0c;inline关键字用于建议编译器对函数进行内联展开&#xff0c;而不是像普通函数一样调用。内联函数的目的是减少函数调用的开销&#xff0c;特别是对于简单的、频繁调用的函数。 内联函数的定义和使用 定义内联函数 要定义一个内联函数&#xff0c;需要在…

《代号鸢》国服,能否推动国乙市场重新洗牌?

灵犀互娱《如鸢》顺利拿到版号&#xff0c;再次搅浑了国乙市场这潭水。 六月份游戏版号审批公布后&#xff0c;灵犀互娱运营的《如鸢》引起了关注&#xff0c;这个与《代号鸢》原名《三国志如鸢》雷同的名字&#xff0c;竟然让《代号鸢》玩家大面积破防了。 其实目前关于《如…

for循环中list触发fast-fail或不触发的原理和方法

Iterable和Iterator Iterator接口位于的位置是java.util.Iterator&#xff0c;它主要有两个抽象方法供子类实现。hasNext()用来判断还有没有数据可供访问&#xff0c;next()用来访问下一个数据。 集合Collection不是直接去实现Iterator接口&#xff0c;而是去实现Iterable接口…

【Python】字典练习

python期考练习 目录 1. 首都名​编辑 2. 摩斯电码 3. 登录 4. 学生的姓名和年龄​编辑 5. 电商 6. 学生基本信息 7. 字母数 1. 首都名 初始字典 (可复制) : d{"China":"Beijing","America":"Washington","Norway":…

HCM智能人力资源系统存在命令执行漏洞Getshell

0x01 阅读须知 技术文章仅供参考&#xff0c;此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成…

防爆对讲终端是什么?在哪些行业中应用广泛?

防爆对讲终端是一种特殊设计的通信设备&#xff0c;它具备防爆性能和可靠的通信功能&#xff0c;确保在存在爆炸性气体或粉尘的危险环境中使用时不会引发爆炸或火灾等危险情况。这种设备通过特殊的设计和防护措施&#xff0c;如采用防爆材料、防静电、绝缘、阻燃材料等&#xf…

ABAQUS软件天津正版代理商亿达四方:创新技术,驱动产业升级

在环渤海经济圈的核心地带——天津&#xff0c;随着智能制造与高新技术产业的蓬勃发展&#xff0c;对高端仿真软件的需求日益增长。亿达四方&#xff0c;作为ABAQUS在天津的官方正版代理商&#xff0c;凭借其深厚的行业经验和卓越的服务体系&#xff0c;正为这片热土上的科研机…

2024年度潍坊市职业技能大赛——网络搭建(网络与信息安全管理员)职业技能竞赛样题

2024年度潍坊市职业技能大赛 ——网络搭建&#xff08;网络与信息安全管理员&#xff09;职业技能竞赛样题 网络搭建职业技能竞赛组委会 2024年6月 一、项目简介 &#xff08;一&#xff09;竞赛须知 1.技能操作比赛时间150分钟&#xff0c;你需要合理分配时间。 2.如果没…

Hive常用的内置函数

文章目录 聚合类1.指定列值的数目2.指定列值求和3.最大值4.最小值5.平均值6.中位数函数7.分位数函数 数值类1.取整函数Round(a)2.指定精度取整ROUND(double a,int b)3.向上取整FLOOR()4.向下取整CEIL()5.随机数 rand()6.绝对值函数 日期类获取当前日期获取当前时间戳日期前后日…

基于Java的外卖点餐系统设计与实现

作者介绍&#xff1a;计算机专业研究生&#xff0c;现企业打工人&#xff0c;从事Java全栈开发 主要内容&#xff1a;技术学习笔记、Java实战项目、项目问题解决记录、AI、简历模板、简历指导、技术交流、论文交流&#xff08;SCI论文两篇&#xff09; 上点关注下点赞 生活越过…

java+mysql教师管理系统

完整源码地址 教师信息管理系统使用命令行交互的方式及数据库连接实现教师信息管理系统&#xff0c;该系统旨在实现教师信息的管理&#xff0c;并根据需要进行教师信息展示。该软件的功能有如下功能 (1)基本信息管理(教师号、姓名、性别、出生年月、职称、学历、学位、教师类型…

25西安电子科技大学研究生政策(最新)

25西安电子科技大学研究生政策&#xff08;最新&#xff09; 01全国研究生报名情况 全国研究生报名人数438万&#xff0c;首次下降超36万人。 02西电研究生全日制/非全日制报名情况 西电硕士研究生报考录取情况&#xff08;包含全日制、非全日制&#xff09;&#xff0c;2024年…

python-数据容器对比总结

基于各类数据容器的特点&#xff0c;它们的应用场景如下&#xff1a; 数据容器的通用操作 - 遍历 数据容器的通用统计功能 容器的通用转换功能 容器通用排序功能 容器通用功能总览

一文彻底搞懂Transformer - Input(输入)

一、输入嵌入&#xff08;Input Embedding&#xff09; 词嵌入&#xff08;Word Embedding&#xff09;&#xff1a;词嵌入是最基本的嵌入形式&#xff0c;它将词汇表中的每个单词映射到一个固定大小的向量上。这个向量通常是通过训练得到的&#xff0c;能够捕捉单词之间的语义…

HTTP入门

入门HTTP协议 1. 原理介绍 爬虫就是用程序模拟浏览器的行为&#xff0c;发送请求给服务器&#xff0c;获取网页的内容&#xff0c;解析网页数据。 要学会爬虫&#xff0c;先要了解浏览器是如何和服务器交流的。浏览器通过HTTP协议和服务器交流。 2. HTTP协议简介 2.1…