Apache DolphinScheduler 与 AWS 的 EMR/Redshift 集成实践分享

引言

这篇文章将给大家讲解关于DolphinScheduler与AWS的EMR和Redshift的集成实践,通过本文希望大家能更深入地了解AWS智能湖仓架构,以及DolphinScheduler在实际应用中的重要性。

AWS智能湖仓架构

首先,我们来看一下AWS经典的智能湖仓架构图。

这张图展示了以S3为核心的数据湖,围绕数据湖的是各种组件,包括数据库、Hadoop的EMR、大数据处理、数据仓库、机器学习、日志查询和全文检索等。

这些组件形成一个完整的生态系统,确保数据能够在企业内部自由流动,无论是从外围到核心,还是从核心到外围。

智能湖仓架构的核心目标是实现数据在各个组件之间的自由移动,提升企业数据处理的灵活性和效率。

数据源与数据采集工具

为了让大家更直观地理解这张图,我们可以从左到右进行解读。左侧是各种数据源,包括数据库、应用程序以及数据采集和摄入工具。

这些工具包括Kinesis、MSK(托管的Kafka)和OpenSearch,都是用于高效数据摄入的优秀工具。

核心组件介绍

今天的主角是图中圈起来的几个关键组件:

  • Redshift:用于数据仓库的解决方案。
  • EMR:Hadoop生态圈的大数据处理组件。
  • DolphinScheduler:任务调度工具。

在大数据处理的下游,还包括BI(商业智能)、传统机器学习和最新的生成式AI,再往下是企业中的人、应用和设备。这张图展示了整个数据处理和分析的流程,使得数据处理过程更加直观和流畅。

今天的分享主要围绕以下两个核心点展开:

  1. EMR与DolphinScheduler的实践
  2. Redshift与DolphinScheduler的实践

在此之前,我们先对EMR做一个简要介绍。

Amazon EMR 简介

Amazon EMR(Elastic MapReduce)是亚马逊云技术提供的一款云端服务,用于轻松运行Hadoop生态圈的各类组件,包括Spark、Hive、Flink、HBase等。

其主要特点包括:

  • 及时更新:紧跟开源社区的最新版本,30天内提供最新的开源版本更新。
  • 自动弹性扩容和缩容:根据工作负载自动调整集群规模。
  • 多种计价模式:灵活组合使用不同的计价模式,实现极致性价比。

EMR与自建Hadoop集群的比较

相比于传统IDC机房自建Hadoop集群,EMR具有以下优势:

  • 充分发挥原生特性
  • 多种计价模式组合使用
  • 自动弹性扩容和缩容

成本分析

在使用Hadoop进行数据分析和大数据处理时,企业越来越关注成本控制,而不仅仅是性能。

下图展示了企业在IDC机房自建Hadoop集群的成本构成,包括服务器成本、网络成本、人工维护成本以及其他额外费用。这些成本往往非常高。

迁移到EMR的优势

许多企业由于本地扩容困难、长周期采购流程以及升级困难等原因,逐渐将Hadoop工作负载迁移到云上的EMR。

通过支付订阅费用和额外支持费用,企业可以享受到以下优势:

  • 灵活的计价模式组合
  • 自动弹性伸缩
  • 参数调优

进一步优化

在完成初步优化后,部分企业可能仍觉得不满足其性价比的要求,进而转向使用EMR Serverless和EMR on EKS。这两者本质上都是基于容器化技术,旨在实现更高的性价比。

EMR Serverless

EMR Serverless让企业摆脱了对硬件和基础设施的维护,更加关注上层应用和业务开发,用户只需设置应用程序代码和相关参数即可。

EMR on EKS

EMR on EKS通过在Kubernetes上构建Hadoop集群,包括Spark和Flink等组件,进一步优化性价比。

不仅能提高数据处理的效率和灵活性,也使得企业能够更好地控制成本,实现业务目标。

实践案例分享:从IDC迁移到AWS EMR

