0 前言
Airflow是Airbnb内部发起的一个工作流管理平台。使用Python编写实现的任务管理、调度、监控工作流平台。Airflow的调度依赖于crontab命令,与crontab相比,Airflow可以方便地查看任务的执行状况(执行是否成功、执行时间、执行依赖等),可追踪任务历史执行情况,任务执行失败时可以收到邮件通知、查看错误日志。对于管理调度任务有很大的帮助。
crontab命令管理调度的方式总结来看存在以下几个方面的弊端:
1)在多任务调度执行的情况下,难以厘清任务间的依赖关系;
2)不便于查看当前执行到哪一个任务;
3)不便于查看调度流下每个任务执行的起止消耗时间,而这对于优化task作业是非常重要的;
4)不便于记录历史调度任务的执行情况,而这对于优化作业和排查错误是非常重要的;
5)执行任务失败时不便于查看执行日志,不方便定位报错的任务和接收错误告警邮件。
Airflow的官方文档地址是👇
http://airflow.apache.org/index.html,想使用Airflow管理调度任务的读者可反复研读官网文章,深入了解Airflow。
下面介绍在工程开发中如何去应用Airflow。
1 基础概念
在介绍Airflow这个调度工具前先介绍几个相关的基础概念。
DAG(Directed Acyclic Graph,有向无环图):
用于描述数据流的计算过程。
Operators:
描述了DAG中一个具体的task要执行的任务,如BashOperator为执行一条bash命令,EmailOperator用于发送邮件,HTTPOperator用于发送HTTP请求,PythonOperator用于调用任意的Python函数。
Task:
是Operator的一个实例,也就是DAG中的一个节点。
Task Instance:
记录task的一次运行。Task Instance有自己的状态,包括“running”“success”“failed”“skipped”“up for retry”等。
Triggher Rules:
指task的触发条件。
每一个节点可视为一个task,每个task用于执行一条任务,比如执行某个表的ETL加工。这些task调度任务按执行顺序的先后连接起来形成一个有向无环图
2 Airflow服务构成
一个正常运行的Airflow系统一般由以下几个服务构成。
1.WebServer
Airflow提供了一个可视化的Web界面,启动WebServer后,可以在Web界面上查看定义好的DAG并监控及改变其运行状况。也可以在Web界面中对一些变量进行配置。
2.Worker(Celery模式)
一般地,我们使用Celery Worker来执行具体作业。Worker可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker就会接收这个作业任务并开始执行。Airflow会自动在每个部署Worker的机器上同时部署一个Server Logs服务,这样就可以在Web界面上方便地查看分布在不同机器上的日志了。
3.Scheduler
整个Airflow的调度由Scheduler负责发起,每隔一段时间Scheduler就会检查所有定义完成的DAG和定义在其中的作业,如果有符合运行条件的作业,Scheduler就会发起相应的作业任务以供Worker接收。
4.Flower(Celery模式)
Flower提供了一个可视化界面用于监控所有Celery Worker的运行状况。
主要模块功能
通过Airflow的管理界面,可以了解其主要覆盖的功能模块。下面介绍Airflow主要覆盖的功能模块,这些模块在Airflow官网上有详细介绍。Airflow的工作流设计是有向无环图(DAG),在编写工作流时,需要考虑如何将任务划分为多个可独立执行的任务,然后将这些任务合并为一个逻辑整体,从而实现任务调度的结果。
1.DAG任务列表
首页中的DAG模块可以查看当前DAG的任务列表,包括当前有哪些DAG调度任务、哪些任务运行成功、哪些任务运行失败、哪些任务正在运行中。
2.DAG调度状态图
在Tree View模块可以查看当前DAG每个task任务的调度状态,是执行成功、正在执行、执行失败还是等待执行等,便于快速定位到执行失败的任务,重新调启执行。
3.DAG有向无环图
在Graph View模块可以看到当前DAG中各task任务之间的依赖关系,以及各任务的执行状态。
4.DAG执行脚本
在Code模块中可以查看当前DAG任务的执行脚本,包括任务的起始调度时间、调度失败后重试机制、各task任务之间的依赖关系等。当某个task执行出现问题时可通过查看该调度脚本定位原因。
5.Gantt图
在Gantt模块中可以查看DAG调度的甘特图,通过甘特图可以查看每个task调度任务的起止时间、持续时长。方便查找到调度时间长的task任务,以便后续进行优化。
3 脚本实例
在Airflow中,简单地说,task脚本是需要被一个个调起执行的脚本,DAG脚本是管理task脚本执行顺序、执行触发条件的。在Airflow调度开发中主要需要维护的是DAG脚本。下面通过一个具体的例子来了解:
在该脚本中,首先定义了需要引入的依赖包,定义了默认的参数配置及DAG参数和调度时间。其中default_args的默认配置中主要定义了如下参数。
Python
from airflow.operators.bash_operator import BashOperator
import airflow
from airflow.models import DAG
from airflow import operators
from airflow.contrib.hooks import SSHHook
from airflow.models import BaseOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
import os
import sys
from datetime import timedelta,date,datetime
import pendulum
from airflow.utils.trigger_rule import TriggerRuledefault_args = {'owner': 'userprofile','depends_on_past': False,'start_date': datetime(2023, 12, 01),'email': ['administer@testemail.com'],'email_on_failure': True ,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(minutes=1),
}
os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6'
sys.path.Append(os.path.join(os.environ['SPARK_HOME'], 'bin'))dag = DAG('user_dag',default_args=default_args,description='A user test',schedule_interval='00 07 * * *'
)
depends_on_past:是否依赖上游任务,即上一个调度任务执行失败时,是否执行该任务。可选项包括True和False,False表示当前执行脚本不依赖上游执行任务是否成功;
start_date:表示首次任务的执行日期;
email:设定当任务执行失败时,用于接收失败报警邮件的邮箱地址;
email_on_failure:当任务执行失败时,是否发送邮件。可选项包括True和False,True表示失败时将发送邮件;
retries:表示执行失败时是否重新调起任务执行,1表示会重新调起;
retry_delay:表示重新调起执行任务的时间间隔。
在DAG的定义中,除了引入上述的默认配置(default_args=default_args)外,还定义了该DAG脚本的dag_id为user_dag,定时调度时间为每天早上7点。中间两行参数为配置脚本运行的环境变量。
4 常用命令行
Airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用的命令。
airflow dags list:列出所有DAG
airflow list_tasks user:该命令用于查看当前DAG任务下的所有task列表,其中user是DAG名称。
airflow test user age_task 20230701:该命令用于测试DAG下面某个task是否能正常执行,其中user是DAG名称,age_task是其中一个task的名称。
5
Airflow常用Operator介绍
-
Python """ ### Tutorial Documentation Documentation that goes along with the Airflow tutorial located [here](http://pythonhosted.org/airflow/tutorial.html) """ import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import timedelta# these args will get passed on to each operator # you can override them on a per-task basis during operator initialization default_args = {'owner': 'airflow','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5) }dag = DAG('tutorial',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1))# t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator(task_id='print_date', #这里也可以是一个 bash 脚本文件bash_command='date',dag=dag)t1.doc_md = """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """dag.doc_md = __doc__t2 = BashOperator(task_id='sleep',depends_on_past=False,bash_command='sleep 5',dag=dag)templated_command = """ {% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}" {% endfor %} """t3 = BashOperator(task_id='templated',depends_on_past=False,bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag)t2.set_upstream(t1) t3.set_upstream(t1)
这里 t1 和 t2 都很容易理解,直接调用的是 bash 命令,其实也可以传入带路径的 bash 脚本, t3 使用了 Jinja 模板,"{% %}" 内部是 for 标签,用于循环操作。"{{ }}" 内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.my_param 是自定义变量。根据官方提供的模板,稍加修改即可满足我们的日常工作所需。
PythonOperator
Python
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAGimport time
from pprint import pprintargs = {'owner': 'airflow','start_date': airflow.utils.dates.days_ago(2)
}dag = DAG(dag_id='example_python_operator', default_args=args,schedule_interval=None)def my_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)def print_context(ds, **kwargs):pprint(kwargs)print(ds)return 'Whatever you return gets printed in the logs'run_this = PythonOperator(task_id='print_the_context',provide_context=True,python_callable=print_context,dag=dag)# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):task = PythonOperator(task_id='sleep_for_' + str(i),python_callable=my_sleeping_function,op_kwargs={'random_base': float(i) / 10},dag=dag)task.set_upstream(run_this)
通过以上代码我们可以看到,任务 task 及依赖关系都是可以动态生成的,这在实际使用中会减少代码编写数量,逻辑也非常清晰,非常方便使用。PythonOperator 与 BashOperator 基本类似,不同的是 python_callable 传入的是 Python 函数,而后者传入的是 bash 指令或脚本。通过 op_kwargs 可以传入任意多个参数
SqoopOperator
SqoopOperator允许用户在 Airflow 工作流中集成 Apache Sqoop 作业,以便于在 Hadoop 分布式文件系统(HDFS)、关系型数据库管理系统(RDBMS)如 MySQL、PostgreSQL 或 Oracle 之间导入和导出数据。使用 SqoopOperator可以自动化数据迁移任务,提高数据处理流程的可维护性和灵活性
Python
sqoop_import = SqoopOperator(task_id='sqoop_import_data',conn_id='my_postgres_conn', # Airflow中预先配置的数据库连接IDcmd_type='import', # Sqoop操作类型,这里是导入table='example_table', # 要导入的表名target_dir='/user/hadoop/sqoop_imports/example_table', # HDFS目标目录num_mappers=2, # 使用的Mapper数量splits_by='id', # 分割数据的列名dag=dag,
)
参数说明
conn_id: 引用Airflow中预先配置的数据库连接信息。
cmd_type: 指定Sqoop命令类型,如import或export。
table: 要操作的数据库表名。
target_dir: 导入数据的目标HDFS目录。
num_mappers: 并行执行任务的Mapper数量。
splits_by: 用于数据切分的列名,有助于提升导入效率。
其他可选参数如 where 用于指定导入数据的筛选条件,--direct 用于直接模式导入等,可根据需要添加。
BranchPythonOperator
BranchPythonOperator允许用户通过函数返回下一步要执行的task的id,从而根据条件选择执行的分支。它用于在工作流中根据特定条件动态选择下一个执行的任务。这个操作符通过执行一个Python函数来决定接下来执行哪一个任务,从而实现工作流的动态分支逻辑。
DummyOperator
作为一个虚拟的任务节点,使得DAG有一个起点,但实际不执行任务;或者是在上游几个分支任务的合并节点,为了清楚的现实数据逻辑。
HiveOperator
可以通过HiveOperato执行Hive SQL语句或脚本。它允许用户在Airflow工作流中方便地集成Hive作业,例如创建表、加载数据、执行查询等。
ExternalTaskSensor
Airflow中可以通过ExternalTaskSensor来完成跨DAG依赖。
跨DAG依赖管理:ExternalTaskSensor用于处理不同DAG之间的依赖关系。如果你的业务流程包含多个相互依赖的DAG,可以使用 ExternalTaskSensor 来确保上游DAG或其特定任务完成后,下游DAG的任务才开始执行。
SqlSensor
SqlSensor是 Apache Airflow 中的一个传感器(Sensor)操作符,它用于在工作流中等待直到特定的SQL查询返回预期的结果,之后才允许工作流继续执行。这种操作符非常适合于基于数据库状态的依赖控制,比如在执行下一步骤之前确保数据已就绪或满足特定条件。
MySqlOperator
MySqlOperator 是 Apache Airflow 中的一个操作符,它允许用户在 Airflow 工作流中执行 MySQL 数据库的相关操作,比如执行 SQL 查询、插入数据、更新表结构等。通过使用 MySqlOperator,你可以将数据库操作集成到自动化的工作流程中,实现数据处理、ETL 任务的编排与执行。