Java自定义IO密集型和CPU密集型线程池

文章目录

  • 前言
  • 线程池各类场景描述
  • 常见场景案例设计思路
    • 公共类
      • 自定义工厂类-MyThreadFactory
      • 自定义拒绝策略-RejectedExecutionHandlerFactory
      • 自定义阻塞队列-TaskQueue(实现 核心线程->最大线程数->队列)
    • 场景1:CPU密集型场景
      • 思路&计算公式
      • 实现代码
    • 场景2:IO密集型场景
      • 思路&计算公式
      • 实现代码
  • 其他部分组成
    • 拒绝策略兜底方案
      • 思路设计及思考
      • 设计1:数据库持久化方案
      • 设计2:Netty两种拒绝策略实现(根据场景来进行是否重试入队 + 失败抛异常)
      • 设计3:ActiveMQ(有效时间内尝试入队+入队失败抛出异常)
      • 设计4:dubbo设计思路(dump文件+抛出异常)
      • 设计5: 自定义设计-阻塞入队
  • 参考文章

稿定智能设计202502031721

前言

本章节配套源码:

  • gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo10

线程池各类场景描述

**类型场景:**不同的场景设置参数也各不相同

  • 第一种:CPU密集型:最大线程数应该等于CPU核数+1,这样最大限度提高效率。
// 通过该代码获取当前运行环境的cpu核数
Runtime.getRuntime().availableProcessors();
  • **第二种:**IO密集型:主要是进行IO操作,执行IO操作的时间较长,这时cpu出于空闲状态,导致cpu的利用率不高。线程数为2倍CPU核数。当其中的线程在IO操作的时候,其他线程可以继续用cpu,提高了cpu的利用率。

  • 第三种:混合型:如果CPU密集型和IO密集型执行时间相差不大那么可以拆分;如果两种执行时间相差很大,就没必要拆分了。

  • **第四种(了解):**在IO优化中,线程等待时间所占比越高,需要线程数越多;线程cpu时间占比越高,需要越少线程数。

线程池初始化所有参数:

corePoolSize : 核心线程数,当线程池中的线程数量为 corePoolSize 时,即使这些线程处于空闲状态,也不会销毁(除非设置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大线程数,线程池中允许的线程数量的最大值。
keepAliveTime : 线程空闲时间,当线程池中的线程数大于 corePoolSize 时,多余的空闲线程将在销毁之前等待新任务的最长时间。
workQueue : 任务队列
unit : 线程空闲时间的单位。
threadFactory : 线程工厂,线程池创建线程时使用的工厂。
handler : 拒绝策略,因达到线程边界和任务队列满时,针对新任务的处理方法。CallerRunsPolicy:由提交任务的线程直接执行任务,避免任务丢失。适合任务量波动较大的场景。AbortPolicy:直接抛出 RejectedExecutionException 异常。适合任务量可控的场景。DiscardPolicy:静默丢弃任务,不抛出异常。适合对任务丢失不敏感的场景。DiscardOldestPolicy:丢弃队列中最旧的任务,然后重新尝试提交当前任务。适合对任务时效性要求较高的场景。

核心线程池execute逻辑代码:

public void execute(Runnable command) {//任务判空if (command == null)throw new NullPointerException();//查看当前运行的线程数量int c = ctl.get();//若小于核心线程则直接添加一个工作线程并执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果线程数等于核心线程数则尝试将任务入队if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//入队失败,调用addWorker参数为false,尝试创建应急线程处理突发任务else if (!addWorker(command, false))//如果创建应急线程失败,说明当前线程数已经大于最大线程数,这个任务只能拒绝了reject(command);}

image-20250201004416120


常见场景案例设计思路

公共类

image-20250202175504266

自定义工厂类-MyThreadFactory

MyThreadFactory.java:自定义了线程池工厂类,可以自行进行命名

package demo10;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程池工厂类*/
public class MyThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String factoryName) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = factoryName + "-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}

自定义拒绝策略-RejectedExecutionHandlerFactory

RejectedExecutionHandlerFactory.java:包含有多种拒绝策略,其中包含本次需要使用的阻塞入队拒绝策略

package demo10;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;/*** 拒绝策略工厂类**/
@Slf4j
public class RejectedExecutionHandlerFactory {private static final AtomicLong COUNTER = new AtomicLong();/*** 拒绝执行,抛出 RejectedExecutionException* @param source name for log* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newAbort(String source) {return (r, e) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + source);};}/*** 直接丢弃该任务* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newDiscard(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);};}/*** 调用线程运行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newCallerRun(String source) {System.out.println("thread =>" + Thread.currentThread().getName() + "触发阻塞中...");return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {r.run();}};}/*** 新线程运行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newThreadRun(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {String threadName = source + "-T-" + COUNTER.getAndIncrement();log.info("[{}] create new thread[{}] to run job", source, threadName);new Thread(r, threadName).start();}};}/*** 依据阻塞队列put 阻塞添加到队列中* @return 拒绝策略执行器*/public static RejectedExecutionHandler blockCallerPolicy(String source) {return new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, e, r);if (!e.isShutdown()) {try {// 阻塞入队操作,阻塞方为调用方执行submitjob的线程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}};}}

自定义阻塞队列-TaskQueue(实现 核心线程->最大线程数->队列)

TaskQueue.java:线程池中实现先使用核心线程数

package demo10;import java.util.concurrent.*;public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private transient ThreadPoolExecutor parent;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(ThreadPoolExecutor parent) {this.parent = parent;}/*** 核心线程 -> 最大核心线程数 -> 队列* @param runnable the element to add* @return*/@Overridepublic boolean offer(Runnable runnable) {// 如果没有线程池父类,则直接尝试入队if (parent == null) return super.offer(runnable);// 若是工作线程数 < 最大线程数,则优先创建线程跑任务if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 工作线程数 >= 最大线程数,入队return super.offer(runnable);}
}

场景1:CPU密集型场景

思路&计算公式

**场景:**具体是指那种包含大量运算、在持有的 CPU 分配的时间片上一直在执行任务、几乎不需要依赖或等待其他任何东西。处理起来其实没有多少优化空间,因为处理时几乎没有等待时间,所以一直占有 CPU 进行执行,才是最好的方式。

**可优化的点:**就是当单个线程累计较多任务时,其他线程能进行分担,类似fork/join框架的概念。

设置参数:设置线程数时,针对单台机器,最好就是有几个 CPU ,就创建几个线程,然后每个线程都在执行这种任务,永不停歇。

Nthreads=Ncpu+1 
w/c =0 
理解也是正确的,+1 主要是防止因为系统上下文切换,让系统资源跑满!

实现代码

image-20250202184351200

这里核心+最大线程数使用的是CPU核心数+1:

package demo10.cpu;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** cpu密集型场景任务提交*  自定义队列:核心线程 -> 最大线程 -> 队列*  自定义拒绝策略:自定义采用执行阻塞队列的put操作来实现任务阻塞入队,而非直接使用调用者线程来直接跑任务*  非影响主线程执行流程:批次1000个任务统一在一个线程中去进行处理,与主流程main线程隔离**/
@Slf4j
public class CPUThreadPoolExample {public static void main(String[] args) {// 获取 CPU 核心数int cpuCores = Runtime.getRuntime().availableProcessors();// 自定义线程池参数int corePoolSize = cpuCores + 1; // 核心线程数 cpu核心数+1int maximumPoolSize = corePoolSize; // 最大线程数 cpu核心数+1long keepAliveTime = 60L; // 空闲线程存活时间TimeUnit unit = TimeUnit.SECONDS; // 时间单位// 自定义任务队列 核心线程 -> 最大核心线程数 -> 队列TaskQueue<Runnable> taskQueue = new TaskQueue<>(500); // 队列容量为核心线程数的 2 倍// 创建自定义线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默认线程工厂 Executors.defaultThreadFactory() | 自定义工厂支持自定义线程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 将线程池对象设置到任务队列中taskQueue.setExecutor(executor);// 统计任务的执行数量int jobNums = 1000000;final AtomicInteger count = new AtomicInteger(0);// 记录任务开始时间long startTime = System.currentTimeMillis();// 单独开一个线程(后续可改为线程池 核心、最大就1个场景)去完成整个任务提交处理// 如果submitjob阻塞,仅仅只会影响该thread线程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模拟1000个任务 (可改造为queue队列形式去在这个线程中去消费)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {// CPU计算int sum = 0;for (int j = 0; j < 100000; j++) {sum += j;}System.out.println(Thread.currentThread().getName() + " 任务 " + taskId + " 完成!sum = " + sum);count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();});}System.out.println("所有任务提交完成!");// 关闭线程池,等待任务全部执行完毕try {latch.await();System.out.println("所有任务执行结束!");// 记录任务结束时间long endTime = System.currentTimeMillis();// 计算任务执行时间long duration = endTime - startTime;System.out.println("任务执行总耗时: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任务完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 强制关闭}System.out.println("执行完任务数统计:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}

效果:

image-20250202184444336


场景2:IO密集型场景

思路&计算公式

**场景:**其消耗的主要资源就是 IO 了,所接触到的 IO ,大致可以分成两种:磁盘 IO网络 IO。IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。

  • 磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
  • 网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

设置参数:

# 如果存在IO,那么肯定w/c>1(阻塞耗时一般都是计算耗时的很多倍),但是需要考虑系统内存有限(每开启一个线程都需要内存空间),这里需要上服务器测试具体多少个线程数适合(CPU占比、线程数、总耗时、内存消耗)。如果不想去测试,保守点取1即,Nthreads=Ncpu*(1+1)=2Ncpu。这样设置一般都OK
# 通用就是2倍的CPU核心数(如果要效率最大化,需要测算当前系统环境每个线程任务的阻塞等待时间与实际计算时间)
Nthreads=Ncpu*(1+w/c)
公式中 W/C 为系统 阻塞率  w:等待时间 c:计算时间

实现代码

image-20250202183217126

IOIntensiveThreadPoolExample2.java:这里最终实现的Example2类来进行测试

package demo10.io;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** io密集型场景任务提交* demo3:基于demo2自定义拒绝策略*  自定义队列:核心线程 -> 最大线程 -> 队列*  自定义拒绝策略:自定义采用执行阻塞队列的put操作来实现任务阻塞入队,而非直接使用调用者线程来直接跑任务*  非影响主线程执行流程:批次1000个任务统一在一个线程中去进行处理,与主流程main线程隔离**/
@Slf4j
public class IOIntensiveThreadPoolExample2 {public static void main(String[] args) {// 获取 CPU 核心数int cpuCores = Runtime.getRuntime().availableProcessors();// 自定义线程池参数int corePoolSize = cpuCores * 2; // 核心线程数(IO 密集型任务可以设置较大)int maximumPoolSize = cpuCores * 4; // 最大线程数long keepAliveTime = 60L; // 空闲线程存活时间TimeUnit unit = TimeUnit.SECONDS; // 时间单位// 自定义任务队列 核心线程 -> 最大核心线程数 -> 队列TaskQueue<Runnable> taskQueue = new TaskQueue<>(corePoolSize * 2); // 队列容量为核心线程数的 2 倍// 创建自定义线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默认线程工厂 Executors.defaultThreadFactory() | 自定义工厂支持自定义线程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 将线程池对象设置到任务队列中taskQueue.setExecutor(executor);// 统计任务的执行数量int jobNums = 1000;final AtomicInteger count = new AtomicInteger(0);// 记录任务开始时间long startTime = System.currentTimeMillis();// 单独开一个线程(后续可改为线程池 核心、最大就1个场景)去完成整个任务提交处理// 如果submitjob阻塞,仅仅只会影响该thread线程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模拟1000个任务 (可改造为queue队列形式去在这个线程中去消费)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId + "...");try {Thread.sleep(500); // 模拟 IO 操作(如网络请求或文件读写)10s// xxxio类耗时操作} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {System.out.println(Thread.currentThread().getName() + " 任务 " + taskId + " 完成!");count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();}});}System.out.println("所有任务提交完成!");// 关闭线程池,等待任务全部执行完毕try {latch.await();System.out.println("所有任务执行结束!");// 记录任务结束时间long endTime = System.currentTimeMillis();// 计算任务执行时间long duration = endTime - startTime;System.out.println("任务执行总耗时: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任务完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 强制关闭}System.out.println("执行完任务数统计:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}

一个任务耗时0.5s,1000个任务执行如下:

image-20250202183457306

说明:经过测试验证,如果IO阻塞时间特别长,调大最大核心线程数效果更好。


其他部分组成

拒绝策略兜底方案

思路设计及思考

如果核心线程、最大线程、队列都满了的情况下该如何处理?如果本身就是单台机器资源打满,就需要在设计策略上改变线程池的调度方案,如果我的目的是任何一个任务都不丢弃,同时在服务器上有余力及时处理?

方案1:持久化数据库设计

  • 如:设计一张任务表间任务存储到 MySQL 数据库中;redis缓存;任务提交到中间件来缓冲。

设计思路可以如下:参考https://zhuanlan.zhihu.com/p/700719289

image-20250201084054264

方案2:Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控。

  • 后续通过翻阅源码发现一种在拒绝策略场景带退避的重试策略

方案3:ActiveMQ 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付

**方案4:**dubbo设计思路(dump文件+抛出异常)

方案5:线程阻塞队列

思路:队列采用阻塞队列,在拒绝策略方法中使用put方法实现阻塞效果。

可能情况:阻塞主线程任务执行。


设计1:数据库持久化方案

设计思路:自定义拒绝策略,在拒绝策略情况下进行数据库持久化;自定义实现队列,在poll的时候优先从db获取任务,接着再从队列中获取。

**详细具体实现可见:**某大厂线程池拒绝策略连环问 https://blog.csdn.net/shark_chili3007/article/details/137042400


设计2:Netty两种拒绝策略实现(根据场景来进行是否重试入队 + 失败抛异常)

实现思路1:创建新线程执行任务

说明:为了保证任务的实时处理,这种做法需要良好的硬件设备且临时创建的线程无法做到准确的监控

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {NewThreadRunsPolicy() {super();}public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {//创建一个临时线程处理任务final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}
}

弊端:如果任务数特别多无上限场景,就会出现oom情况,导致服务挂掉。

实现思路2:拒绝策略场景带退避的重试策略

源码地址:https://github.dev/netty/netty

  • 具体代码文件:RejectedExecutionHandlers
/*** Tries to backoff when the task can not be added due restrictions for an configured amount of time. This* is only done if the task was added from outside of the event loop which means* {@link EventExecutor#inEventLoop()} returns {@code false}.*/
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {// 检查 retries 参数是否为正数,如果不是则抛出异常ObjectUtil.checkPositive(retries, "retries");// 将退避时间转换为纳秒final long backOffNanos = unit.toNanos(backoffAmount);// 返回一个实现了 RejectedExecutionHandler 接口的匿名类return new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {// 检查当前线程是否不是事件循环线程if (!executor.inEventLoop()) {// 进行最多 retries 次重试for (int i = 0; i < retries; i++) {// 尝试唤醒事件循环线程,以便它能够处理任务队列中的任务executor.wakeup(false);// 当前线程休眠指定的退避时间LockSupport.parkNanos(backOffNanos);// 尝试将任务重新加入任务队列if (executor.offerTask(task)) {// 如果任务成功加入队列,则直接返回return;}}}// 如果当前线程是事件循环线程,或者重试次数用尽后仍然无法加入任务队列,// 则抛出 RejectedExecutionException 异常throw new RejectedExecutionException();}};
}

设计3:ActiveMQ(有效时间内尝试入队+入队失败抛出异常)

说明:尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付,超过时间内则返回false。

github地址:https://github.dev/apache/activemq

  • 对应代码:BrokerService#getExecutor
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {try {// 在60s内进行尝试入队,如果入队失败,则抛出异常if (!executor.getQueue().offer(r, 60, TimeUnit.SECONDS)) {throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");}} catch (InterruptedException e) {throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");}}}

设计4:dubbo设计思路(dump文件+抛出异常)

github地址:

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!"+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d),"+ " Task: %d (completed: %d),"+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating(),url.getProtocol(),url.getIp(),url.getPort());// 0-1 - Thread pool is EXHAUSTED!logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, Boolean.TRUE.toString()))) {// 进行dump文件dumpJStack();}// 指派发送消息给listener监听器dispatchThreadPoolExhaustedEvent(msg);throw new RejectedExecutionException(msg);
}

dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因

1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。

3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性


设计5: 自定义设计-阻塞入队

在线程池初始化的时候自定义拒绝策略:阻塞入队操作,阻塞方为调用方执行submitjob的线程

new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", "IOIntensiveThreadPool", e, r);if (!e.isShutdown()) {try {// 阻塞入队操作,阻塞方为调用方执行submitjob的线程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}
}

如果要执行的任务数量过多,核心线程数、最大核心线程数占满、任务队列占满,此时让任务进行入队阻塞,等待队列中任务有空余位置。


参考文章

[1]. Java 线程池讲解——针对 IO 密集型任务:https://www.jianshu.com/p/66b6dfcf3173(提出dubbo 或者 tomcat 的线程池中自定义Queue的实现,核心线程数 -> 最大线程数 -> 队列中)

[2]. 某大厂线程池拒绝策略连环问 https://blog.csdn.net/shark_chili3007/article/details/137042400

[3]. 线程池拒绝策略:https://blog.csdn.net/qq_40428665/article/details/121680262

[4]. Java线程池如何合理配置核心线程数:https://www.cnblogs.com/Vincent-yuan/p/16022613.html

[5]. 线程池参数配置:https://blog.csdn.net/whp404/article/details/131960756(计算公式)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/894587.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【VM】VirtualBox安装ubuntu22.04虚拟机

阅读本文之前&#xff0c;请先根据 安装virtualbox 教程安装virtulbox虚拟机软件。 1.下载Ubuntu系统镜像 打开阿里云的镜像站点&#xff1a;https://developer.aliyun.com/mirror/ 找到如图所示位置&#xff0c;选择Ubuntu 22.04.3(destop-amd64)系统 Ubuntu 22.04.3(desto…

C#,shell32 + 调用控制面板项(.Cpl)实现“新建快捷方式对话框”(全网首发)

Made By 于子轩&#xff0c;2025.2.2 不管是使用System.IO命名空间下的File类来创建快捷方式文件&#xff0c;或是使用Windows Script Host对象创建快捷方式&#xff0c;亦或是使用Shell32对象创建快捷方式&#xff0c;都对用户很不友好&#xff0c;今天小编为大家带来一种全新…

国产编辑器EverEdit - 输出窗口

1 输出窗口 1.1 应用场景 输出窗口可以显示用户执行某些操作的结果&#xff0c;主要包括&#xff1a; 查找类&#xff1a;查找全部&#xff0c;筛选等待操作&#xff0c;可以把查找结果打印到输出窗口中&#xff1b; 程序类&#xff1a;在执行外部程序时(如&#xff1a;命令窗…

Vue-data数据

目录 一、Vue中的data数据是什么&#xff1f;二、data支持的数据类型有哪些&#xff1f; 一、Vue中的data数据是什么&#xff1f; Vue中用到的数据定义在data中。 二、data支持的数据类型有哪些&#xff1f; data中可以写复杂类型的数据&#xff0c;渲染复杂类型数据时只要遵…

02.03 递归运算

使用递归求出 1 1/3 -1/5 1/7 - 1/9 ... 1/n的值。 1>程序代码 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #inc…

数据分析系列--⑥RapidMiner构建决策树(泰坦尼克号案例含数据)

一、资源下载 二、数据处理 1.导入数据 2.数据预处理 三、构建模型 1.构建决策树 2.划分训练集和测试集 3.应用模型 4.结果分析 一、资源下载 点击下载数据集 二、数据处理 1.导入数据 2.数据预处理 三、构建模型 1.构建决策树 虽然决策树已经构建,但对于大多数初学者或…

100 ,【8】 buuctf web [蓝帽杯 2021]One Pointer PHP(别看)

进入靶场 没提示&#xff0c;去看源代码。 user.php <?php // 定义一个名为 User 的类&#xff0c;该类可用于表示用户相关信息或执行与用户有关的操作 class User{// 声明一个公共属性 $count&#xff0c;可在类的内部和外部直接访问// 这个属性可能用于记录与用户相关…

巧妙利用数据结构优化部门查询

目录 一、出现的问题 部门树接口超时 二、问题分析 源代码分析 三、解决方案 具体实现思路 四、优化的效果 一、出现的问题 部门树接口超时 无论是在A项目还是在B项目中&#xff0c;都存在类似的页面&#xff0c;其实就是一个部门列表或者叫组织列表。 从页面的展示形式…

QT简单实现验证码(字符)

0&#xff09; 运行结果 1&#xff09; 生成随机字符串 Qt主要通过QRandomGenerator类来生成随机数。在此之前的版本中&#xff0c;qrand()函数也常被使用&#xff0c;但从Qt 5.10起&#xff0c;推荐使用更现代化的QRandomGenerator类。 在头文件添加void generateRandomNumb…

JavaFX - 3D 形状

在前面的章节中&#xff0c;我们已经了解了如何在 JavaFX 应用程序中的 XY 平面上绘制 2D 形状。除了这些 2D 形状之外&#xff0c;我们还可以使用 JavaFX 绘制其他几个 3D 形状。 通常&#xff0c;3D 形状是可以在 XYZ 平面上绘制的几何图形。它们由两个或多个维度定义&#…

深入理解开放寻址法中的三种探测序列

一、引言 开放寻址法是解决散列表中冲突的一种重要方法&#xff0c;当发生冲突&#xff08;即两个不同的键通过散列函数计算得到相同的散列值&#xff09;时&#xff0c;它会在散列表中寻找下一个可用的存储位置。而探测序列就是用于确定在发生冲突后&#xff0c;依次尝试哪些…

【双指针题目】

双指针 美丽区间&#xff08;滑动窗口&#xff09;合并数列&#xff08;双指针的应用&#xff09;等腰三角形全部所有的子序列 美丽区间&#xff08;滑动窗口&#xff09; 美丽区间 滑动窗口模板&#xff1a; int left 0, right 0;while (right < nums.size()) {// 增大…

嵌入式八股文面试题(一)C语言部分

1. 变量/函数的声明和定义的区别&#xff1f; &#xff08;1&#xff09;变量 定义不仅告知编译器变量的类型和名字&#xff0c;还会分配内存空间。 int x 10; // 定义并初始化x int x; //同样是定义 声明只是告诉编译器变量的名字和类型&#xff0c;但并不为它分配内存空间…

go-zero学习笔记(三)

利用goctl生成rpc服务 编写proto文件 // 声明 proto 使用的语法版本 syntax "proto3";// proto 包名 package demoRpc;// golang 包名(可选) option go_package "./demo";// 如需为 .proto 文件添加注释&#xff0c;请使用 C/C 样式的 // 和 /* ... */…

【25考研】南开软件考研复试复习重点!

一、复试内容 复试采取现场复试的方式。复试分为笔试、机试和面试三部分。三部分合计100分&#xff0c;其中笔试成绩占30%、机试成绩占30%、面试成绩占40%。 1.笔试&#xff1a;专业综合基础测试 考核方式&#xff1a;闭卷考试&#xff0c;时长为90分钟。 笔试考查内容范围…

【最长上升子序列Ⅱ——树状数组,二分+DP,纯DP】

题目 代码&#xff08;只给出树状数组的&#xff09; #include <bits/stdc.h> using namespace std; const int N 1e510; int n, m; int a[N], b[N], f[N], tr[N]; //f[i]表示以a[i]为尾的LIS的最大长度 void init() {sort(b1, bn1);m unique(b1, bn1) - b - 1;for(in…

012-51单片机CLD1602显示万年历+闹钟+农历+整点报时

1. 硬件设计 硬件是我自己设计的一个通用的51单片机开发平台&#xff0c;可以根据需要自行焊接模块&#xff0c;这是用立创EDA画的一个双层PCB板&#xff0c;所以模块都是插针式&#xff0c;不是表贴的。电路原理图在文末的链接里&#xff0c;PCB图暂时不选择开源。 B站上传的…

对象的实例化、内存布局与访问定位

一、创建对象的方式 二、创建对象的步骤: 一、判断对象对应的类是否加载、链接、初始化: 虚拟机遇到一条new指令&#xff0c;首先去检查这个指令的参数能否在Metaspace的常量池中定位到一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已经被加载、解析和初始化…

传输层协议 UDP 与 TCP

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Linux 目录 一&#xff1a;&#x1f525; 前置复盘&#x1f98b; 传输层&#x1f98b; 再谈端口号&#x1f98b; 端口号范围划分&#x1f98b; 认识知名端口号 (Well-Know Port Number) 二&#xf…

实验十一 Servlet(二)

实验十一 Servlet(二) 【实验目的】 1&#xff0e;了解Servlet运行原理 2&#xff0e;掌握Servlet实现方式 【实验内容】 改造实验10&#xff0c;引入数据库&#xff0c;创建用户表&#xff0c;包括用户名和密码&#xff1a;客户端通过login.jsp发出登录请求&#xff0c;请求…