文章目录
- 前言
- 一、JobRegistryHelper 作用:
- 二、JobRegistryHelper 源码介绍:
- 2.1 初始化start() 方法:
- 2.1.1 registryOrRemoveThreadPool 执行器注册和移除:
- 2.1.2 registryMonitorThread 执行器注册监控线程:
- 2.2 toStop() 释放资源:
- 2.3 执行器的注册和移除:
- 2.3 registry 执行器的注册:
- 2.3 registryRemove执行器的移除:
- 三、扩展:
- 3.1 守护线程 Thread:
- 总结
前言
本文对JobRegistryHelper 工作内容进行介绍。
一、JobRegistryHelper 作用:
JobRegistryHelper是xxl-job-admin中的一个工具类,用于注册并管理各个任务执行器的信息。具体作用包括:
- 注册任务执行器:JobRegistryHelper可以将任务执行器的信息注册到注册中心,例如Zookeeper等,以便管理和监控任务执行器的运行状态。
- 监控任务执行器:JobRegistryHelper可以监控任务执行器的健康状态,包括可用性、负载情况等,确保任务执行器能够正常执行任务。
- 发现任务执行器:JobRegistryHelper可以帮助xxl-job-admin发现注册的任务执行器,以便将任务分配给合适的执行器执行。
- 注销任务执行器:JobRegistryHelper还可以在任务执行器下线或停止运行时,及时将其从注册中心注销,以确保系统能够正确管理和调度任务。
总之,JobRegistryHelper在xxl-job-admin中扮演着任务执行器注册、监控和发现的重要角色,保证了任务的正常执行和系统的稳定运行。
二、JobRegistryHelper 源码介绍:
2.1 初始化start() 方法:
由于start() 方法内容较多,故拆为 2.1.1 和 2.1.2 进行介绍
2.1.1 registryOrRemoveThreadPool 执行器注册和移除:
private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean toStop = false;public void start(){// for registry or remove 执行器注册和移除 线程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});}
2.1.2 registryMonitorThread 执行器注册监控线程:
// for monitor
registryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {// 死循环 停止标识 当容器停止时 toStop 会被置为 true 这样就跳出循环try {// auto registry group 获取自动注册的执行器 (addressType执行器地址类型:0=自动注册、1=手动录入)// 通过页面收到进行添加的执行器 不进行处理/*** SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.address_type = #{addressType}ORDER BY t.app_name, t.title, t.id ASC**/List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)// 获取更新时间超过 90s 的执行器 进行删除(超过90s 任务执行器已经下线)// RegistryConfig.DEAD_TIMEOUT 30*3=90/*** SELECT t.idFROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {// 移除移除的执行器/*** DELETE FROM xxl_job_registryWHERE id in<foreach collection="ids" item="item" open="(" close=")" separator="," >#{item}</foreach>**/XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)// 获取更新时间在 90 s 全部执行器/*** SELECT <include refid="Base_Column_List" />FROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {// 判断 执行是否是自动注册的if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {// RegistType{ EXECUTOR, ADMIN } EXECUTOR 自动注册,ADMIN 手动注册// 获取执行器的名字 执行器项目中设置的 xxl.job.executor.appname value 值String appname = item.getRegistryKey();// 按照执行器名称 进行数据分组List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}// 放入执行器的的地址 key : 执行器名称 ,value: 执行器地址appAddressMap.put(appname, registryList);}}}// fresh group address 刷新执行器地址for (XxlJobGroup group: groupList) {// 根据执行器项目名称 获取对应的执行器地址List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();// 多个执行器则使用, 进行分隔for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}// 更新 执行器地址group.setAddressList(addressListStr);group.setUpdateTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {// 每隔 30s 执行一次 BEAT_TIMEOUT = 30TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}
});
// 设置registryMonitorThread 守护线程,线程的名字,以及启动线程
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
执行器集群情况下的注册地址:
2.2 toStop() 释放资源:
public void toStop(){// registryMonitorThread while 循环标识符设置为true 结束死循环toStop = true;// stop registryOrRemoveThreadPool 停掉线程registryOrRemoveThreadPool.shutdownNow();// stop monitir (interrupt and wait)registryMonitorThread.interrupt();try {// 等待registryMonitorThread 的任务执行完成registryMonitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}
}
2.3 执行器的注册和移除:
2.3 registry 执行器的注册:
public ReturnT<String> registry(RegistryParam registryParam) {// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}// async execute 交由 registryOrRemoveThreadPool 线程池 执行注册任务
registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 更新执行器信息/*** UPDATE xxl_job_registrySET `update_time` = #{updateTime}WHERE `registry_group` = #{registryGroup}AND `registry_key` = #{registryKey}AND `registry_value` = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// 如果更新的条数小于1 证明执行器不在数据库 xxl_job_registry 中存在if (ret < 1) {// 插入一条新的呃执行器数据XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh 刷新 xxl_job_group 的执行器地址(目前此方法并没有实现具体的业务)freshGroupRegistryInfo(registryParam);}}
});
// 返回给执行器客户端项目 注册成功标识
return ReturnT.SUCCESS;
}private void freshGroupRegistryInfo(RegistryParam registryParam){// Under consideration, prevent affecting core tables
}
2.3 registryRemove执行器的移除:
public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute 交由 registryOrRemoveThreadPool 线程池 执行任务registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 删除失效的执行器/*** DELETE FROM xxl_job_registryWHERE registry_group = #{registryGroup}AND registry_key = #{registryKey}AND registry_value = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// fresh 删除后更新 xxl_job_group 的执行器地址(目前此方法并没有实现具体的业务)freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
三、扩展:
3.1 守护线程 Thread:
将一个线程设置为守护线程(daemon thread)的作用是告诉JVM,如果所有的非守护线程都执行完毕了,那么守护线程也应该随着退出。换句话说,当所有的非守护线程都执行完毕时,守护线程会自动结束,不会影响JVM的正常关闭。
设置一个线程为守护线程通常用于执行一些后台任务,这些任务不需要操纵UI或与用户进行交互,而且这些任务不是应用程序的关键部分。通过将这些线程设置为守护线程,可以避免这些线程持续运行导致程序无法正常结束的情况。
需要注意的是,守护线程所执行的任务可能会在任何时刻被JVM终止,因此对于一些需要确保执行完整性的任务,不应该将其设置为守护线程。常见的例子是Timer和TimerTask,如果Timer是守护线程,那么可能在主线程结束后,Timer任务就被终止了。
总结
本文对 JobRegistryHelper 的工作内容进行介绍。