接下来通过一个真实案例,分享一个客户从IDC迁移到AWS EMR的优化过程。这将帮助大家了解在云上使用EMR和DolphinScheduler的具体实践,以及如何通过这些工具实现性能和成本的双重优化。

客户背景与挑战

该客户原本在IDC机房采用CDH(Cloudera Distribution Hadoop),自建了一个Hadoop集群,共有13个节点。某个任务在机房内需要13个小时才能完成。客户希望通过迁移到云上,降低成本并提升效率。

迁移与优化过程

初始迁移

客户将其工作负载迁移到AWS云上,采用EMR On EC2。在迁移后的初始阶段,使用Hive on EMR,仅用了6个节点,任务时间缩短至10小时。此时并未进行任何调优。

接着,客户将Hive切换为Spark,仍使用6个节点,任务时间进一步缩短至3.5小时。

通过开启EMR的弹性伸缩功能,并将数据格式转换为parquet,任务时间进一步缩减至2.4小时。

最终优化

客户实现了集群版EMR与EMR Serverless的混合使用,将部分工作负载迁移到EMR Serverless上,达到了极致的性价比,性能优化在很大程度上也代表了成本优化。

调度优化与实践

迁移后的调度方式

在迁移到云上的EMR On EC2后,客户的调度方式如下:

  • 任务分布在24小时内,小任务每个大约需要20-30分钟。
  • 较大的任务集中在一天的7小时内执行。
  • 超级大的任务执行时间为3-5天,甚至一周,一次执行一个月可能只需要运行两三次。

优化调度

通过Apache DolphinScheduler,客户将工作负载分别调度到EMR On EC2和EMR Serverless上。

具体做法如下:

  • 小任务:放到EMR Serverless上,因为这些任务不需要3-4台节点,10GB-20GB内存即可满足需求。
  • 大任务:继续保留在EMR On EC2上,因为这些任务在云上运行时间相对集中,则可以集群定时开关,降低成本。
  • 超级大任务:放到EMR Serverless上,通过监控每次运行的CPU和内存消耗,,将每次任务运行的成本可视化,便于针对性地、持续地进行成本优化。

统一元数据管理

使用AWS Glue作为统一的元数据管理工具,使得集群的创建、销毁、再创建过程无需恢复元数据或数据,同一份数据和元数据可以在EMR On EC2和EMR Serverless之间无缝使用。

挑战与解决方案

在此过程中,我们也遇到了一些挑战:

  1. 异步提交问题:EMR Serverless目前仅支持异步提交,而批处理任务需要同步执行。我们通过封装Python类库,实现了统一的API接口,解决了这个问题。
    1. 日志查看不一致:EMR和EMR Serverless的日志查看方式不同。通过DolphinScheduler,我们实现了统一的日志下载和查看,改善了客户体验。
  2. API接口差异:EMR和EMR Serverless的API接口不同。我们通过封装统一的API接口,减少了客户的维护成本。
  3. SQL提交限制:EMR Serverless暂时不支持直接提交SQL。我们通过Python脚本,间接实现了SQL提交。

解决方案实施

我们和客户一起封装了一个Python类库,通过这个类库,统一了EMR On EC2和EMR Serverless的任务提交、日志查询和状态查询接口。在DolphinScheduler中,客户可以通过统一的API无缝地的在EMR Serverless 和 EMR on EC2 之间切换工作负载。

例如,在DolphinScheduler上调度到EMR On EC2时,脚本如下:

from emr_common import Sessionsession_emr = Session(job_type=0)
session_emr.submit_sql("job_name","your_sql_query")
session_emr.submit_file("job_name","your_pyspark_script")

而调度到EMR Serverless时,脚本如下:

