工作流管理框架airflow-安装部署教程

1 概述

Airflow是一个以编程方式编写,用于管理和调度工作流的平台。可以帮助你定义复杂的工作流程,然后在集群上执行和监控这些工作流。

Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。Airflow的可扩展Python框架可以让你构建连接几乎任何技术的工作流程。丰富的用户界面可以随时查看生产中正在运行的管道,帮助你管理工作流程的状态,监视进度以及需要时对问题进行故障排除。

Airflow的主要组件有:

DAG(有向无环图):使用Airflow将工作流编写任务的有向无环图(DAG)。一个DAG定义了一个工作流,它包含所有任务、任务的依赖关系和时间表。

任务(Task):一个任务定义了一个单独的单元工作,有一个确定的开始和结束。一个任务可以依赖于其他任务。

运算符(Operator):一个运算符封装了一个任务,并定义了它的执行逻辑。Airflow内置了许多运算符,如BashOperator、PythonOperator、EmailOperator等。你也可以自定义运算符。

时间轴(Timeline):时间轴让你以图形方式查看 DAG 的运行情况和状态。

调度器(Scheduler):调度器监视时间轴并触发需要运行的任务。

执行器(Executor):executor负责实际运行任务。Airflow支持多种executor,如LocalExecutor, CeleryExecutor, KubernetesExecutor 等。

2 名词

(1)Dynamic:Airflow管道是用Python代码配置的,允许动态生成管道。Airflow配置需要使用Python,这允许编写可动态实例化管道的代码。

(2)Extensible:Airflow框架包含许多运算符来连接各种技术。Airflow的所有组件都是可扩展的。轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。

(3)Elegant:Airlfow是精简灵活的,使用功能强大的Jinja模板引擎,将脚本参数化内置于Airflow的核心中。

(4)Scalable:Airflow具有模板块架构,并使用消息队列来安排任意数量的工作任务。

3 airflow优缺点

优点:

Python脚本实现DAG,非常容易扩展;

可实现复杂的依赖规则;

外部依赖较少,搭建容易,仅依赖DB和rabbitmq;

工作流依赖可视化。有一套完整的UI,可视化展现所有任务的状态及历史信息;(本人刚开始主要看重这点)

完全支持crontab定时任务格式,可以通过crontab格式指定任务何时进行;

业务代码和调度系统解耦,每个业务的流程代码以独立的Python脚本描述,里面定义了流程化的节点来执行业务逻辑,支持任务的热加载.

缺点:

Airflow是为有限的批处理工作流构建的。虽然CLI和REST API确实允许触发工作流,但Airflow不是为无限运行的基于事件的工作流构建的。Airflow不是流解决方案。然而,像Apache Kafka这样的流系统通常与Apache Airflow一起使用。Kafka可以用于实时接收和处理事件数据,事件数据被写入存储位置,Airflow定期启动处理一批数据的工作流。

如果你更喜欢点击而不是编码,Airflow可能不是正确的解决方案。Web界面旨在最大限度地简化工作流的管理,Airflow框架不断改进以最大限度地简化开发人员体验。然而,Airflow的理念是将工作流定义为代码,所以代码始终是必需的。

4 Airflow安装

airflow官网地址:https://airflow.apache.org。

1)先安装并配置好python环境(可以参考Anaconda安装即可,如果项目不需要依赖太多工具包,可选择更简洁的MiniConda)并激活。

2)安装airflow

pip install apache-airflow

3)初始化airflow

airflow db init

4)查看版本

airflow version

5)启动airflow web服务,启动后浏览器访问http://ip_address:12025(如果不知道ip地址的就用ifconfig命令去linux下获取)

airflow webserver -p 12025 -D

6)启动airflow调度

airflow scheduler -D

7)创建账号(斜杠别忘记了)

airflow users create \

  --username admin \

  --firstname trisyp \

  --lastname trisyp \

  --role Admin \

  --email trisyp@email.com

回车之后会让你输入两次password,我们就用123456

8)启动停止脚本

vim af.sh

#!/bin/bash

case $1 in

"start"){

    echo " --------启动 airflow-------"

    ssh ip_address "conda activate airflow;airflow webserver -p 12025 -D;airflow scheduler -D; conda deactivate"

};;

"stop"){

    echo " --------关闭 airflow-------"

    ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15

};;

esac

添加权限即可使用。

trisyp@ip_address bin]$ chmod +x af.sh

5 修改数据库为MySQL

1)先在MySQL中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭MySQLSSL证书

查看SSL是否开启  YES为开启

mysql> SHOW VARIABLES LIKE '%ssl%';

+---------------+-----------------+

| Variable_name | Value           |

+---------------+-----------------+

| have_openssl  | YES             |

| have_ssl      | YES             |

| ssl_ca        | ca.pem          |

