ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排

作者:庄宇

什么是 Fan-out Fan-in

在工作流编排过程中,为了加快大任务处理的效率,可以使用 Fan-out Fan-in 任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。

图片

由上图,可以使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,子任务的拆分方式分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也可以理解为 MapReduce。每个子任务为 Map,最后聚合结果为 Reduce。

静态 DAG: 拆分的子任务分类是固定的,例如:在数据收集场景中,同时收集数据库 1 和数据库 2 中的数据,最后聚合结果。

动态 DAG: 拆分的子任务分类是动态的,取决于前一个任务的输出结果,例如:在数据处理场景中,任务 A 可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务 Bn 处理,当所有子任务 Bn 运行结束后,在子任务 C 中聚合结果,具体启动多少个子任务 B 取决由任务 A 的输出结果。根据实际的业务场景,可以在任务 A 中自定义子任务的拆分规则。

ACK One 分布式工作流 Argo 集群

在实际的业务场景中,为了加快大任务的执行,提升效率,往往需要将一个大任务分解成数千个子任务,为了保证数千个子任务的同时运行,需要调度数万核的 CPU 资源,叠加多任务需要竞争资源,一般 IDC 的离线任务集群难以满足需求。例如:自动驾驶仿真任务,修改算法后的回归测试,需要对所有驾驶场景仿真,每个小驾驶场景的仿真可以由一个子任务运行,开发团队为加快迭代速度,要求所有子场景测试并行执行。

如果您在数据处理,仿真计算和科学计算等场景中,需要使用动态 DAG 的方式编排任务,或者同时需要调度数万核的 CPU 资源加快任务运行,您可以使用阿里云 ACK One 分布式工作流 Argo 集群 [ 1]

ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow [ 2] ,提供售后支持,支持动态 DAG Fan-out Fan-in 任务编排,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,运行完成后及时回收资源节省成本。支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

本文介绍使用 Argo Workflow 编排动态 DAG Fan-out Fan-in 任务。

Argo Workflow 编排 Fan-out Fan-in 任务

我们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 对象存储中的一个大日志文件,并将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。

  1. 创建分布式工作流 Argo 集群 [ 3]

  2. 挂载阿里云 OSS 存储卷,工作流可以像操作本地文件一样,操作阿里云 OSS 上的文件。参考:工作流使用存储卷 [ 4]

  3. 使用以下工作流 YAML 创建一个工作流,参考:创建工作流 [ 5] 。具体说明参见注释。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:generateName: dynamic-dag-map-reduce-
spec:entrypoint: main# claim a OSS PVC, workflow can read/write file in OSS through PVC. volumes:- name: workdirpersistentVolumeClaim:claimName: pvc-oss# how many tasks to split, default is 5.arguments:parameters:- name: numPartsvalue: "5"templates:- name: main# DAG definition.dag:tasks:# split log files to several small files, based on numParts.- name: splittemplate: splitarguments:parameters:- name: numPartsvalue: "{{workflow.parameters.numParts}}"# multiple map task to count words in each small file.- name: maptemplate: maparguments:parameters:- name: partIdvalue: '{{item}}'depends: "split"# run as a loop, partId from split task json outputs.withParam: '{{tasks.split.outputs.result}}'- name: reducetemplate: reducearguments:parameters:- name: numPartsvalue: "{{workflow.parameters.numParts}}"depends: "map"# The `split` task split the big log file to several small files. Each file has a unique ID (partId).# Finally, it dumps a list of partId to stdout as output parameters- name: splitinputs:parameters:- name: numPartscontainer:image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-countcommand: [python]args: ["split.py"]env:- name: NUM_PARTSvalue: "{{inputs.parameters.numParts}}"volumeMounts:- name: workdirmountPath: /mnt/vol# One `map` per partID is started. Finds its own "part file" and processes it.- name: mapinputs:parameters:- name: partIdcontainer:image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-countcommand: [python]args: ["count.py"]env:- name: PART_IDvalue: "{{inputs.parameters.partId}}"volumeMounts:- name: workdirmountPath: /mnt/vol# The `reduce` task takes the "results directory" and returns a single result.- name: reduceinputs:parameters:- name: numPartscontainer:image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-countcommand: [python]args: ["merge.py"]env:- name: NUM_PARTSvalue: "{{inputs.parameters.numParts}}"volumeMounts:- name: workdirmountPath: /mnt/voloutputs:artifacts:- name: resultpath: /mnt/vol/result.json
  1. 动态 DAG 实现

1)split 任务在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子任务要处理的 partId,例如:

["0", "1", "2", "3", "4"]

2)map 任务使用 withParam 引用 split 任务的输出,并解析 json 字符串获得所有 {{item}},并使用每个 {{item}} 作为输入参数启动多个 map 任务。

          - name: maptemplate: maparguments:parameters:- name: partIdvalue: '{{item}}'depends: "split"withParam: '{{tasks.split.outputs.result}}'

