Apache Airflow (十四) :Airflow分布式集群搭建及测试

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. 节点规划

2. airflow集群搭建步骤

3. 初始化Airflow

4. 创建管理员用户信息

​​​​​​​5. 配置Scheduler HA

​​​​​​​6. 启动Airflow集群

​​​​​​​7. 访问Airflow 集群WebUI

8. 测试Airflow HA


1. 节点规划

节点IP

节点名称

节点角色

运行服务

192.168.179.4

node1

Master1

webserver,scheduler

192.168.179.5

node2

Master2

websever,scheduler

192.168.179.6

node3

Worker1

worker

192.168.179.7

node4

Worker2

worker

2. airflow集群搭建步骤

1) 在所有节点安装python3.7

参照单节点安装Airflow中安装anconda及python3.7。

2) 在所有节点上安装airflow

  • 每台节点安装airflow需要的系统依赖
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib 
  • 每台节点配置airflow环境变量
vim /etc/profileexport AIRFLOW_HOME=/root/airflow#使配置的环境变量生效source /etc/profile
  • 每台节点切换airflow环境,安装airflow,指定版本为2.1.3
(python37)   conda activate python37(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

默认Airflow安装在$ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow目录下。配置了AIRFLOW_HOME,Airflow安装后文件存储目录在AIRFLOW_HOME目录下。可以每台节点查看安装Airflow版本信息:

(python37)  airflow version2.1.3
  • 在Mysql中创建对应的库并设置参数

aiflow使用的Metadata database我们这里使用mysql,在node2节点的mysql中创建airflow使用的库及表信息。

CREATE DATABASE airflow CHARACTER SET utf8;create user 'airflow'@'%' identified by '123456';grant all privileges on airflow.* to 'airflow'@'%';flush privileges;

在mysql安装节点node2上修改”/etc/my.cnf”,在[mysqld]下添加如下内容:

[mysqld]explicit_defaults_for_timestamp=1

以上修改完成“my.cnf”值后,重启Mysql即可,重启之后,可以查询对应的参数是否生效:

#重启mysql[root@node2 ~]# service mysqld restart#重新登录mysql查询mysql> show variables like 'explicit_defaults_for_timestamp';

  • 每台节点配置Airflow airflow.cfg文件

修改AIRFLOW_HOME/airflow.cfg文件,确保所有机器使用同一份配置文件,在node1节点上配置airflow.cfg,配置如下:

[core]dags_folder = /root/airflow/dags#修改时区default_timezone = Asia/Shanghai#配置Executor类型,集群建议配置CeleryExecutorexecutor = CeleryExecutor# 配置数据库sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8[webserver]#设置时区default_ui_timezone = Asia/Shanghai[celery]#配置Celery broker使用的消息队列broker_url = redis://node4:6379/0#配置Celery broker任务完成后状态更新使用库result_backend = db+mysql://root:123456@node2:3306/airflow

将node1节点配置好的airflow.cfg发送到node2、node3、node4节点上

3. 初始化Airflow

1) 每台节点安装需要的python依赖包

初始化Airflow数据库时需要使用到连接mysql的包,执行如下命令来安装mysql对应的python包。

​
(python37) #  pip install mysqlclient -i Simple Index​

2) 在node1上初始化Airflow 数据库

(python37) [root@node1 airflow]# airflow db init

初始化之后在MySQL airflow库下会生成对应的表。

4. 创建管理员用户信息

在node1节点上执行如下命令,创建操作Airflow的用户信息:

airflow users create \--username airflow \--firstname airflow \--lastname airflow \--role Admin \--email xx@qq.com

执行完成之后,设置密码为“123456”并确认,完成Airflow管理员信息创建。

​​​​​​​5. 配置Scheduler HA

1) 下载failover组件

登录https://github.com/teamclairvoyant/airflow-scheduler-failover-controller下载 airflow-scheduler-failover-controller 第三方组件,将下载好的zip包上传到node1 “/software”目录下。

在node1节点安装unzip,并解压failover组件:

(python37) [root@node1 software]# yum -y install unzip(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip

2) 使用pip进行安装failover需要的依赖包

需要在node1节点上安装failover需要的依赖包。

(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .

3) node1节点初始化failover

(python37) [root@node1 ~]# scheduler_failover_controller initAdding Scheduler Failover configs to Airflow config file...Finished adding Scheduler Failover configs to Airflow config file.Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.

注意:初始化airflow时,会向airflow.cfg配置中追加配置,因此需要先安装 airflow 并初始化。

4) 修改airflow.cfg

首先修改node1节点的AIRFLOW_HOME/airflow.cfg

[scheduler_failover]# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密scheduler_nodes_in_cluster = node1,node2#在1088行,特别注意,需要去掉一个分号,不然后期自动重启Scheduler不能正常启动airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

配置完成后,可以通过以下命令进行验证Airflow Master节点:

(python37) [root@node1 airflow]# scheduler_failover_controller test_connectionTesting Connection for host 'node1'(True, ['Connection Succeeded', ''])Testing Connection for host 'node2'(True, ['Connection Succeeded\n'])

