原理篇-- 定时任务xxl-job-服务端(admin)项目启动过程--JobTriggerPoolHelper 初始化 (3)

文章目录

  • 前言
  • 一、JobTriggerPoolHelper 作用:
  • 二、JobTriggerPoolHelper 源码介绍:
    • 2.1. start() 方法:
    • 2.2 任务触发:
    • 2.3 XxlJobTrigger.trigger 任务执行:
    • 2.4 processTrigger 任务的执行:
    • 2.5 runExecutor 任务的执行:
  • 总结


前言

本文对 JobTriggerPoolHelper 的工作进行介绍;


一、JobTriggerPoolHelper 作用:

JobTriggerPoolHelper 创建两个线程池ThreadPoolExecutor,当任务被触发需要执行时,ThreadPoolExecutor 负责进行执行任务;

二、JobTriggerPoolHelper 源码介绍:

2.1. start() 方法:

// 创建日志对象private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);// ---------------------- trigger pool ----------------------// fast/slow thread pool 快慢线程池创建 区别是线程池最大线程的数量不一样
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;public void start(){// 快速线程池 的最大线程数量  xxl.job.triggerpool.fast.max  默认值 200fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});// 慢速线程池 的最大线程数量 xxl.job.triggerpool.slow.max  默认值 100slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});
}
// 项目停止 关闭线程池释放资源
public void stop() {//triggerPool.shutdown();fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}

2.2 任务触发:

/**
* add trigger  执行任务
*/
public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool// 快慢线程池 的选择 1分钟 内同一个任务(jobId 相同),超过10次 执行的时间超过 500ms 就放入到慢线程池中执行ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// trigger 线程池 任务的执行triggerPool_.execute(new Runnable() {@Overridepublic void run() {// 记录开始时间,方便统计 本次job 执行的耗时 long start = System.currentTimeMillis();try {// do trigger  执行任务XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {// 如果不是同一分钟,则重新开始计数minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) {       // ob-timeout threshold 500ms// 大于 500ms 任务超时 进行计数AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});
}// ---------------------- helper ----------------------private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();public static void toStart() {helper.start();
}
public static void toStop() {helper.stop();
}/**
* @param jobId
* @param triggerType
* @param failRetryCount
* 			>=0: use this param
* 			<0: use param from job info config
* @param executorShardingParam
* @param executorParam
*          null: use job param
*          not null: cover job param
*  触发任务的执行 调用本类中的 addTrigger
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}

2.3 XxlJobTrigger.trigger 任务执行:

/*** trigger job** @param jobId* @param triggerType* @param failRetryCount* 			>=0: use this param* 			<0: use param from job info config* @param executorShardingParam* @param executorParam*          null: use job param*          not null: cover job param* @param addressList*          null: use executor addressList*          not null: cover*/
public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data// 根据job id 从 xxl_job_info 获取job /*** SELECT <include refid="Base_Column_List" />FROM xxl_job_info AS tWHERE t.id = #{id}**/XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}//  执行器,任务参数if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}// 改任务设置的 任务执行失败次数,如果到时间正常触发 则failRetryCount 为-1 ;// 如果是失败的补充,则为改任务剩余补偿次数// 到时间第一次触发任务 则获取xxl_job_info  executor_fail_retry_count 设置的重试次数int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 获取改任务对应的执行器信息/***SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.id = #{id}**/XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList// 执行器地址覆盖,如果重新传入了执行器地址 则进行覆盖;否则取 XxlJobGroup  的执行器地址if (addressList!=null && addressList.trim().length()>0) {//  执行器地址类型:0=自动注册、1=手动录入group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param// 执行器任务分片参数 格式如 1/2int[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}// 判断路由策略if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {// 如果是: 分片广播,则需要为注册的每个执行器都 发送任务for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}// 非分片广播 ,则直接执行任务processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}

