并发编程
1、线程池中提交一个任务的流程是怎样的?
1、提交任务:首先,一个任务被提交到线程池。这个任务通常是一个实现了Runnable
或Callable
接口的对象;
2、检测线程池状态:线程池会首先检测其运行状态。如果线程池不是RUNNING
状态,任务会被直接拒绝
3、核心线程判断:如果当前工作线程数workerCount
小于核心线程数corePoolSize
,线程池会创建一个新的核心线程来执行提交的任务;
4、阻塞队列判断:如果工作线程数已经达到核心线程数,但线程池内的阻塞队列workQueue
还未满,任务会被添加到这个阻塞队列中,等待执行。随后,空闲的核心线程会依次从队列中取出任务来执行;
5、非核心线程判断:如果阻塞队列已满,则入队失败,那么会尝试增加线程,如果当前线程池中线程数小于最大线程数maximumPoolSize
, 线程池会创建一个新的非核心线程(也称为临时线程)来执行任务;
6、拒绝策略:如果阻塞队列满了,且工作线程数已达到最大线程数,线程池会根据预设的拒绝策略来处理这个任务。默认的处理方式是直接抛出一个RejectedExecutionException
异常,但还有其他策略如CallerRunsPolicy
(在调用者线程执行)、DiscardPolicy
(任务直接丢弃,不做任何处理)和DiscardOldestPolicy
(丢弃队列里最旧的那个任务,再尝试执行当前任务)等。但是这种策略有一个弊端就是任务执行的轨迹不会被记录下来。所以,我们往往需要实现自定义的拒绝策略, 通过实现RejectedExecutionHandler
接口的方式。
在整个过程中,线程池会优先使用核心线程来执行任务,其次是阻塞队列,最后是非核心线程。如果所有资源都已经用尽,任务会根据拒绝策略进行处理。
注意,线程池提供了两种主要的方法来执行任务:execute()
和submit()
。其中,execute()
方法用于提交不需要返回值的任务,而submit()
方法用于提交一个任务并带有返回值,这个方法将返回一个Future类型对象
,可以通过这个返回对象判断任务是否执行成功,并且可以通过future.get()
方法来获取返回值。
流程图:
代码:
public void test() {// 定义线程池的参数int corePoolSize = 5; // 核心线程数int maximumPoolSize = 10; // 最大线程数long keepAliveTime = 60L; // 非核心线程的空闲存活时间TimeUnit unit = TimeUnit.SECONDS; // 时间单位BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 阻塞队列ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);// 提交任务到线程池executor.execute(new Runnable() {@Overridepublic void run() {// 模拟耗时操作System.out.println("nihao");}});}
// 线程池execute()方法源码
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {// 新创建一个线程if (addWorker(command, true))return;c = ctl.get();}// 如果当前线程数大于核心线程数,则添加到阻塞队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果阻塞队列满了,则创建非核心线程else if (!addWorker(command, false))// 当前线程数是否>=最大线程数,执行拒绝策略reject(command);}
// 创建线程源码
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 判断当前线程数是否>=最大线程数if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}boolean workerStarted = false;boolean workerAdded = false;ThreadPoolExecutor.Worker w = null;try {w = new ThreadPoolExecutor.Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
2、线程池有几种状态?分别是如何变化的?
线程池有五种状态,分别是:
1、Running(运行状态):
线程池经过初始化后,就进入了运行状态。这个状态下,线程池中的线程开始执行任务,并且会接受新任务、会处理队列中的任务;
2、Shutdown(关闭状态):
当调用线程池的shutdown()
方法后,线程池的状态会变为Shutdown。这个状态下,线程池不会接收新的任务,会处理队列中的任务,任务处理完后会中断所有线程;
3、Stop(停止状态):
当调用线程池的shutdownNow()
方法后,线程池的状态会变为Stop。这个状态下,线程池不会接收新的任务,不会处理队列中的任务,并且会直接中断所有线程;
4、Tidying(整理状态):
所有的线程都停止了之后,线程池的状态会转变为TIDYING,一旦达到此状态,就会调用线程池的terminated()方法;
5、Terminated(终止状态):
线程池处于TIDYING状态后,会执行terminated()方法,执行完后就进入Terminated状态。
RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed
线程池的状态变化通常是通过调用shutdown()
或shutdownNow()
方法来实现的。当调用shutdown()
方法时,线程池的状态会从Running
到Shutdown
,再到Tidying
,最后到Terminated
销毁状态。当调用shutdownNow()
方法时,线程池的状态会从Running
到Stop
,再到Tidying
,最后到Terminated
销毁状态
代码:
// shutdown()方法源码
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态为SHUTDOWNadvanceRunState(SHUTDOWN);// 关闭线程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 当线程都关闭后,执行tryTerminate()方法,将线程池的状态修改为TIDYINGtryTerminate();}
// shutdownNow()方法源码
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态为STOP,只修改状态。先修改状态,在关闭线程,防止有新的任务进来advanceRunState(STOP);// 关闭线程interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}// 当线程都关闭后,执行tryTerminate()方法,将线程池的状态修改为TIDYINGtryTerminate();return tasks;}
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// ctlOf(TIDYING, 0) 修改线程池状态为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 空方法,当线程池都关闭后,如果想要做其他额外处理,可重写此方法进行扩展terminated();} finally {// 修改线程池状态为TERMINATED,最终状态,线程池到此真正关闭ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
3、如何停止一个线程?
Thread
类中有两个方法:
start()
:开启一个线程
stop()
:停止一个线程
但是stop()
方法不建议使用,并且是有可能在未来版本中删除掉的
因为stop()
方法太粗暴了,一旦调用了stop()
方法,就会直接停掉线程,这样就可能造成严重的问题,比如任务执行到哪一步了?锁释放了吗?等一系列问题。
注意:stop()
方法会释放线程占用的synchronized锁
,而不会自动释放ReentrantLock锁
public static void main(String[] args) {Object lock = new Object();Thread thread = new Thread(()->{synchronized (lock) {for (int i = 0; i < 100; i++) {System.out.println(i);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}});thread.start();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}thread.stop();synchronized (lock){// 程序能正常打印这条语句,说明线程调用stop()方法会释放synchronized锁System.out.println("拿到锁了");}}
public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread thread = new Thread(() -> {// 加ReentrantLock锁lock.lock();for (int i = 0; i < 100; i++) {System.out.println(i);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}// 释放锁lock.unlock();});thread.start();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}thread.stop();lock.lock();// 程序不能正常打印这条语句,说明线程调用stop()方法不会释放ReentrantLock锁System.out.println("拿到锁了");lock.unlock();}
正常情况下我们可以使用中断机制(Interrupt)来停止一个线程
Thread
类中有interrupt()
方法可以来停止线程,interrupt()
方法并不是直接停止线程,而是在线程中设置一个中断状态。线程可以使用isInterrupted()
方法检查中断状态,并据此决定是否继续执行;
如果线程处于阻塞状态(如sleep()
或wait()
等),调用interrupt()
会唤醒线程,并抛出InterruptedException异常,同时会清除isInterrupted()
方法的中断状态,重新置为false
;
另外,线程池中也是通过interrupt()
来停止线程的,比如shutdownNow()
方法中就会调用
public static void main(String[] args) {Thread thread = new Thread(() -> {for (int i = 0; i < 100; i++) {// 由当前线程决定是否中断线程if (Thread.currentThread().isInterrupted() && i > 50) {break;}// 打印到50线程被就中止掉System.out.println(i);}});thread.start();// 中断线程thread.interrupt();System.out.println("end");}
public static void main(String[] args) {Thread thread = new Thread(() -> {for (int i = 0; i < 100; i++) {// 由当前线程决定是否中断线程if (Thread.currentThread().isInterrupted() && i > 50) {break;}System.out.println(i);try {// 当线程存在阻塞状态,会清除中止状态,这里会打印到99Thread.sleep(1000);} catch (InterruptedException e) {
// e.printStackTrace();}}});thread.start();// 中断线程thread.interrupt();System.out.println("end");}