目录
编辑
先上完整代码:
解析:
任务队列:
线程池类:
拒绝策略:
先上完整代码:
public class MyThreadPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {// 1.死等queue.put(task);// 2.带超时时间等待加入等待队列// queue.offer(task, 500, TimeUnit.MICROSECONDS);// 3.放弃任务// 队列满了,没做人任何事情// 4.抛出异常// throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行// task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j);});}}
}// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {void reject(BlockQueue queue, T task) ;
}
class ThreadPool {// 任务队列private BlockQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet();// 线程数private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 构造方法public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数没有超过核心数时,直接交给woker对象执行// 如果超过,放入任务队列中存起来synchronized (workers) { // workers不安全,把他锁起来if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker");workers.add(worker); // 加入线程集合worker.start();} else {// taskQueue.put(task); // 任务添加进入// 1.死等// 2.带超时时间等待// 3.放弃任务// 4.抛出异常// 5.让调用者自己执行taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 当task任务不为空,执行// 当任务为空,去任务队列中去取// while (task != null || (task = taskQueue.take()) != null) 一直等待获取while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {System.out.println("正在执行" + task);task.run();} catch (Exception e) {} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除" + this);workers.remove(this); // 移除当前集合对象}}}
}// 阻塞队列
class BlockQueue<T> {// 任务队列private Deque<T> queue = new ArrayDeque<>();// 锁private ReentrantLock lock = new ReentrantLock();// 满了等待,生产者private Condition fullWaitSet = lock.newCondition();// 空的等待,消费者private Condition emptyWaitSet = lock.newCondition();// 容量private int capacity;public BlockQueue(int capacity) {this.capacity = capacity;}// 阻塞队列中获取任务public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await(); // 进入等待} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 阻塞队列中添加任务public void put(T t) {lock.lock();try {while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" + t);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + t);queue.addLast(t);emptyWaitSet.signal(); // 唤醒}finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();}finally {lock.unlock(); // 就算return也会执行}}// 带超时时间的获取,无需永久的等待了public T poll (long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout); // 时间转换为nswhile (queue.isEmpty()) {try {if (nanos <= 0) return null; // 超时了,直接返回吧nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 带超时时间的添加, return 添加成功 or 失败public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" + task);if (nanos <= 0) return false;nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + task);queue.addLast(task);emptyWaitSet.signal(); // 唤醒return true;}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == capacity) { // 有空闲rejectPolicy.reject(this, task); // 拒绝策略} else { // 有空闲queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}
解析:
任务队列:
// 阻塞队列
class BlockQueue<T> {// 任务队列private Deque<T> queue = new ArrayDeque<>();// 锁private ReentrantLock lock = new ReentrantLock();// 满了等待,生产者private Condition fullWaitSet = lock.newCondition();// 空的等待,消费者private Condition emptyWaitSet = lock.newCondition();// 容量private int capacity;public BlockQueue(int capacity) {this.capacity = capacity;}// 阻塞队列中获取任务public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await(); // 进入等待} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 阻塞队列中添加任务public void put(T t) {lock.lock();try {while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" + t);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + t);queue.addLast(t);emptyWaitSet.signal(); // 唤醒}finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();}finally {lock.unlock(); // 就算return也会执行}}// 带超时时间的获取,无需永久的等待了public T poll (long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout); // 时间转换为nswhile (queue.isEmpty()) {try {if (nanos <= 0) return null; // 超时了,直接返回吧nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 带超时时间的添加, return 添加成功 or 失败public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" + task);if (nanos <= 0) return false;nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + task);queue.addLast(task);emptyWaitSet.signal(); // 唤醒return true;}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == capacity) { // 有空闲rejectPolicy.reject(this, task); // 拒绝策略} else { // 有空闲queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}
-
ArrayDeque
作为底层数据结构存储队列元素。 -
ReentrantLock
实现了线程安全。 Condition
来实现阻塞等待机制,当队列为空时,消费者线程等待;当队列满时,生产者线程等待。- 常规的入队
put()
、出队take()
操作。 - 带有超时的入队
offer()
和出队poll()
操作。 tryPut()
方法,该方法接受一个RejectPolicy
接口,用于指定当队列已满时的拒绝策略
方法:
take()
: 当队列为空时,消费者线程调用该方法将进入等待状态,直到队列中有元素可取。put(T t)
: 当队列已满时,生产者线程调用该方法将进入等待状态,直到队列有空位可添加元素。poll(long timeout, TimeUnit unit)
: 带有超时的出队操作,当队列为空时,会等待一段时间,如果在指定时间内仍未有元素可取,则返回 null。offer(T task, long timeout, TimeUnit timeUnit)
: 带有超时的入队操作,当队列已满时,会等待一段时间,如果在指定时间内仍未有空位可添加元素,则返回 false。tryPut(RejectPolicy<T> rejectPolicy, T task)
: 尝试添加元素,当队列已满时,根据拒绝策略RejectPolicy
进行处理。
单看其实就是一个生产者消费者模式而已。
线程池类:
class ThreadPool {// 任务队列private BlockQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet();// 线程数private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 构造方法public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数没有超过核心数时,直接交给woker对象执行// 如果超过,放入任务队列中存起来synchronized (workers) { // workers不安全,把他锁起来if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker");workers.add(worker); // 加入线程集合worker.start();} else {// taskQueue.put(task); // 任务添加进入// 1.死等// 2.带超时时间等待// 3.放弃任务// 4.抛出异常// 5.让调用者自己执行taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 当task任务不为空,执行// 当任务为空,去任务队列中去取// while (task != null || (task = taskQueue.take()) != null) 一直等待获取while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {System.out.println("正在执行" + task);task.run();} catch (Exception e) {} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除" + this);workers.remove(this); // 移除当前集合对象}}}
}
BlockQueue<Runnable>
来存储待执行的任务。HashSet<Worker>
来存储线程集合。- 提供构造方法来初始化线程池的核心线程数、超时时间、任务队列大小和拒绝策略。
execute(Runnable task)
方法来提交任务到线程池中执行。- 内部定义了
Worker
内部类,用于执行任务的线程。
方法:
execute(Runnable task)
: 提交任务到线程池中执行。如果当前线程数小于核心线程数,则直接创建新的Worker
线程执行任务;如果当前线程数已达到核心线程数,则尝试将任务放入任务队列中,根据拒绝策略rejectPolicy
进行处理。Worker
: 内部类实现了线程执行任务的逻辑。在run()
方法中,线程会不断从任务队列中取出任务执行,如果队列为空则会等待一段时间,超时时间由timeout
和timeUnit
决定。
拒绝策略:
函数式接口,由使用者提供实现。
// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {void reject(BlockQueue queue, T task) ;
}
```java
public class MyThreadPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {// 1.死等queue.put(task);// 2.带超时时间等待加入等待队列// queue.offer(task, 500, TimeUnit.MICROSECONDS);// 3.放弃任务// 队列满了,没做人任何事情// 4.抛出异常// throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行// task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j);});}}
}
几种拒绝策略实现:
-
死等(Blocking): 当任务队列已满时,线程池会一直等待直到有空位。这里使用了
queue.put(task)
,该方法会阻塞当前线程直到队列有空位可用。 -
带超时时间等待(Timeout Blocking): 当任务队列已满时,线程池会等待一段时间,如果在指定时间内仍未有空位可用,则放弃当前任务。这里使用了
queue.offer(task, 500, TimeUnit.MICROSECONDS)
,该方法会在指定时间内等待,如果超时则返回 false。 -
放弃任务(Discard): 当任务队列已满时,线程池会放弃当前任务,不做任何处理。
-
抛出异常(Throw Exception): 当任务队列已满时,线程池会抛出异常,通知调用者任务执行失败。
-
让调用者自己执行(Caller Runs): 当任务队列已满时,不在线程池内执行任务,而是由调用者自己执行任务。