2.4 processTrigger 任务的执行:

 /**
* @param group                     job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index                     sharding index* @param total                     sharding index*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param// 获取阻塞处理策略 ,如果从任务中的 阻塞处理策略 不在ExecutorBlockStrategyEnum 枚举中,则默认为 单机率行// "SERIAL EXECUTION":单机率行; "DISCARD LATER":云弃后续调度;"COVER EARLY":獲盖之前调度ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy// 获取任务路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id 记录log 日志XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-param  创建任务执行器的 triggerParam 对象TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 根据任务的路由策略,从执行器地址中,选择出一个执行器地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executor 获取到执行器地址,runExecutor 执行任务ReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-info 更新日志信息jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

2.5 runExecutor 任务的执行:

/**
* run executor* @param triggerParam* @param address* @return*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {// 根据执行器的地址,获取包装执行器的接口对象ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);// 远程rpc 调用 执行器端,进行任务的执行runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}// 返沪执行的信息StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;
}

executorBiz.run 任务的执行:

@Override
public ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

总结

本文对 JobTriggerPoolHelper 的工作内容进行介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/718560.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JAVA重要知识 | 第三篇】深入理解并暴打AQS原理、ReentrantLock锁

文章目录 3.深入理解AQS、ReentrantLock3.1AQS3.1.1AQS简介3.1.2核心结构&#xff08;1&#xff09;设计模型&#xff08;2&#xff09;组成部分&#xff08;3&#xff09;State关键字 3.1.3实现的两类队列&#xff08;1&#xff09;同步队列①CLH②Node③主要行为 img条件队列…

中霖教育:注册安全工程师考是科目有哪些?

注册安全工程师的类型是职业资格证书&#xff0c;需要满足报名条件才能参加考试&#xff0c;考试通过就能发放证书。报名时间一般在八月份&#xff0c;考试时间在十月底左右。 考试科目&#xff1a; 《安全生产法律法规》 《安全生产管理》 《安全生产技术基础》 《安全生…

golang实现openssl自签名双向认证

第一步&#xff1a;生成CA、服务端、客户端证书 1. 生成CA根证书 生成CA证书私钥 openssl genrsa -out ca.key 4096创建ca.conf 文件 [ req ] default_bits 4096 distinguished_name req_distinguished_name[ req_distinguished_name ] countryName …

Node.js基础---Express路由

1. 路由的概念 1. 什么是路由 广义上来讲&#xff0c;路由就是映射关系 2. Express 中的路由 在 Express 中&#xff0c;路由指的是客户端的请求与服务器处理函数之间的映射关系 Express 中的路由分三部分&#xff1a;请求的类型、请求的URL地址&#xff0c;处理函数。如下&am…

怎么使用curl2py自动构造爬虫代码并进行网络爬虫

目录 一、了解curl2py 二、安装curl2py 三、使用curl2py生成爬虫代码 四、实际案例&#xff1a;爬取网页数据 五、总结与建议 在当今数据驱动的时代&#xff0c;网络爬虫成为了获取数据的重要工具。对于初学者来说&#xff0c;手动编写爬虫代码可能是一项挑战。幸运的是&a…

PyTorch-神经网络

神经网络&#xff0c;这也是深度学习的基石&#xff0c;所谓的深度学习&#xff0c;也可以理解为很深层的神经网络。说起这里&#xff0c;有一个小段子&#xff0c;神经网络曾经被打入了冷宫&#xff0c;因为SVM派的崛起&#xff0c;SVM不了解的同学可以去google一下&#xff0…

JavaScript 基础学习笔记(五):函数、作用域、匿名函数

目录 一、函数 1.1 声明和调用 1.2 形参和实参 1.3 返回值 二、作用域 2.1 全局作用域 2.2 局部作用域 三、匿名函数 3.1 函数表达式 3.2 立即执行函数 一、函数 理解函数的封装特性&#xff0c;掌握函数的语法规则 1.1 声明和调用 函数可以把具有相同或相似逻辑的代…

NLP_文本张量表示方法(代码示例)

目标 了解什么是文本张量表示及其作用.文本张量表示的几种方法及其实现. 1 文本张量表示 将一段文本使用张量进行表示&#xff0c;其中一般将词汇为表示成向量&#xff0c;称作词向量&#xff0c;再由各个词向量按顺序组成矩阵形成文本表示. ["人生", "该&q…

无极低码:五分钟快速上手,开启编程新时代

无极低码平台凭借其革命性的设计理念和强大的功能特性&#xff0c;正在彻底改变软件开发的传统格局。该平台专为开发者、初创企业和各类研发团队量身打造&#xff0c;旨在提供一种快速而高效的解决方案&#xff0c;以应对日益增长的业务需求和技术挑战。 1.无极低码的核心价值在…

2024《》

vue-cli到哪做了那些事 vue-cli是vue.js的脚手架&#xff0c;用于自动生成vue.jswebpack的项目模板&#xff0c;快速搭建Vue.js项目。 vue cli内置了webpack的一些功能&#xff0c;这些是用webpack打包时需要我们自己配置的&#xff0c;例如&#xff1a; 1.ES6代码转换成ES5代…

Linux 实现打印彩色进度条

文章目录 预备知识一、理解回车换行二、认识行缓冲1、代码一、二&#xff08;回车换行理解&#xff09;2、代码三、四&#xff08;sleep函数和ffush函数理解&#xff09; 三、简单倒计时1. 倒计时代码2、效果展示 四、进度条1、效果展示2、进度条代码makefileProcessBar.hProce…

tomcat 反向代理 自建博客 修改状态页 等

一 自建博客 随后&#xff0c;拷贝到webapps下面 并且做软连接 随后重定向 并且下载 cat >/etc/yum.repos.d/mysql.repo <<EOF [mysql57-community] nameMySQL 5.7 Community Server baseurlhttp://repo.mysql.com/yum/mysql-5.7-community/el/7/x86_64/ enabled1 g…

团体程序设计天梯赛 L2-006 树的遍历

L2-006 树的遍历 分数 25 给定一棵二叉树的后序遍历和中序遍历&#xff0c;请你输出其层序遍历的序列。这里假设键值都是互不相等的正整数。 输入格式&#xff1a; 输入第一行给出一个正整数N&#xff08;≤30&#xff09;&#xff0c;是二叉树中结点的个数。第二行给出其后…

【Linux】Linux系统磁盘分区和挂载相关命令介绍

Linux系统磁盘分区和挂载相关命令介绍 文章目录 Linux系统磁盘分区和挂载相关命令介绍磁盘分区1、使用fdisk创建分区2、使用parted创建分区 格式化分区分区挂载自动挂载其他常见&#xff08;用&#xff09;的磁盘相关命令 在Linux系统中&#xff0c;磁盘分区和磁盘挂载是管理存…

第十四届蓝桥杯大赛B组 JAVA 蜗牛 (递归剪枝)

题目描述&#xff1a; 这天&#xff0c;一只蜗牛来到了二维坐标系的原点。 在 x 轴上长有 n 根竹竿。它们平行于 y 轴&#xff0c;底部纵坐标为 0&#xff0c;横坐标分别为 x1, x2, …, xn。竹竿的高度均为无限高&#xff0c;宽度可忽略。蜗牛想要从原点走到第 n 个竹竿的底部也…

全域电商数据集成管理与采集|API接口的采集与管理

如今&#xff0c;全渠道零售已是大势所趋。企业电商经营的一大现状就是数据分散各处&#xff0c;比如有来自电商平台私域数据、品牌一方数据、公开的第三方行业数据与电商平台C端页面数据等等。如何集成全域数据日益成为企业数字化基建的难题。 当前电商数据集成的主流方案为人…

【基于Matlab GUI的语音降噪系统设计】

客户不要了&#xff0c;挂网上吧&#xff0c;有需要自行下载~ 赚点辛苦费 ** 功能实现: ** 1、导入音频文件/录入音频&#xff0c;能实现播放功能。 2、对导入/录入的音频信号进行时域和频域分析&#xff0c;并制图。 3、可在导入/录入的音频信号上加入噪声&#xff0c;并能够播…

Apache JMeter 5.6.3 安装

源码下载 curl -O https://dlcdn.apache.org//jmeter/source/apache-jmeter-5.6.3_src.zipJMeter 下载 curl -O https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.6.3.zipjmeter.properties 里 设置中文 windows系统上解压&#xff0c;双击jmeter.bat 启动 执行参…

【人工智能】DeepLearning学习路线及简要说明

目录 神经网络 1.1 前馈神经网络(FNN) 结构和工作原理 训练过程 应用

架构设计方法(4A架构)-应用架构

1、应用架构&#xff08;AA&#xff09;&#xff1a;业务价值与产品之间的桥梁&#xff0c;是企业架构的一个子集 2、应用架构包含“应用系统模块、应用服务、应用系统集成”3个关键要素 3、收集AS-IS应用架构&#xff0c;描绘现状&#xff0c;并识别改进机会点 4、描述对新系统…