大数据:任务调度https://blog.csdn.net/qq_43713049/article/details/116985497
文章目录
- 任务调度
- 一、任务流调度的需求
- 二、任务流调度的工具
- 三、Oozie的简介
- 四、Oozie的2种使用方式
- 五、WorkFlow 与 Fork 和 Join
- 六、SubFlow:子工作流
- 七、定时调度的实现
- 八、自动化调度的实现
- 1.自动化调度需求
- 2.自动化调度脚本
- 3.自动化调度实现
任务调度
一、任务流调度的需求
整体需求
- 相同的业务线,有不同的需求会有多个程序来实现,这多个程序共同完成的需求,组合在一起就是工作流或者叫做任务流
- 基于工作流来实现任务流的自动化运行
基于时间的任务运行
- Job1和Job2是在每天固定的时间去采集昨天的数据
- 每天00:00
基于运行依赖关系的任务运行
- Job3必须等待Job1运行成功,才能运行
- Job5必须等待Job3和Job4都运行成功才能运行
调度类型
- 定时调度:基于某种时间的规律进行调度运行
- 依赖调度:基于某种依赖关系进行调度运行
二、任务流调度的工具
Linux Crontab
Linux中自带的一个工具
-
优点
- 简单,不用做额外的部署,能实现大多数的定时需求
- crontab -e
-
缺点
- 只能做定时任务的执行
-
语法
* * * * * command 分钟 小时 日 月 周几
- 1
- 2
Oozie
Cloudera公司研发的Hadoop生态圈的调度工具
- 官网:oozie.apache.org
- 优点
- 功能很强大,能满足几乎所有常规的任务流调度的需求
- 支持DAG流程调度
- 缺点
- 本身不是分布式的工具,依赖于MapReduce来实现分布式
- 原生的交互开发接口不友好
- 整体的监控不完善
- 学习成本比较高
Zeus
阿里巴巴最早基于Hadoop1研发的一个调度系统,目前市场上的Zeus一般都是携程版本的Zeus
- 优点
- 交互非常友好
- 使用非常简单
- 分布式的,功能相对也比较全面
- 缺点
- Bug非常多,阿里没有继续研发Zeus,不支持Hadoop2
Azkaban
LinkedIn公司研发的分布式调度工具
- 优点
- 重点着重于自身的调度功能的研发,其他的辅助性功能都通过插件来完成
- 自身也是分布式调度系统
- 界面交互性比较友好
- 开发交互性:properties或者JSON
- 缺点
- 3.x版本开始才支持完全分布式
三、Oozie的简介
功能
- Oozie是一个专门为管理Hadoop生态的程序调度而设计的工作流调度系统
- 基于DAG实现依赖调度:WorkFlow
- 基于定时器实现定时调度:Coordinator
特点
- 优点:功能全面
- 缺点:部署相对复杂、原生开发方式过于复杂
应用
- 基于Hadoop平台的分布式离线任务流调度
原理
- 底层依赖于MapReduce,将工作流变成MapReduce程序,提交个YARN
- 由YARN来将不同的工作流分配到不同的机器上运行,用于构建分布式调度系统
四、Oozie的2种使用方式
原生方式
-
这种方式,是通过自己写代码的方式来实现工作流的开发,效率低,容易出问题,不用
-
实现一个效果:4个程序
- 第一个程序:shell脚本,定时运行的
- 第二个程序:Spark程序,必须等第一个程序运行完才能运行
- 第三个程序:MapReduce程序,必须等第二个程序运行才能运行’
- 第四个程序:Hive,必须等第三个程序运行完才能运行
-
先要开发一个XML文件
- 控制节点:start、end、kill
- 控制程序运行的流程
- start:开始节点
- end:终止节点
- kill:强制退出节点
- fork:分支节点
- join:合并节点
- 程序节点:action
- 控制节点:start、end、kill
<start to="first">
<action name="first"><shell><path>xxx.sh</path><args></args></shell><ok to="second"> </ok><error to="kill"></error>
</action>
<action name ="second"><spark><jar></jar><class></class>……</spark><ok to="third"> </ok>
<error to="kill"></error>
<action><action name ="forth"><hive><scprit></scprit><path></path>……</hive><ok to="end"> </ok><error to="kill"></error>
<action>
<kill name="kill"> kill
</kill><end name="end"> end
</end>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
集成Hue
-
由于Oozie原生的方式交互性非常差,导致用户上手非常困难
-
Cloudera基于可视化需求,在Hue中集成Oozie开发和监控
五、WorkFlow 与 Fork 和 Join
创建测试脚本
- 启动Oozie:在第一台机器
- 启动:start-oozie.sh
- 启动:start-oozie.sh
- 关闭:stop-oozie.sh
测试
- 创建四个脚本
mkdir /export/data/flow
- 1
- /export/data/flow/test01.sh
#!/bin/bash
echo "this is test01"
- 1
- 2
- /export/data/flow/test02.sh
#!/bin/bash
echo "this is test02"
- 1
- 2
- /export/data/flow/test03.sh
#!/bin/bash
echo "this is test03"
- 1
- 2
- /export/data/flow/test04.sh
#!/bin/bash
echo "this is test04"
- 1
- 2
- 上传到HDFS
hdfs dfs -put /export/data/flow /user/oozie/
- 1
单job工作流
需求1:构建一个工作流,执行test01
多job工作流
需求2:构建一个工作流,先执行test01,再执行test02,最后执行test03
分支工作流
-
需求3
-
test01先执行
-
test01执行完成,test02和test03并行执行
-
test02和test03都执行完成,执行test04
六、SubFlow:子工作流
需求:在调度运行一个工作流的实现,需要嵌套调用另外一个工作流
七、定时调度的实现
八、自动化调度的实现
1.自动化调度需求
- 目标:自动化实现增量任务流调度
- 实施
- 第一个job:增量采集
- 第二个job:统计昨天的订单总个数
- 第三个job:统计昨天的订单总金额
- 第四个job:合并二和三的结果,得到每天的订单总个数,和总金额,导出到MySQL
2.自动化调度脚本
-
目标:实现自动化脚本调度的开发
-
路径
- step1:增量采集脚本job1
- step2:增量统计个数脚本job2
- step3:增量统计金额脚本job3
- step4:增量合并导出脚本job4
-
实施
增量采集脚本job1
- 创建脚本
vim /export/data/shell/01.collect.sh
- 1
- 开发Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo “参数至多只能有一个,为处理的日期,请重新运行!”
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=KaTeX parse error: Expected 'EOF', got '#' at position 13: 1 fi else #̲参数个数为0,默认处理昨天的日…{yesterday}"
echo “step2:开始运行采集的程序”
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
SQOOPHOME/bin/sqoopimport−−connectjdbc:mysql://node3:3306/dborder−−usernameroot−−password−filehdfs://node1:8020/user/oozie/shell/sqoop.passwd−−query"select∗fromtborderwheresubstring(createtime,1,10)=′SQOOP_HOME/bin/sqoop import \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \ --query "select * from tb_order where substring(create_time,1,10) = 'SQOOPHOME/bin/sqoopimport −−connectjdbc:mysql://node3:3306/dborder −−usernameroot −−password−filehdfs://node1:8020/user/oozie/shell/sqoop.passwd −−query"select∗fromtborderwheresubstring(createtime,1,10)=′{yesterday}’ and $CONDITIONS "
–delete-target-dir
–target-dir /nginx/logs/tb_order/daystr=${yesterday}
–fields-terminated-by ‘\t’
-m 1
echo “step2:采集的程序运行结束”
echo “step3:开始运行ETL”
#模拟ETL的过程,将采集的新增的数据移动到表的目录下
HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
#先判断结果是否存在,如果已经存在,先删除再移动
HADOOPHOME/bin/hdfsdfs−test−e/user/hive/warehouse/tborder/daystr=HADOOP_HOME/bin/hdfs dfs -test -e /user/hive/warehouse/tb_order/daystr=HADOOPHOME/bin/hdfsdfs−test−e/user/hive/warehouse/tborder/daystr={yesterday}
if [ $? -eq 0 ]
then
#存在
HADOOPHOME/bin/hdfsdfs−rm−r/user/hive/warehouse/tborder/daystr=HADOOP_HOME/bin/hdfs dfs -rm -r /user/hive/warehouse/tb_order/daystr=HADOOPHOME/bin/hdfsdfs−rm−r/user/hive/warehouse/tborder/daystr={yesterday}
HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr=HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr={yesterday} /user/hive/warehouse/tb_order/
else
#不存在
HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr=HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr={yesterday} /user/hive/warehouse/tb_order/
fi
echo “step3:ETL结束”
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
增量统计个数脚本job2
- 创建脚本
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
- 1
- 2
- 开发Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho "参数至多只能有一个,为处理的日期,请重新运行!"exit 100else#参数个数只有1个,就用第一个参数作为处理的日期yesterday=$1fi
else#参数个数为0,默认处理昨天的日期yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/02.analysis.sql
echo "step2:分析的程序运行结束"
- 开发SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/tb_order';
alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');create table if not exists default.tb_order_num_rs(
daystr string,
order_number int
)
row format delimited fields terminated by '\t';insert into table default.tb_order_num_rs
select
daystr,
count(id) as order_number
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
增量统计金额脚本job3
- 创建脚本
vim /export/data/shell/03.analysis.sh
vim /export/data/shell/03.analysis.sql
- 1
- 2
- 开发Shell脚本
#!/bin/bash#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho "参数至多只能有一个,为处理的日期,请重新运行!"exit 100else#参数个数只有1个,就用第一个参数作为处理的日期yesterday=$1fi
else#参数个数为0,默认处理昨天的日期yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/03.analysis.sqlecho "step2:分析的程序运行结束"
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 开发SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/tb_order';alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');create table if not exists default.tb_order_price_rs(
daystr string,
order_price double
)
row format delimited fields terminated by '\t';insert into table default.tb_order_price_rs
select
daystr,
sum(price) as order_price
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
增量合并导出脚本job4
- 创建脚本
vim /export/data/shell/04.export.sh
vim /export/data/shell/04.export.sql
- 1
- 2
- 开发Shell脚本
#!/bin/bash#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho "参数至多只能有一个,为处理的日期,请重新运行!"exit 100else#参数个数只有1个,就用第一个参数作为处理的日期yesterday=$1fi
else#参数个数为0,默认处理昨天的日期yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/04.export.sqlecho "step2:分析的程序运行结束"echo "step3:开始运行导出的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop export \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \
--table tb_order_rs \
--hcatalog-database default \
--hcatalog-table tb_order_rs \
--input-fields-terminated-by '\t' \
--update-key daystr \
--update-mode allowinsert \
-m 1echo "step3:导出的程序运行结束"
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 开发SQL文件
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double)
row format delimited fields terminated by '\t';insert into table default.tb_order_rs
select
a.daystr,
a.order_number,
b.order_price
from default.tb_order_num_rs a join default.tb_order_price_rs b on a.daystr = b.daystr
where a.daystr='${hiveconf:yest}';
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
3.自动化调度实现
上传
cp /export/data/sqoop.passwd /export/data/shell/
hdfs dfs -put /export/data/shell /user/oozie/
- 1
- 2
在MySQL中导入最新数据
use db_order;
insert into tb_order values('o00013','p00009','u00001',121,'2021-05-17 00:01:01');
insert into tb_order values('o00014','p00010','u00002',122,'2021-05-17 10:01:02');
insert into tb_order values('o00015','p00011','u00003',123,'2021-05-17 11:01:03');
insert into tb_order values('o00016','p00012','u00004',124,'2021-05-17 23:01:04');
- 1
- 2
- 3
- 4
- 5