Apache Airflow 快速入门教程

Apache Airflow已经成为Python生态系统中管道编排的事实上的库。与类似的解决方案相反,由于它的简单性和可扩展性,它已经获得了普及。在本文中,我将尝试概述它的主要概念,并让您清楚地了解何时以及如何使用它。

Airflow应用场景

想象一下,你想要构建一个机器学习管道,它由以下几个步骤组成:

  • 从基于云的存储中读取图像数据集
  • 处理图像
  • 使用下载的图像训练深度学习模型
  • 将训练好的模型上传到云端
  • 部署模型

你将如何安排和自动化这个工作流程?Cron作业是一个简单的解决方案,但它也带来了许多问题。最重要的是,它们不允许你有效地扩展。Airflow提供了轻松调度和扩展复杂数据流程编排的能力,另一方面,它还能够在故障后自动重新运行它们,管理它们的依赖关系,并使用日志和仪表板监视它们。

在构建上述数据流之前,让我们先了解Apache Airflow 的基本概念。

Airflow 简介

Apache Airflow 是一个开源的平台,用于编排、调度和监控工作流,工作流是由一系列任务(Tasks)组成的,这些任务可以是数据处理、数据分析、机器学习模型训练、文件传输等各种操作。因此,它是ETL和MLOps用例的理想解决方案。示例用例包括:

  • 从多个数据源提取数据,对其进行聚合、转换,并将其存储在数据仓库中。
  • 从数据中提取见解并将其显示在分析仪表板中
  • 训练、验证和部署机器学习模型

核心组件

在默认版本中安装Apache Airflow 时,你将看到四个不同的组件。

  • Webserver: Webserver是Airflow的用户界面(UI),它允许您在不需要CLI或API的情况下与之交互。从那里可以执行和监视管道,创建与外部系统的连接,检查它们的数据集等等。
  • 执行器:执行器是管道运行的机制。有许多不同类型的管道在本地运行,在单个机器中运行,或者以分布式方式运行。一些例子是LocalExecutor, SequentialExecutor, CeleryExecutor和KubernetesExecutor
  • 调度器:调度器负责在正确的时间执行不同的任务,重新运行管道,回填数据,确保任务完成等。
  • PostgreSQL:存储所有管道元数据的数据库。这通常是Postgres,但也支持其他SQL数据库。

安装Airflow最简单的方法是使用docker compose。你可以从这里下载官方的docker撰写文件:

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

在这里插入图片描述

基本概念

要学习Apache Airflow,必须熟悉它的主要概念,这些概念可能有点难理解,让我们试着揭开它们的神秘面纱。

DAGs

所有管道都定义为有向无环图(dag)。每次执行DAG时,都会创建一个单独的运行。每个DAG运行都是独立的,并且包含一个关于DAG执行阶段的状态。这意味着相同的dag可以并行执行多次。

要实例化DAG,可以使用DAG函数或与上下文管理器一起使用,如下所示:

from airflow import DAG
with DAG("mlops",default_args={"retries": 1,},schedule=timedelta(days=1),start_date=datetime(2023, 1, 1)
) as dag:# dag code goes here

上下文管理器接受一些关于DAG的全局变量和一些默认参数。默认参数被传递到所有任务中,并且可以在每个任务的基础上重写。完整的参数列表可以在官方文档中找到。

在本例中,我们定义DAG将从2023年1月1日开始,并且每天执行一次。retries参数确保在可能出现故障后重新运行一次。

task(任务)

DAG的每个节点表示一个Task,即一段单独的代码。每个任务可能有一些上游和下游依赖项。这些依赖关系表示任务如何相互关联以及它们应该以何种顺序执行。每当初始化一个新的DAG运行时,所有任务都初始化为Task实例。这意味着每个Task实例都是给定任务的特定运行。

在这里插入图片描述

operator(任务模板)

操作符可以被视为预定义任务的模板,因为它们封装了样板代码并抽象了它们的大部分逻辑。常见的操作符有BashOperator、PythonOperator、MySqlOperator、S3FileTransformOperator。我们看到,操作符可以定义遵循特定模式的任务。例如,MySqlOperator创建任务来执行SQL查询,而BashOperator执行bash脚本。

操作符在DAG上下文管理器中定义,如下所示。下面的代码创建了两个任务,一个执行bash命令,另一个执行MySQL查询。