将node1节点配置好的airflow.cfg同步发送到node2、node3、node4节点上:

(python37) [root@node1 ~]# cd /root/airflow/(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`

​​​​​​​6. 启动Airflow集群

1) 在所有节点安装启动Airflow依赖的python包

(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

2) 在Master1节点(node1)启动相应进程

#默认后台启动可以使用-D ,这里使用-D有时不能正常启动Airflow对应进程airflow webserverairflow scheduler

3) 在Master2节点(node2)启动相应进程

airflow webserver

4) 在Worker1(node3)、Worker2(node4)节点启动Worker

在node3、node4节点启动Worker:

(python37) [root@node3 ~]# airflow celery worker(python37) [root@node4 ~]# airflow celery worker

5) 在node1启动Scheduler HA

(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &

​​​​​​​

至此,Airflow高可用集群搭建完成。

​​​​​​​7. 访问Airflow 集群WebUI

浏览器输入node1:8080,查看Airflow WebUI:

8. 测试Airflow HA

1) 准备shell脚本

Airflow集群所有节点{AIRFLOW_HOME}目录下创建dags目录,准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

2) 编写airflow python 配置

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

将以上内容写入execute_shell.py文件,上传到所有Airflow节点{AIRFLOW_HOME}/dags目录下。

3) 重启Airflow,进入Airflow WebUI查看对应的调度

重启Airflow之前首先在node1节点关闭webserver ,Scheduler进程,在node2节点关闭webserver ,Scheduler进程,在node3,node4节点上关闭worker进程。

如果各个进程是后台启动,查看后台进程方式:

(python37) [root@node1 dags]# ps aux |grep webserver(python37) [root@node1 dags]# ps aux |grep scheduler(python37) [root@node2 dags]# ps aux |grep webserver(python37) [root@node2 dags]# ps aux |grep scheduler(python37) [root@node3 ~]# ps aux|grep "celery worker"(python37) [root@node4 ~]# ps aux|grep "celery worker"找到对应的启动命令对应的进程号,进行kill。

重启后进入Airflow WebUI查看任务:

点击“success”任务后,可以看到脚本执行成功日志:

​​​​​​​4) 测试Airflow HA

当我们把node1节点的websever关闭后,可以直接通过node2节点访问airflow webui:

在node1节点上,查找“scheduler”进程并kill,测试scheduler HA 是否生效:

(python37) [root@node1 ~]# ps aux|grep schedulerroot      23744  0.9  3.3 326940 63028 pts/2    S    00:08   0:02 airflow scheduler -- DagFileProcessorManager#kill 掉scheduler进程(python37) [root@node1 ~]# kill -9 23744

访问webserver webui

在node1节点查看scheduler_failover_controller进程日志中有启动schudler动作,注意:这里是先从node1启动,启动不起来再从其他Master 节点启动Schduler。


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/175895.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】探索C++模板编程

文章目录 什么是C模板?模板的基本语法类型模板参数模板函数的示例类模板的示例总结 C模板是一种强大的编程工具,它可以实现泛型编程,使代码更加灵活和可重用。本篇博客将介绍C模板的基本语法、类型模板参数和模板函数的使用,并通过…

MaaS/PaaS/SaaS

生成式AI时代的AI Infra—从DevOps->MLOps->LLMOps - 知乎距离上次讲LLM相关的内容已经过去2个月了 LLM as Controller—无限拓展LLM的能力边界,本文想要从AI Infra的角度出发,从更宏观的角度看Generative AI对AI Infra生态产生的变化,…

Web前端开发技术:图像与多媒体文件

在现代的Web开发中,图像和多媒体文件在各种网站和应用程序中扮演着至关重要的角色。它们不仅能提供更丰富的内容,还能大大提高应用程序的吸引力和用户体验。本文将深入介绍一些关键的Web前端开发技术,这些技术将有助于开发者在处理图像和多媒…

前馈式神经网络与反馈式神经网络的区别,联系,各自的应用范围和场景!!!

文章目录 前言一、前馈式神经网络是什么?二、前馈式神经网络包括:三、反馈式神经网络是什么?四、反馈式神经网络包括:总结 前言 前馈式神经网络和反馈式神经网络是两种主要的神经网络架构,它们在网络结构和应用场景上…

Python---引用变量与可变、非可变类型

