Airflow任务流调度

前言

Airflow是Airbnb内部发起的一个工作流管理平台。使用Python编写实现的任务管理、调度、监控工作流平台。Airflow的调度依赖于crontab命令,与crontab相比,Airflow可以方便地查看任务的执行状况(执行是否成功、执行时间、执行依赖等),可追踪任务历史执行情况,任务执行失败时可以收到邮件通知、查看错误日志。对于管理调度任务有很大的帮助。

crontab命令管理调度的方式总结来看存在以下几个方面的弊端:

1)在多任务调度执行的情况下,难以厘清任务间的依赖关系;

2)不便于查看当前执行到哪一个任务;

3)不便于查看调度流下每个任务执行的起止消耗时间,而这对于优化task作业是非常重要的;

4)不便于记录历史调度任务的执行情况,而这对于优化作业和排查错误是非常重要的;

5)执行任务失败时不便于查看执行日志,不方便定位报错的任务和接收错误告警邮件。

Airflow的官方文档地址是👇

http://airflow.apache.org/index.html,想使用Airflow管理调度任务的读者可反复研读官网文章,深入了解Airflow。

下面介绍在工程开发中如何去应用Airflow。

1 基础概念

在介绍Airflow这个调度工具前先介绍几个相关的基础概念。

DAG(Directed Acyclic Graph,有向无环图):

用于描述数据流的计算过程。

Operators:

描述了DAG中一个具体的task要执行的任务,如BashOperator为执行一条bash命令,EmailOperator用于发送邮件,HTTPOperator用于发送HTTP请求,PythonOperator用于调用任意的Python函数。

Task:

是Operator的一个实例,也就是DAG中的一个节点。

Task Instance:

记录task的一次运行。Task Instance有自己的状态,包括“running”“success”“failed”“skipped”“up for retry”等。

Triggher Rules:

指task的触发条件。

每一个节点可视为一个task,每个task用于执行一条任务,比如执行某个表的ETL加工。这些task调度任务按执行顺序的先后连接起来形成一个有向无环图

2 Airflow服务构成

一个正常运行的Airflow系统一般由以下几个服务构成。

1.WebServer

Airflow提供了一个可视化的Web界面,启动WebServer后,可以在Web界面上查看定义好的DAG并监控及改变其运行状况。也可以在Web界面中对一些变量进行配置。

2.Worker(Celery模式)

一般地,我们使用Celery Worker来执行具体作业。Worker可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker就会接收这个作业任务并开始执行。Airflow会自动在每个部署Worker的机器上同时部署一个Server Logs服务,这样就可以在Web界面上方便地查看分布在不同机器上的日志了。

3.Scheduler

整个Airflow的调度由Scheduler负责发起,每隔一段时间Scheduler就会检查所有定义完成的DAG和定义在其中的作业,如果有符合运行条件的作业,Scheduler就会发起相应的作业任务以供Worker接收。

4.Flower(Celery模式)

Flower提供了一个可视化界面用于监控所有Celery Worker的运行状况。

主要模块功能

通过Airflow的管理界面,可以了解其主要覆盖的功能模块。下面介绍Airflow主要覆盖的功能模块,这些模块在Airflow官网上有详细介绍。Airflow的工作流设计是有向无环图(DAG),在编写工作流时,需要考虑如何将任务划分为多个可独立执行的任务,然后将这些任务合并为一个逻辑整体,从而实现任务调度的结果。

1.DAG任务列表

首页中的DAG模块可以查看当前DAG的任务列表,包括当前有哪些DAG调度任务、哪些任务运行成功、哪些任务运行失败、哪些任务正在运行中。

图片

2.DAG调度状态图

在Tree View模块可以查看当前DAG每个task任务的调度状态,是执行成功、正在执行、执行失败还是等待执行等,便于快速定位到执行失败的任务,重新调启执行。

3.DAG有向无环图

在Graph View模块可以看到当前DAG中各task任务之间的依赖关系,以及各任务的执行状态。

图片

4.DAG执行脚本

在Code模块中可以查看当前DAG任务的执行脚本,包括任务的起始调度时间、调度失败后重试机制、各task任务之间的依赖关系等。当某个task执行出现问题时可通过查看该调度脚本定位原因。

图片

5.Gantt图

在Gantt模块中可以查看DAG调度的甘特图,通过甘特图可以查看每个task调度任务的起止时间、持续时长。方便查找到调度时间长的task任务,以便后续进行优化。

图片

3 脚本实例

在Airflow中,简单地说,task脚本是需要被一个个调起执行的脚本,DAG脚本是管理task脚本执行顺序、执行触发条件的。在Airflow调度开发中主要需要维护的是DAG脚本。下面通过一个具体的例子来了解:

