XXL-JOB中断信号感知

目录

背景

思路

实现逻辑

总结


背景

  在使用xxl-job框架时,由于系统是由线程池去做异步逻辑,然后主线程等待,在控制台手动停止时,会出现异步线程不感知信号中断的场景,如下场景

而此时如果人工在控制台停止xxl-job执行,异步任务并不会感知到调度线程被interrupt了,上面3个异步任务仍旧执行,而主线程却退出了,如果此时再次调度该任务,而代码逻辑没做幂等,可能出现预期外的异常

思路

  先看看xxl-job trigger的时序图

原图plantuml

    @startuml
'https://plantuml.com/sequence-diagram!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用actor "User" as userbox "xxl-job-admin" #LightGrayparticipant "controller"  as controllerparticipant "trigger" as triggerparticipant "executor-proxy" as proxyparticipant "adminBiz" as admin
end boxbox "xxl-job-client" #LightGray
participant "executor"  as executor
participant "jobThread" as job
participant "callBackThread" as callback
participant "retryCallThread" as retryCallBack
end boxautonumber 1
user -> controller++:手动调度 /jobinfo/trigger
controller->trigger++: jobId/触发类型/参数
trigger->trigger:提交trigger任务group 异步流程trigger->trigger:根据jobId获取jobInfotrigger->trigger:获取执行器信息note left已注册的机器地址列表end notealt 分片广播looptrigger->trigger:遍历触发end loopelse 其他trigger->trigger:单个触发endend groupreturn 返回提交结果== 异步rpc触发==autonumber 1group 触发流程trigger->trigger:获取路由策略&阻塞策略trigger->trigger:根据路由策略获取需调度的机器地址trigger -> proxy++:获取执行器代理对象&缓存note leftjdk代理+nettyxxljob的log是客户端记录在本地文件admin调用时也通过代理调用远端接口end noteproxy->executor:远程调用(传递触发信息)executor->executor:根据jobId获取执行线程executor->executor:获取job执行器alt 执行线程不为空executor->executor:根据阻塞策略处理endalt 执行线程为空executor->executor:新建job线程endexecutor->job++:把任务参数加入阻塞队列job->job:jobId去重return:返回结果return:返回结果end group== 异步jobThread ==autonumber 1job->job:执行handler init 方法loop toStop=falsejob->job:从阻塞队列中获取任务参数job->job:准备工作note left状态设置为运行中空闲次数=0去除jobId设置logFile&分片信息end notealt 超时时间>0job->job:新建线程处理handler信息elsejob->job:本线程处理handler信息endjob->job:把执行结果or终止结果加入callback阻塞队列end loopjob->job:清除阻塞队列里的待任务note left此时已经该线程已经被停止了end note== 异步callBackThread ==autonumber 1loop toStop=falsecallback->callback:从callback阻塞队列中获取callback参数alt 获取成功callback->callback:清空当前阻塞队列中的参数,并将其放到一个新的listloop 遍历admin列表callback->controller++:调用callback接口controller->admin:调用callback逻辑alt 任务处理成功admin->admin:获取job信息admin->admin:获取子任务信息loop 遍历子任务admin->trigger:提交trigger任务end loopadmin->admin:更新job信息endreturn:返回回调结果callback->callback:记录日志到本地文件alt 回调失败callback->callback:记录序列化后的失败参数,用于重试endend loopendend loop
== 重试retryCallBack ==
autonumber 1loop toStop=falseretryCallBack->retryCallBack:获取本地重试文件信息retryCallBack->retryCallBack:反序列化内容,重试callback请求end loop
@enduml

主要关注异步JobThread部分,可以看出是有个toStop的flag去感知这个中断信号的,那怎么去获取toStop的信息呢?这里可以通过起另一个线程去检查这个信号,如果为stop,则透传到异步task中,设计流程如下

原图plantuml

@startuml
'https://plantuml.com/sequence-diagram!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用actor "xxl-job-admin" as userbox "xxl-job-client" #LightGrayparticipant "xxl-client"  as clientparticipant "xxl-main-thread"  as mainThreadparticipant "check-interrupt-thread" as checkThreadparticipant "async-task..." as asyncThread
end boxautonumber 1
user -> client++:手动调度 /jobinfo/trigger
client->client:加入任务队列
return
client-->mainThread:获取队列任务执行
mainThread->mainThread++:init
mainThread->checkThread++:定期检查mainThread的 stopFlag属性
loop
checkThread->checkThread:定期检查停止属性属性end loop
mainThread->mainThread:初始化完毕
mainThread->asyncThread:分发任务
asyncThread-->asyncThread++:任务执行
mainThread->mainThread:等待子任务执行完成
user->client:手动中断任务
client->client:捞取jobId对应的线程
client->mainThread:调用暂停方法,interrupt,设置 stopFlag
mainThread-->client:返回暂停结果
mainThread->mainThread:等待执行中的子任务完成
checkThread->asyncThread:设置给子任务 stopFlag
asyncThread->asyncThread:业务逻辑判断 stopFlag
return:stop
mainThread->mainThread:等待检查线程完成
return:check-thread end
mainThread->mainThread:后置处理
return:stop
@enduml

