解决SeaTunnel 2.3.4版本写入S3文件报错问题

在使用Apache SeaTunnel时,我遇到了一个写入S3文件的报错问题。通过深入调试和分析,找到了问题所在,并提出了相应的解决方案。 file

本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向,希望对大家有帮助!

一、详细报错

2024-04-12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

二、参考资料

  • HADOOP-16027:https://issues.apache.org/jira/browse/HADOOP-16027
  • CSDN Blog:https://blog.csdn.net/a18262285324/article/details/112470363
  • AWS SDK Java Issue #2337:https://github.com/aws/aws-sdk-java/issues/2337
  • Amazon SQS Java Messaging Lib Issue #96:https://github.com/awslabs/amazon-sqs-java-messaging-lib/issues/96
  • 博客园:https://www.cnblogs.com/xhy-shine/p/10772736.html

三、解决思路

1. 远程调试

在本地IDEA中进行debug未发现报错,但在服务器上执行时却报错,因此决定进行远程debug。执行以下命令添加JVM参数:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

实际命令是:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. 定位问题

通过调试发现问题出在hadoop-aws使用的缓存连接池对象。关键在于if判断部分,如果上游传递了fs.s3a.impl.disable.cache=true,则不使用缓存。深入debug发现:有时hadoopConf.getSchema获取的不是s3a而是s3n

s3和s3n / s3a的区别

  • s3:基于块的文件系统
  • s3n:基于对象存储的文件系统,支持高达5GB的对象
  • s3a:基于对象存储的文件系统,支持高达5TB的对象,并具有更高的性能

在配置文件中设置的是s3a,但实际获取到的是s3n,这显然不合理。

3. 深入挖掘

我仔细看了一下报错的截图发现:

file

确实是commit期间报的错:那么也就是说commit初始化s3conf并没有走buildWithConfig方法,而是用的默认值,而且我根本没找到commit里面有new s3Conf的代码,再次debug看看谁去重新初始化了S3Conf

file

定位到这里就很头疼了,已经涉及到引擎层而非插件层面了,涉及到classloader的使用以及反序列化操作:

file

反序列化代码:

        logicalDag =CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),classLoader,jobImmutableInformation.getLogicalDag());

很明显可以看出,S3Conf(静态类)被重新初始化了,导致SHEMA被重新赋值成s3n

file

因为s3conf它本身的属性都是静态的,而对classloader反序列化是时会重新加载静态属性的,所以导致shema被重新赋值为默认s3n

综上所述

除了sourcesink阶段,AggregatedCommit操作也会写入s3File。错误发生在commit期间,说明初始化S3Conf时并没有走buildWithConfig方法,而是使用了默认值。

由于S3Conf类的属性是静态的,反序列化时会重新加载静态属性,导致SCHEMA被重新赋值为默认的s3n

资料参考:https://wiki.apache.org/hadoop/AmazonS3

s3:基于Block块的文件系统

S3 Block FileSystem(URI scheme:s3)由S3支持的基于块的文件系统。 文件存储为块,就像HDFS一样。 这样可以有效地实现重命名。 此文件系统需要您为文件系统专用一个存储桶 - 您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区。 此文件系统存储的文件大于5GB,但不能与其他S3工具进行互操作。

s3n:基于对象存储的文件系统

S3 Native FileSystem(URI scheme:s3n)用于在S3上读取和写入常规文件的本机文件系统。 这个文件系统的优点是您可以访问使用其他工具编写的S3上的文件。 相反,其他工具可以访问使用Hadoop编写的文件。 缺点是S3的文件大小限制为5GB。

s3a:基于对象存储的文件系统

S3A(URI方案:s3a)是S3 Native,s3n fs的继承者,S3a:系统使用Amazon的库与S3进行交互。 这允许S3A支持较大的文件(不超过5GB的限制),更高的性能操作等等。 文件系统旨在替代S3 Native:从s3n:// URL可访问的所有对象也应该通过替换URL模式从s3a访问。

