azkaban-tools 项目介绍

本文背景

应一个用户的好心和好奇心,在最近水深火热的百忙之中抽时间写完了一个简短的项目介绍,其实就是几个azkaban的批量操作脚本,但在大数据集群的“运维生涯”中,还是帮了自己不少忙,也算是为了它做一个简单的回顾吧

项目背景

azkaban 是一个大数据领域通用的任务管理服务,它的运行模式和其他任务管理服务类似,都是将任务下发到执行器,定期执行,它的优势主要在于可定义任务流,同个项目下不同任务可引用同个模板,大数据领域的任务正好比较具有复用性,因此在 azkaban 诞生的时代(第一个release在2014年),它还是成为了当时比较流行的开源任务调度服务

azkaban 的操作方式比较容易上手,通过界面即可完成所有的操作,包括上传项目、执行项目中定义的job、查看job日志、给任务配置调度时间等,操作并不复杂。但如果需要批量做一些操作,在界面一个个点就不太方便了

之前没有做这个项目的时候,隔三差五用户就要来找我“能不能帮忙…”(具体对话参考下面),终于有一天没忍住,本项目就此诞生…

项目主要是参考了 azkaban api 文档,通过若干脚本实现了一些常见的批量操作,项目地址: azkaban-tools

目前实现的批量操作场景如下:

批量操作① 启动任务

每年都会有个一两次的真实对话

在这里插入图片描述

批量操作② 启动任务

azkaban 默认不允许同时执行同一个任务,因此如果任务在上个周期执行一直没结束,到下个周期也不会被触发

于是偶尔也会有以下的对话

在这里插入图片描述

批量操作③ 设置调度

对离线抽数任务来说,一般都会在每天一个固定的时间调度一次,抽数完后执行上层的任务,一般就是这种场景

在这里插入图片描述

批量操作④ 设置调度

背景同上,调度周期如果要修改,比如从早上8点改成9点,需要先把原来的调度删除后再创建新的调度

如何使用

本地部署 azkaban

azkaban 提供两种部署模式: solo (单节点)和 webserver+executor集群模式,生产环境肯定是采用后者,本地测试可以通过 solo 模式快速部署

# 第三方镜像
docker run -d -p 8081:8081 --name azkaban-srv -e TZ='Asia/Shanghai' haxqer/azkaban# 下载代码
git clone https://github.com/azkaban/azkaban
# 注意: 官方仓库久未维护,编译会有报错,也可以拉取笔者 fork 后修复的仓库
git clone https://github.com/smiecj/azkaban -b b_3_90_extend# 编译
./gradlew build installDist# 本地solo模式启动
cd azkaban-solo-server/build/install/azkaban-solo-server
./bin/start-solo.sh

访问刚启动的 solo 服务器: http://localhost:8081 ,默认用户名密码都是 azkaban

首次登录,首页还没有项目

上传项目

这里我们创建一个示例项目 examples, 并以 azkaban_examples 作为项目代码,压缩成zip包后上传

git clone https://github.com/joeharris76/azkaban_examples
zip -r azkaban_examples.zip azkaban_examples

上传任务压缩包后azkaban会解析出任务结构
azkaban 项目下的任务结构一般是 flow->job->template,flow 是父任务,job 为子任务。flow 可以通过串行或并行定义一组job的关系,job 可以直接定义任务行为,也可以引用template,在template中定义具体行为

以笔者实际维护的离线抽数作为例子,就是一个比较标准的azkaban任务格式:

在这里插入图片描述

所有抽数任务都在每天早上7点执行,每个抽数任务都对应一张mysql表,具体指令都是通过sqoop指令将数据从mysql导入hive表,区别只是执行sqoop的参数(即库表名)。这种任务我们就可以在 template 中编写sqoop的执行逻辑,每个表的同步任务作为一个job,都引用 template ,在job配置中定义表明即可

具体的任务定义示例:

# template/sqoop.job
command=sqoop import -Dmapreduce.job.user.classpath.first=true --connect jdbc:mysql://mysql_host:mysql_port/${mysql_db} --username mysql_user --password mysql_pwd --table ${mysql_table} -m 1 --target-dir /import/stg/${mysql_table} --as-avrodatafile# stg/table1.job
mysql_db=element
mysql_table=orderscommand=echo "import mysql table"type=flow
flow.name=sqoop

