📝个人主页:五敷有你
🔥系列专栏:并发编程
⛺️稳重求进,晒太阳
示意图
步骤1:自定义任务队列
变量定义
- 用Deque双端队列来承接任务
- 用ReentrantLock 来做锁
- 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
- 最后定义容量 capcity
方法:
- 添加任务
- 注意点:
- 任务容量慢了 用await
- 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
- 考虑万一死等的情况,加入时间的判断
- 注意点:
- 取出任务
- 注意点:
- 任务空了 用await
- 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
- 考虑超时的情况,加入时间的判断
- 注意点:
public class MyBlockQueue<T> {//1.任务队列private Deque<T> deque=new ArrayDeque();//2.锁private ReentrantLock lock=new ReentrantLock();//3.生产者条件变量private Condition fullWaitSet=lock.newCondition();//4.消费者条件变量private Condition emptyWaitSet=lock.newCondition();//5.容量private int capcity;public MyBlockQueue(int capcity) {this.capcity = capcity;}//带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//6. 阻塞获取public T take() {lock.lock();try {while (deque.isEmpty()) {try {emptyWaitSet.await();}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//阻塞添加public void put(T element){lock.lock();try {while (deque.size()==capcity){try {fullWaitSet.await();}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();} finally {lock.unlock();}}public int size(){lock.lock();try {return deque.size();}finally {lock.unlock();}}}
步骤2:自定义线程池
- 定义变量:
- 任务队列 taskQueue
- 队列的容量
- 线程的集合
- 核心线程数
- 获取任务的超时时间
- 时间单位
- 方法
- 构造方法 初始化一些核心的参数
- 执行方法 execute(task) 里面处理任务
- 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
- 当任务数量>核心数量时,就加入到阻塞队列中
- 自定义的类worker
- 继承Thread 重写Run方法
- 执行传递的任务,每次任务执行完毕,不回收,
- 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
- 继承Thread 重写Run方法
package com.aqiuo.juc;import java.util.HashSet;
import java.util.concurrent.TimeUnit;public class ThreadPool {//任务队列private MyBlockQueue<Runnable> taskQueue;//队列容量int queueCapcity;//线程集合private HashSet<Worker> workers=new HashSet();//线程池的核心线程private int coreSize;//获取任务的超时时间private long timeOut;//时间单位private TimeUnit timeUnit;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);}public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{System.out.println(task+"加入任务队列");taskQueue.put(task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task=task;}@Overridepublic void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务while (task!=null||(task=taskQueue.take())!=null){try {System.out.println("正在执行worker"+this);sleep(10000);task.run();} catch (Exception e) {}finally {task=null;}}//执行完任务后销毁线程synchronized (workers){workers.remove(this);}}}}
测试
开启15个线程测试
public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i=0;i<15;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}
执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,
同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。
这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet
改进
获取任务的超时结束
获取任务take的增强 超时
//带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}
修改worker的run函数
public void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务
// while (task!=null||(task=taskQueue.take())!=null){//修改如下while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){try {System.out.println("正在执行worker"+this);sleep(1000);task.run();} catch (Exception e) {}finally {task=null;}}
正常结束了
放入任务的超时结束offer()
那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入
//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){lock.lock();long nanos = unit.toNanos(timeOut);try {while (deque.size()==capcity){try {long l = fullWaitSet.awaitNanos(nanos);if(l<=0){return false;}}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();return true;} finally {lock.unlock();}
}
拒绝策略
函数式接口
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(MyBlockQueue<T> queue, T task);
}
代码改进
如下部分代码是存入任务的部分
public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{//存入任务//taskQueue.put(task);//当队列满了之后 执行的策略//1) 死等//2)带有超时的等待//3)当调用者放弃任务执行//4)让调用者抛出异常//5)让调用者自己执行任务...//为了增加灵活性,这里不写死,交给调用者//重新写了一个放入任务的方法taskQueue.tryPut(rejectPolicy,task);}}}
阻塞队列里的tryPut
public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {//如果队列容量满了,就开始执行拒绝策略if(capcity>= deque.size()){rejectPolicy.reject(this,task);}else{//不满就正常加入到队列中System.out.println(task+"正常加入到队列");deque.addLast(task);}}finally {lock.unlock();}}
//1) 死等
//2)带有超时的等待
//3)当调用者放弃任务执行
//4)让调用者抛出异常
//5)让调用者自己执行任务...
谁调用方法,谁写拒绝策略
为了传入策略,就再构造函数里面加入一个方法的参数传入
//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);this.rejectPolicy=rejectPolicy;
}
主函数编写拒绝的策略,就lamda表达式会把...
public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{//死等
// queue.put(task);//超时添加
// System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));//放弃执行
// System.out.print("我放弃");//调用者抛出异常
// throw new RuntimeException("任务执行失败");//调用者执行
// task.run();});for (int i=0;i<5;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}
五种拒绝策略的结果(我不会用slog4j)
1.死等的结果
2.超时拒绝的结果(每个false都是时间到了,每加进去)
3.不作为,调用者放弃任务
4.抛出异常,停止
5.调用者线程执行了