线程池的优点
1、线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。
2、可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。
线程池的创建
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
corePoolSize:线程池核心线程数量
maximumPoolSize:线程池最大线程数量
keepAliverTime:当活跃线程数大于核心线程数时,空闲的多余线程最大存活时间
unit:存活时间的单位
workQueue:存放任务的队列
handler:超出线程范围和队列容量的任务的处理程序
线程池的实现原理
提交一个任务到线程池中,线程池的处理流程如下:
1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。
2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
线程池的源码解读
1、ThreadPoolExecutor的execute()方法
public void execute(Runnable command) {if (command == null)throw new NullPointerException(); //如果线程数大于等于基本线程数或者线程创建失败,将任务加入队列if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //线程池处于运行状态并且加入队列成功if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);} //线程池不处于运行状态或者加入队列失败,则创建线程(创建的是非核心线程)else if (!addIfUnderMaximumPoolSize(command)) //创建线程失败,则采取阻塞处理的方式reject(command); // is shutdown or saturated}}
2、创建线程的方法:addIfUnderCorePoolSize(command)
private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true;}
主要关注点在 新增线程部分
1 private Thread addThread(Runnable firstTask) {2 Worker w = new Worker(firstTask);3 Thread t = threadFactory.newThread(w);4 if (t != null) {5 w.thread = t;6 workers.add(w);7 int nt = ++poolSize;8 if (nt > largestPoolSize)9 largestPoolSize = nt;
10 }
11 return t;
12 }
这里将线程封装成工作线程worker,并放入工作线程组里,worker类的方法run方法:
public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);}}
worker在执行完任务后,还会通过getTask方法循环获取工作队里里的任务来执行。
RejetedExecutionHandler:饱和策略
当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:
1、AbortPolicy:直接抛出异常
2、CallerRunsPolicy:只用调用所在的线程运行任务
3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
4、DiscardPolicy:不处理,丢弃掉。
Executor框架的两级调度模型
在HotSpot VM的模型中,JAVA线程被一对一映射为本地操作系统线程。JAVA线程启动时会创建一个本地操作系统线程,当JAVA线程终止时,对应的操作系统线程也被销毁回收,而操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,JAVA程序会将应用分解为多个任务,然后使用应用级的调度器(Executor)将这些任务映射成固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
Executor框架类图
JAVA线程既是工作单元,也是执行机制。而在Executor框架中,我们将工作单元与执行机制分离开来。Runnable和Callable是工作单元(也就是俗称的任务),而执行机制由Executor来提供。这样一来Executor是基于生产者消费者模式的,提交任务的操作相当于生成者,执行任务的线程相当于消费者。
1、从类图上看,Executor接口是异步任务执行框架的基础,该框架能够支持多种不同类型的任务执行策略。
public interface Executor {void execute(Runnable command);
}
Executor接口就提供了一个执行方法,任务是Runnbale类型,不支持Callable类型。
2、ExecutorService接口实现了Executor接口,主要提供了关闭线程池和submit方法:
public interface ExecutorService extends Executor {List<Runnable> shutdownNow();boolean isTerminated();<T> Future<T> submit(Callable<T> task);}
另外该接口有两个重要的实现类:ThreadPoolExecutor与ScheduledThreadPoolExecutor。
其中ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务;而ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行任务,或者定期执行命令。
在上一篇文章中,我是使用ThreadPoolExecutor来通过给定不同的参数从而创建自己所需的线程池,但是在后面的工作中不建议这种方式,推荐使用Exectuors工厂方法来创建线程池
这里先来区别线程池和线程组(ThreadGroup与ThreadPoolExecutor)这两个概念:
a、线程组就表示一个线程的集合。
b、线程池是为线程的生命周期开销问题和资源不足问题提供解决方案,主要是用来管理线程。
Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadExecutor和CachedThreadPool
a、SingleThreadExecutor:单线程线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
我们从源码来看可以知道,单线程线程池的创建也是通过ThreadPoolExecutor,里面的核心线程数和线程数都是1,并且工作队列使用的是无界队列。由于是单线程工作,每次只能处理一个任务,所以后面所有的任务都被阻塞在工作队列中,只能一个个任务执行。
b、FixedThreadExecutor:固定大小线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
这个与单线程类似,只是创建了固定大小的线程数量。
c、CachedThreadPool:无界线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
无界线程池意味着没有工作队列,任务进来就执行,线程数量不够就创建,与前面两个的区别是:空闲的线程会被回收掉,空闲的时间是60s。这个适用于执行很多短期异步的小程序或者负载较轻的服务器。
Callable、Future、FutureTash详解
Callable与Future是在JAVA的后续版本中引入进来的,Callable类似于Runnable接口,实现Callable接口的类与实现Runnable的类都是可以被线程执行的任务。
三者之间的关系:
Callable是Runnable封装的异步运算任务。
Future用来保存Callable异步运算的结果
FutureTask封装Future的实体类
1、Callable与Runnbale的区别
a、Callable定义的方法是call,而Runnable定义的方法是run。
b、call方法有返回值,而run方法是没有返回值的。
c、call方法可以抛出异常,而run方法不能抛出异常。
2、Future
Future表示异步计算的结果,提供了以下方法,主要是判断任务是否完成、中断任务、获取任务执行结果
1 public interface Future<V> {2 3 boolean cancel(boolean mayInterruptIfRunning);4 5 boolean isCancelled();6 7 boolean isDone();8 9 V get() throws InterruptedException, ExecutionException;
10
11 V get(long timeout, TimeUnit unit)
12 throws InterruptedException, ExecutionException, TimeoutException;
13 }
3、FutureTask
可取消的异步计算,此类提供了对Future的基本实现,仅在计算完成时才能获取结果,如果计算尚未完成,则阻塞get方法。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask不仅实现了Future接口,还实现了Runnable接口,所以不仅可以将FutureTask当成一个任务交给Executor来执行,还可以通过Thread来创建一个线程。
Callable与FutureTask
定义一个callable的任务:
1 public class MyCallableTask implements Callable<Integer>2 {3 @Override4 public Integer call()5 throws Exception6 {7 System.out.println("callable do somothing");8 Thread.sleep(5000);9 return new Random().nextInt(100);
10 }
11 }
public class CallableTest2 {3 public static void main(String[] args) throws Exception4 {5 Callable<Integer> callable = new MyCallableTask();6 FutureTask<Integer> future = new FutureTask<Integer>(callable);7 Thread thread = new Thread(future);8 thread.start();9 Thread.sleep(100);
10 //尝试取消对此任务的执行
11 future.cancel(true);
12 //判断是否在任务正常完成前取消
13 System.out.println("future is cancel:" + future.isCancelled());
14 if(!future.isCancelled())
15 {
16 System.out.println("future is cancelled");
17 }
18 //判断任务是否已完成
19 System.out.println("future is done:" + future.isDone());
20 if(!future.isDone())
21 {
22 System.out.println("future get=" + future.get());
23 }
24 else
25 {
26 //任务已完成
27 System.out.println("task is done");
28 }
29 }
30 }