from emr_common import Sessionsession_emrserverless = Session(job_type=1)
session_emrserverless.submit_sql("your_sql_query")
session_emrserverless.submit_file("job_name","your_pyspark_script"

通过Apache DolphinScheduler的参数传递特性,整个代码可以在不同引擎之间自由切换,实现了无缝调度。除了基本的job type参数之外,还有许多其他参数可供配置。

为了简化用户操作,系统为大部分参数设置了默认值,因此用户通常不需要手动配置这些参数。

例如,用户可以指定任务执行的集群,如果不指定,系统将默认选择第一个活跃的应用或集群ID。

此外,用户还可以为每个Spark任务设置driver和executor的相关参数。如果不指定这些参数,系统也会使用默认值。

封装Session

为了实现简化操作,我们封装了一个session对象,这个session包含两个子类:EMRSession和EMRServerlessSession。

根据传入参数的不同,系统会创建相应的session对象,无论是提交SQL语句还是脚本文件,其接口从上到下都是一致的。

使用体验

下面通过一些DolphinScheduler的截图来展示其使用体验。

上图是DolphinScheduler上的一个Python Operator示例,包含了EMR On EC2和EMR Serverless的代码:

from emr_common import Session# EMR On EC2
session_emr = Session(job_type=0)
session_emr.submit_sql("job_name","your_sql_query")
session_emr.submit_file("job_name","your_pyspark_script")# EMR Serverless
session_emrserverless = Session(job_type=1)
session_emrserverless.submit_sql("your_sql_query")
session_emrserverless.submit_file("job_name","your_pyspark_script"

实际效果

通过上述优化,客户不仅大幅缩短了任务运行时间,还实现了成本的大幅节约。

例如,在未调优情况下,任务时间从13小时缩短到10小时,而经过多次优化后,最终任务时间缩短到2.4小时,同时实现了集群版本EMR和Serverless版本的混合使用。

通过这个案例,我们可以看到,通过使用AWS EMR和DolphinScheduler,企业可以在保证性能的同时,大幅降低成本,实现更高的性价比。希望这个案例能为大家在云上进行大数据处理和优化提供一些借鉴和参考。

Redshift 实践分享

Redshift是AWS推出的云数据仓库,已经存在十多年,是业界最成熟的云数据仓库之一。通过Redshift,用户可以实现数据仓库、数据湖和数据库的无缝集成。

Redshift简介

Redshift是一款分布式数据仓库产品,支持以下功能:

  • 联合查询与联邦查询:直接查询MySQL等关系数据库的数据,无需通过ETL导入Redshift。
  • 与S3数据湖的集成:通过Redshift Spectrum,直接查询S3上的parquet等格式的数据,而无需将数据导入Redshift。
  • 与机器学习的集成:在没有机器学习经验的情况下,通过写SQL就能快速且自动地完成特征工程与模型训练,然后进行趋势预测、销售预测、异常检测等应用。

Redshift不仅支持集群部署,还提供Serverless模式,在这种模式下,用户无需管理负载和资源扩展,只需关注SQL代码和数据开发应用,进一步地简化了数据开发的门槛,让大家更加专注业务层面的开发,而不要去关注过多的关注底层的运维。

自3.0版本起,DolphinScheduler支持Redshift数据源。通过DolphinScheduler的SQL Operator,用户可以直接编写Redshift的SQL,进行大数据开发和应用。

用户可以通过拖拽界面操作,轻松定制DAG并监控各个任务和流程的执行情况。

并发控制

Redshift是一个OLAP数据库,具有高吞吐量和快速计算的特点,但其并发扩展通常不超过50。这对于调度大量并发任务的客户来说是一个挑战。

为了解决这一问题,有两个选项:

  1. 开启Redshift并发扩展:扩展到预置资源的10倍容量,但会增加额外成本。
  2. 使用DolphinScheduler的并发控制功能:创建任务组,设定资源容量,控制调度到Redshift的任务并发,避免集群过载。

Shell Operator实践

在使用DolphinScheduler进行Redshift开发时,推荐使用SQL Operator,但也可以使用Shell Operator,通过sql - fxxx.sql 命令执行SQL文件。

这种做法的好处在于可以与CICD流程集成。开发人员可以在个人电脑上通过GitLab开发代码,提交后自动上传到S3桶,DolphinScheduler支持从S3桶读取代码文件,并提交到Redshift中执行。

比如说通过Jenkins实现了代码推送到GitLab后,自动上传到S3存储桶。在DolphinScheduler的资源中心创建文件后,自动向S3写文件,并更新DolphinScheduler的元数据,实现了CICD的无缝集成。

总结

今天我们分享了EMR与EMR Serverless和DolphinScheduler的整合经验和实践,以及Redshift与DolphinScheduler的集成实践。以下是我个人对DolphinScheduler社区的期望和展望:

  1. SQL语法树解析生成血缘关系:希望DolphinScheduler能提供基于SQL语法树解析生成的数据血缘关系,尤其是字段级别的血缘关系。
  2. 引入AI agent编排流程:希望未来DolphinScheduler能考虑引入AI agent的编排流程,或引入AI agent的Operator。

感谢大家的观看,如果想了解更多详情,欢迎加小助手进群交流。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/40645.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【第20章】MyBatis-Plus逻辑删除支持

文章目录 前言一、逻辑删除的工作原理二、支持的数据类型三、使用方法1.配置全局逻辑删除属性2.在实体类中使用 TableLogic 注解 四、常见问题解答1. 如何处理插入操作?2. 删除接口自动填充功能失效怎么办? 五、实战1. 全局配置2. 添加TableLogic3. 自动…

高考选专业,兴趣与就业前景该如何平衡?

从高考结束的那一刻开始,有些家长和学生就已经变得焦虑了,因为他们不知道成绩出来的时候学生应该如何填报志愿,也不知道选择什么样的专业,毕竟大学里面的专业丰富多彩,如何选择确实是一门学问,而对于学生们…

Oracle的RECYCLEBIN回收站:轻松恢复误删对象

目录 Oracle的RECYCLEBIN回收站:轻松恢复误删对象一、概念二、工作原理三、使用方法1 查看回收站中的对象2 恢复回收站中的对象2.1 恢复表(TABLE)2.2 恢复索引(INDEX)2.3 恢复视图(VIEW)2.4 恢复…

乐清网站建设规划书

乐清是位于浙江省温州市的一个县级市,拥有悠久的历史和丰富的文化底蕴。随着互联网的快速发展,网站建设成为推动乐清经济和文化发展的重要手段。因此,我们认为有必要制定一个全面的乐清网站建设规划书,以促进乐清的经济繁荣和文化…

东芝 TB5128FTG 强大性能的步进电机驱动器

TB5128FTG它以高精度和高效能为设计理念,采用 PWM 斩波方法,并内置时钟解码器。通过先进的 BiCD 工艺制造,这款驱动器提供高达 50V 和 5.0A 的输出额定值,成为广泛应用场景中的强劲解决方案。 主要特性 TB5128FTG 拥有众多确保高…

SAP PS学习笔记01 - PS概述,创建Project和WBS

本章开始学习PS(Project System)。 1,PS的概述 PS(Project System)是SAP企业资源规划系统中的一个关键模块,主要用于项目管理。 它提供了一个全面的框架来规划、控制和执行项目,涵盖了从项目启…

【Express】自定义错误码和通用返回对象

自定义错误码: // 自定义错误 const {formatResponse} require("./tool");class ServiceError extends Error {/**** param message 自定义错误信息* param code 自定义错误码*/constructor(message, code) {super(message);this.code code;}/*** 将错…

ZeroMQ最全面试题解读(3万字长文)

目录 解释ZeroMQ是什么,它的主要用途是什么? ZeroMQ支持哪些通信模式? 描述一下ZeroMQ中的“消息”和“消息帧” 如何在C++中初始化一个ZeroMQ上下文? 在ZeroMQ中,如何创建一个套接字并将其绑定到特定端口? 解释什么是“管道模式”(Pipe Pattern) 说明如何使用Z…

Spring的三种注入方式的优缺点分析

在 Spring 中,提供了三种依赖注入(也被称之为 "对象注入","属性装配"等)的方式,这篇博客我们来分析一下这三种方式各有哪些优缺点。 一、属性注入 优点 简洁,使用方便。 缺点 ▪ 只…

竞赛选题 卷积神经网络手写字符识别 - 深度学习

文章目录 0 前言1 简介2 LeNet-5 模型的介绍2.1 结构解析2.2 C1层2.3 S2层S2层和C3层连接 2.4 F6与C5层 3 写数字识别算法模型的构建3.1 输入层设计3.2 激活函数的选取3.3 卷积层设计3.4 降采样层3.5 输出层设计 4 网络模型的总体结构5 部分实现代码6 在线手写识别7 最后 0 前言…

C++内存的一些知识点

一、内存分区 在C中,内存主要分为以下几个区域: 代码区:存放函数体的二进制代码。 全局/静态存储区:存放全局变量和静态变量,这些变量在程序的整个运行期间都存在。常量存储区:存放常量,这些值…

学懂C#编程:实用方法——string字符串指定连接符拼接之 string.Join 的详细用法

在C#中,string.Join 方法用于将一个字符串数组或集合中的元素连接成一个单一的字符串,并在每个元素之间插入指定的分隔符。这个方法非常有用,特别是在需要将多个字符串合并成一个字符串时。以下是 string.Join 方法的详细用法: 方…

WPF UI 界面布局 魔术棒 文字笔记识别 技能提升 布局功能扩展与自定义 继承Panel的对象,测量与排列 系列七

应用开发第一步 功能分类:页面上的功能区域划分。。。。需求分析 业务逻辑 数据流 功能模块 UI/UX 编码 测试 发布 功能开发与布局 不用显式的方式设定元素的尺寸 不使用屏幕坐标来指定位置 Grid 功能最强大,布局最灵活的容器…

卷积神经网络:目标检测的黄金钥匙

标题:卷积神经网络:目标检测的黄金钥匙 卷积神经网络(CNN)是深度学习中用于处理具有网格结构数据(如图像)的强大工具。在目标检测任务中,CNN不仅提升了检测的准确性,还极大地推动了…

开发个人Go-ChatGPT--5 模型管理 (一)

开发个人Go-ChatGP–5 模型管理 (一) 背景 开发一个chatGPT的网站,后端服务如何实现与大模型的对话?是整个项目中开发困难较大的点。 如何实现上图的聊天对话功能?在开发后端的时候,如何实现stream的响应呢?本文就…

Vue-Router4.0 报“Cannot read property ‘forEach‘ of undefined”

Vue-Router4.0在创建路由时 报“Cannot read property ‘forEach‘ of undefined” 解决办法 将路由规则名称更改为routes,否则报错 import { createWebHashHistory, createRouter } from vue-router; // 创建路由规定 const routes [{path: /login,name: login,co…

Linux Docker 防火墙设置 放通 MySQL(3306) Redis(6379) 端口

Linux Docker 防火墙设置 放通 MySQL(3306) Redis(6379) 端口,使用firewalld 防火墙或iptables ,因此尝试重新启动 firewalld 服务,添加防火墙规则,并检查防火墙状态。以下是详细步骤: 1. 启动 firewalld 服务 首先启…

qt opencv 应用举例

在Qt中使用OpenCV可以实现各种图像处理和计算机视觉任务。以下是一些Qt与OpenCV联合应用的具体举例: 1. 图像读取与显示 读取图像:使用OpenCV的imread函数可以方便地读取各种格式的图像文件,如.bmp、.jpg、.png等。这个函数返回一个Mat对象…

【Unity数据交互】Unity中使用二进制进行数据持久化

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 专栏交流🧧&…

SqlSugar分表笔记

1、使用SqlSugar的分表功能时,.net要使用.net core; 我开始使用的是.net freamwork4.72,程序报异常,没能解决,换到.net core下面就正常; 2、SqlSugar自带分表支持按季度、月、周、日进行分表&#x…