AirFlow官方入门DAG示例

经过前两篇文章的简单介绍之后,我们安装了自己的AirFlow以及简单了解了DAG的定义文件.现在我们要实现自己的一个DAG.

1. 启动Web服务器

使用如下命令启用:

airflow webserver

现在可以通过将浏览器导航到启动Airflow的主机上的8080端口来访问Airflow UI,例如:http://localhost:8080/admin/

img

备注

Airflow附带了许多示例DAG。 请注意,在你自己的`dags_folder`中至少有一个DAG定义文件之前,这些示例可能无法正常工作。你可以通过更改`airflow.cfg`中的`load_examples`设置来隐藏示例DAG。

2. 第一个AirFlow DAG

现在一切都准备好了,我们开始写一些代码,来实现我们的第一个DAG。 我们将首先创建一个Hello World工作流程,其中除了向日志发送"Hello world!"之外什么都不做。

创建你的dags_folder,那就是你的DAG定义文件存储目录---$AIRFLOW_HOME/dags。在该目录中创建一个名为hello_world.py的文件。

AIRFLOW_HOME
├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│   ├── hello_world.py
│   └── hello_world.pyc
└── unittests.cfg

将以下代码添加到dags/hello_world.py中:


# -*- coding: utf-8 -*-import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta#-------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initializationdefault_args = {'owner': 'jifeng.si','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['1203745031@qq.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5)
}#-------------------------------------------------------------------------------
# dagdag = DAG('example_hello_world_dag',default_args=default_args,description='my first DAG',schedule_interval=timedelta(days=1))#-------------------------------------------------------------------------------
# first operatordate_operator = BashOperator(task_id='date_task',bash_command='date',dag=dag)#-------------------------------------------------------------------------------
# second operatorsleep_operator = BashOperator(task_id='sleep_task',depends_on_past=False,bash_command='sleep 5',dag=dag)#-------------------------------------------------------------------------------
# third operatordef print_hello():return 'Hello world!'hello_operator = PythonOperator(task_id='hello_task',python_callable=print_hello,dag=dag)#-------------------------------------------------------------------------------
# dependenciessleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(date_operator)

该文件创建一个简单的DAG,只有三个运算符,两个BaseOperator(一个打印日期一个休眠5秒),另一个为PythonOperator在执行任务时调用print_hello函数。

3. 测试代码

使用如下命令测试一下我们写的代码的正确性

python ~/opt/airflow/dags/hello_world.py

如果你的脚本没有抛出异常,这意味着你代码中没有错误,并且你的Airflow环境是健全的。

下面测试一下我们的DAG中的Task.使用如下命令查看我们example_hello_world_dagDAG下有什么Task:

xiaosi@yoona:~$ airflow list_tasks example_hello_world_dag

可以看到我们有三个Task:

date_task
hello_task
sleep_task

下面分别测试一下这几个Task:

(1) 测试date_task

xiaosi@yoona:~$ airflow test example_hello_world_dag date_task 20170803

(2) 测试hello_task

xiaosi@yoona:~$ airflow test example_hello_world_dag hello_task 20170803

如果没有问题,我们就可以运行我们的DAG了.

4. 运行DAG

为了运行你的DAG,打开另一个终端,并通过如下命令来启动Airflow调度程序:

airflow scheduler

备注

调度程序将发送任务进行执行。默认Airflow设置依赖于一个名为`SequentialExecutor`的执行器,它由调度程序自动启动。在生产中,你可以使用更强大的执行器,如`CeleryExecutor`。

当你在浏览器中重新加载Airflow UI时,应该会在Airflow UI中看到你的hello_world DAG。

img

为了启动DAG Run,首先打开工作流(off键),然后单击Trigger Dag按钮(Links 第一个按钮),最后单击Graph View按钮(Links 第三个按钮)以查看运行进度:

img

你可以重新加载图形视图,直到两个任务达到状态成功。完成后,你可以单击hello_task,然后单击View Log查看日志。如果一切都按预期工作,日志应该显示一些行,其中之一是这样的:

[2017-08-03 09:46:43,236] {base_task_runner.py:95} INFO - Subtask: [2017-08-03 09:46:43,235] {python_operator.py:81} INFO - Done. Returned value was: Hello world![2017-08-03 09:46:47,378] {jobs.py:2083} INFO - Task exited with return code 0

