Airflow简介

1、什么是Airflow

Airflow 是一个 Airbnb 的 Workflow 开源项目,使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统,可以简单理解为是高级版的crontab,但是它解决了crontab无法解决的任务依赖问题。与crontab相比Airflow可以方便查看任务的执行状况(执行是否成功、执行时间、执行依 赖等),可追踪任务历史执行情况,任务执行失败时可以收到邮件通知,查看错误日志。

2、Airflow与同类产品的对比

系统名称

介绍

Apache Oozie

使用XML配置, Oozie任务的资源文件都必须存放在HDFS上. 配置不方便同时也只能用于Hadoop.

Linkedin Azkaban

web界面尤其很赞, 使用java properties文件维护任务依赖关系, 任务资源文件需要打包成zip, 部署不是很方便.

Airflow

具有自己的web任务管理界面,dag任务创建通过python代码,可以保证其灵活性和适应性

3、Airflow基础概念

(1)DAG:有向无环图(Directed Acyclic Graph),描述数据流的计算过程。

(2)Operators:DAG中一个Task要执行的任务,如:①BashOperator为执行一条bash命令;②EmailOperator用于发送邮件;③HTTPOperator用于发送HTTP请求;④PythonOperator用于调用任意的Python函数。

(3)Task:是DAG中的一个节点,是Operator的一个实例。

(4)Task Instance:记录Task的一次运行,Task Instance有自己的状态,包括:running、success、failed、 skipped、up for retry等。

(5)Trigger Rules:task的触发条件。

4 、Airflow安装

依赖:yum -y install python-devel libevent-devel mysql-devel mysqlclient

(1)安装airflow:pip install apache-airflow

(2)修改airflow对应的环境变量:export AIRFLOW_HOME=/usr/local/airflow

(3)执行airflow version,在/usr/local/airflow目录下生成配置文件

(4)修改默认数据库:修改/usr/local/airflow/airflow.cfg

[core]

executor = LocalExecutor

sql_alchemy_conn = mysql://airflow:123456@192.168.48.102:3306/airflow

(5)创建airflow用户,创建airflow数据库并给出所有权限给次用户:

create database airflow;
create user 'tairflow'@'%' identified by '123123';
GRANT all privileges on airflow.* TO 'testairflow'@'%'  IDENTIFIED BY '123456';
FLUSH PRIVILEGES;

(6)初始化数据库:airflow initdb

(7)启动web服务器:airflow webserver –p 8080

在安装过程中如遇到如下错误:

在my.cnf中加explicit_defaults_for_timestamp=1,然后重启数据库

 

5、Airflow主要功能模块

下面通过Airflow调度任务管理的主界面了解一下各个模块功能,这个界面可以查看当前的DAG任务列表,有多少任务运行成功,失败以及正在当前运行中等:

在Graph View中查看DAG的状态。DAG是一个有向无环图,它是一个单向流动的ETL流程图。只有前置task执行成功后,后续task才会被Trigger;如果后续task有并行分支,会被同时Trigger执行。

对于已经执行完的task,鼠标停留在task上面,会自动浮现出一个黑色的提醒框,显示该task的基本情况。①Airflow当前UTC时间;②默认显示一个与①一样的时间,自动跟随①的时间变动而变动;③DAG当前批次触发的时间,也就是Dag Run时间,没有什么实际意义④数字4:该task开始执行的时间⑤该task开始执行和结束执行的UTC时间⑥该task开始执行和结束执行的CST时间,也就是香港本地时间。

Airflow中每一个task可能有8种状态,使用8种不同的颜色标注,分别是successrunningfailedskippedup_for_rescheduleup_for_retryqueuedno_status。每一个task被调度执行前都是no_status状态;当被调度器传入作业队列之后,状态被更新为queued;被调度器调度执行后,状态被更新为running;如果该task执行失败,如果没有设置retry参数,状态立马被更新为failed;如果有设置retry参数,第一次执行失败后,会被更新为up_for_retry状态,等待重新被调度执行,执行完retry次数仍然失败则状态会被更新为failedskipped状态是指该task被跳过不执行;up_for_reschedule状态是指等待重新调度;

