原理篇-- 定时任务xxl-job-服务端(admin)项目启动过程--JobRegistryHelper 初始化 (4)

文章目录

  • 前言
  • 一、JobRegistryHelper 作用:
  • 二、JobRegistryHelper 源码介绍:
    • 2.1 初始化start() 方法:
      • 2.1.1 registryOrRemoveThreadPool 执行器注册和移除:
      • 2.1.2 registryMonitorThread 执行器注册监控线程:
    • 2.2 toStop() 释放资源:
    • 2.3 执行器的注册和移除:
      • 2.3 registry 执行器的注册:
      • 2.3 registryRemove执行器的移除:
  • 三、扩展:
    • 3.1 守护线程 Thread:
  • 总结


前言

本文对JobRegistryHelper 工作内容进行介绍。


一、JobRegistryHelper 作用:

JobRegistryHelper是xxl-job-admin中的一个工具类,用于注册并管理各个任务执行器的信息。具体作用包括:

  1. 注册任务执行器:JobRegistryHelper可以将任务执行器的信息注册到注册中心,例如Zookeeper等,以便管理和监控任务执行器的运行状态。
  2. 监控任务执行器:JobRegistryHelper可以监控任务执行器的健康状态,包括可用性、负载情况等,确保任务执行器能够正常执行任务。
  3. 发现任务执行器:JobRegistryHelper可以帮助xxl-job-admin发现注册的任务执行器,以便将任务分配给合适的执行器执行。
  4. 注销任务执行器:JobRegistryHelper还可以在任务执行器下线或停止运行时,及时将其从注册中心注销,以确保系统能够正确管理和调度任务。

总之,JobRegistryHelper在xxl-job-admin中扮演着任务执行器注册、监控和发现的重要角色,保证了任务的正常执行和系统的稳定运行。

二、JobRegistryHelper 源码介绍:

2.1 初始化start() 方法:

由于start() 方法内容较多,故拆为 2.1.1 和 2.1.2 进行介绍

2.1.1 registryOrRemoveThreadPool 执行器注册和移除:

private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean toStop = false;public void start(){// for registry or remove 执行器注册和移除 线程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});}

2.1.2 registryMonitorThread 执行器注册监控线程:

// for monitor
registryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {// 死循环 停止标识 当容器停止时 toStop 会被置为 true 这样就跳出循环try {// auto registry group 获取自动注册的执行器  (addressType执行器地址类型:0=自动注册、1=手动录入)// 通过页面收到进行添加的执行器 不进行处理/*** 	SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.address_type = #{addressType}ORDER BY t.app_name, t.title, t.id ASC**/List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)//  获取更新时间超过 90s 的执行器 进行删除(超过90s 任务执行器已经下线)// RegistryConfig.DEAD_TIMEOUT 30*3=90/*** 	SELECT t.idFROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {// 移除移除的执行器/*** DELETE FROM xxl_job_registryWHERE id in<foreach collection="ids" item="item" open="(" close=")" separator="," >#{item}</foreach>**/XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)// 获取更新时间在 90 s 全部执行器/*** SELECT <include refid="Base_Column_List" />FROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {// 判断 执行是否是自动注册的if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {//  RegistType{ EXECUTOR, ADMIN } EXECUTOR 自动注册,ADMIN 手动注册//  获取执行器的名字 执行器项目中设置的 xxl.job.executor.appname value 值String appname = item.getRegistryKey();// 按照执行器名称 进行数据分组List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}// 放入执行器的的地址 key : 执行器名称 ,value: 执行器地址appAddressMap.put(appname, registryList);}}}// fresh group address 刷新执行器地址for (XxlJobGroup group: groupList) {// 根据执行器项目名称 获取对应的执行器地址List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();// 多个执行器则使用, 进行分隔for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}// 更新 执行器地址group.setAddressList(addressListStr);group.setUpdateTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {// 每隔 30s 执行一次  BEAT_TIMEOUT = 30TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}
});
// 设置registryMonitorThread 守护线程,线程的名字,以及启动线程
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();

执行器集群情况下的注册地址:
在这里插入图片描述

2.2 toStop() 释放资源:

public void toStop(){// registryMonitorThread while 循环标识符设置为true 结束死循环toStop = true;// stop registryOrRemoveThreadPool 停掉线程registryOrRemoveThreadPool.shutdownNow();// stop monitir (interrupt and wait)registryMonitorThread.interrupt();try {// 等待registryMonitorThread 的任务执行完成registryMonitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}
}

2.3 执行器的注册和移除:

2.3 registry 执行器的注册:

public ReturnT<String> registry(RegistryParam registryParam) {// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}// async execute 交由 registryOrRemoveThreadPool 线程池 执行注册任务
registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 更新执行器信息/***  UPDATE xxl_job_registrySET `update_time` = #{updateTime}WHERE `registry_group` = #{registryGroup}AND `registry_key` = #{registryKey}AND `registry_value` = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// 如果更新的条数小于1 证明执行器不在数据库 xxl_job_registry 中存在if (ret < 1) {// 插入一条新的呃执行器数据XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh 刷新 xxl_job_group 的执行器地址(目前此方法并没有实现具体的业务)freshGroupRegistryInfo(registryParam);}}
});
// 返回给执行器客户端项目 注册成功标识
return ReturnT.SUCCESS;
}private void freshGroupRegistryInfo(RegistryParam registryParam){// Under consideration, prevent affecting core tables
}

2.3 registryRemove执行器的移除:

public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute 交由 registryOrRemoveThreadPool 线程池 执行任务registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 删除失效的执行器/*** 	DELETE FROM xxl_job_registryWHERE registry_group = #{registryGroup}AND registry_key = #{registryKey}AND registry_value = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// fresh 删除后更新 xxl_job_group 的执行器地址(目前此方法并没有实现具体的业务)freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}

三、扩展:

3.1 守护线程 Thread:

将一个线程设置为守护线程(daemon thread)的作用是告诉JVM,如果所有的非守护线程都执行完毕了,那么守护线程也应该随着退出。换句话说,当所有的非守护线程都执行完毕时,守护线程会自动结束,不会影响JVM的正常关闭。

设置一个线程为守护线程通常用于执行一些后台任务,这些任务不需要操纵UI或与用户进行交互,而且这些任务不是应用程序的关键部分。通过将这些线程设置为守护线程,可以避免这些线程持续运行导致程序无法正常结束的情况。

需要注意的是,守护线程所执行的任务可能会在任何时刻被JVM终止,因此对于一些需要确保执行完整性的任务,不应该将其设置为守护线程。常见的例子是Timer和TimerTask,如果Timer是守护线程,那么可能在主线程结束后,Timer任务就被终止了。


总结

本文对 JobRegistryHelper 的工作内容进行介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/718683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

折线图实现柱状阴影背景的demo

这个是一个由官网的基础折线图实现的流程&#xff0c;将涉及到的知识点附上个人浅薄的见解&#xff0c;源码在最后&#xff0c;需要的可自取。 折线图 成果展示代码注解参数backgroundColordataZoomlegendtitlexAxisyAxisgridseries 源码 成果展示 官网的基础折线图&#xff…

猫耳语音下载(mediadown)

猫耳语音下载(mediadown) 一、介绍 猫耳语音下载,能够帮助你下载猫耳音频节目。如果你是会员,它还能帮你下载会员节目。 二、下载地址 下载:猫耳语音下载(mediadown) 百度网盘下载:猫耳语音下载(mediadown) 三、安装教程 将下载的文件解压到D:\xibinhui,D:\Pr…

Unity RectTransform·屏幕坐标转换

RectTransform转屏幕坐标 分两种情况 Canvas渲染模式为Overlay时&#xff0c;使用此方式 public Rect GetScreenCoordinatesOfCorners(RectTransform rt) {var worldCorners new Vector3[4];rt.GetWorldCorners(worldCorners);var result new Rect(worldCorners[0].x,world…

Manomotion 实现AR手势互动-解决手势无效的问题

之前就玩过 Manomotion &#xff0c;现在有新需求&#xff0c;重新接入发现不能用了&#xff0c;不管什么办法&#xff0c;都识别不了手势&#xff0c;我记得当初是直接调用就可以的。 经过研究发现&#xff0c;新版本SDK改了写法。下边就写一下新版本的调用&#xff0c;并且实…

好书推荐 《Excel函数与公式应用大全for Excel 365 Excel 2021》

一.基本介绍 1.什么是 Excel? Excel 是微软公司开发的一款电子表格软件&#xff0c;是 Microsoft Office 套件的一部分。它被广泛用于数据处理、分析、可视化和管理等方面。Excel 提供了丰富的功能&#xff0c;使用户能够创建、编辑、存储和分享各种类型的数据表格。 2.Exc…

Golang Channel 详细原理和使用技巧

1.简介 Channel(一般简写为 chan) 管道提供了一种机制:它在两个并发执行的协程之间进行同步&#xff0c;并通过传递与该管道元素类型相符的值来进行通信,它是Golang在语言层面提供的goroutine间的通信方式.通过Channel在不同的 goroutine中交换数据&#xff0c;在goroutine之间…

代码随想录Day66 | 图的DFS与BFS

代码随想录Day66 | 图的DFS与BFS DFS797.所有可能的路径无向图和有向图的处理 BFS200.岛屿数量 DFS 文档讲解&#xff1a;代码随想录 视频讲解&#xff1a; 状态 本质上就是回溯算法。 void dfs(参数) {if (终止条件) {存放结果;return;}for (选择&#xff1a;本节点所连接的…

Qt textBrowser的Html相关

Qt textBrowser的Html相关 Qt textBrowser的Html相关 Qt textBrowser的Html相关 一开始就想要一个简单的功能&#xff0c;点一下按钮&#xff0c;添加的文字居中显示&#xff0c;再点一下按钮&#xff0c;添加的文字变更颜色居右显示。 但是&#xff1a; ui->textEdit-&g…

WordPress免费的远程图片本地化下载插件nicen-localize-image

nicen-localize-image&#xff08;可在wordpress插件市场搜索下载&#xff09;&#xff0c;是一款用于本地化文章外部图片的插件&#xff0c;支持如下功能&#xff1a; 文章发布前通过编辑器插件本地化 文章手动发布时自动本地化 文章定时发布时自动本地化 针对已发布的文章…

基于机器学习的密码强度检测

项目简介 利用机器学习对提供的数据集预测用户输入的密码是否为弱密码。 原始数据集只包含关于弱密码的信息&#xff0c;并没有包含强密码的数据或分类器&#xff0c;这意味着模型无法学习到强密码的规律!!! 我之所以这样设计这个示例&#xff0c;其目的是为了向你展示模型的…

AI蠕虫病毒威胁升级,揭示AI安全新危机

一组研究人员成功研发出首个能够通过电子邮件客户端窃取数据、传播恶意软件以及向他人发送垃圾邮件的AI蠕虫&#xff0c;并在使用流行的大规模语言模型&#xff08;LLMs&#xff09;的测试环境中展示了其按设计功能运作的能力。基于他们的研究成果&#xff0c;研究人员向生成式…

用pyinstaller打包python代码为exe可执行文件并在其他电脑运行的方法

本文介绍基于Python语言中的pyinstaller模块&#xff0c;将写好的.py格式的Python代码及其所用到的所有第三方库打包&#xff0c;生成.exe格式的可执行文件&#xff0c;从而方便地在其他环境、其他电脑中直接执行这一可执行文件的方法。 有时&#xff0c;我们希望将自己电脑上的…

基于springboot的助农管理系统的设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1研究背…

【网站项目】219一中体育馆管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

pywin32,一个超强的 Python 库!

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个超强的 Python 库 - pywin32。 Github地址&#xff1a;https://github.com/mhammond/pywin32 在Python的世界里&#xff0c;有许多优秀的第三方库可以帮助开发者更轻松地处理各种任务。其中&a…

【性能】后台与黑屏

目录 现象观察 调整应用的电池策略 现象观察 切换到后台&#xff0c; 一个心跳期间&#xff0c;就会发close socket 直接黑屏&#xff0c;没有收到任何消息&#xff0c;直接到onclose,然后有离线触发 也到时间。 调整应用的电池策略 修改成“无限制”后&#xff0c;就不会断…

Linxu自动化构建工具make/Makefile究竟时什么?

Linxu自动化构建工具make/Makefile究竟时什么&#xff1f; 一、简介二、makefile文件制作&#xff08;简洁版&#xff09;2.1 源文件2.2 makefile如何制作2.2.1 依赖关系、依赖方法2.2.3 伪目标&#xff08;清理文件资源&#xff09; 三、make/Makefile自动化原理3.1 伪目标为什…

SpringBoot+Vue实现el-table表头筛选排序(附源码)

&#x1f468;‍&#x1f4bb;作者简介&#xff1a;在笑大学牲 &#x1f39f;️个人主页&#xff1a;无所谓^_^ ps&#xff1a;点赞是免费的&#xff0c;却可以让写博客的作者开心好几天&#x1f60e; 前言 后台系统对table组件的需求是最常见的&#xff0c;不过element-ui的el…

Grpc项目集成到java方式调用实践

背景&#xff1a;由于项目要对接到grcp 的框架&#xff0c;然后需要对接老外的东西&#xff0c;还有签名和证书刚开始没有接触其实有点懵逼。 gRPC 是由 Google 开发的高性能、开源的远程过程调用&#xff08;RPC&#xff09;框架。它建立在 HTTP/2 协议之上&#xff0c;使用 …

D7805 正电压稳压电路应用——体积小,成本低,性能好

D7805 构成的 5V 稳压电源为输出电压5V&#xff0c;输出电流 1000mA 的稳压电源它由滤波电容 C1,C3,防止自激电容 C2、C3 和一只固定三端稳压器&#xff08;7805&#xff09;后级加 LC 滤波极为简洁方便地搭成&#xff0c;输入直流电压范围为 7~35V&#xff0c;此直流电压经过D…