with DAG("tutorial"
) as dag:task1 = BashOperator(task_id="print_date",bash_command="date",)task2 = MySqlOperator(task_id="load_table",sql="/scripts/load_table.sql")

任务依赖

为了形成DAG的结构,我们需要定义每个任务之间的依赖关系。一种方法是使用>>符号,如下所示:

task1 >> task2 >> task3
# 一个任务有多个依赖
task1 >> [task2, task3]
# 也可以使用set_downstream, set_upstream
t1.set_downstream([t2, t3])

xcom

xcom,或相互通信,负责任务之间的通信。xcom对象可以在任务之间推拉数据。更具体地说,它们将数据推入元数据数据库,其他任务可以从中提取数据。这就是为什么可以通过它们传递的数据量是有限的。但是,如果需要传输大数据,则可以使用合适的外部数据存储,例如对象存储或NoSQL数据库。

看看下面的代码。这两个任务使用ti参数(任务实例的缩写)通过xcom进行通信。train_model任务将model_path推入元数据数据库,元数据由deploy_model任务拉出。

dag = DAG('mlops_dag',
)def train_model(ti):model_path = train_and_save_model()ti.xcom_push(key='model_path', value=model_path)def deploy_model(ti):model_path = ti.xcom_pull(key='model_path', task_ids='train_model')deploy_trained_model(model_path)train_model_task = PythonOperator(task_id='train_model',python_callable=train_model,dag=dag
)deploy_model_task = PythonOperator(task_id='deploy_model',python_callable=deploy_model,dag=dag
)train_model_task >> deploy_model_task

Taskflow

Taskflow API是一种使用Python装饰器@task来定义任务的简单方法。如果所有任务的逻辑都可以用Python编写,那么一个简单的注释就可以定义一个新任务。Taskflow自动管理其他任务之间的依赖关系和通信。

使用Taskflow API,我们可以用@dag装饰器初始化DAG。下面是使用Tashflow示例:

@dag(start_date=datetime(2023, 1, 1),schedule_interval='@daily'
)
def mlops():@taskdef load_data():. . .return df@taskdef preprocessing(data):. . .return data@taskdef fit(data): return Nonedf = load_data()data = preprocessing(df)model = fit(data)dag = mlops()

注意,任务之间的依赖关系是通过每个函数参数隐含的。这里我们是简单的连接顺序,但实际可以变得复杂得多。Taskflow API还解决了任务之间的通信问题,因此使用xcom的需求有限。

调度

作业调度是Airflow的核心功能之一。这可以使用schedule_interval参数完成,该参数接收cron表达式,表示日期时间对象,或预定义变量,如@hour, @daily等。更灵活的方法是使用最近添加的时间表,它支持使用Python定义自定义时间表。

下面是如何使用schedule_interval参数的示例。以下DAG将每天执行。

@dag(start_date=datetime(2023,1,1),schedule_interval = '@daily',catchup =False
)
def my_dag():pass

关于调度,需要了解两个非常重要的概念:回填(backfill)和追赶(catchup)。

一旦我们定义了DAG,我们就设置了开始日期和计划间隔。如果catchup=True,则Airflow 将为从开始日期到当前日期的所有计划间隔创建DAG运行。如果catchup=False,气流将只从当前日期调度运行。

回填扩展了这个想法,使我们能够在CLI中创建过去的运行,而不管catchup参数的值:

$ airflow backfill  -s <START_DATE> -e <END_DATE> <DAG_NAME>

连接

Airflow 提供了一种简单的方法来配置与外部系统或服务的连接。可以使用UI、作为环境变量或通过配置文件创建连接。它们通常需要URL、身份验证信息和唯一id。钩子(Hooks )是一种API,它抽象了与这些外部系统的通信。例如,我们可以通过如下的UI定义一个PostgreSQL连接:

在这里插入图片描述

然后使用PostgresHook来建立连接并执行我们的查询:

pg_hook = PostgresHook(postgres_conn_id='postgres_custom')conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('create table _mytable (ModelID int, ModelName varchar(255)')
cursor.close()
conn.close()

高级概念