每点击一个button,可以跳转到对应页面,查看这个task对应的Task Instance Details、Rendered、Task Instance、Log

"Run"可以单次执行该task,右边3个button是执行task时可以选择的条件,鼠标停留在每一个条件上会显示该条件表示的含义。选择"Ignore All Deps"表示忽略该task的前后依赖条件及之前批次的执行状态,直接执行该task。

"Clear"表示可以清除当前task的执行状态,清除执行状态后,该task会被自动重置为no_status,等待Airflow调度器自动调度执行;"Downstream"和"Recursive"是默认选中的,是当你点击"Clear"后,当前task及所有后置task的状态都会被清除,即当前task及所有后置task都会重新等待调度执行;如果同时选中"Upstream"和"Recursive",点击"Clear"后则表示从Dag第一个task到当前task,这条路径上的所有task会被重新调度执行;

点击"Clear"按钮后,会将当前task及所有后续task作业的task id打印出来。点击"OK"后,Airflow会将这些task的最近一次执行记录清除,然后将当前task及后续所有task生成新的task instance,将它们放入队列由调度器调度重新执行

以树状的形式查看各个Task任务的调度如下图

显示DAG调度持续的时间

甘特图显示每个任务的起止、持续时间

配置DAG运行的默认参数

查看DAG的调度脚本

6、DAG脚本示例

