xxl-job调度任务原理解析

xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程:

admin服务端初始化

JobTriggerPoolHelper.java#toStart()方法中会初始化两个调用任务的线程池,快线程池最大线程数为200,慢线程池最大线程数为100。然后启动线程定时轮询需要调度的定时任务。首先计算每秒能处理的定时任务数量,公式为(快线程池的最大线程数+满线程池的最大线程数)*20(1000ms/每个任务处理的时长50ms),最多为6000。从数据库中加锁查出任务触发时间<当前时间+预读时间(5s)的任务,然后分情况处理。

  • 当前时间大于任务触发时间+预读时间,即任务触发时间已经过期超过5s,此时不做任何处理,只刷新任务下次触发时间
  • 当前时间大于任务触发时间但不超过5s,即任务虽然过期但是过期时间不到5s,此时触发任务,将任务数据保存到ringDataprivate volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();,ringData的key是秒数,value是jobid,然后刷新任务的下次触发时间
  • 当前时间小于任务触发时间,即还没到任务的触发时间,此时也会将任务写道ringData中,等到期就会进行处理,因为在内存中查询任务比到数据库查询要快很多。
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {boolean preReadSuc = true;try {preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {for (XxlJobInfo jobInfo: scheduleList) {if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}

最后判断任务调度状态,TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);,有任务需要调度则下一秒继续扫描,如果没有发现任务则睡眠5s(PRE_READ_MS)。
刚才说到待执行的任务会加入ringData,现在往下看怎么处理ringData的。这里会回退一秒,因为可能出现任务超时的情况,导致任务处理时遗漏。处理的逻辑很简单,到了某秒时,根据秒数取出对应的jobid集合,然后依次处理触发每个任务即可。触发任务的逻辑我们稍微再说。

                        List<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}if (ringItemData.size() > 0) {for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);}ringItemData.clear();}

客户端初始化

客户端创建定时任务只要在bean中添加@XxlJob注解即可,调度任务是通过XxlJobSpringExecutor实现的。过程是到spring容器中获取所有bean,找出对方法使用了@XxlJob的bean,然后使用MethodJobHandler进行封装,注册到jobHandlerRepositoryprivate static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

        String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Method[] methods = bean.getClass().getDeclaredMethods();for (Method method: methods) {XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);if (xxlJob != null) {String name = xxlJob.value();method.setAccessible(true);if(xxlJob.init().trim().length() > 0) {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);}if(xxlJob.destroy().trim().length() > 0) {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);}registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}}

客户端会启动一个netty服务器,xxl-job底层的核心就是netty,监听${xxl.job.executor.port}配置的端口,等待来自服务端的调度。

                    ServerBootstrap bootstrap = new ServerBootstrap();((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());NettyHttpServer.this.onStarted();future.channel().closeFuture().sync();

服务端触发任务

触发任务是从JobTriggerPoolHelper.java#addTrigger()中开始的。默认是快线程池触发,如果1min内执行时间超过500ms的次数大于10,则改为满线程池。

    public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {long minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}long cost = System.currentTimeMillis()-start;if (cost > 500) {       // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}

真正执行是在processTrigger()方法中,先根据调度策略获取处理任务的客户端地址,默认是轮询策略。先获取任务id,然后找到任务对应的客户端索引,通过nextInt()方法找到下个索引,再到客户端地址列表中根据索引获取地址。

    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){XxlJobLog jobLog = new XxlJobLog();XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);TriggerParam triggerParam = new TriggerParam();String address = null;routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}triggerResult = runExecutor(triggerParam, address);
//轮询策略调度任务private static int count(int jobId) {// cache clearif (System.currentTimeMillis() > CACHE_VALID_TIME) {routeCountEachJob.clear();CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;}// count++Integer count = routeCountEachJob.get(jobId);count = (count==null || count>1000000)?(new Random().nextInt(100)):++count;  // 初始化时主动Random一次,缓解首次压力routeCountEachJob.put(jobId, count);return count;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {String address = addressList.get(count(triggerParam.getJobId())%addressList.size());return new ReturnT<String>(address);} 

处理任务时通过proxy进行动态代理,在XxlRpcReferenceBean.class#getObject为调度的定时任务生成了动态代理对象,在InvocationHandler的invoke()方法中实现了逻辑增强,最终到NettyHttpClient#asyncSend()将消息发送到客户端netty服务器。

