一、调度中心管理注册信息
1.JobApiController
执行器调用调度中心的url来实现注册、下线、回调等操作;其主要的实现类是JobApiController,调用/api/registry接口注册执行器信息,调用/api/registryRemove接口下线执行器信息,调用/api/callback接口执行回调操作。
@Controller
@RequestMapping("/api")
public class JobApiController {@Resourceprivate AdminBiz adminBiz;/*** api** @param uri* @param data* @return*/@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// validif (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingif ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}}
2.AdminBizImpl
执行adminBiz.registry(registryParam)
是调用实现类AdminBizImpl,在实现类中调用registry(registryParam)
来实现。
@Service
public class AdminBizImpl implements AdminBiz {@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return JobCompleteHelper.getInstance().callback(callbackParamList);}@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registry(registryParam);}@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registryRemove(registryParam);}}
3.JobRegistryHelper
在AdminBizImpl中的实现也不难理解,通过在初始化start()方法中创建的registryOrRemoveThreadPool
线程池中执行异步注册任务,注册信息写入到数据表xxl_job_registry中。
//AdminBizImpl.javapublic void start(){// for registry or removeregistryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});//...省略public ReturnT<String> registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute//my-异步执行,将注册信息持久化到数据库registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {//my-写入数据库int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
4.总结
- 执行器通过restful api形式注册到调度中心来,调度中心JobApiController对应有3个注册、下线和回调的方法实现;
- 通过AdminBizImpl的adminBiz.registry(registryParam)来实际执行注册方法,实际使用JobRegistryHelper类;
- 在JobRegistryHelper类中在初始化的时候会创建一个线程池,每次注册执行器的时候会创建一个异步线程来将注册信息持久化的数据库;
二、调度中心的配置和启动
1.添加权限控制
制定权限注解@PermissionLimit,其实现的逻辑在PermissionInterceptor中,首先判断是否需要鉴权,如果需要则根据cookie中拿到的用户信息查库判断是否有权限登录,如果没有权限则重定向到登录页面或提示没有权限。
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PermissionLimit {/*** 登录拦截 (默认拦截)*/boolean limit() default true;/*** 要求管理员权限** @return*/boolean adminuser() default false;}
@Component
public class PermissionInterceptor implements AsyncHandlerInterceptor {@Resourceprivate LoginService loginService;@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//my-处理登录权限的逻辑if (!(handler instanceof HandlerMethod)) {return true; // proceed with the next interceptor}// if need loginboolean needLogin = true;boolean needAdminuser = false;HandlerMethod method = (HandlerMethod)handler;PermissionLimit permission = method.getMethodAnnotation(PermissionLimit.class);if (permission!=null) {needLogin = permission.limit();needAdminuser = permission.adminuser();}if (needLogin) {XxlJobUser loginUser = loginService.ifLogin(request, response);if (loginUser == null) {response.setStatus(302);response.setHeader("location", request.getContextPath()+"/toLogin");return false;}if (needAdminuser && loginUser.getRole()!=1) {throw new RuntimeException(I18nUtil.getString("system_permission_limit"));}request.setAttribute(LoginService.LOGIN_IDENTITY_KEY, loginUser);}return true; // proceed with the next interceptor}}
将PermissionInterceptor添加到web配置文件中。
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {@Resourceprivate PermissionInterceptor permissionInterceptor;@Resourceprivate CookieInterceptor cookieInterceptor;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(permissionInterceptor).addPathPatterns("/**");registry.addInterceptor(cookieInterceptor).addPathPatterns("/**");}}
2.配置中心初始化
xxljob的初始化和销毁动作在XxlJobAdminConfig中配置完成。
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private static XxlJobAdminConfig adminConfig = null;public static XxlJobAdminConfig getAdminConfig() {return adminConfig;}// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}@Overridepublic void destroy() throws Exception {xxlJobScheduler.destroy();}
}
具体初始化的操作。
public class XxlJobScheduler {private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);public void init() throws Exception {// init i18ninitI18n();// admin trigger pool startJobTriggerPoolHelper.toStart();// admin registry monitor runJobRegistryHelper.getInstance().start();// admin fail-monitor runJobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )JobCompleteHelper.getInstance().start();// admin log report startJobLogReportHelper.getInstance().start();// start-schedule ( depend on JobTriggerPoolHelper )JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}public void destroy() throws Exception {// stop-scheduleJobScheduleHelper.getInstance().toStop();// admin log report stopJobLogReportHelper.getInstance().toStop();// admin lose-monitor stopJobCompleteHelper.getInstance().toStop();// admin fail-monitor stopJobFailMonitorHelper.getInstance().toStop();// admin registry stopJobRegistryHelper.getInstance().toStop();// admin trigger pool stopJobTriggerPoolHelper.toStop();}// ---------------------- I18n ----------------------private void initI18n(){for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));}}