Airflow安装与使用

# Airflow 1.10+安装

本次安装Airflow版本为1.10+,其需要依赖Python和DB,本次选择的DB为Mysql。
本次安装组件及版本如下:Airflow == 1.10.0
Python == 3.6.5
Mysql == 5.7

# 整体流程
1. 建表
2. 安装
3. 配置
4. 运行
5. 配置任务


```
启动schedule
airflow scheduler -D
启动webserver
airflow webserver -D

ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}
## 建库、建用户

```
库名为airflow
'create database airflow;'
建用户

用户名为airflow,并且设置所有ip均可以访问。

create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';

用户授权
这里为新建的airflow用户授予airflow库的所有权限

grant all on airflow.* to 'airflow'@'%';
flush privileges
```

## Airflow安装
```
这里通过 virtualenv 进行安装。

----- 通过virtualenv安装

$ mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 创建目录
$ virtualenv --no-site-packages airflow --python=python # 创建虚拟环境
$ source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境

----- 安装指定版本或者默认
$ pip install apache-airflow -i https://pypi.douban.com/simple
在安装完一堆的依赖后,就需要配置 AIRFLOW_HOME 环境变量,后续的 DAG 和 Plugin 都将以该目录作为根目录查找,如上,可以直接设置为 /tmp/project 。

报错
ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible.
ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible.

执行:pip3 install -U Flask==1.0.4
执行:pip3 install -U pika==0.13.1


重新执行 :pip install apache-airflow -i https://pypi.douban.com/simple


----- 设置环境变量
(airflow) $ export AIRFLOW_HOME=/tmp/airflow


----- 查看其版本信息
(airflow) $ airflow version
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.8.0
执行了上述的命令后,会生成 airflow.cfg 和 unittests.cfg 两个文件,其中前者是一个配置文件 。

## airflow 配置


----- 修改Airflow DB配置
### 1. 安装Mysql模块

pip install "apache-airflow[mysql]"
这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。

修改Airflow DB配置
修改${AIRFLOW_HOME}/airflow.cfg

sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
参数的格式为mysql://帐号:密码@ip:port/db

初始化db
新建airflow依赖的表。

airflow initdb

如报错 Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)
需改sql_alchemy_conn = mysql+mysqldb://airflow:airflow@127.0.0.1:3306/airflow

```

### 2. 用户认证
```
本文采用的用户认证方式为password方式,其他方式如LDAP同样支持但是本文不会介绍。笔者在安装时实验过LDAP方式但是未成功过。

安装passsword组件
pip install "apache-airflow[password]"
2. 修改 airflow.cfg

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
3. 在python环境中执行如下代码以添加账户:

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin' # 用户名
user.email = 'emailExample@163.com' # 用户邮箱
user.password = 'password' # 用户密码
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
```


### 3. 配置邮件服务
此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下:
```
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = mailExample@163.com
smtp_password = password
smtp_port = 25
smtp_mail_from = mailExample@163.com
接下来简单把dag的Python代码列出来,以供参考:

default_args = {
'owner': 'ownerExample',
'start_date': datetime(2018, 9, 18),
'email': ['mailReceiver@163.com'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。
'email_on_failure': ['mailReceiver@163.com'], # 任务失败且重试次数用完时发送Email。
'email_on_retry': True, # 任务重试时是否发送Email
'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
```

### 4、配置Executor
```
设置Executor
修改:airflow.cfg

executor = LocalExecutor
本文中由于只有单节点所以使用的是LocalExecutor模式。
```

### 5. 修改log地址
```
[core]
base_log_folder = /servers/logs/airflow
[scheduler]
child_process_log_directory = servers/logs/airflow/scheduler
```

### 6. 修改webserver地址
```
修改webserver地址
[webserver]
base_url = http://host:port
可以通过上面配置的地址访问webserver。
```

### 7. 可选配置

```
(可选)修改Scheduler线程数
如果调度任务不多的话可以把线程数调小,默认为32。参数为:parallelism


(可选)不加载example dag
如果不想加载示例dag可以把load_examples配置改为False,默认为True。这个配置只有在第一次启动airflow之前设置才有效。


如果此方法不生效,可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录,也是同样的效果。

(可选)修改检测新dag间隔
修改min_file_process_interval参数为10,每10s识别一次新的dag。默认为0,没有时间间隔。

```

## 运行airflow
```
启动schedule
airflow scheduler
启动webserver
airflow webserver
```


## 安装问题汇总
```
1. Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql


修改Mysql配置文件my.cnf,具体步骤如下:

查找my.cnf文件位置
mysql --help | grep my.cnf
下图红框处为my.cnf文件所在位置:


修改文件
explicit_defaults_for_timestamp=true
注意:必须写在【mysqld】下

重启Mysql
sudo systemctl restart mysqld.service
查看修改是否生效。执行如下SQL,如果值为1则为生效。


2. pip install "apache-airflow[mysql]"报错:

mysql_config not found

安装mysql-devel:

首先查看是否有mysql_config文件。
find / -name mysql_config

如果没有安装mysql-devel
yum install mysql-devel
安装之后再次查找,结果如图:

3. 其他问题找我
```

## 配置任务

在 AirFlow 中,每个节点都是一个任务,可以是一条命令行 (BashOperator),可以是一段 Python 脚本 (PythonOperator) 等等,然后这些节点根据依赖关系构成了一条流程,一个图,称为一个 DAG 。

默认会到 ${AIRFLOW_HOME}/dags 目录下查找,可以直接在该目录下创建相应的文件。

如下是一个简单的示例。

```
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, datetime
import pytz

# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
'owner': 'qxy',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

tz = pytz.timezone('Asia/Shanghai')
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)

dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)