客户端执行定时任务

客户端是在NettyHttpServerHandler#channelRead0()中处理定时任务的,先对服务器的字节流进行反序列化,在XxlRpcProviderFactory.class#invokeService()以反射方式远程调用ExecutorBizImpl.java#run()方法。

                Class<?> serviceClass = serviceBean.getClass();String methodName = xxlRpcRequest.getMethodName();Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result = method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);

在run方法中启动处理任务的JobThread进行处理,JobThread中就是根据定时任务名获取对应的MethodJobHandler,取出要执行的Method,再反射执行即可。

 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

总结下,xxl-job首先在服务端启动线程轮询要执行的定时任务,计算定时任务的触发时间,然后后获取代理对象,将要执行的任务信息通过netty发送到客户端,客户端以反射方式执行定时任务。有不对的地方请大神指出,欢迎大家一起讨论交流,共同进步。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/813846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.Hexo安装和环境搭建引导

Hexo是一个依赖于一个名为nodejs的程序 因此安装它的方式在Mac和Windows上实际上是一样的 为了在电脑上安装Hexo 需要做两件事 nodejs&#xff0c;基本上是hexo依赖运行的JavaScript框架 Node.js — Run JavaScript Everywheregit&#xff0c;是一个程序&#xff0c;用来管理电…

Traefik和HAProxy全方位对比

在面对各种现代应用部署需求时&#xff0c;选择合适的反向代理和负载均衡器至关重要。Traefik&#x1f6a6;和HAProxy&#x1f6e1;️都是领先的解决方案&#xff0c;但它们各有特点&#xff0c;适用于不同的场景。本文将从多个维度全面对比Traefik&#x1f6a6;和HAProxy&…

【C++】类和对象②(类的默认成员函数:构造函数 | 析构函数)

&#x1f525;个人主页&#xff1a;Forcible Bug Maker &#x1f525;专栏&#xff1a;C 目录 前言 类的6个默认成员函数 构造函数 概念 构造函数的特性及用法 析构函数 概念 析构函数的特性及用法 结语 前言 本篇主要内容&#xff1a;类的6个默认成员函数中的构造函…

设计模式——外观(门面)模式10

外观模式&#xff1a;能为系统框架或其他复杂业务流程封装提供一个简单的接口。 例如抽奖过程中 设计模式&#xff0c;一定要敲代码理解 调用1&#xff08;抽奖系统&#xff09; /*** author ggbond* date 2024年04月08日 10:34*/ public class Lottery {public String getId…

OpenHarmony南向开发案例:【智能门锁】

一. 简介 本demo是基于Openharmony 3.1 Beta本版开发&#xff0c;不仅可以接收数字管家应用下发的指令来控制门锁开启&#xff0c;而且还可以通过数字管家设置不同的开锁密码以及一次性密码&#xff0c;实现给临时用户一个临时密码&#xff0c;保证门户安全。当然除了开锁的功…

遍历列举俄罗斯方块的所有形状

以前玩俄罗斯方块的时候&#xff0c;就想过一个问题&#xff0c;为什么俄罗斯方块就这7种形状&#xff0c;还有没有别的形状&#xff1f;自己也在纸上画过&#xff0c;比划来比划去&#xff0c;确实就这几种形状。 继续思考一下&#xff0c;那假如是3个块组合的形状&#xff0…

网页input框自动填充问题

autocomplete 大部分查询解决办法是设置&#xff0c;autocompleteoff&#xff0c;关于autocomplete的含义&#xff0c;官网参考如下: HTML attribute: autocomplete - HTML: HyperText Markup Language | MDN 在 autocomplete 的文档中说明了 value 为 off 时&#xff0c;浏览…

双子座 Gemini1.5和谷歌的本质

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

游戏软件测试流程

备注:本文为博主原创文章,未经博主允许禁止转载。如有问题,欢迎指正。 个人笔记(整理不易,有帮助,收藏+点赞+评论,爱你们!!!你的支持是我写作的动力) 笔记目录:学习笔记目录_pytest和unittest、airtest_weixin_42717928的博客-CSDN博客 个人随笔:工作总结随笔_8、…

