最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?

请添加图片描述《大数据平台架构与原型实现:数据中台建设实战》一书由博主历时三年精心创作,现已通过知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

自Amazon EMR推出Serverlesss形态以来,得益于开箱即用和零运维的优质特性,越来越多的EMR用户开始尝试EMR Serverless。在使用过程中,一个常被提及的问题是:我们应该如何在EMR Serverless上提交Spark/Hive作业?本文我们将分享一些这方面的最佳实践,帮助大家以一种更优雅的方式使用这项服务。

一份通俗易懂的讲解最好配一个形象生动的例子,本文选择《CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》一文介绍的DeltaStreamer作业作为讲解示例,因为这个作业既有一定的通用性又足够复杂,可以涵盖大多数EMR Serverless作业遇到的场景,更重要的是,该作业的提交方式遵循了本文要介绍的各项最佳实践(本文是其姊妹篇)。不了解Apache Hudi的读者不必担心,本文的关注点在于如何提交EMR Serverless作业本身,而非DeltaStreamer的技术细节,所以不会影响到您阅读此文。

参考范本

首先,我们整理一下提交DeltaStreamer CDC作业的几项关键操作,下文会以这些脚本为例,介绍蕴含其中的各项最佳实践。

1. 导出环境相关变量

export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

2. 创建作业专属工作目录和S3存储桶

mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME

3. 准备作业描述文件

cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"apache-hudi-delta-streamer","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar","entryPointArguments":["--continuous","--enable-sync","--table-type", "COPY_ON_WRITE","--op", "UPSERT","--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders","--target-table", "orders","--min-sync-interval-seconds", "60","--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource","--source-ordering-field", "_event_origin_ts_ms","--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload","--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS","--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL","--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest","--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer","--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders","--hoodie-conf", "auto.offset.reset=earliest","--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number","--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date","--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor","--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true","--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory","--hoodie-conf", "hoodie.datasource.hive_sync.table=orders","--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"],"sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

4. 提交作业

export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name apache-hudi-delta-streamer \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)

5. 监控作业

now=$(date +%s)sec
while true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseecho -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"breakfi
done

6. 检查错误

JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

最佳实践 (1):提取环境相关信息集中配置,提升脚本可移植性

※ 此项最佳实践参考《参考范本:1. 导出环境相关变量》

在EMR Serverless的作业脚本中经常会出现与AWS账号和本地环境有关的信息,例如资源的ARN,各种路径等,当我们要在不同环境(如开发、测试或生产)中提交作业时,就需要查找和替换这些环境相关的信息。为了让脚本具备良好的可移植性,推荐的做法是将这些信息抽离出来,以全局变量的形式集中配置,这样,当在一个新环境(新的AWS账号或服务器)中提交作业时,只需修改这些变量即可,而不是具体的脚本。

最佳实践 (2):为作业创建专用的工作目录和S3存储桶

※ 此项最佳实践参考《参考范本:2. 创建作业专属工作目录和S3存储桶》

为一个作业或应用程序创建专用的工作目录和S3存储桶是一个良好的规范和习惯。一方面,将本作业/应用的所有“资源”,包括:脚本、配置文件、依赖包、日志以及产生的数据统一存放在有利于集中管理和维护,如果要在Linux和S3上给作业赋予读写权限,操作起来了也会简单一些。

最佳实践 (3):使用作业描述文件规避字符转义问题

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

我们通常见到的EMR Serverless作业提交示例是将作业描述以字符串参数形式传递给命令行的,就像下面这样:

aws emr-serverless start-job-run \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--job-driver '{"sparkSubmit": {"entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py","entryPointArguments": ["s3://my-job-bucket/output"]}}'

这种方式只能应对简单的作业提交,当作业中包含大量参数和变量时,很容易出现单引号、双引号、美元符等特殊字符的转义问题,由于这里牵涉shell字符串和json字符串的双重嵌套和解析,所以会非常麻烦。此时在命令行中给出作业描述是很不明智的,更好的做法是:使用cat命令联合heredoc来创建作业描述文件,然后在命令行中以--cli-input-json file://xxx.json形式将作业描述传递给命令行:

# 生成作业描述文件
cat << EOF > xxx.json... ...... ...... ...   
EOF
# 使用作业描述文件提交作业
aws emr-serverless start-job-run ... --cli-input-json file://xxx.json ...

这是一个非常重要的技巧,使用这种形式提交作业有如下两个好处:

  • 在cat + heredoc中编辑的文本为原生字符串,不用考虑字符转义问题

  • 在cat + heredoc中可嵌入shell变量、函数调用和if…else等结构体,实现“动态”构建作业描述文件

最佳实践 (4):在作业描述文件中嵌入shell变量和脚本片段,实现“动态”构建

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

如上所述,采用cat + heredoc编辑作业描述文件后,可以在编辑文件的过程中嵌入shell变量、函数调用和if...else...等复合结构体,使得我们可以动态构建作业描述文件,这是非常重要的一个能力。在《参考范本:3. 准备作业描述文件》中有一个很好的例证,就是“动态拼接依赖Jar包的路径”:

--conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')

这是在构建作业描述文件start-job-run.json的过程中通过$(....)嵌入的一段shell脚本,这段脚本遍历了指定目录下的jar文件并拼接成一个字符串输出出来,而输出的字符串会在嵌入脚本的地方变成文本的一部分,我们还可以在编辑文本时调用shell函数,嵌入if...else...whilecase等多重复合逻辑结构,让作业描述文件可以根据不同的参数和条件动态生成期望的内容,这种灵活性足以让开发者应对任何复杂的情况。

最佳实践 (5):使用jq校验并格式化作业描述文件

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

jq是一个处理json文件的命令行工具,对于AWS CLI来说,jq可以说是一个“最佳伴侣”。原因是使用AWS CLI创建资源时,除了传入常规参数之外,还可以通过--cli-input-json参数传入一个json文件来描述所要创建的资源。当创建的资源配置过于复杂时,json文件的优势就会凸显出来,就像我们参考范本中的这个EMR Serverless Job一样。所以,使用AWS CLI时经常有编辑和操作json文件的需求,此时jq就成为了一个强有力的辅助工具。在参考范本中,我们仅仅使用jq打印了一下生成的作业描述文件:

jq . $APP_LOCAL_HOME/start-job-run.json

这一步操作有两个作用:一是利用jq校验了json文件,这能帮助排查文件中的json格式错误,二是jq输出的json经过了格式化和语法着色,更加易读。

其实jq在AWS CLI上还有更多高级应用,只是在我们的参考范本中并没有体现出来。在某些情况下,我们可以通过jq直接检索和编辑作业描述文件,将jq和使用cat + heredoc的json编辑方式结合起来,可以创建更加复杂和动态化的作业描述文件。

最佳实践 (6):可复用的依赖Jar包路径拼接脚本

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