即对于异步的任务,可以做一个封装,用于接受中断信号,而信息的传递则通过threadLocal复制的方式给到异步任务,主要是解决中断信号如何传递到异步任务的问题,异步任务可以通过某个方法来获取主线程是否中断

要点如下

  1. 感知xxl-job主线程的中断信号
  2. 传递中断信号到异步任务,异步任务执行的方法可以手动调用某个方法判断是否中断,进而更快地停止任务

实现逻辑

定义异步任务封装类,用于接受信息

public class TaskWrapper<T> implements Runnable {private Runnable runnable;private volatile boolean isInterrupt;private Supplier<T> supplier;private T result;private final String taskId;private Map<String, String> copyMdc = null;//有需要传递的变量可以通过context传递private Map<String, Object> executeContext = null;Throwable errorCause;TaskWrapper(Runnable runnable, String taskId) {this.runnable = runnable;this.isInterrupt = false;this.taskId = taskId;copyMdc = MDC.getCopyOfContextMap();executeContext = XxlShardingTask.getCopyOfContext();}TaskWrapper(Supplier<T> supplier, String taskId) {this.supplier = supplier;this.isInterrupt = false;this.taskId = taskId;copyMdc = MDC.getCopyOfContextMap();executeContext = XxlShardingTask.getCopyOfContext();}@Overridepublic void run() {if (!CollectionUtils.isEmpty(copyMdc)) {MDC.setContextMap(copyMdc);}if (!CollectionUtils.isEmpty(executeContext)) {XxlShardingTask.setExecuteContext(executeContext);}XxlShardingTask.setWrapper(this);try {if (isInterrupt) {return;}if (runnable != null) {runnable.run();}if (supplier != null) {result = supplier.get();}} finally {MDC.clear();XxlShardingTask.removeContext();}}static boolean isInterrupt() {return Optional.ofNullable(XxlShardingTask.getFromContext(XxlShardingTask.EXECUTE_KEY)).map(e -> ((TaskWrapper<?>) e).interrupted()).orElse(Boolean.FALSE);}public T getResult() {return result;}public String getTaskId() {return taskId;}public Throwable getErrorCause() {return errorCause;}/*** 是否成功** @return*/public boolean isSuccess() {return !isInterrupt && errorCause == null;}public boolean interrupted() {return isInterrupt;}synchronized void setInterrupt() {this.isInterrupt = true;}
}

在xxljob的主线程初次调用时,会调用init方法,定一个handler继承xxljob的IJobHandler,并实现

他的init方法,新建检查线程用于check中断信号,执行过程中,会把当前在跑的任务丢到一个map中存储,而检查线程会调用异步任务,把对应的标志未置为停止