| ssl_capath    |                 |

| ssl_cert      | server-cert.pem |

| ssl_cipher    |                 |

| ssl_crl       |                 |

| ssl_crlpath   |                 |

| ssl_key       | server-key.pem  |

+---------------+-----------------+

3)修改配置文件my.cnf(注意:直接数据库修改值不起作用),加入以下内容:

# disable_ssl

skip_ssl

4)添加python连接的依赖,官网介绍的方法有两种:

这里我们选择mysql+mysqlconnector。

pip install mysql-connector-python

5)修改airflow的配置文件(vim ~/airflow/airflow.cfg):

[database]

# The SqlAlchemy connection string to the metadata database.

# SqlAlchemy supports many different database engines.

# More information here:

# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri

#sql_alchemy_conn = sqlite:home/trisyp/airflow/airflow.db

sql_alchemy_conn = mysql+mysqlconnector://root:123456@ip_address:3306/airflow_db

6)关闭airflow,初始化后重启:

af.sh stop

airflow db init

af.sh start

7)若初始化报错1067 - Invalid default value for ‘update_at’:

原因:字段 'update_at' timestamp类型,取值范围是:1970-01-01 00:00:00 2037-12-31 23:59:59UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改mysql存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启MySQL会造成参数失效(注意:这样就需要重新创建账号),推荐将参数写入到配置文件my.cnf中。

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

6 修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

1)修改airflow的配置文件(vim ~/airflow/airflow.cfg)

[core]

# The executor class that airflow should use. Choices include

# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,

# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the

# full import path to the class when using a custom executor.

executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。

7 部署使用

1)测试环境启动

本次测试使用的是spark的官方案例,所有需要启动hadoop和spark的历史服务器。

myhadoop.sh start

cd /opt/module/spark-yarn/sbin/start-history-server.sh

2)查看Airflow配置文件

vim ~/airflow/airflow.cfg

3)编写.py脚本,创建work-py目录用于存放python调度脚本

mkdir ~/airflow/dags

cd dags/

然后把脚本文件放到dags文件夹,代码如下:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {  # 设置默认参数。

    # 用户

    'owner': 'test_owner',

    # 是否开启任务依赖

    'depends_on_past': True,

    # 邮箱

    'email': ['trisyp@email.com'],

    # 启动时间

    'start_date':datetime(2022,11,28),

    # 出错是否发邮件报警

    'email_on_failure': False,

    # 重试是否发邮件报警

    'email_on_retry': False,

    # 重试次数

    'retries': 3,

    # 重试时间间隔

    'retry_delay': timedelta(minutes=5),

}

# 声明任务图,schedule_interval:调度频率。

dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))

 

# 创建单个任务

t1 = BashOperator(  # BashOperator:具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

    # 任务id:任务唯一标识(必填)。

    task_id='dwd',

    # 具体任务执行命令。

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    # 重试次数

    retries=3,

    # 把任务添加进图中

    dag=dag)

t2 = BashOperator(

    task_id='dws',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

t3 = BashOperator(

    task_id='ads',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

# 设置任务依赖:ads任务依赖dws任务依赖dwd任务。

t2.set_upstream(t1)

t3.set_upstream(t2)

4)等待一段时间,刷新任务列表

airflow dags list

5)已出现myairflow_execute_bash任务(刷新页面)

6)点击运行

7)查看dag图、甘特图,点击成功任务,查看日志

8)查看脚本代码

9)Dag任务操作

9.1 删除Dag任务

主要删除DAG任务不会删除底层文件,过一会还会自动加载回来。

9.2 查看当前所有dag任务

# 查看所有任务

airflow dags list

# 查看单个任务

airflow tasks list test --tree

8 配置邮件服务器

1)保证邮箱已开SMTP服务

2)修改airflow配置文件,用stmps服务对应587端口

vim ~/airflow/airflow.cfg 

smtp_host = smtp.qq.com

smtp_starttls = True

smtp_ssl = False

smtp_user = trisyp@email.com

# smtp_user =

smtp_password = qluxdbuhgrhgbigi

# smtp_password =

smtp_port = 587

smtp_mail_from = trisyp@email.com

3)重启airflow

af.sh stop

af.sh start

4)编辑test.py脚本,加入emailOperator

from airflow.operators.email_operator import EmailOperator

email=EmailOperator(

    task_id="email",

    to="yaohm163@163.com ",

    subject="test-subject",

    html_content="<h1>test-content</h1>",

    cc="trisyp@email.com ",

    dag=dag)

t2.set_upstream(t1)

t3.set_upstream(t2)

email.set_upstream(t3)

5)查看页面是否生效

6)运行测试

9 避坑指南

1)Exception rendering Jinja template for task

2)Intel MKL FATAL ERROR: Cannot load ../numexpr/../../../libmkl_rt.so.1.