dag = DAG(
'airflow_interval_test',
default_args=default_args,
description='airflow_interval_test',
schedule_interval='35 17 * * *',
start_date=utc_dt

)

t1 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag)

t2 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t1 >> t2

```
该文件创建一个简单的 DAG,只有三个运算符,两个 BaseOperator ,也就是执行 Bash 命令分别打印日期以及休眠 5 秒;另一个为 PythonOperator 在执行任务时调用 print_hello() 函数。
文件创建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自动读取该DAG。

----- 测试是否正常,如果无报错那么就说明正常
$ python /tmp/project/dags/hello_world.py

转载于:https://www.cnblogs.com/duanhaoxin/p/11211815.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363558.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Wordpress安装

Wordpress是一种用PHP语言和MySQL的数据库的开源的免费Blog引擎,用户可以在支持PHP和MySQL数据库的服务器上建立自己的Blog,它具有非常强大的功能和成千上万的插件和模板主题。安装步骤如下(本文以Wordpress2.6中文版为例): (1) 在MySQL中创建Wordpress库&#xff0…

python 内置标准库socketserver模块的思考

socketserver模块简化了编写网络服务器的任务, 在很大程度上封装了一些操作, 你可以看成是事件驱动型的设计, 这很不错。它定义了两个最基本的类--服务器类 BaseServer, 请求处理类 BaseRequestHandler. BaseServer 基本服务器类封装了基本的一些socket操作, socket原语中对so…

关于素数的一些定理

1.何谓素数? 指在一个大于1的整数中,如果一个数只能整除1与本身,则该数为素数(质数),否则为合数。 1既不是素数也不是合数 2.素数个数无限多 3.所有大于2的素数都可以唯一的表示为两个平方数之差 pa^2-b^2(…

教师节的感触

刚喝了点酒,因为今天是教师节,我觉得博客是一个很好的平台,可以把自己的一些感想就是的发表出来。把音乐放出来一 边写着博客是一种很自我的感觉,可以在微酣的时候把自己的点滴感受好好的梳理一下倒是一种不错的方式。这是自己第一…

Spring 4.1和Java 8:java.util.Optional

由于Spring 4.1的Java 8的的java.util.Optional ,容器对象可能会或可能不包含非空值,支持与RequestParam , RequestHeader和MatrixVariable 。 使用Java 8的java.util.Optional请确保参数永远不会为null 。 请求参数 在此示例中,…

计算机网络层实验路由表苏州科技,苏州科技大学计算机网络实验报告课案.docx...

苏州科技大学计算机网络实验报告课案苏州科技学院 电子信息实验中心实验报告课  程计算机网络原理学    名徐金玮班  级计算机1312专  业计算机科学与技术指导教师陶滔学年 / 学期2015~2016学年第一学期实验一 物理层实验实验项目性质:设计性  计划学…

Fibonacci again and again

Fibonacci again and again http://acm.hdu.edu.cn/showproblem.php?pid1848 Time Limit: 1000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others)Total Submission(s): 12494 Accepted Submission(s): 5439 Problem Description任何一个大学生对菲波那…

使用闭包的方式实现一个累加函数 addNum

使用闭包的方式实现一个累加函数 addNum,参数为 number 类型,每次返回的结果 上一次计算的值 传入的值,如: addNum(10); //10 addNum(12); //22 addNum(30); //52 写法一 1 function sum(numberOne) {2 var count…

妈妈的菜谱-豉油鸡

我妈周末来我家,给我做一道豉油鸡,我记录下来,分享给大家 1、下午四点半左右,我、老弟以及亲妈,到隔壁市场鸡场,挑选了一只类似的芦花阉鸡活鸡,价格是17块钱每斤,大概三斤多点&#…

【Vegas2008】9月19日-青椒炒南瓜

南瓜,在俺们家,宁夏也叫葫芦。这道菜泡米饭相当好吃。 主料:青椒、南瓜 配料:葱、姜 调料:盐、味精、胡椒粉 做法:1,南瓜切块,青椒切块;2,油烧至6成熟&#x…

计算机机房t4,机房等级-T2-T3-T4-如何划分

如何在众多的IDC服务商中的辨别出服务器和机房环境的好坏呢?小编告诉大家有这么个指标是我们在选购服务器的时候可以参考,且具有权威性的。国际正常时间协会(the Uptime Institute,简称UI)依据数据中心基础设施的可用性把机房划分为四个级别&#xff1a…

在N + 1场景中使用@NamedEntityGraph更有选择地加载JPA实体

N 1问题是使用ORM解决方案时的常见问题。 当您将某些OneToMany关系的fetchType设置为lazy时,会发生这种情况,以便仅在访问Set / List时才加载子实体。 假设我们有一个具有两个关系的Customer实体:每个客户的一组订单和一组地址。 OneToMany…

JetBrains系列IDE快捷键大全(转载)

编辑 快捷键组合说明Ctrl Space代码自动完成提示(选择)Alt Enter显示意图动作和快速修复Ctrl P参数信息 (在调用方法参数忘记的时候,提示)Ctrl Q快速查找文件,可以查找当前类定义的文件等Ctrl 鼠标滑过…

Vs Code 配置C/C++ 开发环境

第一步:下载 Vs Code 点击链接下载Vs Code 下载版本 并安装 https://code.visualstudio.com/ 点击 Download for Windwos 安装时 如图:请一定要勾选 添加到PATH (环境变量) 其他选项可根据个人需要选配 但建议全部勾选 第二步&#xf…

28. css样式中px转rem

Vue3:脚手架配置 https://blog.csdn.net/weixin_41424247/article/details/80867351 与原来的vue-cli 2.x版本不同的是:如果使用最新版本的vue/cli初始化vue项目时,通常看不到webpack的配制文件。而在原来的2.x版本,我们可以在utils.js中轻…

集合已修改;可能无法执行枚举操作。

在对某个List进行遍历的同时,需要对其中的Item进行删除操作。 会提示错误:集合已修改;可能无法执行枚举操作 Codeforeach (VirtualTDate vtDate in tempList){ if (vtDate.Date itemTime.Date) { tempList.Remove(vtDate); …

UI测试脸型软件,App脸型美化剖析|UI-影视-其他|观点|freshoil - 原创文章 - 站酷 (ZCOOL)...

本文基于市面上多款App的美颜效果,做了一个对比分析,整理出一个可以指导美颜调教的参考规范。研究的几个要点如下:1.通过对 某陌、某音、某Y、某他相机、某天P图的效果对比分析2.本次只针对默认效果做对比(某Y无默认则选择自然)3.统一使用前置…

使用Docker,Chef和Amazon OpsWorks进行集群范围的Java / Scala应用程序部署

Docker非常适合在单个节点上运行隔离的容器。 但是,大多数软件系统都在多个节点上运行,因此,除了Docker之外,我们还需要某种方法来指定哪些容器应在哪些节点上运行。 我要解决的特定问题如下:我有两个Scala守护程序&a…

根据输入成绩显示成绩等级(新手)

//导入包。 import java.util.Scanner; //定义一个类。 public class zy238{    //公共静态的主方法。 public static void main(String[] args){ //打印提示。     System.out.println("请你输入成绩"); //为其创建变量。     Scanner sc new Scanner(Sy…

Tmux: 打造精致与实用并存的终端

由于最近需要经常 ssh 到远程环境,遂趁此折腾了一番 tmux。毕竟 工欲善其事,必先利其器 以下是我的配置文件地址,并在不断摸索与更新中。特别喜欢 solarized 主题,于是参考它配了状态栏的主题。在后边我会列出一些平时使用的技巧&…