目录
- 前言
- XXL-Job API接口
- 添加任务API
- 动态添加任务
- 动态启动任务
前言
看该文章之前,最好看一下之前的文章,比较方便我们理解
XXL-Job详解(一):组件架构
XXL-Job详解(二):安装部署
XXL-Job详解(三):任务开发
XXL-Job详解(四):任务注册原理
XXL-Job API接口
我们之前xxl-job添加任务,需要去管理后台进行添加,在一些任务量大或者需要实时添加任务的情况,通过手动添加任务是不现实的,那有什么方法呢?
我们需要知道,我们在页面添加任务,其实调用的就是调度平台的API接口,调度平台的API接口就在xxl-job-admin的com.xxl.job.admin.controller目录下
那我们怎么知道添加任务是哪个接口呢,很简单,我们在页面添加一个任务,看页面请求了哪个接口就行,我新增了一个任务,可以看到,调度平台是请求了add接口,很明显,add就是我们调度平台添加任务的接口
添加任务API
我们去调度平台项目查找add接口,add接口就在JobInfoController下,add接口如下
@RequestMapping("/add")@ResponseBodypublic ReturnT<String> add(XxlJobInfo jobInfo) {return xxlJobService.add(jobInfo);}
下面我们看一下service的add方法
@Overridepublic ReturnT<String> add(XxlJobInfo jobInfo) {// valid baseXxlJobGroup group = xxlJobGroupDao.load(jobInfo.getJobGroup());if (group == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_choose")+I18nUtil.getString("jobinfo_field_jobgroup")) );}if (jobInfo.getJobDesc()==null || jobInfo.getJobDesc().trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input")+I18nUtil.getString("jobinfo_field_jobdesc")) );}if (jobInfo.getAuthor()==null || jobInfo.getAuthor().trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input")+I18nUtil.getString("jobinfo_field_author")) );}// valid triggerScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);if (scheduleTypeEnum == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );}if (scheduleTypeEnum == ScheduleTypeEnum.CRON) {if (jobInfo.getScheduleConf()==null || !CronExpression.isValidExpression(jobInfo.getScheduleConf())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Cron"+I18nUtil.getString("system_unvalid"));}} else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE/* || scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY*/) {if (jobInfo.getScheduleConf() == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")) );}try {int fixSecond = Integer.valueOf(jobInfo.getScheduleConf());if (fixSecond < 1) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );}} catch (Exception e) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );}}// valid jobif (GlueTypeEnum.match(jobInfo.getGlueType()) == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_gluetype")+I18nUtil.getString("system_unvalid")) );}if (GlueTypeEnum.BEAN==GlueTypeEnum.match(jobInfo.getGlueType()) && (jobInfo.getExecutorHandler()==null || jobInfo.getExecutorHandler().trim().length()==0) ) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input")+"JobHandler") );}// 》fix "\r" in shellif (GlueTypeEnum.GLUE_SHELL==GlueTypeEnum.match(jobInfo.getGlueType()) && jobInfo.getGlueSource()!=null) {jobInfo.setGlueSource(jobInfo.getGlueSource().replaceAll("\r", ""));}// valid advancedif (ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_executorRouteStrategy")+I18nUtil.getString("system_unvalid")) );}if (MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), null) == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("misfire_strategy")+I18nUtil.getString("system_unvalid")) );}if (ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), null) == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_executorBlockStrategy")+I18nUtil.getString("system_unvalid")) );}// 》ChildJobId validif (jobInfo.getChildJobId()!=null && jobInfo.getChildJobId().trim().length()>0) {String[] childJobIds = jobInfo.getChildJobId().split(",");for (String childJobIdItem: childJobIds) {if (childJobIdItem!=null && childJobIdItem.trim().length()>0 && isNumeric(childJobIdItem)) {XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.parseInt(childJobIdItem));if (childJobInfo==null) {return new ReturnT<String>(ReturnT.FAIL_CODE,MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId")+"({0})"+I18nUtil.getString("system_not_found")), childJobIdItem));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE,MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId")+"({0})"+I18nUtil.getString("system_unvalid")), childJobIdItem));}}// join , avoid "xxx,,"String temp = "";for (String item:childJobIds) {temp += item + ",";}temp = temp.substring(0, temp.length()-1);jobInfo.setChildJobId(temp);}// add in dbjobInfo.setAddTime(new Date());jobInfo.setUpdateTime(new Date());jobInfo.setGlueUpdatetime(new Date());xxlJobInfoDao.save(jobInfo);if (jobInfo.getId() < 1) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_add")+I18nUtil.getString("system_fail")) );}return new ReturnT<String>(String.valueOf(jobInfo.getId()));}
这个方法很简单,最重要就是两步
1、校验jobinfo参数
2、插入jobinfo到表
好了,到这里我们就基本了解了xxl-job添加任务的流程了,那么我们想动态添加任务就很简单了,主要流程是以下两步
1、构造jobinfo对象
2、以jobinfo对象为入参请求add接口,请求add接口我们可以使用RestTemplate或者openFeign等,大家按自己项目来决定,下面我使用的是RestTemplate
动态添加任务
1、首先把调度平台的jobinfo对象拷贝到我们的执行器项目,jobinfo对象在com.xxl.job.admin.core.model目录下,我们按需拷贝对象
public class XxlJobInfo {private int id; // 主键IDprivate int jobGroup; // 执行器主键IDprivate String jobDesc;private Date addTime;private Date updateTime;private String author; // 负责人private String alarmEmail; // 报警邮件private String scheduleType; // 调度类型private String scheduleConf; // 调度配置,值含义取决于调度类型private String misfireStrategy; // 调度过期策略private String executorRouteStrategy; // 执行器路由策略private String executorHandler; // 执行器,任务Handler名称private String executorParam; // 执行器,任务参数private String executorBlockStrategy; // 阻塞处理策略private int executorTimeout; // 任务执行超时时间,单位秒private int executorFailRetryCount; // 失败重试次数private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnumprivate String glueSource; // GLUE源代码private String glueRemark; // GLUE备注private Date glueUpdatetime; // GLUE更新时间private String childJobId; // 子任务ID,多个逗号分隔private int triggerStatus; // 调度状态:0-停止,1-运行private long triggerLastTime; // 上次调度时间private long triggerNextTime; // 下次调度时间}
2、放开登录校验
因为调度平台的api接口都是在后台,要请求接口,都需要通过登录校验,要解决这个问题,有两个方法
1、模拟登录(请求登录接口),获取登录cookie
2、在我们需要请求的接口上添加@PermissionLimit(limit = false)注解 ,默认是true,也就是需要做登录验证
/*** 权限限制* @author xuxueli 2015-12-12 18:29:02*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PermissionLimit {/*** 登录拦截 (默认拦截)*/boolean limit() default true;/*** 要求管理员权限** @return*/boolean adminuser() default false;}
我为了方便,就直接使用PermissionLimit注解放开校验,大家按自己实际情况选择
为了不影响原有的接口,新增一个addJob接口,在这个接口上进行修改
在add接口添加PermissionLimit注解,如下所示
@RequestMapping("/addJob")@ResponseBody@PermissionLimit(limit = false)public ReturnT<String> addJob(@RequestBody XxlJobInfo jobInfo) {return xxlJobService.add(jobInfo);}
因为我们在设置XxlJobInfo 信息的时候,还需要获取XxlJobGroup的信息,所以我们还需要在JobGroupController添加以下接口
@RequestMapping("/loadByAppName")@ResponseBody@PermissionLimit(limit = false)public ReturnT<XxlJobGroup> loadByAppName(@RequestBody XxlJobGroup xxlJobGroup){XxlJobGroup jobGroup = xxlJobGroupDao.loadByAppName(xxlJobGroup);return jobGroup!=null?new ReturnT<XxlJobGroup>(jobGroup):new ReturnT<XxlJobGroup>(ReturnT.FAIL_CODE, null);}
loadByAppName也是我新增的,如下所示,使用appname查找XxlJobGroup信息
<select id="loadByAppName" resultType="com.xxl.job.admin.core.model.XxlJobGroup">SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.app_name = #{appname}</select>
3、添加请求工具类
@Component
public class XxlJobUtil {/*** xxl-job地址*/@Value("${xxl.job.admin.addresses}")private String adminAddresses;/*** xxl-job标识*/@Value("${xxl.job.executor.appname}")private String appname;private RestTemplate restTemplate = new RestTemplate();/*** 调度平台的api*/private static final String ADD_URL = "/jobinfo/addJob";private static final String GET_GROUP_INFO = "/jobgroup/loadByAppName";/*** 添加任务* @param jobInfo* @return*/public String add(XxlJobInfo jobInfo){// 查询appname对应groupId:Map<String,Object> param = new HashMap<>();param.put("appname", appname);String json = JSON.toJSONString(param);String result = doPost(adminAddresses + GET_GROUP_INFO, json);JSONObject jsonObject = JSON.parseObject(result);String content = jsonObject.getString("content");if (content != null){XxlJobGroup jobGroup = JSONObject.parseObject(content,XxlJobGroup.class);int groupId = jobGroup.getId();jobInfo.setJobGroup(groupId);}//发送请求String json2 = JSON.toJSONString(jobInfo);return doPost(adminAddresses + ADD_URL, json2);}/*** 发送请求* @param url* @param json* @return*/public String doPost(String url, String json){HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<String> entity = new HttpEntity<>(json ,headers);ResponseEntity<String> stringResponseEntity = restTemplate.postForEntity(url, entity, String.class);return stringResponseEntity.getBody().toString();}}
4、测试
我们在执行器项目,添加一个接口,来测试下,是否可以动态添加任务
@Controller
@RequestMapping("/job")
public class JobInfoController {@Autowiredprivate XxlJobUtil xxlJobUtil;@RequestMapping("/add")@ResponseBodypublic String add(XxlJobInfo jobInfo) {return xxlJobUtil.add(jobInfo);}}
使用postman请求,可以看到,已经成功返回,content就是我们的任务ID,去调度中心的任务管理也可以看到我们的任务
动态启动任务
有了上面动态添加任务的案例,动态启动任务就简单了,下面我示范下,其他像暂停、删除等操作也类似
1、在调度平台项目,新增一个启动任务的接口
@RequestMapping("/startJob")@ResponseBody@PermissionLimit(limit = false)public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {return xxlJobService.start(jobInfo.getId());}
2、在执行器项目,添加动态启动项目的方法
/*** 启动任务* @param jobId* @return*/public String start(int jobId) {JSONObject json = new JSONObject();json.put("id",jobId);String jsonString = JSON.toJSONString(json);String result = doPost(adminAddresses + START_JOB_URL, jsonString);return result;}