在该脚本中,首先定义了需要引入的依赖包,定义了默认的参数配置及DAG参数和调度时间。其中default_args的默认配置中主要定义了如下参数。

Python
from airflow.operators.bash_operator import BashOperator
import airflow
from airflow.models import DAG
from airflow import operators
from airflow.contrib.hooks import SSHHook
from airflow.models import BaseOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
import os
import sys
from datetime import timedelta,date,datetime
import pendulum
from airflow.utils.trigger_rule import TriggerRuledefault_args = {'owner': 'userprofile','depends_on_past': False,'start_date': datetime(2023, 12, 01),'email': ['administer@testemail.com'],'email_on_failure': True ,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(minutes=1),
}
os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6'
sys.path.Append(os.path.join(os.environ['SPARK_HOME'], 'bin'))dag = DAG('user_dag',default_args=default_args,description='A user test',schedule_interval='00 07  * * *'
)
 

depends_on_past:是否依赖上游任务,即上一个调度任务执行失败时,是否执行该任务。可选项包括True和False,False表示当前执行脚本不依赖上游执行任务是否成功;

start_date:表示首次任务的执行日期;

email:设定当任务执行失败时,用于接收失败报警邮件的邮箱地址;

email_on_failure:当任务执行失败时,是否发送邮件。可选项包括True和False,True表示失败时将发送邮件;

retries:表示执行失败时是否重新调起任务执行,1表示会重新调起;

retry_delay:表示重新调起执行任务的时间间隔。

在DAG的定义中,除了引入上述的默认配置(default_args=default_args)外,还定义了该DAG脚本的dag_id为user_dag,定时调度时间为每天早上7点。中间两行参数为配置脚本运行的环境变量。

4 常用命令行

Airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用的命令。

airflow dags list:列出所有DAG

airflow list_tasks user:该命令用于查看当前DAG任务下的所有task列表,其中user是DAG名称。

airflow test user age_task 20230701:该命令用于测试DAG下面某个task是否能正常执行,其中user是DAG名称,age_task是其中一个task的名称。

5

