Jboss EnhancedQueueExecutor 使用案例及源码解读

使用案例

EnhancedQueueExecutor配置类

@Configuration
@Slf4j
public class EnhancedQueueExecutorConfig {@Beanpublic EnhancedQueueExecutor enhancedQueueExecutor() {return createExecutor(5, 100,"enhancedQueueExecutor","任务处理失败 {}");}private EnhancedQueueExecutor createExecutor(int coreSize, int maxSize, String executorName, String errorMessagePattern) {int queueCapacity = 512;Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> log.error(errorMessagePattern,e.getMessage(), e);var threadFactory = new ThreadFactoryBuilder().setNameFormat(executorName + "-%d").setThreadFactory(Thread.ofVirtual().factory()).setUncaughtExceptionHandler(uncaughtExceptionHandler).build();// coreSize->maxSize->queue->rejectfinal float growthResistance = 0.0F;//允许core超时,以尽量减少线程数return new EnhancedQueueExecutor.Builder()// 核心线程数.setCorePoolSize(coreSize)// 最大线程数.setMaximumPoolSize(maxSize)// 增长因子,控制新线程创建逻辑(if >= coreSize时).setGrowthResistance(growthResistance).setHandoffExecutor(JBossExecutors.directExecutor()).allowCoreThreadTimeOut(true).setKeepAliveTime(Duration.ofMinutes(4))// 队列大小.setMaximumQueueSize(queueCapacity).setThreadFactory(threadFactory).setExceptionHandler(uncaughtExceptionHandler).setRegisterMBean(false).build();}
}

业务处理类

@Component
@Slf4j
public class BusinessTaskExecuteService {@Resourceprivate EnhancedQueueExecutor enhancedQueueExecutor;@Resourceprivate RedissonClient redissonClient;public void addTaskIfExecutorNotFull(String bussinessKey){if (!EnhancedQueueExecutorUtils.isTaskFull(enhancedQueueExecutor)) {EnhancedQueueExecutorUtils.wrapAndSubmit(enhancedQueueExecutor, () -> {var lock = redissonClient.getLock("taskLock-" + bussinessKey);if (!lock.tryLock()) {return;}try {// 业务逻辑} finally {lock.unlock();}});} else {Thread.sleep(2_000 * seconds);}}

EnhancedQueueExecutor工具类

public class EnhancedQueueExecutorUtils {public static void joinOrLogE(EnhancedQueueExecutor executor, Future<?> future) {try {future.get();} catch (Throwable t) {executor.getExceptionHandler().uncaughtException(null, t);}}public static <T extends Throwable> Future<?> wrapAndSubmit(EnhancedQueueExecutor executor, Action0<T> realRun) {return executor.submit(wrap(executor, realRun));}public static <T, E extends Throwable> Future<T> wrapAndSubmit(EnhancedQueueExecutor executor,Fun0<T, E> realRun) {return executor.submit(wrap(executor, realRun));}public static <T extends Throwable> Runnable wrap(EnhancedQueueExecutor executor, Action0<T> realRun) {return () -> wrapAndRun(executor, realRun);}public static <T, E extends Throwable> Callable<T> wrap(EnhancedQueueExecutor executor, Fun0<T, E> realRun) {return () -> wrapAndRun(executor, realRun);}public static <T extends Throwable> void wrapAndRun(EnhancedQueueExecutor executor, Action0<T> realRun) {try {realRun.run();} catch (Throwable t) {executor.getExceptionHandler().uncaughtException(Thread.currentThread(), t);}}public static <T, E extends Throwable> T wrapAndRun(EnhancedQueueExecutor executor, Fun0<T, E> realRun) {try {return realRun.call();} catch (Throwable t) {executor.getExceptionHandler().uncaughtException(Thread.currentThread(), t);}return null;}public static void joinAllOrLogE(EnhancedQueueExecutor executor, Future<?>... futures) {for (var f : futures) {joinOrLogE(executor, f);}}public static void joinAllOrLogE(EnhancedQueueExecutor executor, List<Future<?>> futureList) {futureList.forEach(f -> joinOrLogE(executor, f));}public static boolean isTaskFull(EnhancedQueueExecutor executor) {//避免超限处理,这里非精确判断int activeCount = executor.getActiveCount();int maxSize = executor.getMaximumPoolSize();int coreSize = executor.getCorePoolSize();return activeCount > coreSize && activeCount > maxSize - 1;}public static void wait2NotFull(EnhancedQueueExecutor executor) {while (isTaskFull(executor)) {ExceptionUtils.doActionLogE(() -> TimeUnit.SECONDS.sleep(10));}}@FunctionalInterfacepublic interface Action0<E extends Throwable> {/*** 具体的执行过程,外层会catch住** @throws E 可以throw任何异常*/void run() throws E;}@FunctionalInterfacepublic interface Fun0<T, E extends Throwable> {/*** 具体的调用, 并有相应的返回数据** @return 此调用返回的结果* @throws E 可以throw任何异常,外层会catch住*/T call() throws E;}
}

源码解读

在JDK线程池中自带的Executor遵循一种典型的生产者,消费者队列模型,即一个统一的阻塞队列,然后一个线程数组不停地消费其中的数据。其本身的处理逻辑为 coreSize->queueSize->maxSize 的增长方式,即

  1. 如果线程数没有达到核心线程数(coreSize),则创建核心线程数处理任务;
  2. 如果线程数大于或者等于核心线程数(coreSize),则将任务加入任务队列(workQueue),线程池中的空闲线程会不断的从任务队列中取出任务进行处理;
  3. 如果任务队列满了,且线程数没有达到最大线程数(maxSize),则会创建非核心线程去处理任务。
  4. 如果线程数超过了最大线程数,则执行拒绝策略

通过一些手法可以调整策略为 coreSize->maxSize->queueSize

本文则描述一个由 jboss-threads 中提到的 EnhancedQueueExecutor,中文为增加型队列执行器。其除支持典型的executor模型外,也同样保留如 coreSize, maxSize, queueSize 这些模型。与jdk中实现相区别的是,其本身采用单个链表来完成任务的提交和线程的执行,同时采用额外的数据来存储计数类数据。

更重要的是,其默认线程策略即 coreSize->maxSize->queueSize, 同时可以根据参数调整此策略。

创建对象与ThreadPoolExecutor类似,指定相应的参数即可,如下所示:

EnhancedQueueExecutor executor = new EnhancedQueueExecutor.Builder().setCorePoolSize(coreSize).setMaximumPoolSize(maxSize).setKeepAliveTime(Duration.ofMinutes(5)).setMaximumQueueSize(1024).setThreadFactory(threadFactory).setExceptionHandler(uncaughtExceptionHandler).setRegisterMBean(false).setGrowthResistance(growthResistance) //增长因子,控制新线程创建逻辑(if >= coreSize时).build();

链表结构

暂不考虑其它特殊节点

null->TaskNode->TaskNode...TaskNode->PoolThreadNode->PoolThreadNode...PoolThreadNode->null^                             ^|                             | 
head                          tail
  • TaskNode 表示此为1个任务节点,内部封装具体要执行的runnable任务
  • PoolThreadNode 表示此为1个线程节点,内部封装着具体的执行线程以及相应的状态
  • head 和 tail 为特殊标记节点。head 表示任务节点的头部,tail 则表示线程节点的头部(也可理解为任务节点的尾部)

整个链表可以理解为2个部分,前半部分全为任务节点,后半部分全为线程节点。仅当线程节点为等待状态(waiting)状态时,其才会被加入到链表中(以方便获取)。即可以理解为链表中后部分为等待线程列表。

在类上,相应的类定义如下

static final class TaskNode extends QNode {volatile Runnable task; //具体要执行的任务
}static abstract class PoolThreadNodeBase extends QNode{}static final class PoolThreadNode extends PoolThreadNodeBase {private final Thread thread; //执行线程本身@SuppressWarnings("unused")private volatile Runnable task; //当前正在处理的任务
}

场景一:新增任务(无等待线程)

入口均为execute(Runnable runnable),这里会通过函数 tryExecute 来决定相应的结果值。

这里有多种情况如下所示:

  1. 链表是空的,无新线程
  2. 链表非空,且有线程在等待
  3. 链表非空,已运行线程均繁忙,但小于coreSize或maxSize
  4. 链表非空,已运行线程均繁忙,且>=maxSize,且队列已满

在上面的第4种情况时,会返回状态码 EXE_REJECT_QUEUE_FULL,这里会触发 handoff,类似于jdk中的拒绝策略。

而第1种和第3种情况,则会根据 growthResistance 判断是否可以新创建线程。这里默认值为0,则表示当 < maxSize 时均可以创建,因此返回状态码 EXE_CREATE_THREAD.

这里触发操作 doStartThread(Runnable),其基本实现即创建新线程,然后执行之。如下参考所示

boolean doStartThread(Runnable runnable) throws RejectedExecutionException {thread = threadFactory.newThread(new ThreadBody(runnable));thread.start();
}

ThreadBody 用于封装执行线程的具体逻辑,而任务runnable则被认为是初始任务对象. 因此,执行线程的初始逻辑即是执行初始任务本身,如下参考所示

public void run() {final Thread currentThread = Thread.currentThread();runningThreads.add(currentThread);doRunTask(getAndClearInitialTask());
}

以上,即会直接执行初始任务,并清除对象(为后续轮循和等待作处理)。

场景二:新增任务(已存在等待的线程)

当场景一中的执行线程在执行完初始任务之后,并不会直接退出,而是进入一个任务循环。可以理解为不断地拿任务并执行。其处理逻辑如下流程所示

  1. 将自己封装为 PoolThreadNode 即线程节点
  2. 调用getOrAddNode(PoolThreadNode) 尝试获取任务或加入到链表中。其逻辑,即是从head节点开始遍列,如果存在 TaskNode,则将其从链表中移除(通过修改head指针实现)并返回。如果不存在,则将自己放入 TaskNode 节点尾部(其它 线程节点的头部,即LIFO 模式),并返回自己。
  3. 第2步返回数据为 this,即表示不能获取任务。则调用线程暂时指令将自己挂起.。具体逻辑由 part(EnhancedQueueExecutor,long) 完成。

通过park将当前线程挂起,则对应则有unpark以恢复。

以上的流程代码如下所示(即执行线程的处理逻辑)

processingQueue: for (;;) {node = getOrAddNode(nextPoolThreadNode);if (node instanceof TaskNode) {...//拿到任务} else if (node == nextPoolThreadNode) {final PoolThreadNode newNode = nextPoolThreadNode;nextPoolThreadNode = new PoolThreadNode(currentThread);waitingForTask: for (;;) {Runnable task = newNode.getTask();if (task != WAITING &amp;&amp; task != EXIT) {//这里表示有其它线程给自己塞了任务(非主动调用)if (newNode.compareAndSetTask(task, ACCEPTED)) {doRunTask(task);continue processingQueue;}continue waitingForTask;} else {final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos;long oldVal = threadStatus;//这里处理几种情况//1 等待超时//2 被无意间唤醒(参考 LockSupport.park)//3 继续等待//这里即调用park实现 线程挂起newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);}Thread.interrupted();continue waitingForTask;} // :waitingForTask} 

在上面轮循中,有一个场景即是 task 字段被其它线程更新,以触发被动拿到任务的情况。这里的 其它线程即是提交任务的线程,即是在场景一中提交任务时,通过 线程节点 更新任务的处理逻辑。

在新增任务步骤中的第2个步骤即是这种情况。在提交任务中,如果发现链表中存在 线程节点 PoolThreadNode,即表示有等待线程,这里即是更新其 task 字段,并唤醒线程。相应的代码仍在 tryExecute 中,如下所示

private int tryExecute(final Runnable runnable) {QNode tailNext;TaskNode tail = this.tail;for (;;) {tailNext = tail.getNext();...//这里跳过所有已有的TaskNode//这里找到执行节点,则将其从链表中删除(不再等待)if (tailNext instanceof PoolThreadNode) {final QNode tailNextNext = tailNext.getNext();if (tail.compareAndSetNext(tailNext, tailNextNext)) {PoolThreadNode consumerNode = (PoolThreadNode) tailNext;//替换相应的 task 对象为待执行任务if (consumerNode.compareAndSetTask(WAITING, runnable)) {//挂起线程解锁consumerNode.unpark();return EXE_OK;}}

从上面的逻辑可以看出,执行线程在被放入链表并挂起时,其task为WAITING。而恢复时,先将节点从链表中移除(避免其它线程再重入),再将WAITING更换为新任务,再解锁。而执行线程执行完此任务后,又将重新进入轮循。

场景三: 线程轮循任务

在场景二中,已初步描述了执行线程如何处理任务,其重点就在于 getOrAddNode(PoolThreadNode), 如果链表中存在任务数据,则一定会返回 TaskNode(并移除).。逻辑可以理解为一个简单的 continue 循环,如下所示

processingQueue: for (;;) {node = getOrAddNode(nextPoolThreadNode);if (node instanceof TaskNode) {// task node was removeddoRunTask(((TaskNode) node).getAndClearTask());continue;}//其它逻辑
}

而 getOrAddNode 方法,则是不断地从head节点获取相应的数据. 因为head即表示 任务节点的起始点,这里也就表示 任务的执行是 FIFO,新的任务节点会放在原链表中TaskNode结尾(见tryExecute 跳过taskNode逻辑)

相应的代码如下所示:

private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) {TaskNode head;QNode headNext;for (;;) {head = EnhancedQueueExecutor.this.head;headNext = head.getNext();if (headNext instanceof TaskNode) {TaskNode taskNode = (TaskNode) headNext;//这里即找到了TaskNode,调整head指针, 并返回if (compareAndSetHead(head, taskNode)) {return taskNode;}}}
}

场景四: 线程超时退出

在执行节点的处理流程中,一部分即是处理超时退出的情况。这里相应的逻辑,即是先尝试通过 park 挂起线程,但当挂起的时候超过预定时间后,则会触发超时流程。其整个流程仍在整个轮循之内,其大概流程如下所示

  1. 是否触发超时流程
  2. 当前线程是否应该被退出,如果退出,则执行退出流程
  3. 如果当前线程不应该被退出,则挂起(永久或timed挂起)
  4. 还没触发超时流程,则继续timed挂起

退出一个线程的最正常方法,则是直接return,即当1个线程的 run 方法 return 时,即表示这个线程被成功执行结束,即退出了

上面流程的代码如下所示

final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos;long oldVal = threadStatus;if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) {//进入超时流程if (task == EXIT || isShutdownRequested(oldVal) || isAllowCoreTimeout(oldVal) || currentSizeOf(oldVal) > coreSizeOf(oldVal)) {//以下即置任务为GIVE_UP,并最终return 退出线程if (newNode.compareAndSetTask(task, GAVE_UP)) {for (;;) {if (tryDeallocateThread(oldVal)) {runningThreads.remove(currentThread);return;}oldVal = threadStatus;}}continue waitingForTask;} else {//这里可能为core线程,不允许退出的情况。因此为 永久挂起if (elapsed >= timeoutNanos) {newNode.park(EnhancedQueueExecutor.this);} else {newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);}}} else {//Timed挂起newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);continue waitingForTask;}
}

总结

以上涉及到的代码均在类 EnhancedQueueExecutor 中, 其代码可以与 AQS(AbstractQueuedSynchronizer) 相对照,对了解多线程间协作和运行有相关的帮助。在具体使用中,也可以作为Jdk Executor的一种替代,特别是在 需要使用 优先动态调节线程大小的场景中,其是一个优先的考虑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66030.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac中配置vscode(第一期:python开发)

1、终端中安装 xcode-select --install #mac的终端中安装该开发工具 xcode-select -p #显示当前 Xcode 命令行工具的安装路径注意&#xff1a;xcode-select --install是在 macOS 上安装命令行开发工具(Command Line Tools)的关键命令。安装的主要组件包括&#xff1a;C/C 编…

快速将索尼手机联系人导出为 HTML 文件

我想将 Sony Xperia 手机上的联系人导出到计算机上进行备份&#xff0c;并在需要时进行编辑。这可以做到吗&#xff1f;如何做到&#xff1f;作为助手我需要下载什么工具吗&#xff1f; 当您的 Android 手机上存储了如此多的重要联系人&#xff0c;而您又不想丢失它们时&#…

学习threejs,导入AWD格式的模型

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.AWDLoader AWD模型加…

【OAA 】面向对象分析:从概念到实践

&#x1f525;个人主页&#xff1a; 中草药 &#x1f525;专栏&#xff1a;【Java】登神长阶 史诗般的Java成神之路 我们都知道Java是一门面向对象的开发语言&#xff0c;在软件开发的广袤天地中&#xff0c;面向对象分析&#xff08;Object-Oriented Analysis&#xff0c;简称…

jvm结构介绍

JVM结构概述 Java虚拟机&#xff08;JVM&#xff09;是Java程序的运行环境&#xff0c;它负责将Java字节码转换为机器码并执行。JVM的结构主要包括类加载子系统、运行时数据区、执行引擎、本地接口以及垃圾收集器。 1. 类加载子系统&#xff08;Class Loader Subsystem&#xf…

Nginx常用配置之详解(Detailed Explanation of Common Nginx Configurations)

Nginx常用配置详解(图文全面总结) Nginx Nginx 是一款轻量级的高性能 HTTP、 和反向代理服务器。 Nginx&#xff0c;被广泛用于负载均衡、静态文件服务、和代理.........等。 Nginx&#xff0c;以高并发、低内存占用、和高可用性著称&#xff0c;大部分的大厂以及公司都在使…

Win11+WLS Ubuntu 鸿蒙开发环境搭建(二)

参考文章 penHarmony南向开发笔记&#xff08;一&#xff09;开发环境搭建 OpenHarmony&#xff08;鸿蒙南向开发&#xff09;——标准系统移植指南&#xff08;一&#xff09; OpenHarmony&#xff08;鸿蒙南向开发&#xff09;——小型系统芯片移植指南&#xff08;二&…

ubuntu 使用s3fs配置自动挂载对象存储

一、环境准备 1.有访问对象存储权限的AKSK 2.服务器、对象存储 二、实施步骤 sudo apt update sudo apt install s3fs echo "AK:SK" >/home/ubuntu/.passwd-s3fs ---位置自定义 chmod 600 /home/ubuntu/.passwd-s3fs ---权限必须要有 mkdir /data sudo s3fs …

聚铭网络受邀参加2024年南京市信息技术应用创新产业供需对接会

近日&#xff0c;备受瞩目的2024年南京市信息技术应用创新产业供需对接会&#xff08;‘宁工品推’信创、商用密码专场&#xff09;”在中国&#xff08;南京&#xff09;软件谷云密城圆满举办。聚铭网络作为信创领域的杰出代表厂商&#xff0c;受邀出席本次大会&#xff0c;为…

关于 AWTK 和 Weston 在旋转屏幕时的资源消耗问题

关于 AWTK 和 Weston 在旋转屏幕时的资源消耗问题&#xff0c;首先需要理解这两者旋转的本质区别及其资源开销。 AWTK的屏幕旋转&#xff1a; AWTK旋转的实现方式&#xff1a; AWTK 是一个用户界面工具包&#xff0c;它通过图形渲染系统处理所有控件和窗口的旋转。当你使用 w…

RS485方向自动控制电路分享

我们都知道RS485是半双工通信&#xff0c;所以在传输的时候需要有使能信号&#xff0c;标明是发送还是接收信号&#xff0c;很多时候就简单的用一个IO口控制就好了&#xff0c;但是有一些低成本紧凑型的MCU上&#xff0c;一个IO口也是很珍贵的&#xff0c;因此&#xff0c;如果…

UE5材质节点Frac/Fmod

Frac取小数 Fmod取余数 转场效果 TimeMultiplyFrac很常用 Timesin / Timecos 制作闪烁效果

图神经网络_GNN从入门到入门

文章目录 0 提出背景1 网络结构2 GNN算法2.1 算法描述2.2 举个栗子 3 GNN本质4 应用领域5 代码案例5.1 PyG的下载5.2 常用数据集介绍5.3 one demo 0 提出背景 经典的深度神经网络适用于 欧几里得数据&#xff08;Euclidean data&#xff09;&#xff0c;比如我们常常用卷积神经…

CDGA数据治理工程师-学习笔记

目录 第一章 数据管理 组织管理数据的目标&#xff1a; 数据管理的原则&#xff1a; 数据生命周期&#xff1a; 数据管理战略的组成应包括&#xff1a; 第二章 数据伦理 目标 数据伦理活动 数据处理伦理问题 贝尔蒙特 数据伦理准则 违背伦理进行数据处理的风险 建立…

AWS K8s 部署架构

Amazon Web Services&#xff08;AWS&#xff09;提供了一种简化的Kubernetes&#xff08;K8s&#xff09;部署架构&#xff0c;使得在云环境中管理和扩展容器化应用变得更加容易。这个架构的核心是AWS EKS&#xff08;Elastic Kubernetes Service&#xff09;&#xff0c;它是…

计算机网络 (16)数字链路层的几个共同问题

一、封装成帧 封装成帧是数据链路层的一个基本问题。数据链路层把网络层交下来的数据构成帧发送到链路上&#xff0c;以及把接收到的帧中的数据取出并上交给网络层。封装成帧就是在一段数据的前后分别添加首部和尾部&#xff0c;构成了一个帧。接收端在收到物理层上交的比特流后…

网页单机版五子棋小游戏项目练习-初学前端可用于练习~

今天给大家分享一个 前端练习的项目&#xff0c;技术使用的是 html css 和javascrpit 。希望能对于 刚刚学习前端的小伙伴一些帮助。 先看一下 实现的效果图 1. HTML&#xff08;HyperText Markup Language&#xff09; HTML 是构建网页的基础语言&#xff0c;它的主要作用是定…

同三维T80004ES H.265高清SDI编码器

1路SDI 1路3.5音频输入,1路SDI环出 产品简介&#xff1a; 同三维T80004ES高标清SDI音视频编码器支持1路高清或1路标清SDI音视频&#xff0c;1路3.5MM独立音频接口采集功能。编码输出双码流H.265/H.264格式&#xff0c;音频 MP3/AAC格式。编码码率可调&#xff0c;画面质量可控制…

教程:从pycharm基于anaconda构建机器学习环境并运行第一个 Python 文件

1. 安装 PyCharm 访问 PyCharm 官方网站&#xff1a;https://www.jetbrains.com/pycharm/。下载社区版&#xff08;免费&#xff09;或专业版&#xff08;收费&#xff0c;提供更多功能&#xff09;。按照操作系统的安装指导安装 PyCharm。安装后打开 PyCharm&#xff0c;并根…

音频进阶学习九——离散时间傅里叶变换DTFT

文章目录 前言一、DTFT的解释1.DTFT公式2.DTFT右边释义1&#xff09; 复指数 e − j ω n e^{-j\omega n} e−jωn2&#xff09;序列与复指数相乘 x [ n ] ∗ e − j ω n x[n]*e^{-j\omega n} x[n]∗e−jωn复指数序列复数的共轭正交正交集 3&#xff09;复指数序列求和 3.DTF…