examples 项目结构也比较简单,basic_flow 和 workflow 是最上层的 flow,basic_flow 引用的8个job直接定义了执行echo的行为;workflow引用的4个job则都属于同一个template,同样都是echo操作,只是最后打印的内容不同

basic_flow

workflow

手动执行两个flow,可以看到它们的执行速度都非常快,因为到最后的子任务都只是执行 echo

在这里插入图片描述

为了方便接下来的测试,我们可以给basic_flow下的job添加sleep以增加执行时间

# 每个job增加睡眠时间,序号越大睡眠时间越长
for i in {1..8}
doecho "command.1=sleep ${i}" >> basic_flow/basic_step_${i}.cmd.job
done

登录配置

在正式使用 azkaban-tools 之前,我们需要先修改 env.sh 中和服务相关的配置

# search_env_name: 查询操作的环境
search_env_name=produce# exec_env_name: 执行、修改等操作的环境
# 注: 一开始开发这个项目是为了将一个老集群部署的azkaban上的所有任务调度都迁移到新集群服务上。为防止误操作生产环境,区分了进行查询和执行操作时使用不同的环境
exec_env_name=test## produce env 连接信息
produce_azkaban_address=localhost:8081
produce_azkaban_user=azkaban
produce_azkaban_password=azkaban## test env 连接信息
## 这里故意将测试环境地址配置成和生产环境不同(虽然都是本地地址),是为了接下来测试批量停止任务时,不会因为脚本中判断“执行环境地址和生产环境地址完全一致”而停止执行
test_azkaban_address=127.0.0.1:8081
test_azkaban_user=azkaban
test_azkaban_password=azkaban

jq 工具

项目通过 jq 工具来解析 azkaban 接口返回的json数据

jq 可以通过 yum/apt 安装,或者直接下载可执行文件

wget https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64
mv jq-linux64 /usr/local/bin/jq
chmod +x jq

jq 作为命令行工具,解析json还是非常好用的,常用指令可以参考后面的章节

批量启动任务

如果需要把examples的两个flow都调度起来,可以这么配置:

# 需要执行的项目名
execute_project_name="examples"
# 禁止执行的任务名
execute_block_flow_names="(^_template$)"
# 允许执行的任务名
execute_allow_flow_names=""

配置完成后直接执行 make run,即可同时拉起两个任务

在这里插入图片描述

批量停止任务

配置和启动任务一样,执行 make kill,即可把项目下所有执行中的任务停止

批量创建调度

# 操作项目名
target_project_name="examples"# 需要设置调度的白名单,不配置默认会设置项目下的所有任务
flow_name_allowlist="(^workflow$|^basic_flow$)"# 设置的调度周期,java cron 格式
fix_schedule_cron="0 0 18 * * ?"

注: 已经配有调度的任务不会覆盖

在这里插入图片描述

批量删除调度

配置和创建调度相同,执行 make clean_cron即可删除任务调度

部分脚本逻辑

登录

# 登录方法需要返回azkaban地址和登录成功得到的session id 两个信息
login() {local env_name=$1eval $(get_azkaban_env_info_by_envname $env_name)local login_ret=`curl -X POST --data "action=$command_login&username=$tmp_azkaban_user&password=$tmp_azkaban_password" http://$tmp_azkaban_address 2>/dev/null`local tmp_session_id=`echo "$login_ret" | jq '.["session.id"] // empty' | tr -d '"'`echo "tmp_azkaban_address=$tmp_azkaban_address"echo "tmp_session_id=$tmp_session_id"
}# eval $(login $exec_env_name)
# session_id=$tmp_session_id
# azkaban_address=$tmp_azkaban_address

shell脚本中返回多个值的方法参考,eval: 将传入的字符串当成指令执行

获取任务下的所有 flow

get_project_flow() {local project_name=$1local session_id=$2local azkaban_address=$3local get_project_ret=`curl "http://$azkaban_address/manager?ajax=$command_get_project_flows&project=$project_name&session.id=$session_id" 2>/dev/null`local tmp_project_id=`echo "$get_project_ret" | jq '.projectId // empty'`local tmp_flow_count=`echo "$get_project_ret" | jq '.flows | length'`local tmp_flow_name=`echo "$get_project_ret" | sed "s/ //g" | jq -r '[.flows[].flowId] | join(",")'`echo "project_id=$tmp_project_id"echo "flow_name_join_str=$tmp_flow_name"echo "flow_count=$tmp_flow_count"
}

jq 基本用法