强制更新airflow到最新版

3)error: subprocess-exited-with-error

解决方案:

错误有明确的提示,缺少pkg-config,所以就先安装这个包,然后在安装mysqlclient。

sudo apt-get install pkg-config

4)Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2)

解决方案:

先用命令“find / -name ‘mysql.sock”来查看下这个文件所在目录,如果有就建立软连接(不要想着拷贝复制,无效的),命令是“ln -s /tmp/mysql.sock”。如果没有就找my.cnf文件,一般文件地址为/etc/mysql/my.cnf,然后通过vim加上socket路径信息,一定要加mysqld这个分组,不然会报找不到分组这个错;Found option without preceding group

5)Segmentation fault (core dumped)

解决方案:

在配置mysql存储的时候要加上mysqlconnector就解决了。这个坑非常恶心,你参照某些教程直接只配mysql,忽视了connector,碰到了还找不到解决方案,因为核心存储转移你不知道怎么搞。

cd /etc

vim profile

加入:

export AIRFLOW_HOME=/root/airflow

sudo mysql

create database airflow_db;

create user 'airflow'@'%' identified by '123456';

grant all on airflow_db .* to 'airflow'@'%';

sql_alchemy_conn = mysql://airflow:123456@10.0.0.22:3306/airflow_db

10 参考链接

https://yuchaoshui.com/1bd10cc/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/633528.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国产开源模型标杆,能力比肩ChatGPT!书生·浦语2.0发布,支持免费商用

1月17日&#xff0c;新一代大语言模型书⽣浦语2.0&#xff08;InternLM2&#xff09;正式发布并开源。 2种参数规格、3种模型版本&#xff0c;共计6个模型&#xff0c;全部免费可商用。 它支持200K超长上下文&#xff0c;可轻松读200页财报。200K文本全文范围关键信息召回准确…

Springboot日志框架logback与log4j2

目录 Springboot日志使用 Logback日志 日志格式 自定义日志格式 日志文件输出 Springboot启用log4j2日志框架 Springboot日志使用 Springboot底层是使用slf4jlogback的方式进行日志记录 Logback日志 trace&#xff1a;级别最低 debug&#xff1a;调试级别的&#xff0c…

Windows平台反调试技术学习

前言 前俩天的学习记录Windows上面的反调试学习&#xff0c;主要是参考《恶意代码实战分析》和《加密与解密》里面的&#xff0c;给每个小技术都写了程序示例&#xff0c;自己编译反调试了一遍。对于加解密一书是还有很多不理解的地方的&#xff0c;目前只能记录到这了&#x…

建筑类中级工程师职称证明业绩材料有哪些?

三、建筑类中级工程师职称造价类工程业绩材料 1.合同&#xff1a;证明项目合作关系的凭证。 2.预&#xff08;结&#xff09;算报告等(重点是体现封面有你的名字和执业印章等) 3.单位证明或任命书(本人在项目中的职务聘书) 4.工程获奖证明&#xff1a;项目获得市优的证书、省优…

Ubuntu 22.04.1 LTS VirtualBox7.0 解决虚拟机窗口失去焦点一段时间后,虚拟机显示不刷新问题

故障描述&#xff1a; virtualbox安装在ubuntu系统上&#xff0c;虚拟机内安装了windows操作系统。使用中发现&#xff0c;当linux系统窗口被激活&#xff0c;如firefox浏览器&#xff0c;虚拟机的显示一段时间后会暂停刷新&#xff0c;鼠标划入虚拟机窗口后&#xff0c;才会立…

分布式概念

文章目录 一、CAP定理和BASE定理1.1 CAP定理1.2 CAP取舍1.3 BASE定理 二、分布式事务2.1 柔性事务2.2 两阶段提交协议2.3 三阶段提交协议 三、分布式ID3.1 数据库自增ID3.2 数据库多主模式3.3 号段模式3.4 雪花算法3.5 Leaf3.6 使用Redis生成ID 四、限流算法4.1 固定窗口计数器…

TypeScript实现一个贪吃蛇小游戏

游戏效果 文件目录 准备1&#xff1a;新建index.html&#xff0c;编写游戏静态页面 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-…

小程序开发实战案例五 | 小程序如何嵌入H5页面

在接入小程序过程中会遇到需要将 H5 页面集成到小程序中情况&#xff0c;今天我们就来聊一聊怎么把 H5 页面塞到小程序中。 本篇文章将会从下面这几个方面来介绍&#xff1a; 小程序承载页面的前期准备小程序如何承载 H5小程序和 H5 页面如何通讯小程序和 H5 页面的相互跳转 小…

安全加速SCDN是什么