以官网的脚本为例进行说明

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {'owner': 'airflow','depends_on_past': False,'start_date': days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),# 'wait_for_downstream': False,# 'dag': dag,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),# 'on_failure_callback': some_function,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'
}
dag = DAG('tutorial',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1),
)# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id='print_date',bash_command='date',dag=dag,
)t2 = BashOperator(task_id='sleep',depends_on_past=False,bash_command='sleep 5',retries=3,dag=dag,
)
dag.doc_md = __doc__t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}"
{% endfor %}
"""t3 = BashOperator(task_id='templated',depends_on_past=False,bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag,
)t1 >> [t2, t3]

(1)需要引入的包

(2)DAG默认参数配置:

①depends_on_past:是否依赖上游任务,即上一个调度任务执行失 败时,该任务是否执行。可选项包括True和False,False表示当前执 行脚本不依赖上游执行任务是否成功;

②start_date:表示首次任务的执行日期;

③email:设定当任务出现失败时,用于接受失败报警邮件的邮箱地址;

④email_on_failure:当任务执行失败时,是否发送邮件。可选项包括 True和False,True表示失败时将发送邮件;

⑤retries:表示执行失败时是否重新调起任务执行,1表示会重新调起;

⑥retry_delay:表示重新调起执行任务的时间间隔;

(3)实例化DAG

设定该DAG脚本的id为tutorial;  设定每天的定时任务执行时间为一天调度一次。

调度时间还可以以“* * * * *”的形式表示,执行时间分别是“分,时,天,月,年”

注意:① Airflow使用的时间默认是UTC的,当然也可以改成服务器本地的时区。

②*/30 * * * * 指的是每个小时的30分的时候调度而不是半小时一次,比如说:1:30 , 2:30  ...

半小时调度一次的写法应该是:0/30 * * *

(4)Operator,即Task要执行的任务

段脚本中引入了需要执行的task_id,并对dag 进行了实例化。

里面的bash_command参数是对于具体执行这个task任务的脚本或命令。

还有Trigger_rule参数为该task任务执行的触发条件,官 方文档里面该触发条件有5种状态,一般常用的包括 “ ALL_DONE ” 和 ”ALL_SUCCESS” 两 种 。 其中 “ALL_DONE”为当上一个task执行完成,该task即 可执行,而”ALL_SUCCESS”为只当上一个task执行成功时,该task才能调起执行,执行失败时,本 task不执行任务。

(5)Task脚本的调度顺序

t1 >> [t2, t3]命令为task脚本的调度顺序,在该命令中先执行“t1” 任务后执行“t2, t3”任务。

一旦Operator被实例化,它被称为“任务”。实例化为在调用抽象Operator时定义一些特定值,参数化任务使之成为DAG中的一个节点。

调度顺序的其他表示方式①t1 >> t2 等价于t1.set_downstream(t2)    表示t1任务先执行②t1 << t2 等价于t1.set_upstream(t2) 表示t2任务先执行

7 Airflow常用命令行

Airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用的命令:

命令描述
airflow list_tasks userprofile用于查看当前DAG任务下的所有task列表,其中userprofile是DAG名称
airflow test userprofile age_task 20200101 用于测试DAG下面某个task是否能正常执行,其中userprofile是DAG名称,age_task是其中一个task名称
airflow backfill -s 2020-01-01 -e 2020-01-02 userprofile用于调起整个DAG脚本执行任务,其中userprofile是DAG名称,2020-01-01是脚本执行的开始日期;

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473623.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flask简介与简单项目操作流程

Flask框架简介Flask诞生于2010年&#xff0c;是Armin ronacher&#xff08;人名&#xff09;用Python语言基于Werkzeug工具箱编写的轻量级Web开发框架。它主要面向需求简单的小应用。Flask本身相当于一个内核&#xff0c;其他几乎所有的功能都要用到扩展&#xff08;邮件扩展Fl…

LeetCode 1640. 能否连接形成数组(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 arr &#xff0c;数组中的每个整数 互不相同 。 另有一个由整数数组构成的数组 pieces&#xff0c;其中的整数也 互不相同 。请你以 任意顺序 连接 pieces 中的数组以形成 arr 。但是&#xff0c;不允许 对每个数组 pieces[i]…

python基本知识、数据库、网络、编程等总结

Python语言特性 1 Python的函数参数传递 看两个例子: a 1 def fun(a):a 2 fun(a) print a # 1 a [] def fun(a):a.append(1) fun(a) print a # [1] 所有的变量都可以理解是内存中一个对象的“引用”&#xff0c;或者&#xff0c;也可以看似c中void*的感觉。 通过id来看引…

LeetCode 1641. 统计字典序元音字符串的数目(DP)

文章目录1. 题目2. 解题1. 题目 给你一个整数 n&#xff0c;请返回长度为 n 、仅由元音 (a, e, i, o, u) 组成且按 字典序排列 的字符串数量。 字符串 s 按 字典序排列 需要满足&#xff1a;对于所有有效的 i&#xff0c;s[i] 在字母表中的位置总是与 s[i1] 相同或在 s[i1] 之…

LeetCode 1642. 可以到达的最远建筑(二分查找 / 优先队列贪心)

文章目录1. 题目2. 解题2.1 二分查找2.2 优先队列贪心1. 题目 给你一个整数数组 heights &#xff0c;表示建筑物的高度。另有一些砖块 bricks 和梯子 ladders 。 你从建筑物 0 开始旅程&#xff0c;不断向后面的建筑物移动&#xff0c;期间可能会用到砖块或梯子。 当从建筑…

ClickHouse高可用及副本测试

1 概述 ​ 对于默认的分布式表的配置&#xff0c;每个分片只有一份&#xff0c;这种多分片单副本集群&#xff0c;挂掉一个节点的话查询分布式表会报错。为了解决这个问题的话可以使用ClickHouse高可用集群&#xff0c;对于每个分片具有2个或2个以上的副本&#xff0c;当某个节…

阿里云Redis读写分离典型场景:如何轻松搭建电商秒杀系统

秒杀活动是绝大部分电商选择的低价促销&#xff0c;推广品牌的方式。不仅可以给平台带来用户量&#xff0c;还可以提高平台知名度。一个好的秒杀系统&#xff0c;可以提高平台系统的稳定性和公平性&#xff0c;获得更好的用户体验&#xff0c;提升平台的口碑&#xff0c;从而提…

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在&#xff0c;我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下&#xff1a; 对于三元组(A, B, C) &#xff0c;“C”为顶层方块&#xff0c;方块“A”、“B”分别作为方块“…

Flask框架项目实例:**租房网站(一)

Flask是一款MVC框架&#xff0c;主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识&#xff0c;通过完成作者-读书功能&#xff0c;先来熟悉Flask框架的完整使用步骤。 操作步骤为&#xff1a; 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

LeetCode 316. 去除重复字母 / 1081. 不同字符的最小子序列(单调栈)

文章目录1. 题目2. 解题1. 题目 LC 316&#xff1a; 给你一个字符串 s &#xff0c;请你去除字符串中重复的字母&#xff0c;使得每个字母只出现一次。需保证 返回结果的字典序最小&#xff08;要求不能打乱其他字符的相对位置&#xff09;。 示例 1&#xff1a; 输入&#…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受&#xff0c;比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组&#xff0c;例如&#xff1a;&qu…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署&#xff0c;主要包括两个方面&#xff1a; 1.nginx安装与配置&#xff1a; 1、Nginx 安装 系统平台&#xff1a;CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number&#xff0c;并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口&#xff0c;从左到右遍历该矩阵 number&#xff0c; 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的&#xff0c;监控是故障诊断和分析的重要辅助利器&#xff0c;在发生事故之前就能预警&#xff0c;最大限度降低系统故障率。   监控系统我们可以分为业务层面&#xff0c;应用层面&#xff0c;系统层面 1.1 业务层面 业务系…

天池 在线编程 队列检查(排序)

文章目录1. 题目2. 解题1. 题目 描述 班上的学生根据他们的年级照片的身高升序排列&#xff0c;确定当前未站在正确位置的学生人数 数组长度 < 10^5示例 输入: heights [1,1,3,3,4,1]输出: 3解释: 经过排序后 heights变成了[1,1,1,3,3,4]&#xff0c;有三个学生不在应在…

celery异步执行任务在Django中的应用实例

1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp demo2.在celery_demo模块中创建celery.py模块, 文件目录为: celery.py模块内容为: from celery import Celery from django.conf import settings import os#…

Spring自学教程-注解的使用(三)

一、java中的注解定义注解下面是一个定义注解的实例。Target(ElementType.TYPE)Retention(RetentionPolicy.RUNTIME)DocumentedInheritedpublic interface Description { String value();}其中的interface是一个关键字&#xff0c;在设计annotations的时候必须把一个类型定义为…

Django单元测试

一.前言/准备 测Django的东西仅限于在MTV模型。哪些可以测&#xff1f;哪些不可以。 1.html里的东西不能测。①Html里的HTML代码大部分都是写死的②嵌套在html中的Django模板语言也不能测&#xff0c;即使有部分逻辑。 但写测试用例时至少要调用一个类或者方法。模板语言没有出…

天池 在线编程 中位数

文章目录1. 题目2. 解题1. 题目 描述 给定一个长度为N的整数数组arr 返回一个长度为N的整数答案数组ans ans[i] 表示删除arr数组第i个数后&#xff0c;arr数组的中位数 N为偶数 2 < N < 10^5 示例 输入:[1,2,3,4,5,6] 输出:[4,4,4,3,3,3] 解释:删去1后 剩下的数组为[…

倒排索引原理和实现

关于倒排索引 搜索引擎通常检索的场景是&#xff1a;给定几个关键词&#xff0c;找出包含关键词的文档。怎么快速找到包含某个关键词的文档就成为搜索的关键。这里我们借助单词——文档矩阵模型&#xff0c;通过这个模型我们可以很方便知道某篇文档包含哪些关键词&#xff0c;某…