更多多资讯或疑问内容请关注 微信公众号 “让梦飞起来” 或添加小编微信, 后台回复 “Python” ,领取更多资料哦

                                    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/547859.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三层业务类(DAL)必用的通用方法之一

写代码有两年多的时间了&#xff0c;越来越觉得代码的通用性是衡量一个程序员的标准。 代码 #regionSqlDataReader > List///<summary>///author:Stone_W///date:2010.11.29///desc:SqlDataReader 转 List///</summary>///<param name"dr">SqlD…

使用 Packer、Ansible 和 Terraform 构建不可变的基础设施Devops工具链

在容器编排领域&#xff0c;Kubernetes 已成为事实上的标准&#xff0c;而容器镜像 (Docker Image) 作为容器技术栈中最关键的创新之一&#xff0c;极大的推动了企业内部 Devops 运动的进程。 容器镜像所具有的轻量性、便携性、分层机制和内核共享机制真正意义上实现了 “Buil…

用于检测敏感词的 PHP 扩展

2019独角兽企业重金招聘Python工程师标准>>> 敏感词过滤是我朝程序员必须具备的一种特殊技能&#xff0c;随着敏感词越来越多&#xff0c;是时候写个扩展来快速的进行敏感词检测了使用说明 1. 安装 libdatrie tar zxf libdatrie-0.2.4.tar.gz cd libdatrie-0.2.4 .…

缓存通用管理类 + 缓存 HttpContext.Current.Cache 和 HttpRuntime.Cache 的区别

以前写asp.net时用HttpContext.Current.Cache存缓存很好用&#xff0c;今天写了一个windows服务程序&#xff0c;HttpContext.Current.Cache存缓存的时候还好&#xff0c;取的时候一直报错“未将对象引用到实例”很郁闷&#xff0c;查询了一下资料才明白引用程序缓存要用HttpRu…

Ubuntu 加速安装Opencv 3.4.3

Ubuntu 18.04 完美安装Opencv 3.4.3 1.1 下载Opencv 3.4.3 在http://opencv.org/网址中找到下载连接&#xff0c;版本选择&#xff1a;https://github.com/opencv/opencv/releases 下载地址&#xff1a;https://github.com/Itseez/opencv/archive/3.4.3.zip (此处可以使用w…

Net和T-sql中的日期函数操作

net中的日期函数代码&#xff1a; 代码 1 DateTime now DateTime.Now; 2 // 当前月的第一天 3 DateTime d1 new DateTime(now.Year, now.Month, 1); 4 // 当前月的最后一天 5 DateTime d2 d1.AddMonths(1).AddDays(-1); 6 if (now.Day d2.Day) 7 { 8 // 当日是当月最后…

kiwiboard 购买记录小结

2019独角兽企业重金招聘Python工程师标准>>> 后悔啊&#xff0c;买前没有做过调研&#xff0c;脑子一热就买了个全套的&#xff0c;现在后悔来不及啦&#xff0c;肠子都青掉了。。。 说说体会吧&#xff1a; 1. 配件很少&#xff0c;没有uart线&#xff0c;也没有…

xml文件转换成图片_怎样能把PDF文件转换成图片?

我们的日常生活工作中时常碰到pdf与Excel、Word、ppt和jpg等文件格式的转换&#xff0c;有时候由于工作的需要&#xff0c;要把PDF文件转换成图片。并且现在网上的很多素材都是PDF文件格式的&#xff0c;如果我们想要里面的图片就变得很难办了。采取截图的方式得到的图片很模糊…

vmware安装渗透系统 Linux Kail最新版

https://mirror-1.truenetwork.ru/kali-images/kali-2020.3/kali-linux-2020.3-installer-amd64.iso Kali Linux安装的磁盘空间的最小值是8GB。为了便于使用&#xff0c;这里推荐至少25GB去保存附加程序和文件。内存最好为512MB以上。Kali Linux的下载地址http://www.kali.org/…

Thread线程的深刻理解和代理方法参数[有图有真相]

