检查任务
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 每一个有依赖其他flow的flow,都要先进行依赖的检查* 这是一个切面*/
public class FlowCheck {private static String flow_id = "order";public static void main(String[] args) {ScheduledExecutorService service = Executors.newScheduledThreadPool(1);final MysqlDao azkabanDao = new MysqlDao();if (System.getProperties().getProperty("os.name").contains("Windows")) {} else {flow_id = args[0];}/*** 坑,会吞掉异常*/service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MysqlDao.AzkabanExecInfo info = azkabanDao.getAzkabanJobExecInfo(flow_id);/*** {"ok":false,"reason":"submitDay is 2022-09-29 and today is 2022-09-30","status":"50"}*/String output = JsonUtil.getJsonStrFromObj(info);if (info.getOk()) {System.out.println(output);service.shutdown();} else {if ("30".equals(info.getStatus())) {//wait} else {System.out.println(output);service.shutdown();}}} catch (SQLException throwables) {throwables.printStackTrace();}}}, 0, 5, TimeUnit.SECONDS);}
}
jobA
#!/bin/bash
echo "do JobA"r=`java -cp ./scheduler-1.0-SNAPSHOT-jar-with-dependencies.jar com.mz.scheduler.FlowCheck common_process`echo 'result is:'$recho ${r} >> "${JOB_OUTPUT_PROP_FILE}"
jobB
新的flow开启的第一个job
#!/bin/bash
echo "do JobB"
定义flow
jobA和jobB任务依赖文件
nodes:- name: jobAtype: commandconfig:command: sh jobA.sh- name: jobBtype: commanddependsOn:- jobAconfig:command: sh jobB.shcondition: ${jobA:ok} == "true"
打包
这种情况下的打包需要将jar包一同打入zip
因为JobA引用了java处理的逻辑