经过前两篇文章的简单介绍之后,我们安装了自己的AirFlow以及简单了解了DAG的定义文件.现在我们要实现自己的一个DAG.
1. 启动Web服务器
使用如下命令启用:
airflow webserver
现在可以通过将浏览器导航到启动Airflow的主机上的8080端口来访问Airflow UI,例如:http://localhost:8080/admin/
备注
Airflow附带了许多示例DAG。 请注意,在你自己的`dags_folder`中至少有一个DAG定义文件之前,这些示例可能无法正常工作。你可以通过更改`airflow.cfg`中的`load_examples`设置来隐藏示例DAG。
2. 第一个AirFlow DAG
现在一切都准备好了,我们开始写一些代码,来实现我们的第一个DAG。 我们将首先创建一个Hello World工作流程,其中除了向日志发送"Hello world!"之外什么都不做。
创建你的dags_folder
,那就是你的DAG定义文件存储目录---$AIRFLOW_HOME/dags
。在该目录中创建一个名为hello_world.py的文件。
AIRFLOW_HOME
├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│ ├── hello_world.py
│ └── hello_world.pyc
└── unittests.cfg
将以下代码添加到dags/hello_world.py
中:
# -*- coding: utf-8 -*-import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta#-------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initializationdefault_args = {'owner': 'jifeng.si','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['1203745031@qq.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5)
}#-------------------------------------------------------------------------------
# dagdag = DAG('example_hello_world_dag',default_args=default_args,description='my first DAG',schedule_interval=timedelta(days=1))#-------------------------------------------------------------------------------
# first operatordate_operator = BashOperator(task_id='date_task',bash_command='date',dag=dag)#-------------------------------------------------------------------------------
# second operatorsleep_operator = BashOperator(task_id='sleep_task',depends_on_past=False,bash_command='sleep 5',dag=dag)#-------------------------------------------------------------------------------
# third operatordef print_hello():return 'Hello world!'hello_operator = PythonOperator(task_id='hello_task',python_callable=print_hello,dag=dag)#-------------------------------------------------------------------------------
# dependenciessleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(date_operator)
该文件创建一个简单的DAG,只有三个运算符,两个BaseOperator(一个打印日期一个休眠5秒),另一个为PythonOperator在执行任务时调用print_hello函数。
3. 测试代码
使用如下命令测试一下我们写的代码的正确性
python ~/opt/airflow/dags/hello_world.py
如果你的脚本没有抛出异常,这意味着你代码中没有错误,并且你的Airflow环境是健全的。
下面测试一下我们的DAG中的Task.使用如下命令查看我们example_hello_world_dag
DAG下有什么Task:
xiaosi@yoona:~$ airflow list_tasks example_hello_world_dag
可以看到我们有三个Task:
date_task
hello_task
sleep_task
下面分别测试一下这几个Task:
(1) 测试date_task
xiaosi@yoona:~$ airflow test example_hello_world_dag date_task 20170803
(2) 测试hello_task
xiaosi@yoona:~$ airflow test example_hello_world_dag hello_task 20170803
如果没有问题,我们就可以运行我们的DAG了.
4. 运行DAG
为了运行你的DAG,打开另一个终端,并通过如下命令来启动Airflow调度程序:
airflow scheduler
备注
调度程序将发送任务进行执行。默认Airflow设置依赖于一个名为`SequentialExecutor`的执行器,它由调度程序自动启动。在生产中,你可以使用更强大的执行器,如`CeleryExecutor`。
当你在浏览器中重新加载Airflow UI时,应该会在Airflow UI中看到你的hello_world
DAG。
为了启动DAG Run,首先打开工作流(off键),然后单击Trigger Dag
按钮(Links 第一个按钮),最后单击Graph View
按钮(Links 第三个按钮)以查看运行进度:
你可以重新加载图形视图,直到两个任务达到状态成功。完成后,你可以单击hello_task,然后单击View Log
查看日志。如果一切都按预期工作,日志应该显示一些行,其中之一是这样的:
[2017-08-03 09:46:43,236] {base_task_runner.py:95} INFO - Subtask: [2017-08-03 09:46:43,235] {python_operator.py:81} INFO - Done. Returned value was: Hello world![2017-08-03 09:46:47,378] {jobs.py:2083} INFO - Task exited with return code 0
更多多资讯或疑问内容请关注 微信公众号 “让梦飞起来” 或添加小编微信, 后台回复 “Python” ,领取更多资料哦