引用变量 在大多数编程语言中,值的传递通常可以分为两种形式“ 值 传递 与 引用 传递”,但是在Python中变量的传递基本上都是引用传递。 变量在内存底层的存储形式 a 10 第一步:首先在计算机内存中创建一个数值10(占用一块…

【Leetcode】907. 子数组的最小值之和

给定一个整数数组 arr,找到 min(b) 的总和,其中 b 的范围为 arr 的每个(连续)子数组。 由于答案可能很大,因此 返回答案模 10^9 7 。 示例 1: 输入:arr [3,1,2,4] 输出:17 解释&…

类 —— 封装、四类特殊成员函数、this指针、匿名对象、深浅拷贝问题

类 将同一类对象的所有属性都封装起来。 类中最基础的内容包括两部分,一个是属性、一个是行为。 ● 属性:表示一些特征项的数值,比如说:身高、体重、性别、肤色。这些属性都是名词。属性一般都以名词存在。属性的数值&#xff0c…

算法基础之食物链

食物链 核心思想&#xff1a;带权并查集 用距根节点和距离表示与根节点的关系 求距离 #include<iostream>using namespace std;const int N50010;int n,m;int p[N],d[N];//找到祖宗节点(路径压缩) 并求出对应距离int find(int x){if(p[x]!x){int up[x]; //保存旧父节点…

如何使用 Java 在Excel中创建下拉列表

下拉列表&#xff08;下拉框&#xff09;可以确保用户仅从预先给定的选项中进行选择&#xff0c;这样不仅能减少数据输入错误&#xff0c;还能节省时间提高效率。在MS Excel中&#xff0c;我们可以通过 “数据验证” 提供的选项来创建下拉列表&#xff0c;但如果要在Java程序中…

mysql账户密码获取

数据库安装目录 MySQL\data\mysql 里面的user.MYD文件&#xff0c;需要编译查看 数据库里的user表 库下面的user表拿到后&#xff0c;直接解密密码即可 网站配置文件 conn、config、data、sql、common 、inc这些文件 比如pikachu\inc目录下的config.inc.php文件的内容会显示…

速通CSAPP(一)计算机系统漫游入门

CSAPP学习 前言 一门经典的计组课程&#xff0c;我却到了大四才学。 anyway&#xff0c;何时都不会晚。 博主参考的教程&#xff1a;本电子书信息 - 深入理解计算机系统&#xff08;CSAPP&#xff09; (gitbook.io)&#xff0c;非常感谢作者的整理。 诚然去看英文版可以学…

【开源】基于Vue和SpringBoot的木马文件检测系统

项目编号&#xff1a; S 041 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S041&#xff0c;文末获取源码。} 项目编号&#xff1a;S041&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 木马分类模块2.3 木…

软著项目推荐 深度学习中文汉字识别

文章目录 0 前言1 数据集合2 网络构建3 模型训练4 模型性能评估5 文字预测6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习中文汉字识别 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xf…

【Vue】Linux 运行 npm run serve 报错 vue-cli-service: Permission denied

问题描述 在Linux系统上运行npm run serve命令时&#xff0c;控制台报错&#xff1a; sudo npm run serve project50.1.0 serve vue-cli-service serve sh: 1: vue-cli-service: Permission denied错误截图如下&#xff1a; 原因分析 该错误是由于vue-cli-service文件权限不…

线性转换函数S_RTR(SCL和ST代码)

模拟量转换函数S_ITR详细公式和算法源代码请查看下面文章链接: PLC模拟量输入 模拟量转换FC S_ITR_博途模拟量转换程序_RXXW_Dor的博客-CSDN博客文章浏览阅读5.4k次,点赞4次,收藏7次。模拟量采集、工业现场应用特别广泛、大部分传感器的测量值和输出信号都是线型关系,所以…

Rocky Linux 9.3 为 PowerPC 64 位带回云和容器镜像

RHEL 克隆版 Rocky Linux 9.3 今天发布了&#xff0c;作为红帽企业 Linux 发行版 CentOS Stream 和 Red Hat Enterprise Linux 的免费替代版本&#xff0c;现在可供下载。 Rocky Linux 9.3 是在 Rocky Linux 9.2 发布 6 个月之后发布的&#xff0c;它带回了 PowerPC 64 位 Lit…

4D雷达目标检测跟踪算法设计

1.算法流程 4D雷达点云跟踪处理沿用3D毫米波雷达的处理流程&#xff0c;如下图&#xff1a; 从接收到点云开始&#xff0c;先对点云做标定、坐标转换、噪点剔除、动静分离&#xff0c;再分别对动态目标和静态目标做聚类&#xff0c;然后根据聚类结果做目标的特征分析和检测等&a…

leetcode42接雨水问题

接雨水 题目描述 题目分析 核心思想&#xff1a; 代码 java版本&#xff1a; package com.pxx.leetcode.trapRainWaterDoublePoniter;public class Solution1 {public int trap(int[] height) {if (height.length 0) {return 0;}int n height.length;int left 0;int righ…

LabVIEWL实现鸟巢等大型结构健康监测

LabVIEWL实现鸟巢等大型结构健康监测 管理国家地震防备和减灾的政府机构中国地震局(CEA)选择了七座新建的巨型结构作为结构健康监测(SHM)技术的测试台。这些标志性建筑包括北京2008年夏季奥运会场馆&#xff08;包括北京国家体育场和北京国家游泳中心&#xff09;、上海104层的…

Eureka简单使用做微服务模块之间动态请求

创建一个eureka模块,引入eureka 为启动项加上EnableEurekaServer注解 配置信息 orderService和userService的操作是一样的 这里以orderService为例: 引入eureka客户端 加上 LoadBalanced注解 配置 orderService和userService都配置好了之后 启动 这样我们在http://localhos…