参考了这篇文档和官方文档

# 获取某个key-value
echo '{"hello": "world"}' | jq '.hello' # world# 获取数组
echo '{"arr": [1,2,3]}' | jq '.arr' # [1,2,3]# 数组所有元素用逗号组合
# 注意: 如果数组元素类型是数字,还要先通过tostring转成字符串
# -r: 直接打印,不再组装成json
echo '{"arr": [1,2,3]}' | jq -r '[.arr[] | tostring] | join(",")' # 1,2,3# 判断
echo '{"hello": "world"}' | jq 'has("world")' # false# 选择器
echo '{"arr": [1,2,3,4,5]}' | jq -r '[.arr[] | select(.>2) | tostring] | join(",")' # 3,4,5# 选择器和映射
echo '{"arr": [1,2,3,4,5]}' | jq -r '.arr | map(select(.>2))' # [3,4,5]# 排序
echo '{"arr": [5,4,3,2,1]}' | jq -r '.arr | sort' # [1,2,3,4,5]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/836519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java | Leetcode Java题解之第85题最大矩形

题目&#xff1a; 题解&#xff1a; class Solution {public int maximalRectangle(char[][] matrix) {int m matrix.length;if (m 0) {return 0;}int n matrix[0].length;int[][] left new int[m][n];for (int i 0; i < m; i) {for (int j 0; j < n; j) {if (mat…

Python3 + Appium + 安卓模拟器实现APP自动化测试并生成测试报告

这篇文章主要介绍了Python3 Appium 安卓模拟器实现APP自动化测试并生成测试报告,本文给大家介绍的非常详细&#xff0c;对大家的学习或工作具有一定的参考借鉴价值&#xff0c;需要的朋友可以参考下 本文主要分为以下几个部分 安装Python3 安装Python3的Appium库 安装Andr…

Mp3tag for Mac:音乐标签,轻松管理

还在为杂乱无章的音乐文件而烦恼吗&#xff1f;Mp3tag for Mac&#xff0c;让您的音乐库焕然一新&#xff01;它支持多种音频格式&#xff0c;批量编辑标签&#xff0c;让音乐管理变得简单高效。同时&#xff0c;自动获取在线数据库的音乐元数据&#xff0c;确保您的音乐库始终…

kafka安装配置及集成springboot

1. 安装 单机安装kafka Kafka对于zookeeper是强依赖&#xff0c;保存kafka相关的节点数据&#xff0c;所以安装Kafka之前必须先安装zookeeper dockerhub网址: https://hub.docker.com Docker安装zookeeper 下载镜像&#xff1a; docker pull zookeeper:3.4.14创建容器 doc…

docker(五):DockerFile

文章目录 DockerFile1、Dockerfile构建过程解析2、DockerFile常用保留字命令FROMMAINTAINERRUNEXPOSEWORKDIRUSERENVADDCOPYVOLUMECMDENTRYPOINT总结 3、案例 DockerFile 1、Dockerfile构建过程解析 官网文档&#xff1a;https://docs.docker.com/reference/dockerfile/ Dock…

【论文阅读笔记】HermesSim(Code is not Natural Language) (Security 24)

个人博客地址 HermesSim [Security 24] 论文&#xff1a;《Code is not Natural Language: Unlock the Power of Semantics-Oriented Graph Representation for Binary Code Similarity Detection》 仓库&#xff1a;https://github.com/NSSL-SJTU/HermesSim 提出的问题 二…

JVM调优:JVM中的垃圾收集器详解

JVM&#xff08;Java Virtual Machine&#xff09;垃圾收集器是Java虚拟机中的一个重要组件&#xff0c;负责自动管理Java堆内存中的对象。垃圾收集器的主要任务是找出那些不再被程序使用的对象&#xff0c;并释放它们占用的内存&#xff0c;以便为新的对象分配空间。这个过程被…

C#泛型委托

在C#中&#xff0c;delegate 关键字用于声明委托&#xff08;delegates&#xff09;&#xff0c;委托是一种类型安全的函数指针&#xff0c;允许你传递方法作为参数或从方法返回方法。有时我们需要将一个函数作为另一个函数的参数&#xff0c;这时就要用到委托&#xff08;Dele…

算法题② —— 链表专栏

1. 链表数据结构 struct ListNode {int val;ListNode *next;ListNode() : val(0), next(nullptr) {}ListNode(int x) : val(x), next(nullptr) {}ListNode(int x, ListNode *next) : val(x), next(next) {}};2. 链表的删除 2.1 移除链表元素 力扣&#xff1a;https://leetco…