在这说的是Thread的基本用法&#xff0c;线程池ThreadPool在这就不说的&#xff0c;以前的blog有写&#xff0c;基本上两个用法都是相同的。基本用法和图&#xff0c;不需要的大鸟请绕行&#xff0c;谢谢&#xff01; 目录&#xff1a; 1.Thread基本用法与异步线程理解。 2.线…

linux ubuntu 编写c/c++ 获取命令行传入参数示例

linux ubuntu 编写c/c 获取命令行传入参数示例 g test.cpp -o test ./test -d video.jpg test.cpp 代码如下 #include <iostream> #include <unistd.h>using namespace std;int main(int argc, char* argv[]) {//参数变量初始化string db_path;int ch;opterr …

python执行系统命令后获取返回值的几种方式

import commands output commands.getstatusoutput(ps -aux) print output更多资讯或疑问内容请关注 微信公众号 “让梦飞起来” 或添加小编微信&#xff0c; 后台回复 “Python” &#xff0c;领取更多资料哦

一些好用的开源控件

工作两年&#xff0c;一直都在做些编码方面的表面功夫&#xff0c;实现了很多很炫的功能&#xff0c;在此写下一些体验。有些比较小的dll文件我会发上来&#xff0c;如果是开源组织的代码我会把地址附上&#xff0c;毕竟人家是会更新的。大家还有什么好用的开源控件欢迎补充。 …

python3爬取百度图片

python3爬取百度图片 最终目的&#xff1a;能通过输入关键字进行搜索&#xff0c;爬取相应的图片存储到本地或者数据库 首先打开百度图片的网站&#xff0c;搜索任意一个关键字&#xff0c;比如说&#xff1a;水果&#xff0c;得到如下的界面 分析&#xff1a; 1、百度图片搜…

Windows Phone 7 LongListSelector控件实现分类列表和字母索引

在wp7手机里面的联系人列表和程序里面里面我们可以看到一个根据字母索引来定位联系人或者应用程序的控件&#xff0c;那么这个控件就是LongListSelector控件了。 LongListSelector是一种比ListBox更加强大的列表控件&#xff0c;你可以根据你列表的信息来分类排列&#xff0c;根…

c# 获取电脑硬件信息通用查询类[测试通过]

C#获取电脑硬件信息通用类[Computer]代码展示和分析&#xff0c;简介如下&#xff1a; 1.项目中添加System.Management引用。 2.添加类Computer&#xff0c;把下面代码全选&#xff0c;复制&#xff0c;粘贴。 3.使用方法new Computer().GetComputerName()。 代码 usingSyst…

基于ssm北关村基本办公管理系统的设计与实现论文

摘 要 在如今社会上&#xff0c;关于信息上面的处理&#xff0c;没有任何一个企业或者个人会忽视&#xff0c;如何让信息急速传递&#xff0c;并且归档储存查询&#xff0c;采用之前的纸张记录模式已经不符合当前使用要求了。所以&#xff0c;对北关村基本办公信息管理的提升&…

C# 操作线程的通用类[测试通过]

进程管理就是对服务器性能的管理和协调&#xff0c;在程序的运行角度来看非常重要&#xff0c;也可以根据操作进程的手段&#xff0c;衍生很多实用和智能的功能&#xff0c;以下就是介绍一个自己写的进程通用操作类&#xff0c;功能如下&#xff1a; 1.把ProcessUtility类直接…

宽字符编码和解码通用类[CodeWidthChartUtility]

在做jsonp传递的时候遇到一个问题&#xff0c;当有特殊字符或中文的时候就会导致数据错误或者是乱码&#xff0c;刚开始有js的编码和解码和正则&#xff0c;都比较麻烦&#xff0c;现在找到了一种合适的解决方案&#xff0c;宽字符编码&#xff0c;js端会自动解析&#xff0c;能…

Ubuntu16.04下安装cuda和cudnn的三种方法(亲测全部有效)

目录 1.cuda的安装 1.1 最简单的方法——分开安装驱动和cuda 1.2 更万能的方法——同时安装驱动和cuda 1.3 终极杀手锏 2.cudnn的安装 安装之前首先要确认你需要安装的cuda和cudnn的版本&#xff0c;假如你后续还需要安装tensorflow的话&#xff0c;请看我的另外一篇博客&am…