public abstract class XxlAsyncTaskHandler<T> extends IJobHandler {
...public void init() throws InvocationTargetException, IllegalAccessException {super.init();JobThread thread = (JobThread) Thread.currentThread();Field toStop = ReflectionUtils.findField(JobThread.class, "toStop");if (toStop == null) {throw new IllegalStateException("current thread don't have field [toStop],please check the xxl-job version");}mainThreadInterrupt.set(false);ReflectionUtils.makeAccessible(toStop);checkInterruptThread = new Thread(() -> {try {while (!mainThreadInterrupt.get()) {TimeUnit.MILLISECONDS.sleep(getCheckInterruptMills());if ((boolean) toStop.get(thread)) {if (mainThreadInterrupt.compareAndSet(false, true)) {currentRunTask.forEach((s, tTaskWrapper) -> {tTaskWrapper.setInterrupt();});}}}} catch (InterruptedException e) {//ignore} catch (Exception ex) {LOGGER.error("check interrupt error", ex);}});checkInterruptThread.start();}}

主流程(即xxl-job调度线程所执行的execute方法)通过获取待执行的任务,对其进行封装,并加入到当前在运行的任务map中,核心的代码如下,逻辑流程

  1. 从任务生成器中获取待执行的封装好的任务
  2. 并加入到异步线程池执行
  3. 主线程等待
 while (currentTaskGenerator.hasNextTask()) {List<TaskWrapper<T>> wrappers = new ArrayList<>();for (int i = 0; i < parallelCount; i++) {if (currentTaskGenerator.hasNextTask()) {TaskWrapper<T> nextTask = currentTaskGenerator.getNextTask();String taskId = nextTask.getTaskId();
//加入到当前执行中的任务currentRunTask.put(taskId, nextTask);CompletableFuture.runAsync(nextTask, executor).whenComplete((unused, throwable) -> {if (throwable != null) {currentRunTask.get(taskId).errorCause = throwable;} else {if (nextTask.isSuccess()) {successCount.incrementAndGet();}}
//任务处理完,countDown一下count.countDown();currentRunTask.remove(taskId);});//代表任务分配完毕} else {count.countDown();}}
//主线程等待count.await();

对于异步任务的逻辑

由于开始时设置当前执行的封装任务到本地线程,可以通过static方法进行获取标识,比如循环或者一些较重的耗时操作,可以在执行前进行判断,如果中断了就返回结果

  protected static boolean isWorkerInterrupt() {return TaskWrapper.isInterrupt();}

比如继承该类,子类可以在业务逻辑进行判断

            while (!isWorkerInterrupt()) {
...业务逻辑
}

由于整块优化的异步调度任务的代码比较多,而且涉及了公司信息,不在此展示,重点在于

  1. xxl-job异步线程如何感知主线程中断信息——了解xxljob trigger原理,封装runnable,管理当前封装的runnable任务,把中断信息透传异步任务
  2. 线程间的信息如何传递——这里通过封装runnable类作为一个信息载体,threadLocal用于接受信息,实现不同线程的信息传递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/867564.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

笔记13:switch多分支选择语句

引例&#xff1a; 输入1-5中的任意一共数字&#xff0c;对应的打印字符A,B,C,D,E int num 0; printf("Input a number[1,5]:"); scanf("%d"&#xff0c;&num); if( num 1)printf("A\n"); else if(num2)printf("B\n"); else i…

Alibaba Cloud Toolkit前端使用proxy代理配置

1、vscode 先安装插件 Alibaba Cloud Toolkit 2、前端代码&#xff1a; /personnel: {// target: http://xxx.xx.xxx.xx:9100, // 测试环境// target: http://xxx.xx.xxx.xx:9200, // 线上环境target: http://127.0.0.1:18002, // toolkit 代理changeOrigin: true,},3、打开插…

Android LayoutInflater 深度解析

在 Android 开发中&#xff0c;LayoutInflater 是一个非常重要的工具。它允许我们从 XML 布局文件中动态地创建 View 对象&#xff0c;从而使得 UI 的创建和管理更加灵活。本文将深入解析 android.view.LayoutInflater&#xff0c;包括它的基本用法、常见问题以及高级用法。 什…

MySQL架构和工作流程

引言&#xff1a;MySQL执行一条sql语句期间发生了什么&#xff1f; 想要搞清楚这个问题&#xff0c;我们必须了解MySQL的体系结构和工作流程 一、MySQL体系结构 MySQL由以下几个部分组成 一、server层 1.MySQL Connnectors连接器&#xff0c;MySQL的连接池组件&#xff0c;…

数据结构+算法-实现一个计算器

在学习栈的数据结构的时候讲到可以用栈来实现一个计算器的功能&#xff0c;那么这个功能是如何实现的呢&#xff1f; 采用栈模拟得方式来实现一个计算器 要实现如下的功能: 字符串如何转为整数 2.处理加减法 如何处理加减法呢&#xff1f; 5-128 给第一个数字前面放一个号…

UEC++ 虚幻5第三人称射击游戏(二)

UEC++ 虚幻5第三人称射击游戏(二) 派生榴弹类武器 新建一个继承自Weapon的子类作为派生榴弹类武器 将Weapon类中的Fire函数添加virtual关键字变为虚函数让榴弹类继承重写 在ProjectileWeapon中重写Fire函数,新建生成投射物的模版变量 Fire函数重写逻辑 代码//生成的投射物U…

从文本到安全图像:自动提示优化防止不当内容生成

T2I生成技术已经得到了广泛关注&#xff0c;并见证了如GLIDE、Imagen、DALL-E 2、Stable Diffusion等大型生成模型的发展。尽管这些模型能够根据文本描述生成高质量的图像&#xff0c;促进了书籍插图、品牌标识设计、游戏场景创作等多种实际应用&#xff0c;但它们也被恶意用户…

使用京东云主机搭建幻兽帕鲁游戏联机服务器全流程,0基础教程

使用京东云服务器搭建幻兽帕鲁Palworld游戏联机服务器教程&#xff0c;非常简单&#xff0c;京东云推出幻兽帕鲁镜像系统&#xff0c;镜像直接选择幻兽帕鲁镜像即可一键自动部署&#xff0c;不需要手动操作&#xff0c;真正的新手0基础部署幻兽帕鲁&#xff0c;阿腾云整理基于京…

Python学习笔记30:进阶篇(十九)pygame的使用之显示与窗口管理

前言 基础模块的知识通过这么长时间的学习已经有所了解&#xff0c;更加深入的话需要通过完成各种项目&#xff0c;在这个过程中逐渐学习&#xff0c;成长。 我们的下一步目标是完成python crash course中的外星人入侵项目&#xff0c;这是一个2D游戏项目。在这之前&#xff…

YOLOv8改进 | 注意力机制 | 结合静态和动态上下文信息的注意力机制

秋招面试专栏推荐 &#xff1a;深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 &#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 专栏目录 &#xff1a;《YOLOv8改进有效…

力扣双指针算法题目:双数之和,三数之和,四数之和

目录 一&#xff1a;双数之和 1.题目&#xff1a; 2.思路解析 3.代码 二&#xff1a;三数之和 1.题目 2.思路解析 3&#xff0c;代码 三&#xff1a;四数字之和 1.题目 2.思路解析 3.代码 一&#xff1a;双数之和 1.题目&#xff1a; 输入一个递增排序的数组和一…

贵州建筑三类人员安全员2024年考试最新题库练习题

一、单选题 1.建设工程安全管理的方针是&#xff08;&#xff09;。 A.安全第一&#xff0c;预防为主&#xff0c;综合治理 B.质量第一&#xff0c;兼顾安全 C.安全至上 D.安全责任重于泰山 答案&#xff1a;A 2.安全生产管理的根本目的是&#xff08;&#xff09;。 A.…

Lunaproxy与711Proxy的对比与优劣分析

今天我们来深入对比两款在市场上备受关注的代理IP服务&#xff1a;Lunaproxy和711Proxy。接下来&#xff0c;我们将从多个角度对这两款服务进行详细分析&#xff0c;帮助大家做出明智的选择。 优势分析 711Proxy的优势 1. 性价比高&#xff1a;711Proxy提供多种灵活的套餐选…

伪元素content追加文字使用小技巧

E::before和E::after本身的作用是追加字&#xff0c;直接在文字后面追加链接 <!DOCTYPE html> <html lang"zh-cn"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-sca…

AI文本转语音,再也不用担心视频配音了.

文章目录 简介代码实现调用开通百度付费包 简介 背景 我想要将文本,转为语音,然后配上图片,这样就可以很快生成一个视频. 可以说是配音吧,我还是比较喜欢通过代码来自动化.所以今天就来实现一下,同时做一下分享和记录.目标 通过python代码,自动将文本转为配音.平台 我选择了百…

万界星空科技MES系统中的排版排产功能

在当今高度竞争的市场环境中&#xff0c;企业对于生产管理的效率和质量要求日益提高。作为智能制造的重要组成部分&#xff0c;制造执行系统&#xff08;MES&#xff09;以其强大的功能&#xff0c;在提升企业生产能力方面发挥着不可替代的作用。万界星空科技作为行业领先的智能…

MongoDB集群搭建-最简单

目录 前言 一、分片概念 二、搭建集群的步骤 总结 前言 MongoDB分片&#xff08;Sharding&#xff09;是一种水平扩展数据库的方法&#xff0c;它允许将数据分散存储在多个服务器上&#xff0c;从而提高数据库的存储容量和处理能力。分片是MongoDB为了应对大数据量和高吞吐量需…

Science期刊政策反转:允许生成式AI用于论文写作,意味着什么?

我是娜姐 迪娜学姐 &#xff0c;一个SCI医学期刊编辑&#xff0c;探索用AI工具提效论文写作和发表。 关于各大top期刊和出版社对于生成式AI用于论文写作中的规定&#xff0c;娜姐之前写过一篇文章&#xff1a; 如何合理使用AI写论文&#xff1f;来看Top 100学术期刊和出版社的…

深度解析 Raft 分布式一致性协议

本文参考转载至&#xff1a;浅谈 Raft 分布式一致性协议&#xff5c;图解 Raft - 白泽来了 - 博客园 (cnblogs.com) 深度解析 Raft 分布式一致性协议 - 掘金 (juejin.cn) raft-zh_cn/raft-zh_cn.md at master maemual/raft-zh_cn (github.com) 本篇文章将模拟一个KV数据读写服…

【漏洞复现】禅道——未授权登入(QVD-2024-15263)

声明&#xff1a;本文档或演示材料仅供教育和教学目的使用&#xff0c;任何个人或组织使用本文档中的信息进行非法活动&#xff0c;均与本文档的作者或发布者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 禅道&#xff08;Zentao&#xff09;是一款开源的项目管理和协作…