引擎:主程渲染

一、引擎发展 二、引擎使用 1.游戏渲染流程 2.3D场景编辑器操作与快捷键 3.节点的脚本组件 脚本介绍 引擎执行流程 物体节点、声音组件\物理组件\UI组件、脚本组件 暴露变量到面板 4.节点的查找 基本查找 this.node&#xff1a;挂载当前脚本的节点A&#xff1b; this.nod…

一、精准化测试介绍

精准化测试介绍 一、精准化测试是什么&#xff1f;二、什么是代码插桩&#xff1f;三、两种插桩方式Offine模式&#xff1a;On-the-fly插桩: 四、jacoco覆盖率报告展示五、增量代码覆盖率监控原理六、精准测试系统架构图七、全量与增量覆盖率报告包维度对比八、全量与增量覆盖率…

牛客NC343 和大于等于K的最短子数组【困难 前缀和 Java/Go】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/3e1fd3d19fb0479d94652d49c7e1ead1 思路 本答案利用前缀和解答&#xff0c;Java&#xff0c;Go答案通过&#xff0c;但是同样的代码用PHP的话有一个测试用例超时 应该还有更优秀的答案&#xff0c;后面找到更优…

2022——蓝桥杯十三届2022国赛大学B组真题

问题分析 看到这个问题的同学很容易想到用十层循环暴力计算&#xff0c;反正是道填空题&#xff0c;一直算总能算得出来的&#xff0c;还有些同学可能觉得十层循环太恐怖了&#xff0c;写成回溯更简洁一点。像下面这样 #include <bits/stdc.h> using namespace std; in…

apk反编译修改教程系列-----反编译apk 去除软件强制更新的八种方式步骤解析【十七】

安卓有的apk 软件会不断更新。但有些用户需要旧版的有些功能或者新版功能增减原因等等。需要不更新继续使用。这类问题有的可以简单修改版本号来跳过更新。或者有的软件可以忽略。但对于某些无法跳过更新界面等等的apk。就需要深度反编译来去除软件的强制更新。 通过课程可以了…

wsl安装Xfce桌面并设置系统语言和输入法

一、安装xfce &#xff08;有相关的依赖都会安装&#xff09; sudo apt -y install xfce4 二、 安装远程连接组件 sudo apt install xrdp -y 并重新启动 Xrdp 服务&#xff1a; sudo systemctl restart xrdp 本地windows系统中请按 winR 键 呼出运行 在运行中输入 mstsc…

1067: 有向图的邻接表存储强连通判断

解法&#xff1a; 定理&#xff1a;有向图G是强连通图的充分必要条件是G中存在一条经过所有节点的回路 跟上道题一样 这是错误代码 #include<iostream> #include<vector> using namespace std; int arr[100][100]; void dfs(vector<bool>& a,int u) {a…

唤醒手腕 Go 语言 并发编程、Channel通道、Context 详细教程(更新中)

并发编程概述 ​ 一个进程可以包含多个线程&#xff0c;这些线程运行的一定是同一个程序&#xff08;进程程序&#xff09;&#xff0c;且都由当前进程中已经存在的线程通过系统调用的方式创建出来。进程是资源分配的基本单位&#xff0c;线程是调度运行的基本单位&#xff0c…

【JAVA进阶篇教学】第十二篇:Java中ReentrantReadWriteLock锁讲解

博主打算从0-1讲解下java进阶篇教学&#xff0c;今天教学第十二篇&#xff1a;Java中ReentrantReadWriteLock锁讲解。 在并发编程中&#xff0c;读写锁&#xff08;ReadWriteLock&#xff09;是一种用于管理对共享资源的访问的锁机制&#xff0c;它提供了比传统的互斥锁更高的…

栈的讲解

栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。 进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底&#xff08;因为先进后出&#xff09;。栈中的数据元素遵守后进先出LIFO&#xff08;Last In Firs…

数据结构与算法===回溯法

文章目录 原理使用场景括号生成代码 小结 原理 回溯法是采用试错的思想&#xff0c;它尝试分步骤的去解决一个问题。在分步骤解决问题的过程中&#xff0c;当它通过尝试发现现有的分步答案不能得到有效的正确的解答的时候&#xff0c;它将取消上一步甚至是上几步的计算&#x…