public class S3Conf extends HadoopConf {private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";private static final String S3A_SCHEMA = "s3a";private static final String DEFAULT_SCHEMA = "s3n";private static String SCHEMA = DEFAULT_SCHEMA;@Overridepublic String getFsHdfsImpl() {return switchHdfsImpl();}@Overridepublic String getSchema() {return SCHEMA;}private S3Conf(String hdfsNameKey) {super(hdfsNameKey);}public static HadoopConf buildWithConfig(Config config) {HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());if (bucketName.startsWith(S3A_SCHEMA)) {SCHEMA = S3A_SCHEMA;}HashMap<String, String> s3Options = new HashMap<>();putS3SK(s3Options, config);if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {config.getObject(S3ConfigOptions.S3_PROPERTIES.key()).forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));}s3Options.put(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));s3Options.put(S3ConfigOptions.FS_S3A_ENDPOINT.key(),config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));hadoopConf.setExtraOptions(s3Options);return hadoopConf;}public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {Config config = readonlyConfig.toConfig();HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);if (bucketName.startsWith(S3A_SCHEMA)) {SCHEMA = S3A_SCHEMA;}HashMap<String, String> s3Options = new HashMap<>();putS3SK(s3Options, config);if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {config.getObject(S3ConfigOptions.S3_PROPERTIES.key()).forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));}s3Options.put(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());s3Options.put(S3ConfigOptions.FS_S3A_ENDPOINT.key(),readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));hadoopConf.setExtraOptions(s3Options);return hadoopConf;}private String switchHdfsImpl() {switch (SCHEMA) {case S3A_SCHEMA:return HDFS_S3A_IMPL;default:return HDFS_S3N_IMPL;}}private static void putS3SK(Map<String, String> s3Options, Config config) {if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())&& !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {return;}String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());if (S3A_SCHEMA.equals(SCHEMA)) {s3Options.put("fs.s3a.access.key", accessKey);s3Options.put("fs.s3a.secret.key", secretKey);return;}// default s3ns3Options.put("fs.s3n.awsAccessKeyId", accessKey);s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);}
}

参考了反序列的知识才了解到这个情况:

当对一个包含静态成员的类进行反序列化时,静态成员不会恢复为之前的状态,而是保持在其初始状态。任何静态变量的值都是与该类本身相关的,

4. 解决方案

  • 1.去掉stastic修饰,把有参构造换成无参构造和静态工厂方法:

  • 2.保留stastic静态方法,使用getSchema方法代替静态属性调用:

由此可见,代码中的细节问题,即使看似微不足道,也可能引发严重的后果。一个简单的静态修饰符的误用,不仅能导致程序行为异常,更可能导致系统稳定性和安全性的大问题。

