1.线程池介绍
顾名思义,线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。
总结一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。
2.Executor框架介绍
Executor
框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor
来启动线程比使用 Thread
的 start
方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
this逃逸指的是在对象的构造过程中,当对象还没有完全初始化完成时(构造函数执行完毕),就将该对象的引用传递给了外部环境。这种情况可能会导致对该对象的访问出现不一致性或者线程安全问题。
具体来说,当一个线程在调用对象的构造函数创建对象实例时,如果对象引用在构造函数执行完成之前就被传递给了另一个线程,那么另一个线程可能会访问到尚未完成初始化的对象,这就是this逃逸。
this逃逸可能导致的问题包括:对象状态不一致、线程安全问题、空指针异常等。因此,在编写Java代码时,我们应该避免在构造函数中将this引用传递给外部环境,以确保对象在完全初始化后才能被其他线程访问。
Executor
框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor
框架让并发编程变得更加简单。
Executor
框架结构主要由三大部分组成:
1、任务(Runnable
/Callable
)
执行任务需要实现的 Runnable
接口 或 Callable
接口。Runnable
接口或 Callable
接口 实现类都可以被 ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
执行。
2、任务的执行(Executor
)
如下图所示,包括任务执行机制的核心接口 Executor
,以及继承自 Executor
接口的 ExecutorService
接口。ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
这两个关键类实现了 ExecutorService
接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor
这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
注意: 通过查看 ScheduledThreadPoolExecutor
源代码我们发现 ScheduledThreadPoolExecutor
实际上是继承了 ThreadPoolExecutor
并实现了 ScheduledExecutorService
,而 ScheduledExecutorService
又实现了 ExecutorService
,正如我们上面给出的类关系图显示的一样。
ThreadPoolExecutor
类描述:
//AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService
ScheduledThreadPoolExecutor
类描述:
//ScheduledExecutorService继承ExecutorService接口
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
3、异步计算的结果(Future
)
Future
接口以及 Future
接口的实现类 FutureTask
类都可以代表异步计算的结果。
当我们把 Runnable
接口 或 Callable
接口 的实现类提交给 ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
执行。(调用 submit()
方法时会返回一个 FutureTask
对象)
Executor
框架的使用示意图:
- 1.主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。 - 2.把创建完成的实现
Runnable
/Callable
接口的对象直接交给ExecutorService
执行:ExecutorService.execute(Runnable command)
)或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task)
)。 - 3.如果执行
ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(我们刚刚也提到过了执行execute()
方法和submit()
方法的区别,submit()
会返回一个FutureTask 对象)。
由于FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 4.最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
3.ThreadPoolExecutor类介绍(重要)
线程池实现类 ThreadPoolExecutor
是 Executor
框架最核心的类。
1.线程池参数分析
ThreadPoolExecutor
类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。
/*** 用给定的初始参数创建一个新的ThreadPoolExecutor。*/public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量int maximumPoolSize,//线程池的最大线程数long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间TimeUnit unit,//时间单位BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
下面这些参数非常重要,在后面使用线程池的过程中你一定会用到!所以,务必记清楚。
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 任务队列未达到队列容量时,最大可以同时运行的线程数量。maximumPoolSize
: 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue
: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数 :
keepAliveTime
:线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁。unit
:keepAliveTime
参数的时间单位。threadFactory
:executor 创建新线程的时候会用到。handler
:拒绝策略(后面会单独详细介绍一下)。
下面这张图可以加深你对线程池中各个参数的相互关系的理解
线程池各个参数的关系
ThreadPoolExecutor
拒绝策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
:此策略将丢弃最早的未处理的任务请求。
举个例子:Spring 通过 ThreadPoolTaskExecutor
或者我们直接通过 ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler
拒绝策略来配置线程池的时候,默认使用的是 AbortPolicy
。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor
将抛出 RejectedExecutionException
异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicy
。CallerRunsPolicy
和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {// 直接主线程执行,而不是线程池中的线程执行r.run();}}}
2.创建线程池的两种方式
方式一:通过ThreadPoolExecutor
构造函数来创建(推荐)。
使用样例:
public static void main(String[] args) {// 创建一个具有指定核心线程数、最大线程数、空闲线程存活时间、时间单位、工作队列和拒绝策略的线程池int corePoolSize = 5;int maximumPoolSize = 10;long keepAliveTime = 10;TimeUnit unit = TimeUnit.SECONDS;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);// 提交任务到线程池for (int i = 1; i <= 10; i++) {int finalI = i;CompletableFuture.runAsync(() -> {try {System.out.println("任务" + finalI + "正在执行...");TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}, executor);}// 关闭线程池executor.shutdown();}
方式二:通过 Executor
框架的工具类 Executors
来创建(不推荐)。
Executors
工具类提供的创建线程池的方法如下图所示:
可以看出,通过Executors
工具类可以创建多种类型的线程池,包括:
FixedThreadPool
:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。SingleThreadExecutor
: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。CachedThreadPool
: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。ScheduledThreadPool
:给定的延迟后运行任务或者定期执行任务的线程池。
《阿里巴巴 Java 开发手册》强制线程池不允许使用 Executors
去创建,而是通过 ThreadPoolExecutor
构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors
返回线程池对象的弊端如下(后文会详细介绍到):
FixedThreadPool
和SingleThreadExecutor
:使用的是无界的LinkedBlockingQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。CachedThreadPool
:使用的是同步队列SynchronousQueue
, 允许创建的线程数量为Integer.MAX_VALUE
,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。ScheduledThreadPool
和SingleThreadScheduledExecutor
:使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
3. Executors创建线程的4种方法
1. newSingleThreadExecutor创建“单线程化线程池”
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;public class CreateThreadPollDemo {public static final int SLEEP_GAP=1000;static class TargetTask implements Runnable{static AtomicInteger taskNo=new AtomicInteger(1);private String taskName;public TargetTask(){taskName="task-"+taskNo;taskNo.incrementAndGet();}public void run(){System.out.println("task:"+taskName+" is doing...");try {Thread.sleep(SLEEP_GAP);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task:"+taskName+" end...");}}public static void main(String[] args) {ExecutorService pool=Executors.newSingleThreadExecutor();for(int i=0;i<3;i++){pool.execute(new TargetTask());pool.submit(new TargetTask());}pool.shutdown();}}
运行结果:
特点:
- 单线程化的线程池中的任务是按照提交的次序顺序执行的
- 只有一个线程的线程池
- 池中的唯一线程的存活时间是无限的
- 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的
- 适用场景:任务按照提交次序,一个任务一个任务地逐个执行的场景
2. newFixedThreadPool创建“固定数量的线程池
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;public class CreateThreadPollDemo {public static final int SLEEP_GAP=1000;static class TargetTask implements Runnable{static AtomicInteger taskNo=new AtomicInteger(1);private String taskName;public TargetTask(){taskName="task-"+taskNo;taskNo.incrementAndGet();}public void run(){System.out.println(taskName+" is doing...");try {Thread.sleep(SLEEP_GAP);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(taskName+" end...");}}public static void main(String[] args) {ExecutorService pool=Executors.newFixedThreadPool(3);//创建含有3个线程的线程池for(int i=0;i<5;i++){pool.execute(new TargetTask());pool.submit(new TargetTask());}pool.shutdown();}}
特点:
- 如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量
- 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
- 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)
适用场景:
- 需要任务长期执行的场景
- CPU密集型任务
缺点:
- 内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽
3. newCachedThreadPool创建“可缓存线程池”
package threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;public class CreateThreadPollDemo {public static final int SLEEP_GAP=1000;static class TargetTask implements Runnable{static AtomicInteger taskNo=new AtomicInteger(1);private String taskName;public TargetTask(){taskName="task-"+taskNo;taskNo.incrementAndGet();}public void run(){System.out.println(taskName+" is doing...");try {Thread.sleep(SLEEP_GAP);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(taskName+" end...");}}public static void main(String[] args) {ExecutorService pool=Executors.newCachedThreadPool();for(int i=0;i<5;i++){pool.execute(new TargetTask());pool.submit(new TargetTask());}pool.shutdown();}}
特点:
- 在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务
- 线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
- 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程
适用场景:
- 需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景
缺点:
- 线程池没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能会因创建线程过多而导致资源耗尽
4. newScheduledThreadPool创建“可调度线程池”
package threadpool;
import java.security.Policy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class CreateThreadPollDemo {public static final int SLEEP_GAP=1000;static class TargetTask implements Runnable{static AtomicInteger taskNo=new AtomicInteger(1);private String taskName;public TargetTask(){taskName="task-"+taskNo;taskNo.incrementAndGet();}public void run(){System.out.println(taskName+" is doing...");try {Thread.sleep(SLEEP_GAP);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(taskName+" end...");}}public static void main(String[] args) throws InterruptedException {ScheduledExecutorService pool=Executors.newScheduledThreadPool(2);for(int i=0;i<2;i++){pool.scheduleAtFixedRate(new TargetTask(), 0, 500, TimeUnit.MILLISECONDS);//参数1: task任务 //参数2: 首次执行任务的延迟时间//参数3: 周期性执行的时间//参数4: 时间单位}Thread.sleep(3000);//主线程睡眠时间越长 周期次数越多pool.shutdown();}}
- 延时性
- 周期性
pool.scheduleAtFixedRate(new TargetTask(), 0, 500, TimeUnit.MILLISECONDS);//参数1: task任务//参数2: 首次执行任务的延迟时间//参数3: 周期性执行的时间//参数4: 时间单位
总结:Executors创建线程池的4种方法十分方便,但是构造器创建普通线程池、可调度线程池比较复杂,这些构造器会涉及大量的复杂参数,已经较少使用。
4.Executors创建线程池存在的问题
1.创建固定数量线程池的问题
阻塞队列无界,队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽
2.创建单线程线程池的问题
问题和固定数量线程池一样,阻塞队列无界
3.创建缓存线程池的问题
问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽
4.创建可调度线程存在的问题
主要问题在于线程数不设上限
总结:
- newFixedThreadPool和newSingleThreadExecutor: 阻塞队列无界,会堆积大量任务导致OOM(内存耗尽)
- newCachedThreadPool和newScheduledThreadPool: 线程数量无上界,会导致创建大量的线程,从而导致OOM
- 建议直接使用线程池ThreadPoolExecutor的构造器
5.线程池常用的阻塞队列
Java 线程池使用阻塞队列实现线程之间的同步,控制任务的提交和执行。线程池中的任务被提交到阻塞队列中,等待被线程池中的线程执行。当线程池中的线程空闲时,它们会从阻塞队列中取出任务进行执行。
常用的阻塞队列有以下几种:
- ArrayBlockingQueue:基于数组实现的有界阻塞队列,插入操作和删除操作都可能会被阻塞。
- LinkedBlockingQueue:基于链表实现的阻塞队列,可以指定容量,如果未指定容量,则容量默认为 Integer.MAX_VALUE。插入操作和删除操作都可能会被阻塞。
- SynchronousQueue:一个没有容量的阻塞队列,插入操作和删除操作必须同时进行,否则会被阻塞。// 阻塞队列,不存储元素
阻塞队列的实现可以保证线程安全,多个线程可以同时操作队列。当阻塞队列为空时,从队列中取出任务的操作会被阻塞,直到队列中有新的任务被添加进来。当阻塞队列已满时,添加任务的操作会被阻塞,直到队列中有任务被取出。
在线程池中使用阻塞队列可以帮助控制任务的提交速度,防止任务提交过多导致系统资源的浪费。在使用阻塞队列时需要根据具体情况选择合适的实现类,以实现更高效的任务调度和执行。
除定时执行的线程池外,其他三种线程池创建使用的阻塞队列如下图所示:
ArrayBlockingQueue和LinkedBlockingQueue的区别:
ArrayBlockingQueue 底层采用数组来实现队列,因此它在创建时需要指定容量大小,并且容量不可变。由于是基于数组实现,因此 ArrayBlockingQueue 可以高效地随机访问队列中的元素,但是插入和删除操作需要移动元素,因此效率相对较低。// 适合随机访问
LinkedBlockingQueue 底层采用链表来实现队列,因此它在创建时可以不指定容量大小,也可以指定容量大小,但是如果没有指定容量大小,则默认容量为 Integer.MAX_VALUE。由于是基于链表实现,因此 LinkedBlockingQueue 插入和删除元素时只需要修改指针,因此效率相对较高,但是不能高效地随机访问队列中的元素。// 适合频繁修改
4.线程池常见面试题
1. 线程池是什么?为什么要使用线程池?
计算机中,线程的创建和销毁开销较大,频繁的创建和销毁线程会影响程序性能。利用基于池化思想的线程池来统一管理和分配线程,复用已创建的线程,避免频繁创建和销毁线程带来的资源消耗,提高系统资源的利用率。
线程池具有以下 3 点优势:
- 降低资源消耗,重复利用已经创建的线程,避免线程创建与销毁带来的资源消耗;
- 提高响应速度,接收任务时,可以通过线程池直接获取线程,避免了创建线程带来的时间消耗;
- 便于管理线程,统一管理和分配线程,避免无限制创建线程,另外可以引入线程监控机制。
2.四大快捷创建线程池的方法和详细说明
- newFixedThreadPool(int nThreads):创建一个固定大小的线程池,线程池中的线程数量固定不变。当线程池中所有线程都被占用时,新的任务会被放入等待队列。
- newCachedThreadPool():创建一个可缓存的线程池,线程池中的线程数量不固定,会根据任务的多少自动调整线程池中的线程数量。空闲线程会被保留60秒,并且在需要时重新利用这些线程。
- newSingleThreadExecutor():创建一个只有一个线程的线程池,所有任务都将在该线程中执行。如果该线程因为异常退出,那么会创建一个新的线程来替代它。
- newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,可以执行定时任务和周期性任务。
3.为什么不建议使用Executor直接创建线程池?
虽然Executor提供了便捷的方法来创建线程池,但是直接使用Executor创建线程池也有一些缺点。
阿里巴巴Java开发手册,明确指出不允许使用Executors静态工厂构建线程池,原因如下:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
说明:Executors返回的线程池对象的弊端如下:
1:FixedThreadPool 和 SingleThreadPool:
允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
2:CachedThreadPool 和 ScheduledThreadPool
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
4.使用ThreadPoolExecutor创建线程池的参数详细说明
ThreadPoolExecutor是Java提供的一个灵活的线程池实现类,可以通过它来自定义线程池的各种参数。下面是ThreadPoolExecutor的构造函数的参数说明:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize:线程池中的核心线程数,当线程池中的线程数量小于这个值时,新的任务会创建新的线程。
- maximumPoolSize:线程池中的最大线程数,当线程池中的线程数量达到这个值时,新的任务会被放入等待队列。
- keepAliveTime:线程空闲时间,当线程的空闲时间超过这个值时,多余的线程会被销毁。
- unit:时间单位,用来指定keepAliveTime的时间单位,如秒、毫秒等。
- workQueue:工作队列,用来保存等待执行的任务,ThreadPoolExecutor提供了多种工作队列实现,如LinkedBlockingQueue、ArrayBlockingQueue等。
- threadFactory:线程工厂,用来创建新的线程。
- handler:拒绝策略,用来处理无法处理的任务,ThreadPoolExecutor提供了多种拒绝策略实现,如AbortPolicy、CallerRunsPolicy等。
5. 线程池的关闭
线程池的关闭是一个比较重要的问题,如果不正确地关闭线程池,会导致一些任务没有被执行完或者线程池无法释放已经占用的资源。ThreadPoolExecutor提供了两种关闭线程池的方法:
- shutdown():该方法会平缓地关闭线程池,在调用该方法之后,线程池不再接受新的任务,但是会执行完所有已经提交的任务,然后关闭线程池。调用该方法后,线程池会一直等待所有任务执行完毕才会关闭。
- shutdownNow():该方法会立即关闭线程池,它会尝试停止所有正在执行的任务,并且不再处理等待队列中的任务。调用该方法后,线程池会立即释放所有已经占用的资源。
6.线程池常用的阻塞队列说明
线程池的阻塞队列是管理等待执行的任务的一个重要组成部分,常用的阻塞队列有以下几种:
- LinkedBlockingQueue:一个基于链表的阻塞队列,可以无限地添加元素,当队列满时会阻塞等待队列空闲。该队列比较适合用于固定大小的线程池中。
- ArrayBlockingQueue:一个基于数组的阻塞队列,可以指定队列的容量,当队列满时会阻塞等待队列空闲。该队列比较适合用于固定大小的线程池中。
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。该队列比较适合用于处理大量短暂的任务。
- PriorityBlockingQueue:一个基于优先级的无限阻塞队列,可以按照元素的优先级顺序执行任务。该队列比较适合用于需要按照优先级顺序执行任务的场景。
7.线程池的拒绝策略
当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
- AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
- DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
- DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
除了JDK默认提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现RejectedExecutionHandler接口即可。
8.线程池的执行流程
1. 初始化线程池
在初始化线程池时,我们需要指定线程池的大小、任务队列的大小等参数。同时,我们还需要创建一些线程,以便在任务到来时能够立即执行。
2. 提交任务
当我们有任务需要执行时,我们可以将任务提交到线程池中。这些任务会被加入到任务队列中,等待线程池中的线程来执行。
3. 线程执行任务
线程池中的线程会不断地从任务队列中获取任务,并执行这些任务。当任务队列中没有任务时,线程可能会等待,直到有新的任务到来。
4. 任务执行完毕
当一个任务执行完毕时,线程会将执行结果返回给调用方,并继续执行下一个任务。如果线程池中的线程数量是有限制的,那么当所有的线程都在执行任务时,新的任务会被加入到任务队列中等待。
5. 关闭线程池
当我们不再需要线程池时,我们需要将线程池关闭。在关闭线程池时,我们需要等待所有的任务执行完毕,并停止所有的线程。当线程池被关闭之后,我们就不能再向其中提交任务了。
以上就是线程池的执行流程。线程池可以提高程序的效率,减少线程创建和销毁的开销,同时还可以控制线程的数量,避免线程数量过多造成的问题。
9.线程池优化
1)用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM
2)如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务
3)最大线程数一般设为2N+1最好,N是CPU核数
4)核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数
5)如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果。
10.为什么不建议通过Executors构建线程池?
阿里巴巴Java开发手册中也明确指出,不允许使用Executors创建线程池
主要原因
阿里巴巴Java开发手册建议避免直接使用Executors类来创建线程池,主要出于以下几个考虑:
1.默认的队列大小和最大线程数可能导致资源耗尽:
- FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue,默认最大容量为Integer.MAX_VALUE。在高负载情况下,可能导致内存溢出。
- CachedThreadPool和ScheduledThreadPool允许创建的线程数量最大为Integer.MAX_VALUE,可能引发大量线程创建和内存溢出风险。
2.缺乏灵活性:
- Executors工厂方法创建的线程池参数预定义,不够灵活。直接使用ThreadPoolExecutor构造函数可以明确设置核心线程数、最大线程数、工作队列、拒绝策略等参数,更有利于根据业务需求进行优化。
3.隐藏了复杂性:
- Executors的便捷性隐藏了线程池内部的复杂性和细节,可能导致开发者对线程池运行机制理解不足,产生意外异常。
4.性能问题:
- 使用无界队列可能增加内存消耗,降低系统稳定性。任务过多可能导致线程上下文切换频繁,影响性能。
因此,建议直接使用ThreadPoolExecutor构造函数创建线程池,能更好地理解线程池工作原理,根据需求合理配置,避免潜在风险,提高系统稳定性和性能。虽然增加了编码复杂度,但获得更好的控制力和安全性是值得的。