1 Dubbo的线程池概述
这里将要讲述的线程池指Dubbo服务端使用某些线程模型(如 all 模型)时用到的业务线程池。ThreadPool 是一个扩展接口SPI。
@SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK)
public interface ThreadPool {/*** Thread pool** @param url URL contains thread parameter* @return thread pool*/@Adaptive({THREADPOOL_KEY})Executor getExecutor(URL url);}
Dubbo提供了一些基于JDK的标准ThreadPoolExecutor的接口实现,具体如下。
- FixedThreadPool:创建一个线程数固定的线程池。
- LimitedThreadPool:创建一个线程池,线程池中的线程个数随着需要量动态增加,但是数量不超过配置的阈值。另外空闲线程不会被回收,会一直存在。
- EagerThreadPool:创建一个线程池,当所有核心线程都忙碌时,将创建新线程来执行新任务,而不是把任务放入线程池的阻塞队列中。
- CachedThreadPool:创建一个自适应线程池。当线程空闲1min时,线程会被回收;当有新请求到来时,会创建新线程。
默认情况下,Dubbo 会使用线程数固定的线程池(FixedThreadPool)。
2 源码分析
2.1 FixedThreadPool
创建一个线程数固定的线程池。源码如下所示。
public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {// 线程名称的前缀,默认值为"Dubbo"String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));// 线程的数量,默认为200int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);// 线程池阻塞队列的大小,默认值为0int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);BlockingQueue<Runnable> blockingQueue;if (queues == 0) {blockingQueue = new SynchronousQueue<>();} else if (queues < 0) {blockingQueue = new MemorySafeLinkedBlockingQueue<>();} else {blockingQueue = new LinkedBlockingQueue<>(queues);}return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, blockingQueue,new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}
其中使用的拒绝策略为 AbortPolicyWithReport,其继承了JDK的ThreadPoolExecutor.AbortPolicy,其作用是当线程池中的线程处于忙碌状态且线程池队列已满时,新来的任务会被丢弃,并抛出RejectedExecutionException异常。具体如下所示。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d)," +" Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());// 0-1 - Thread pool is EXHAUSTED!logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);// 获取当前 JVM 进程的线程堆栈跟踪信息// 依赖 java.lang.management.ThreadMXBean 访问Java虚拟机线程信息if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, "true"))) {dumpJStack();}dispatchThreadPoolExhaustedEvent(msg);throw new RejectedExecutionException(msg);
}