相关的issues已提交,大家有兴趣可以查看:

  • [bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be non-static to avoid being reassigned after deserialization by LeonYoah · Pull Request #6717 · apache/seatunnel (github.com)

  • [Bug] [S3File] [zeta-local] Error writing to S3File in version 2.3.4:: Java lang. An IllegalStateException: Connection pool shut down · Issue #6678 · apache/seatunnel (github.com)

四、有待研究

1.为什么只有local模式会报错:

推测可能是cluster模式是分布式的,每个算子分布在不同的机器上,所以本地缓存不会被使用,类似于没有走缓存。

2.为什么本地IDEA执行local模式却没问题

可能是Windows和Linux的线程调度机制不同导致的。

结论

通过这次对Apache SeaTunnel S3 File写入报错问题的分析与解决,希望这些经验能帮助到遇到类似问题的开发者,同时也提醒大家在处理分布式系统时注意细节问题,以免引发不必要的故障。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/40717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

修改头文件版本需要修改的文件

以修改ui的头文件版本为例&#xff0c;还需要同时更新 PJ10PC20240120041_c928\components\master-t5\hikauto\module\app\include PJ10PC20240120041_c928\components\master-t5\hikauto\module\app\include\dsp PJ10PC20240120041_c928\components\master-t5\hikauto\incl…

【算法】(C语言):堆排序

堆&#xff08;二叉树的应用&#xff09;&#xff1a; 完全二叉树。最大堆&#xff1a;每个节点比子树所有节点的数值都大&#xff0c;根节点是最大值。父子索引号关系&#xff08;根节点为0&#xff09;&#xff1a;&#xff08;向上&#xff09;子节点x&#xff0c;父节点(x…

datawhale大模型应用开发夏令营学习笔记一

参考自 基于LangChainLLM的本地知识库问答&#xff1a;从企业单文档问答到批量文档问答datawhale的llm-universe 作者现在在datawhale夏令营的大模型应用开发这个班中&#xff0c;作为一个小白&#xff0c;为了能为团队做出一点贡献&#xff0c;现在就要开始学习怎么使用langch…

实战教程:如何用JavaScript构建一个功能强大的音乐播放器,兼容本地与在线资源

项目地址&#xff1a;Music Player App 作者&#xff1a;Reza Mehdikhanlou 视频地址&#xff1a;youtube 我将向您展示如何使用 javascript 编写音乐播放器。我们创建一个项目&#xff0c;您可以使用 javascript 从本地文件夹或任何 url 播放音频文件。 项目目录 assets 1…

顶级10大AI测试工具

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

JWT入门

JWT与TOKEN JWT&#xff08;JSON Web Token&#xff09;是一种基于 JSON 格式的轻量级安全令牌&#xff0c;通常用于在网络应用间安全地传递信息。而“token”一词则是一个更广泛的术语&#xff0c;用来指代任何形式的令牌&#xff0c;用于在计算机系统中进行身份验证或授权。J…

【️讲解下Laravel为什么会成为最优雅的PHP框架?】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

cloudreve 设置开机服务

创建一个Systemd服务文件&#xff1a; 打开终端并创建一个新的服务文件&#xff1a; sudo nano /etc/systemd/system/cloudreve.service 在服务文件中添加以下内容&#xff1a; 根据你的设置调整路径和参数&#xff0c;然后将以下配置粘贴到文件中&#xff1a; [Unit] Descri…

Django学习第四天

启动项目命令 python manage.py runserver 分页功能封装到类中去 封装的类的代码 """ 自定义的分页组件,以后如果想要使用这个分页组件&#xff0c;你需要做&#xff1a; def pretty_list(request):# 靓号列表data_dict {}search_data request.GET.get(q, &…

Excel为数据绘制拆线图,并将均值线叠加在图上,以及整个过程的区域录屏python脚本

Excel为数据绘制拆线图,并将均值线叠加在图上,以及整个过程的区域录屏python脚本 1.演示动画A.视频B.gif动画 2.跟踪鼠标区域的录屏脚本 Excel中有一组数据,希望画出曲线,并且能把均值线也绘制在图上,以下动画演示了整个过程,并且提供了区域录屏脚本,原理如下: 为节约空间,避免…

从华为和特斯拉之争,看智能驾驶的未来

“一旦特斯拉完全解决自动驾驶问题并量产Optimus&#xff0c;任何空头都将被消灭&#xff0c;即使是比尔-盖茨也不例外。”7月2日&#xff0c;马斯克再次在社交媒体X上画下了这样的“大饼”。 与此同时&#xff0c;特斯拉的股价在最近的三个交易日也迎来了24%的涨幅&#xff0c…

中俄汽车产业链合作前景广阔,东方经济论坛助力双边合作与创新

随着中国汽车零部件企业的竞争力和创新能力不断增强&#xff0c;中国汽车及零部件行业在俄罗斯的市场份额和品牌影响力显著提升&#xff0c;中俄两国在汽车产业链上的合作展现出巨大的潜力和广阔的前景。2024年5月&#xff0c;俄罗斯乘用车新车销量达到12.8万辆&#xff0c;同比…

7.基于SpringBoot的SSMP整合案例-表现层开发

目录 1.基于Restfu1进行表现层接口开发 1.1创建功能类 1.2基于Restful制作表现层接口 2.接收参数 2使用Apifox测试表现层接口功能 保存接口&#xff1a; 分页接口&#xff1a; 3.表现层一致性处理 3.1先创建一个工具类&#xff0c;用作后端返回格式统一类&#xff1a;…

SpringMVC 的工作流程和详细解释

Spring MVC&#xff08;Model-View-Controller&#xff09;框架是基于经典的 MVC 设计模式构建的&#xff0c;用于开发 Web 应用程序。下面是 Spring Boot MVC 的工作流程和详细解释&#xff1a; 1.客户端发起请求 1.客户端&#xff08;通常是浏览器&#xff09;发起 HTTP 请求…

Python学习篇:Python基础知识(三)

目录 1 Python保留字 2 注释 3 行与缩进 ​编辑4 多行语句 5 输入和输出 6 变量 7 数据类型 8 类型转换 9 表达式 10 运算符 1 Python保留字 Python保留字&#xff08;也称为关键字&#xff09;是Python编程语言中预定义的、具有特殊含义的标识符。这些保留字不能用作…

代码随想录算法训练营第70天图论9[1]

代码随想录算法训练营第70天:图论9 ‍ 拓扑排序精讲 卡码网&#xff1a;117. 软件构建(opens new window) 题目描述&#xff1a; 某个大型软件项目的构建系统拥有 N 个文件&#xff0c;文件编号从 0 到 N - 1&#xff0c;在这些文件中&#xff0c;某些文件依赖于其他文件的…

5款软件让电脑更方便,更快,更好看

​ 你有没有想过&#xff0c;有些软件能让你的电脑用起来更方便&#xff0c;更快&#xff0c;更好看&#xff1f; 1. 屏幕动画创作——Screen To Gif ​ Screen To Gif是一款功能强大的屏幕录制软件&#xff0c;专注于将屏幕上的动态内容转换为高质量的GIF动画。它不仅支持自…

《ClipCap》论文笔记(下)

原文出处 [2111.09734] ClipCap: CLIP Prefix for Image Captioning (arxiv.org) 原文翻译 接上篇 《ClipCap》论文笔记&#xff08;上&#xff09;-CSDN博客 4. Results Datasets.我们使用 COCO-captions [7,22]、nocaps [1] 和 Conceptual Captions [33] 数据集。我们根…

自动化设备上位机设计 一

目录 一 设计原型 二 后台代码 一 设计原型 二 后台代码 namespace 自动化上位机设计 {public partial class Form1 : Form{public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e){}} }namespace 自动化上位机设计 {partial class Fo…

Pyqt5中如何让label里面的图片进行更换,避免出现黑图

在Pyqt5的界面开发过程中&#xff0c;发现一个label的图片怎么都添加不上&#xff0c;而且出现黑色&#xff0c;主要原因就是在进行显示的时候需要加一行清除的代码&#xff1a; label.clear()如果不加这行代码&#xff0c;当里面的图片发生变化时&#xff0c;显示出来的就是黑…