为了使本教程尽可能完整,我需要提到一些更高级的概念。我不会详细介绍每一个,但我强烈建议你看看他们,如果你想深入掌握Airflow 。

  • 分支:分支允许你将任务划分为许多不同的任务,如:支持条件处理不同任务的工作流。最常见的方法是BranchPythonOperator。
  • 任务组:任务组可以在单个组中组织多个任务。它是简化图形视图和重复模式的好工具。
  • 动态包:包和任务也可以以动态的方式构造。从Airflow 2.3开始,可以在运行时创建包和任务,这对于并行和依赖输入的任务来说是理想的。气流也支持Jinja模板,并且是对动态包非常有用的补充。
  • 单元测试和日志记录:气流具有运行单元测试和记录信息的专用功能.

Airflow最佳实践

在我们看到实际操作的示例之前,让我们讨论一下大多数从业者使用的一些最佳实践。

  • 幂等性:dag和任务应该是幂等的。使用相同的输入重新执行相同的DAG运行应该始终具有与执行一次相同的效果。
  • 原子性:任务应该是原子性的。每个任务应该负责一个操作,并且独立于其他任务
  • 增量过滤:每个DAG运行应该只处理一批支持增量提取和加载的数据。这样,可能出现的故障就不会影响整个数据集。
  • 顶级代码:如果不是用于创建操作符或标记,则应避免使用顶级代码,因为它会影响性能和加载时间。所有代码都应该在任务内部,包括导入包、数据库访问和繁重的计算。
  • 复杂性:dag应尽可能保持简单,因为高复杂性可能会影响性能或调度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/62589.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python+django自动化部署日志采用‌WebSocket前端实时展示

一、开发环境搭建和配置 # channels是一个用于在Django中实现WebSocket、HTTP/2和其他异步协议的库。 pip install channels#channels-redis是一个用于在Django Channels中使用Redis作为后台存储的库。它可以用于处理#WebSocket连接的持久化和消息传递。 pip install channels…

[MySQL]流程控制语句

流程控制语句需要借助存储过程才有效。关于存储过程&#xff0c;我会在后续的文章详述&#xff0c;本篇文章只是阐述流程控制语句。因此&#xff0c;大家只需要注意存储过程中相应的流程控制语句即可。 如果文中阐述不全或不对的&#xff0c;多多交流。 参考笔记三&#xff0c…

使用 pycharm 新建使用 conda 虚拟 python 环境的工程

1. conda 常见命令复习&#xff1a; conda env list // 查看 conda 环境列表 conda activate xxxenv // 进入指定 conda 环境2. 环境展示&#xff1a; 2.1. 我的物理环境的 Python 版本为 3.10.9&#xff1a; 2.2. 我的 conda 虚拟环境 env_yolov9_python_3_8 中的 pyth…

上传镜像docker hub登不上和docker desktop的etx4.vhdx占用空间很大等解决办法

平时使用docker一般都在Linux服务器上&#xff0c;但这次需要将镜像上传到docker hub上&#xff0c;但是服务器上一直无法登录本人的账号&#xff0c;&#xff08;这里的问题应该docker 网络配置中没有开代理的问题&#xff0c;因服务器上有其他用户使用&#xff0c;不可能直接…

时频转换 | Matlab基于S变换S-transform一维数据转二维图像方法

目录 基本介绍程序设计参考资料获取方式基本介绍 时频转换 | Matlab基于S变换S-transform一维数据转二维图像方法 程序设计 clear clc % close all load x.mat % 导入数据 x =

【娱乐项目】竖式算术器

Demo介绍 一个加减法随机数生成器&#xff0c;它能够生成随机的加减法题目&#xff0c;并且支持用户输入答案。系统会根据用户输入的答案判断是否正确&#xff0c;统计正确和错误的次数&#xff0c;并显示历史记录和错题记录。该工具适合用于数学练习&#xff0c;尤其适合练习基…

Java抛出自定义运行运行

1.重新生成异常的.java文件 Empty&#xff1a;空 Exception&#xff1a;异常 加起来就是 空指针异常的文件 2.打上extends 运行的异常&#xff08;异常的类型&#xff09; 3.点击ctrlo&#xff0c;选着这两个快捷重写 4.在需要抛出异常的地方写上&#xff1a;th…

使用Github Action将Docker镜像转存到阿里云私有仓库,供国内服务器使用,免费易用

文章目录 一、前言二、 工具准备&#xff1a;三、最终效果示例四、具体步骤第一大部分是配置阿里云1. 首先登录阿里云容器镜像服务 [服务地址](https://cr.console.aliyun.com/cn-hangzhou/instances)2. 选择个人版本3. 创建 命名空间4. 进入访问凭证来查看&#xff0c;用户名字…

