目录
一. 阻塞队列 BlockingQue
二. 拒绝策略 RejectPolicy
三. 线程池 ThreadPool
四. 模拟运行
在 Java基础(二) 多线程编程 中,我们简单介绍了线程池 ThreadPoolExecutor 的核心概念与基本使用。在本文中,我们将基于前面学习的各种锁与同步工具来实现自定义的线程池,同时来探究和分析 Java 线程池的基本原理。
一. 阻塞队列 BlockingQue
在线程池的生态中,阻塞队列是至关重要的一环,其用于实现任务与工作线程之间的平衡(类似于生产者/消费者模式)。 在此处,我们实现了一个自定义的阻塞队列 BlockingQue,其代码如下:
// 阻塞队列实现
public class BlockingQue<T> {// 1. 任务队列private Deque<T> queue;// 2. 锁private ReentrantLock lock;// 3. 生产者条件变量private Condition fullWaitSet;// 4. 消费者条件变量private Condition emptyWaitSet;// 5. 容量private int capacity;public BlockingQue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.capacity = capacity;// ArrayDeque: 基于 Object[] 实现,可以自动扩容this.queue = new ArrayDeque<>();this.lock = new ReentrantLock(fair);// 读写共用一把锁this.fullWaitSet = lock.newCondition();this.emptyWaitSet = lock.newCondition();}public BlockingQue(int capacity) {this(capacity, false);}// 阻塞添加public void put(T element) throws InterruptedException {lock.lock();try {while (queue.size() == capacity) {fullWaitSet.await();}queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();} finally {lock.unlock();}}// 非阻塞添加public boolean offer(T element) {lock.lock();try {if (queue.size() == capacity)return false;queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();return true;} finally {lock.unlock();}}// 超时阻塞添加public boolean offer(T element, long timeout, TimeUnit unit) throws InterruptedException {lock.lock();try {long nanos = unit.toNanos(timeout);while (queue.size() == capacity) {// 已经超时则返回 falseif (nanos <= 0)return false;nanos = fullWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)}queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();return true;}finally {lock.unlock();}}// 阻塞获取public T take() throws InterruptedException {lock.lock();try {while (queue.isEmpty()) {emptyWaitSet.await();}T element = queue.removeFirst();// 唤醒生产线程fullWaitSet.signal();return element;} finally {lock.unlock();}}// 超时阻塞获取public T poll(long timeout, TimeUnit unit) throws InterruptedException {lock.lock();try {// 将 timeout 统一转换为纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {// 已经超时则返回 nullif(nanos <= 0){return null;}nanos = emptyWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)}T element = queue.removeFirst();// 唤醒生产线程fullWaitSet.signal();return element;}finally {lock.unlock();}}//获取大小public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}
}
可以看出,上述代码使用了 Deque 作为元素存储容器,但若将 Deque 换成 Object[] 数组,则其基本就是 ArrayBlockingQueue 的实现源码。在实际工作中,若要实现自定义阻塞队列,我们只需要实现 BlockingQueue<E> 接口及其抽象方法即可。
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit) throws InterruptedException;int remainingCapacity();boolean remove(Object o);public boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}
二. 拒绝策略 RejectPolicy
在线程数量已满且阻塞队列已满的情况下,主线程则会因为无法放置任务而一直阻塞等待,因此我们需要拒绝策略来处理这种溢出情况。拒绝策略一般定义为接口,并允许我们自定义策略,其代码如下:
// 拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQue<T> queue, T task);
}
一般接口方法需要提供阻塞队列以及当前任务两个参数,并支持函数式编程;常见的拒绝策略包括:阻塞等待、放弃执行、抛出异常、由调用线程执行等(后续会实现)。在实际工作中,Java已经为我们提供了拒绝策略的顶层设计,若想自定义拒绝策略,我们只需实现 RejectedExecutionHandler 接口并实现其 rejectedExecution 抽象方法即可。
package java.util.concurrent;public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
三. 线程池 ThreadPool
在本节,我们将实现一个简单的自定义线程池,其只包含核心线程数,并且规定线程池的运行规则如下:
1.若当前线程数 < corePoolSize,则新建线程处理任务;
2.若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待;
3.若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略;
/*** 自定义线程池实现:* 1. 若当前线程数 < corePoolSize,则新建线程处理任务* 2. 若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待* 3. 若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略*/
public class ThreadPool {// 任务队列private BlockingQue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 锁private ReentrantLock mainLock = new ReentrantLock();// 核心线程数private int coreSize;// 获取任务的超时时间(allowThreadTimeOut=true时有效)private long timeOut;// 时间单位(allowThreadTimeOut=true时有效)private TimeUnit timeUnit;// 是否允许线程超时等待(默认允许)private boolean allowThreadTimeOut = true;// 拒绝策略private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;this.taskQueue = new BlockingQue<>(queueCapacity);this.rejectPolicy = rejectPolicy;}// 设置 allowThreadTimeOut 参数public void setAllowThreadTimeOut(boolean allowThreadTimeOut) {this.allowThreadTimeOut = allowThreadTimeOut;}// 执行任务 taskpublic void execute(Runnable task){mainLock.lock();try{if(workers.size() < coreSize){// 添加核心线程Worker worker = new Worker(task);workers.add(worker);worker.start();}else if(!taskQueue.offer(task)){// 执行拒绝策略rejectPolicy.reject(taskQueue, task);}} finally {mainLock.unlock();}}// 工作线程类private class Worker extends Thread{// 执行任务private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {// 获取任务while(task != null || (task = getTask()) != null){try{task.run();}catch (Exception e){e.printStackTrace();}finally {task = null;}}// worker 线程终止synchronized (workers){// 移除 workerworkers.remove(this);}}}// 从阻塞队列中获取等待任务(提供给Worker的钩子方法)private Runnable getTask(){for(;;){try {Runnable r = allowThreadTimeOut ? taskQueue.poll(timeOut, timeUnit) : taskQueue.take();return r;} catch (InterruptedException e) {// 若被中断则重新等待e.printStackTrace();}}}
}
Java ThreadPoolExecutor 的实现相比我们自定义的线程池更加复杂和安全(增加了线程池状态的维护、最大线程数的逻辑、线程池终止方法等),但在核心思想的实现上基本一致,因此这段自定义代码的实现可以帮助我们更加方便的理解 ThreadPoolExecutor 的源码。
四. 模拟运行
public class Main {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,10000, TimeUnit.MILLISECONDS, 5,(queue, task) -> {// 1. 死等//try {// queue.put(task);//} catch (InterruptedException e) {// e.printStackTrace();//}// 2. 放弃任务执行// do nothing...System.out.println("do discard policy...");// 3. 抛出异常//throw new RuntimeException("task run fail" + task);// 4. 调用线程执行任务//task.run();});for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j + "is running...");});}}
}