安全加速SCDN&#xff08;Secure Content Delivery Network&#xff0c;SCDN&#xff09; 是集分布式DDoS防护、CC防护、WAF防护、BOT行为分析为一体的安全加速解决方案。已使用内容分发网络&#xff08;CDN&#xff09;或全站加速网络&#xff08;ECDN&#xff09;的用户&…

【JavaEE】_网络通信原理

目录 1. 网络发展史 2. 网络通信基础 1.1 IP地址 1.2 端口号 1.3 协议 1.3.1 概念 1.3.2 五元组 1.4 协议分层 1.4.1 协议分层的优点 1.4.2 协议分层的分类 1.4.3网络设备所在分层 1.4.4 两台主机通过TCP/IP协议通讯过程 1.5 封装与分用 1.5.1 封装 1.5.2 分用…

Docker 容器连接

Docker 容器连接 前面我们实现了通过网络端口来访问运行在 docker 容器内的服务。 容器中可以运行一些网络应用&#xff0c;要让外部也可以访问这些应用&#xff0c;可以通过 -P 或 -p 参数来指定端口映射。 下面我们来实现通过端口连接到一个 docker 容器。 网络端口映射 …

算法练习-A+B/财务管理/实现四舍五入/牛牛的菱形字符(题目链接+题解打卡)

难度参考 难度&#xff1a;简单 分类&#xff1a;熟悉OJ与IDE的操作 难度与分类由我所参与的培训课程提供&#xff0c;但需要注意的是&#xff0c;难度与分类仅供参考。以下内容均为个人笔记&#xff0c;旨在督促自己认真学习。 题目 A B1. A B - AcWing题库财务管理1004:财…

VsCode + CMake构建项目 C/C++连接Mysql数据库 | 数据库增删改查C++封装 | 信息管理系统通用代码 ---- 课程笔记

这个是B站Up主&#xff1a;程序员程子青的视频 C封装Mysql增删改查操作_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1m24y1a79o/?p6&spm_id_frompageDriver&vd_sourcea934d7fc6f47698a29dac90a922ba5a3安装mysql:mysql 下载和安装和修改MYSQL8.0 数据库存储…

【现代密码学】笔记9-10.3-- 公钥(非对称加密)、混合加密理论《introduction to modern cryphtography》

【现代密码学】笔记9-10.3-- 公钥&#xff08;非对称加密&#xff09;、混合加密理论《introduction to modern cryphtography》 写在最前面8.1 公钥加密理论随机预言机模型&#xff08;Random Oracle Model&#xff0c;ROM&#xff09; 写在最前面 主要在 哈工大密码学课程 张…

深入vue响应式原理

当你把一个普通的 JavaScript 对象传入 Vue 实例作为 data 选项&#xff0c;Vue 将遍历此对象所有的 property&#xff0c;并使用 Object.defineProperty 把这些 property 全部转为 getter/setter。 这些 getter/setter 对用户来说是不可见的&#xff0c;但是在内部它们让 Vue …

Docker 47 个常见故障的原因和解决方法

本文针对Docker容器部署、维护过程中&#xff0c;产生的问题和故障&#xff0c;做出有针对性的说明和解决方案&#xff0c;希望可以帮助到大家去快速定位和解决类似问题故障。 Docker是一种相对使用较简单的容器&#xff0c;我们可以通过以下几种方式获取信息&#xff1a; 1、…

简单理解自动驾驶-看这篇够了!

本文主要介绍自动驾驶技术的整体框架&#xff0c;旨在从宏观理解自动驾驶技术。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;自动驾驶技术 &#x1f380;CSDN主页 发狂的小花 &#x1f304;人生秘诀&#xff1a…

第6章 现代通信技术

文章目录 6.1 图像与多媒体通信6.1.1 图像通信6.1.2 多媒体通信技术1、多媒体通信概念2、多媒体通信的组成3、多媒体通信的业务分类4、实用化的多媒体通信系统类型5、多媒体通信应用系统&#xff08;1&#xff09;多媒体会议电视系统&#xff08;2&#xff09;IPTV 6.2 移动通信…

【机器学习300问】12、为什么要进行特征归一化?

当线性回归模型的特征量变多之后&#xff0c;会出现不同的特征量&#xff0c;然而对于那些同是数值型的特征量为什么要做归一化处理呢&#xff1f; 一、为了消除数据特征之间的量纲影响 使得不同指标之间具有可比性。例如&#xff0c;分析一个人的身高和体重对健康的影响&…

每日一题——LeetCode1252.奇数值单元格的数目

进阶&#xff1a;你可以设计一个时间复杂度为 O(n m indices.length) 且仅用 O(n m) 额外空间的算法来解决此问题吗&#xff1f; 方法一 直接模拟&#xff1a; 创建一个n x m的矩阵&#xff0c;初始化所有元素为0&#xff0c;对于indices中的每一对[ri,ci]&#xff0c;将矩…