YOLO系列论文综述(从YOLOv1到YOLOv11)【第13篇:YOLOv10——实时端到端物体检测】

YOLOv10 1 摘要2 网络结构3 YOLOv1-v10对比 YOLO系列博文&#xff1a; 【第1篇&#xff1a;概述物体检测算法发展史、YOLO应用领域、评价指标和NMS】【第2篇&#xff1a;YOLO系列论文、代码和主要优缺点汇总】【第3篇&#xff1a;YOLOv1——YOLO的开山之作】【第4篇&#xff1a…

Figma入门-自动布局

Figma入门-自动布局 前言 在之前的工作中&#xff0c;大家的原型图都是使用 Axure 制作的&#xff0c;印象中 Figma 一直是个专业设计软件。 最近&#xff0c;很多产品朋友告诉我&#xff0c;很多原型图都开始用Figma制作了&#xff0c;并且很多组件都是内置的&#xff0c;对…

零基础学安全--Burp Suite(4)proxy模块以及漏洞测试理论

目录 学习连接 一些思路 proxy模块 所在位置 功能简介 使用例子 抓包有一个很重要的点&#xff0c;就是我们可以看到一些在浏览器中看不到的传参点&#xff0c;传参点越多就意味着攻击面越广 学习连接 声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可…

CAD 文件 批量转为PDF或批量打印

CAD 文件 批量转为PDF或批量打印&#xff0c;还是比较稳定的 1.需要本地安装CAD软件 2.通过 Everything 搜索工具搜索&#xff0c;DWG To PDF.pc3 &#xff0c;获取到文件目录 &#xff0c;替换到代码中&#xff0c; originalValue ACADPref.PrinterConfigPath \ r"C:…

【Linux网络编程】TCP套接字

TCP与UDP的区别&#xff1a; udp是无连接的、面向数据报&#xff08;通信时以数据报为单位传输&#xff09;的传输层通信协议&#xff0c;其中每个数据报都是独立的&#xff0c;通信之前不需要建立连接&#xff0c;bind绑定套接字后直接可以进行通信。 tcp是面向连接的、基于字…

spring-boot-maven-plugin 标红

情况&#xff1a;创建好 Spring Boot 项目后&#xff0c;pom.xml 文件中 spring-boot-maven-plugin 标红。 解决方案&#xff1a;加上 Spring Boot 的版本即可解决。

xv6前置知识

fork函数 一个进程,包括代码、数据和分配给进程的资源。fork()函数通过系统调用创建一个与原来进程几乎完全相同的进程,也就是两个进程可以做完全相同的事,但如果初始参数或者传入的变量不同,两个进程也可以做不同的事。 一个进程调用fork()函数后,系统先给新的进程分…

(11)(2.2) BLHeli32 and BLHeli_S ESCs(二)

文章目录 前言 1 传递支持 前言 BLHeli 固件和配置应用程序的开发是为了允许配置 ESC 并提供额外功能。带有此固件的 ESC 允许配置定时、电机方向、LED、电机驱动频率等。在尝试使用 BLHeli 之前&#xff0c;请按照 DShot 设置说明进行操作(DShot setup instructions)。 1 传…

Flink的双流join理解

如何保证Flink双流Join准确性和及时性、除了窗口join还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在文中找到答案。 1 引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作。如下所示的订单查询SQL&#xff0c;通过将订单表的id和订单详情表ord…

1.1 数据结构的基本概念

1.1.1 基本概念和术语 一、数据、数据对象、数据元素和数据项的概念和关系 数据&#xff1a;是客观事物的符号表示&#xff0c;是所有能输入到计算机中并被计算机程序处理的符号的总称。 数据是计算机程序加工的原料。 数据对象&#xff1a;是具有相同性质的数据元素的集合&…

【程序人生】“阶段总结“-前路茫茫

岁月如白驹过隙&#xff0c;如指尖流沙&#xff0c;不知不觉已经离开了陪伴我度过四年岁月的学校&#xff0c;离开了那间堆满各种书籍的宿舍&#xff0c;离开了通宵开发的实验室&#xff0c;离开了教室里的最后一排课桌椅......&#xff08;虽然&#xff0c;我并不是很喜欢它&a…

Android 13 编译Android Studio版本的Launcher3

Android 13 Aosp源码 源码版本Android Studio版本Launcher3QuickStepLib (主要代码) Launcher3ResLib(主要资源)Launcher3IconLoaderLib(图