更多定义方式,请参考开源 Argo Workflow 文档 [ 6]

  1. 工作流运行后,通过分布式工作流 Argo 集群控制台 [ 7] 查看任务 DAG 流程与运行结果。

图片

  1. 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中间结果目录,result.json 为最终结果文件。

图片

  1. 示例中的源代码可以参考:AliyunContainerService GitHub argo-workflow-examples [ 8]

总结

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,提供售后支持,加固控制面实现数万子任务(Pod)稳定高效调度运行,数据面支持无服务器方式调度云上大规模算力,无需运维集群或者节点,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

欢迎加入 ACK One 客户交流钉钉群与我们进行交流。(钉钉群号:35688562

相关链接:

[1] 阿里云 ACK One 分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/overview-12

[2] Argo Workflow

https://argo-workflows.readthedocs.io/en/latest/

[3] 创建分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/create-a-workflow-cluster

[4] 工作流使用存储卷

https://help.aliyun.com/zh/ack/use-volumes

[5] 创建工作流

https://help.aliyun.com/zh/ack/create-a-workflow

[6] 开源 Argo Workflow 文档

https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/

[7] 分布式工作流 Argo 集群控制台

https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Fcs.console.aliyun.com%2Fone%3Fspm%3Da2c4g.11186623.0.0.7e2f1428OwzMip#/argowf/cluster/detail

[8] AliyunContainerService GitHub argo-workflow-examples

https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/673179.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何评估头抖的病情和治疗效果?

评估头抖的病情和治疗效果是一个综合性的过程,需要考虑多个方面,包括症状的严重程度、频率、持续时间,以及生活质量和心理状态的变化等。下面将从不同方面详细介绍如何评估头抖的病情和治疗效果。 一、病情评估 症状观察:首先&am…

文件处理工具类

一、引入依赖 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version> </dependency> <dependency><groupId>org.apache.commons</groupId><arti…

在 Docker 中启动 ROS2 里的 rivz2 和 rqt 出现错误的解决方法

1. 出现错误&#xff1a; 运行 ros2 run rivz2 rivz2 &#xff0c;报错如下 &#xff1a; No protocol specified qt.qpa.xcb: could not connect to display :1 qt.qpa.plugin: Could not load the Qt platform plugin "xcb" in "" even though it was f…

Java Swing实现简易版项目代码统计

尝试用AI生成日常方便使用的代码程序 用文心一言生成的可用代码 (1)提示1 假如你是一个java程序员&#xff0c;请用java swing创建一个JFrame&#xff0c;显示一个JTextField显示路径&#xff0c;Jtextfield右侧添加一个JButton&#xff0c;下面添加一个JTextArea&#xff0c…

文档协作技术——Operational Transformations简单了解

OT是支持协作软件系统的一种广泛使用的技术。 OT通常使用副本文档储存&#xff0c;每个客户端都拥有对文档的副本。客户端在本地副本以无锁非堵塞方式操作&#xff0c;并将改变传递到其他客户端。当客户端收到其他客户端传播的改变之后&#xff0c;通过转换应用更改&#xff0…

【前端web入门第四天】03 显示模式+综合案例热词与banner效果

文章目录: 1. 显示模式 1.1 块级元素,行内元素,行内块元素 1.2 转换显示模式 综合案例 综合案例一 热词综合案例二 banner效果 1. 显示模式 什么是显示模式 标签(元素)的显示方式 标签的作用是什么? 布局网页的时候&#xff0c;根据标签的显示模式选择合适的标签摆放内容。…

揭秘备忘录模式:打造灵活高效的状态管理解决方案

备忘录模式&#xff08;Memento Pattern&#xff09;是一种行为设计模式&#xff0c;它允许在不暴露对象内部状态的情况下捕获和恢复对象的内部状态。这种模式主要用于实现撤销操作。 在 Java 中&#xff0c;备忘录模式通常包括以下三个角色&#xff1a; 发起人&#xff08;O…

leetcode 1539.第k个缺失的正整数

这个题作者就当作是练习C的STL容器来做的&#xff0c;也就是暴力硬解吧。 思路&#xff1a;按照数据范围&#xff0c;我们再创造一个全集&#xff0c;利用差集的概念来求出来arr所没有的元素&#xff0c;放到结果数组里面&#xff0c;然后再对位置进行筛选。 注意&#xff1a…

vue3:25—其他API

目录 1、shallowRef和shallowReactive 2、readonly与shallowReadonly readonly shallowReadonly 3、toRaw和markRaw toRaw markRaw 4、customRef 1、shallowRef和shallowReactive shallowRef 1.作用:创建一个响应式数据&#xff0c;但只对顶层属性进行响应式处理。2…

代码随想录算法训练营第13天—二叉树02 | ● *层序遍历(对应10道题) ● *226.翻转二叉树 ● 101.对称二叉树

*层序遍历&#xff08;二叉树的广度优先搜索&#xff0c;对应10道题&#xff09; 102.二叉树的层序遍历(opens new window) 107.二叉树的层次遍历II(opens new window) 199.二叉树的右视图(opens new window) 637.二叉树的层平均值(opens new window)斜体样式 429.N叉树的层序…

Windows 安装 MySQL 最新最简教程

Windows 安装 MySQL 最新最简教程 官网地址 https://dev.mysql.com/downloads/mysql/下载 MySQL zip 文件 配置 MySQL1、解压文件 2、进入 bin 目录 搜索栏输入 cmd 回车进入命令行 C:\Users\zhong\Desktop\MySQL\mysql-8.3.0-winx64\mysql-8.3.0-winx64\bin 注意这里是你自己…

JUnit实践教程——Java的单元测试框架

前言 大家好&#xff0c;我是chowley&#xff0c;最近在学单元测试框架——JUnit&#xff0c;写个博客记录一下&#xff01; 在软件开发中&#xff0c;单元测试是确保代码质量和稳定性的重要手段之一。JUnit作为Java领域最流行的单元测试框架&#xff0c;为开发人员提供了简单…

std::vector<cv::Mat>和unsigned char** in_pixels 互相转换

将std::vectorcv::Mat转换为unsigned char** in_pixels&#xff0c; std::vector<cv::Mat> matVector; // 假设已经有一个包含cv::Mat的vector// 创建一个二维数组&#xff0c;用于存储像素数据 unsigned char** in_pixels new unsigned char*[matVector.size()]; for …

(2)(2.14) SPL Satellite Telemetry

文章目录 前言 1 本地 Wi-Fi&#xff08;费用&#xff1a;30 美元以上&#xff0c;范围&#xff1a;室内&#xff09; 2 蜂窝电话&#xff08;费用&#xff1a;100 美元以上&#xff0c;范围&#xff1a;蜂窝电话覆盖区域&#xff09; 3 手机卫星&#xff08;费用&#xff…

JS - 处理元素滚动

业务功能中时常有元素滚动的功能&#xff0c;现在就总结一下一些常用的事件。 一、定位滚动元素 做一切滚动操作之前都应该先定位到滚动元素&#xff0c;再做其他操作&#xff0c;如滚动顶部&#xff0c;获取滚动距离、禁止滚动等。 把以下代码复制粘贴到浏览器 Console 面板…

IT行业顶级证书:助力职业生涯的制胜法宝

IT行业顶级证书&#xff1a;助力职业生涯的制胜法宝 在IT行业&#xff0c;拥有一系列高含金量的证书是事关职业生涯发展的关键。这些证书不仅是技能的象征&#xff0c;更是在激烈的市场竞争中脱颖而出的法宝。让我们一起揭晓在中国IT行业中&#xff0c;哪些证书是最具含金量的…

仰暮计划|“​爷爷说这些话的时候眼睛都红着,他那变形的脊柱和瘸拐的双腿都证明他曾为这个家付出了血汗拼尽了全力”

赴一场拾光之旅&#xff0c;集往年回忆碎片 爷爷生于1952年&#xff0c;今年已有七十一了&#xff0c;是河南焦作沁阳北金村的一位地道农民&#xff0c;劳苦一生&#xff0c;如今终于得以颐养天年。许是早年经历过于难忘&#xff0c;爷爷如今与我讲起仍是记忆犹新&#xff0c;…

百卓Smart管理平台 uploadfile.php 文件上传漏洞【CVE-2024-0939】

百卓Smart管理平台 uploadfile.php 文件上传漏洞【CVE-2024-0939】 一、 产品简介二、 漏洞概述三、 影响范围四、 复现环境五、 漏洞复现手动复现小龙验证Goby验证 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工…

uniapp 本地存储的方式

1. uniapp 本地存储的方式 在uniapp开发中&#xff0c;本地存储是一个常见的需求。本地存储可以帮助我们在客户端保存和管理数据&#xff0c;以便在应用程序中进行持久化存储。本文将介绍uniapp中本地存储的几种方式&#xff0c;以及相关的代码示例。 1.1. 介绍 在移动应用开发…

#Js篇:字符串的使用方法es5和es6

字符串 \ &#xff1a;单引号&#xff08;\u0027&#xff09;\" &#xff1a;双引号&#xff08;\u0022&#xff09; charAt 定义&#xff1a; 返回指定位置的字符&#xff0c;参数时从0开始编号的位置 参数&#xff1a; 位置下标 abc.charAt(1) // "b" …