【STL】list

目录 1. list的使用 1.1 list的构造 1.2 list iterator的使用 1.3 list capacity 1.4 list element access 1.5 list modifiers 1.6 list的迭代器失效 2. list的模拟实现 3. list与vector的对比 1. list的使用 1.1 list的构造 1.2 list iterator的使用 1. begin与end为…

JavaEE初阶Day 6:多线程(4)

目录 Day 6&#xff1a;多线程&#xff08;4&#xff09;1. 线程不安全的原因2. 锁3. synchronized Day 6&#xff1a;多线程&#xff08;4&#xff09; 前序&#xff1a;针对Day 5结尾的count 多线程的执行&#xff0c;是随机调度抢占式的执行模式&#xff0c;某个线程执行指…

KVM + GFS 分布式存储

目录 一、案例分析 1.1、案例概述 1.2、案例前置知识点 1&#xff09;Glusterfs 简介 2&#xff09;Glusterfs 特点 1.3、案例环境 1&#xff09;案例环境 2&#xff09;案例需求 3&#xff09;案例实现思路 二、案例实施 2.1、安装部署 KVM 虚拟化平台 1&…

Mac的终端配置

Mac的终端配置 参考教程包管理工具 - Homebrew出现的问题用虚拟环境解决方案&#xff1a;直接将解释器的路径放过去错误方法&#xff1a;用find查找到虚拟环境安装的路径&#xff0c;其链接的是brew安装的python路径 编辑器没有报错&#xff0c;但是运行过程中仍然找不到pandas…

JavaEE初阶Day 7:多线程(5)

目录 Day 7&#xff1a;多线程&#xff08;5&#xff09;1. 死锁2. 死锁场景3. 场景二&#xff1a;两个线程&#xff0c;两把锁4. 场景三&#xff1a;N个线程&#xff0c;M把锁5. 避免死锁问题6. 内存可见性问题 Day 7&#xff1a;多线程&#xff08;5&#xff09; 回顾synchr…

Windows下使用SDKMAN对JDK(Java)进行多版本管理

Windows下使用SDKMAN对JDK&#xff08;Java&#xff09;进行多版本管理 1.背景2.基于msys2工具2.1. msys2简介2.2. 安装与配置2.2. Windows环境变量配置参考2.3 结果确认 3. 基于WSL 1.背景 前端有nvm,python有miniconda,miniforge等&#xff0c;java呢&#xff1f;java在Linu…

对于常见的两种统计报表的写法和思路总结

常见的两种报表&#xff0c;一种是清单明细报表&#xff0c;一种是数据统计报表 简单讲 清单明细报表&#xff0c;就是订单主信息以及详情信息的展示&#xff0c;汇聚在一张大的报表上&#xff0c;用于导出和展示使用 数据统计报表&#xff0c;就是根据某些纬度&#xff0c;比如…

前端css笔记(pink老师)

css css书写顺序 自适应屏幕 html { width: 100%; height: 100%; display: table; } body { display: table-cell; } 用了这个方法以后&#xff0c;如果希望页面内的盒子也适应屏幕大小&#xff0c;则使用以下方法&#xff0c;会根据父亲的宽高计算出该盒子的宽高 width:xx%; …

java进阶---反射

获取class对象 Testvoid reflect() throws Exception {Class userClass User.class;System.out.println(userClass.getName()); // 全类名 com.zyw.proxydemo.pojo.UserSystem.out.println(userClass.getSimpleName()); // 简名 UserClass userClass1 Class.forName("co…

SpringBoot通过UUid实现文件上传接口及问题解决

在controller中&#xff0c;添加对应的方法体&#xff1a; PostMapping("/upload")ResponseBodypublic ApiRestResponse upload(HttpServletRequest httpServletRequest, RequestParam("file")MultipartFile file) throws IOException {String fileName f…

工业项目中你连PLM系统都没见过?

什么是 PLM 软件&#xff1f; PLM 软件是用于管理全球供应链中产品或服务全生命周期环节的解决方案。它包括从物料、零部件、产品、文档、规定、工程变更单到质量工作流的数据管理。 PLM 的发展历史 从最初的产品设计管理到如今的数字化转型和智能化生产&#xff0c;PLM 在不断…