Airflow常用Operator介绍

  • Python
    """
    ### Tutorial Documentation
    Documentation that goes along with the Airflow tutorial located
    [here](http://pythonhosted.org/airflow/tutorial.html)
    """
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import timedelta# these args will get passed on to each operator
    # you can override them on a per-task basis during operator initialization
    default_args = {'owner': 'airflow','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5)
    }dag = DAG('tutorial',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1))# t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(task_id='print_date',   #这里也可以是一个 bash 脚本文件bash_command='date',dag=dag)t1.doc_md = """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """dag.doc_md = __doc__t2 = BashOperator(task_id='sleep',depends_on_past=False,bash_command='sleep 5',dag=dag)templated_command = """
    {% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}"
    {% endfor %}
    """t3 = BashOperator(task_id='templated',depends_on_past=False,bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag)t2.set_upstream(t1)
    t3.set_upstream(t1)

这里 t1 和 t2 都很容易理解,直接调用的是 bash 命令,其实也可以传入带路径的 bash 脚本, t3 使用了 Jinja 模板,"{% %}" 内部是 for 标签,用于循环操作。"{{ }}" 内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.my_param 是自定义变量。根据官方提供的模板,稍加修改即可满足我们的日常工作所需。

PythonOperator

Python
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAGimport time
from pprint import pprintargs = {'owner': 'airflow','start_date': airflow.utils.dates.days_ago(2)
}dag = DAG(dag_id='example_python_operator', default_args=args,schedule_interval=None)def my_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)def print_context(ds, **kwargs):pprint(kwargs)print(ds)return 'Whatever you return gets printed in the logs'run_this = PythonOperator(task_id='print_the_context',provide_context=True,python_callable=print_context,dag=dag)# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):task = PythonOperator(task_id='sleep_for_' + str(i),python_callable=my_sleeping_function,op_kwargs={'random_base': float(i) / 10},dag=dag)task.set_upstream(run_this)

通过以上代码我们可以看到,任务 task 及依赖关系都是可以动态生成的,这在实际使用中会减少代码编写数量,逻辑也非常清晰,非常方便使用。PythonOperator 与 BashOperator 基本类似,不同的是 python_callable 传入的是 Python 函数,而后者传入的是 bash 指令或脚本。通过 op_kwargs 可以传入任意多个参数

SqoopOperator

SqoopOperator允许用户在 Airflow 工作流中集成 Apache Sqoop 作业,以便于在 Hadoop 分布式文件系统(HDFS)、关系型数据库管理系统(RDBMS)如 MySQL、PostgreSQL 或 Oracle 之间导入和导出数据。使用 SqoopOperator可以自动化数据迁移任务,提高数据处理流程的可维护性和灵活性

Python
sqoop_import = SqoopOperator(task_id='sqoop_import_data',conn_id='my_postgres_conn',  # Airflow中预先配置的数据库连接IDcmd_type='import',  # Sqoop操作类型,这里是导入table='example_table',  # 要导入的表名target_dir='/user/hadoop/sqoop_imports/example_table',  # HDFS目标目录num_mappers=2,  # 使用的Mapper数量splits_by='id',  # 分割数据的列名dag=dag,
)  

参数说明

conn_id: 引用Airflow中预先配置的数据库连接信息。

cmd_type: 指定Sqoop命令类型,如import或export。

table: 要操作的数据库表名。

target_dir: 导入数据的目标HDFS目录。

num_mappers: 并行执行任务的Mapper数量。

splits_by: 用于数据切分的列名,有助于提升导入效率。

其他可选参数如 where 用于指定导入数据的筛选条件,--direct 用于直接模式导入等,可根据需要添加。

BranchPythonOperator

BranchPythonOperator允许用户通过函数返回下一步要执行的task的id,从而根据条件选择执行的分支。它用于在工作流中根据特定条件动态选择下一个执行的任务。这个操作符通过执行一个Python函数来决定接下来执行哪一个任务,从而实现工作流的动态分支逻辑。

DummyOperator

作为一个虚拟的任务节点,使得DAG有一个起点,但实际不执行任务;或者是在上游几个分支任务的合并节点,为了清楚的现实数据逻辑。

HiveOperator

可以通过HiveOperato执行Hive SQL语句或脚本。它允许用户在Airflow工作流中方便地集成Hive作业,例如创建表、加载数据、执行查询等。

ExternalTaskSensor

Airflow中可以通过ExternalTaskSensor来完成跨DAG依赖。

跨DAG依赖管理:ExternalTaskSensor用于处理不同DAG之间的依赖关系。如果你的业务流程包含多个相互依赖的DAG,可以使用 ExternalTaskSensor 来确保上游DAG或其特定任务完成后,下游DAG的任务才开始执行。

SqlSensor

SqlSensor是 Apache Airflow 中的一个传感器(Sensor)操作符,它用于在工作流中等待直到特定的SQL查询返回预期的结果,之后才允许工作流继续执行。这种操作符非常适合于基于数据库状态的依赖控制,比如在执行下一步骤之前确保数据已就绪或满足特定条件。

MySqlOperator

MySqlOperator 是 Apache Airflow 中的一个操作符,它允许用户在 Airflow 工作流中执行 MySQL 数据库的相关操作,比如执行 SQL 查询、插入数据、更新表结构等。通过使用 MySqlOperator,你可以将数据库操作集成到自动化的工作流程中,实现数据处理、ETL 任务的编排与执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/28924.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于OCC+OSG的读取IGS模型显示其装配以及模型颜色

一般来说,读取STP模型会解析其装配结构,而读取IGS模型时候一般不这么做,因为IGS的每个部件大多是面片,而非一个实体模型,所以比如一些开源软件,比如Freecad等都是直接将模型作为一个整体并且在模型树上只显…

Elixir学习笔记——try, catch, and rescue

Elixir 有三种错误机制:errors, throws, and exits。在本章中,我们将探索每种机制,并说明何时应使用它们。 Errors 当代码中发生异常时,就会使用错误(或异常)。可以通过尝试将数字添加到原子来检索示例错…

生活好物:日常更精彩

我们的日用杂货店,是生活美学的聚集地。这里汇聚了各式各样的生活用品,每一件都蕴含着对生活的热爱与追求。 走进我们的日用杂货店,仿佛打开了一个充满生活气息的宝藏盒。从厨房的锅碗瓢盆,到浴室的洗漱用品,再到客厅的…

6.17 作业

使用qt实现优化自己的登录界面 要求: 1. qss实现 2. 需要有图层的叠加 (QFrame) 3. 设置纯净窗口后,有关闭等窗口功能。 4. 如果账号密码正确,则实现登录界面关闭,另一个应用界面显示。 第一个源文件 …

9.无代码爬虫软件做网页数据抓取流程——弹出窗口的移除

首先,多数情况下免费版本的功能,已经可以满足绝大多数采集需求,想了解八爪鱼采集器版本区别的详情,请访问这篇帖子: https://blog.csdn.net/cctv1123/article/details/139581468 八爪鱼采集器免费版和个人版、团队版下…

反射复习(java)

文章目录 反射机制的作用反射机制的原理加载机制详细解释 获取 Class 对象反射获取构造方法:获取 Class 对象里面 Constructor 对象反射获取成员变量:获取Class 对象里面的 Field 对象反射获取成员方法:获取 Class 对象里的 Method 对象其他常…

15.编写自动化测试(下)

标题 三、控制测试流程3.1 添加测试参数3.2 并行或连续运行测试3.3 显示函数输出3.4 指定/过滤测试用例名称3.5 忽略某些测试用例3.6 只运行被忽略的测试 四、测试的组织结构4.1 概念引入4.2 测试私有函数4.2 单元测试4.3 集成测试4.4 集成测试中的子模块4.5 二进制crate的集成…

Python脚本中使用 if 语句导致的错误代码

在 Python 脚本中使用 if 语句是一种常见的控制流程结构,用于根据条件决定程序的执行路径。当使用 Python 中的 if 语句时,可能会导致一些常见的错误。下面就是我经常遇到的错误代码示例及其可能的原因和解决方法,希望对大家有些帮助&#xf…

死锁预防之银行家算法

死锁预防之银行家算法 第一章 概述 Dijkstra提出了一种能够避免死锁的调度算法,称为银行家算法。 它的模型基于一个小城镇的银行家,他向一群客户分别承诺了一定的贷款额度,每个客户都有一个贷款额度,银行家知道不可能所有客户同时都需要最大贷款额,所以他只保留一定单位…

韩国职场新趋势:员工拒绝晋升,追求工作与生活的平衡

在当前职场环境中,晋升通常被视为职业生涯发展的重要里程碑。然而,据韩国《今日财经》报道,现代重工工会在今年的劳资谈判中提出了一个令人关注的要求——“拒绝晋升权”。这一要求反映了韩国职场的新趋势,即越来越多的员工对高薪…

长期保存红酒的挑战与应对策略

云仓酒庄雷盛红酒,以其卓着的品质和口感,赢得了无数葡萄酒爱好者的喜爱。然而,对于那些希望长期保存这些珍贵佳酿的人来说,如何确保红酒的品质和风味不受时间的影响,却是一项充满挑战的任务。 长期保存红酒的大挑战来自…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 火星字符串(100分) - 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 &#x1f…

电影《加菲猫家族》观后感

上周看了电影《加菲猫家族》,本片其中有很多明亮的画面,相关艳丽的色彩,充满温馨的场景,很符合加菲猫的一贯画风,即使反派出场时,带有阴暗的感觉,看起也不是特别吓人,比较欢乐气氛&a…

定时器介绍之8253芯片

目录 定时器简介 8253功能介绍 组成 工作原理 相关引脚 启动方法 计数方式 实现 读取计数值 定时器简介 8253功能介绍 内部结构 相关引脚 计数器组成 工作原理 启动方法 计数方式 初始化:写入控制字——>写入计数初值 实现 计数长度选择&#xff1a…

虚拟机Ping不通主机

1.问题描述 虚拟机IP: 192.168.3.133 主机ip:192.168.3.137 虚拟机Ping不通主机 主机可以ping通虚拟机 2.解决方案 设置桥接模式 控制面板找到网络和Internet设置 3.问题解决

geoserver 如何设置数据目录

在GeoServer中,数据目录是存储配置文件、数据存储、图层、样式等的重要目录。默认情况下,GeoServer的数据目录位于GeoServer安装目录下的data_dir文件夹。但在很多情况下,用户可能希望将数据目录设置在一个自定义位置,以便更好地管…

手持气象仪:科技与自然交汇的奇妙工具

TH-SQ5在广袤无垠的大自然中,天气总是瞬息万变,让人难以捉摸。然而,随着科技的进步,人类已经能够借助各种先进的仪器来预测和监测天气变化,其中,手持气象仪便是其中的佼佼者。 手持气象仪,顾名…

Java获取本机IP地址的方法(内网、公网)

起因是公司一个springboot项目启动类打印了本机IP地址加端口号,方便访问项目页面,但是发现打印出来的不是“无线局域网”的ip而是“以太网适配器”ip,如下图所示 这样就导致后续本地起项目连接xxl-job注册节点的时候因为不在同个局域网下ping…

打假-代码都让AI写,CS还有前途吗?加州大学伯克利分校:CDSS申请人数飙升48%!

一、背景 现在 CSDN 上发现了一篇文章 点进去文档的内容在说CDSS专业 还有一篇文章 文章基本上都是同一个意思,CDSS专业申请人数飙升 48%,但却有人刻意的把计算机专业突出出来。我不确定有人刻意把 CDSS 专业 和 CS 专业混淆的目的是什么?…

【数据结构】红黑树实现详解

在本篇博客中,作者将会带领你使用C来实现一棵红黑树,此红黑树的实现是基于二叉搜索树和AVLTree一块来讲的,所以在看本篇博客之前,你可以先看看下面这两篇博客 【C】二叉搜索树-CSDN博客 【数据结构】AVLTree实现详解-CSDN博客 在这…