拼接依赖Jar包路径几乎是每个作业都要解决的问题,手动拼接虽然可行,但费力且容易出错。过去在本地环境中,我们可以使用:--jars $(echo /path/*.jar | tr ' ' ',')这种简洁而优雅的方式拼接Jar包路径。但是EMR Serverless作业的依赖Jar包是存放在S3上的,这此,我们针对性地编写了一段可复用的脚本来拼接位于S3指定目录下的Jar包路径,供大家参考(请注意替换脚本中出现的两处文件夹路径):

aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//'

最佳实践 (7):可复用的作业监控脚本

※ 此项最佳实践参考《参考范本:5. 监控作业》

使用命令行提交EMR Serverless作业后,用户可以转到AWS控制台上查看作业的状态,但是对开发者来说,这种切换会分散注意力,最完美的方式莫过于提交作业后继续在命令行窗口监控作业状态,直到其失败或成功运行。为此,《参考范本:5. 监控作业》给出了一种实现,可复用于所有EMR Serverless作业,供大家参考。

最佳实践 (8):可复用的日志错误信息检索脚本

※ 此项最佳实践参考《参考范本:6. 检查错误》

在日常开发中,“提交作业报错 -> 查看日志中的报错信息 -> 修改代码重新提交”是一个反复迭代的过程,在EMR Serverless中,用户需要切换到AWS控制台查看错误日志,并且有时日志量会非常大,在控制台上查看效率很低。一种更高效的做法是:将存放于S3上的日志文件统一下载到本地并解压,然后使用grep命令快速检索日志中含有error, failed, exception等关键字的行,然后再打开具体文件仔细查看。将这些动作脚本化后,我们就能得到一段可复用的日志错误信息检索脚本,对于调试和排查错误有很大的帮助。为此,《参考范本:6. 检查错误》给出了一种实现,可复用于所有EMR Serverless作业,供大家参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/35772.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

章节7:XSS检测和利用

章节7&#xff1a;XSS检测和利用 测试payload <script>alert(XSS)</script> <script>alert(document.cookie)</script> ><script>alert(document.cookie)</script> ><script>alert(document.cookie)</script> &qu…

golang—面试题大全

目录标题 sliceslice和array的区别slice扩容机制slice是否线程安全slice分配到栈上还是堆上扩容过程中是否重新写入go深拷贝发生在什么情况下&#xff1f;切片的深拷贝是怎么做的copy和左值进行初始化区别slice和map的区别 mapmap介绍map的key的类型map对象如何比较map的底层原…

《Java极简设计模式》第03章:工厂方法模式(FactoryMethod)

作者&#xff1a;冰河 星球&#xff1a;http://m6z.cn/6aeFbs 博客&#xff1a;https://binghe.gitcode.host 文章汇总&#xff1a;https://binghe.gitcode.host/md/all/all.html 源码地址&#xff1a;https://github.com/binghe001/java-simple-design-patterns/tree/master/j…

腾讯云标准型CVM云服务器详细介绍

腾讯云CVM服务器标准型实例的各项性能参数平衡&#xff0c;标准型云服务器适用于大多数常规业务&#xff0c;例如&#xff1a;web网站及中间件等&#xff0c;常见的标准型云服务器有CVM标准型S5、S6、SA3、SR1、S5se等规格&#xff0c;腾讯云服务器网来详细说下云服务器CVM标准…

NAS搭建指南一——服务器的选择与搭建

一、服务器的选择 有自己的本地的公网 IP 的请跳过此篇文章按需求选择一个云服务器&#xff0c;目的就是为了进行 frp 的搭建&#xff0c;完成内网穿透我选择的是腾讯云服务器&#xff0c;我的配置如下&#xff0c;仅供参考&#xff1a; 4. 腾讯云服务器官网地址 二、服务器…

day9 10-牛客67道剑指offer-JZ66、19、20、75、23、76、8、28、77、78

文章目录 1. JZ66 构建乘积数组暴力解法双向遍历 2. JZ19 正则表达式匹配3. JZ20 表示数值的字符串有限状态机遍历 4. JZ75 字符流中第一个不重复的字符5. JZ23 链表中环的入口结点快慢指针哈希表 6. JZ76 删除链表中重复的结点快慢指针三指针如果只保留一个重复结点 7. JZ8 二…

gitblit-使用

1.登入GitBlit服务器 默认用户和密码: admin/admin 2.创建一个新的版本库 点击图中的“版本库”&#xff0c;然后点击图中“创建版本库” 填写名称和描述&#xff0c;注意名称最后一定要加 .git选择限制查看、克隆和推送勾选“加入README”和“加入.gitignore文件”在图中的1处…

使用IIS服务器部署Flask python Web项目

参考文章 ""D:\Program Files (x86)\Python310\python310.exe"|"D:\Program Files (x86)\Python310\lib\site-packages\wfastcgi.py"" can now be used as a FastCGI script processor参考文章 请求路径填写*&#xff0c;模块选择FastCgiModule&…

一键部署 Umami 统计个人网站访问数据

谈到网站统计&#xff0c;大家第一时间想到的肯定是 Google Analytics。然而&#xff0c;我们都知道 Google Analytics 会收集所有用户的信息&#xff0c;对数据没有任何控制和隐私保护。 Google Analytics 收集的指标实在是太多了&#xff0c;有很多都是不必要的&#xff0c;…

Multi-object navigation in real environments using hybrid policies 论文阅读

论文信息 题目&#xff1a;Multi-object navigation in real environments using hybrid policies 作者&#xff1a;Assem Sadek, Guillaume Bono 来源&#xff1a;CVPR 时间&#xff1a;2023 Abstract 机器人技术中的导航问题通常是通过 SLAM 和规划的结合来解决的。 最近…

优化堆排序(Java 实例代码)

目录 优化堆排序 Java 实例代码 src/runoob/heap/HeapSort.java 文件代码&#xff1a; 优化堆排序 上一节的堆排序&#xff0c;我们开辟了额外的空间进行构造堆和对堆进行排序。这一小节&#xff0c;我们进行优化&#xff0c;使用原地堆排序。 对于一个最大堆&#xff0c;首…

pytest 编写规范

一、pytest 编写规范 1、介绍 pytest是一个非常成熟的全功能的Python测试框架&#xff0c;主要特点有以下几点&#xff1a; 1、简单灵活&#xff0c;容易上手&#xff0c;文档丰富&#xff1b;2、支持参数化&#xff0c;可以细粒度地控制要测试的测试用例&#xff1b;3、能够…

差分升级在物联网水表上的实现与应用(学习)

摘要 当越来越多的物联网水表加入抄表系统后&#xff0c;实现了水表数据的信息化&#xff0c;并且当水表终端需要技术更新时&#xff0c;通过网络方式来升级产品可以高效修复设备面临的问题&#xff0c;减少用户损失&#xff0c;降低维护成本&#xff0c;但同时也对有限的网络…

遍历集合List的五种方法以及如何在遍历集合过程中安全移除元素

一、遍历集合List的五种方法 测试数据 List<String> list new ArrayList<>(); list.add("A");list.add("B");list.add("C");1. 普通for循环 普通for循环&#xff0c;通过索引遍历 for (int i 0; i < list.size(); i) {Syst…

form中表单切换,导致 relus 中的事件无法触发,原因:页面切换不要一直切换DOM,会导致问题,需要都显示出来

修改前&#xff0c;因为重复渲染DOM导致绑定rules失效 修改前代码使用 computed 计算出渲染的DOM&#xff0c;影响rules事件<el-formref"form"inline:model"billDetailCopy":rules"rules"size"small"label-position"right&quo…

NLP语言模型概览

语言模型结构分类 Encoder-Decoder&#xff08;Transformer&#xff09;: Encoder 部分是 Masked Multi-Head Self-Attention&#xff0c;Decoder 部分是 Casual Multi-Head Cross-Attention 和 Casual Multi-Head Self-Attention 兼具。比如T5&#xff0c;BART&#xff0c;MA…

腾讯云轻量服务器和云服务器的CPU处理器有差别吗?

腾讯云轻量应用服务器和CVM云服务器的CPU处理器性能有差别吗&#xff1f;创建轻量应用服务器时不支持指定底层物理服务器的CPU型号&#xff0c;腾讯云将随机分配满足套餐规格的物理CPU型号&#xff0c;通常优先选择较新代次的CPU型号。而云服务器CVM的CPU处理器型号、主频都是有…

JAVA设计模式----原型设计模式

文章目录 一、简介二、实现方式三、原型模式的注意事项浅拷贝与深拷贝浅拷贝深拷贝一、简介 定义:用原型实例指定创建对象的种类,并通过拷贝这些原型创建新的对象。 类型:创建类模式 类图: 原型模式主要用于对象的复制,它的核心是就是类图中的原型类Prototype。Protot…

下载程序到西门子PLC

更多关于西门子S7-200PLC内容请查看&#xff1a;西门子200系列PLC学习课程大纲 下载西门子200PLC程序分以下两步&#xff1a; 一.编译程序 1. 如下图1-1所示&#xff0c;使用PPI电缆将PLC和电脑连接上&#xff0c;注意笔记本使用USB转PPI电缆&#xff0c;连接保证给PLC单独供…

Linux(进程间通信详解)

进程间通信&#xff0c;顾名思义&#xff0c;就是进程与进程之间互通信交流&#xff0c;OS保证了各进程之间相互独立&#xff0c;但这不意味着进程与进程之间就互相隔离开&#xff0c;在不少的情况下&#xff0c;进程之间需要相互配合共同完成某项6任务&#xff0c;这就要求各进…