一、自定义线程池
1)背景:
- 在 QPS 量比较高的情况下,我们不可能说所有的访问都创建一个线程执行,这会导致内存占用过高,甚至有可能出现 out of memory
- 另外也要考虑 cpu 核数,如果请求超过了cpu核数,那么有一部分线程就会收到限制,然后等cpu时间片结束会进行一个上下文切换,频繁的上下文也会影响性能。
总和以上,我们可以引出线程池的概念,也就是结合前面的享元模式,创建一批线程,复用线程,既可以较少内存占用,又可以较少线程上下文切换。
2)任务数量放不满 Blocking Queue
import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i = 0; i < 5; i ++ ) {int j = i;threadPool.execute(() -> {log.debug("{}", j);});}}
}
@Slf4j(topic = "c.TestPool")
class ThreadPool {// 线程池大小private int capacity;// 队列private BlockQueue<Runnable> blockQueue;// 线程队列private HashSet<Worker> workers = new HashSet<>();private long timeout;private TimeUnit timeUnit;public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize) {this.timeout = timeout;this.timeUnit = unit;this.capacity = capacity;blockQueue = new BlockQueue<>(queueSize);}// 执行任务public void execute(Runnable runnable) {synchronized (workers) {if (workers.size() < capacity) {Worker worker = new Worker(runnable);log.debug("开始创建线程...{}", worker);workers.add(worker);worker.start();} else {log.debug("线程已满...加入队列");blockQueue.put(runnable);}}}class Worker extends Thread{private Runnable runnable;public Worker(Runnable runnable) {this.runnable = runnable;}@Overridepublic void run() {//执行任务// 1) 当任务不为空是,执行任务// 2) 当任务执行完毕,接着从队列中获取任务执行//这里有赋值不用上锁,是因为poll跟take实现都加了锁// while ((runnable != null) || (runnable = blockQueue.take()) != null) {while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行。。。{}", runnable);runnable.run();} catch (Exception e) {throw new RuntimeException(e);} finally {runnable = null;}}synchronized (workers) {log.debug("{} 被移除了", this);workers.remove(this);}}}
}@Slf4j(topic = "c.TestPool")
class BlockQueue<T> {// 队列大小private int capacity;//存放队列private Deque<T> deque = new ArrayDeque<>();// 一把锁private ReentrantLock lock = new ReentrantLock();// 等待条件private Condition fullCondition = lock.newCondition();private Condition emptyCondition = lock.newCondition();public BlockQueue(int capacity) {this.capacity = capacity;}public int size() {return deque.size();}// 获取任务 设置超时时间public T poll(long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.isEmpty()) {try {if (nanos <= 0) {log.debug("超时结束");return null;}//进行等待nanos = emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 获取任务public T take() {try {lock.lock();while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}// log.debug("删除队列头...");return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 阻塞添加存放任务public void put(T runnable) {try {lock.lock();while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("存放到队列尾");deque.addLast(runnable);emptyCondition.signal();} finally {lock.unlock();}}
}
2)任务数量放满 Blocking Queue (改进)
1. 带超时时间的 阻塞添加
// 带超时时间的存放任务
public boolean offer(T runnable, long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {if (nanos <= 0) {log.debug("等待加入任务队列超时 {} ...", runnable);return false;}nanos = fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();return true;} finally {lock.unlock();}
}
2. 设计模式 之 策略模式
如果存放的队列已经满了,之前的版本就进入阻塞死等了,那么我们可以改进的方式
- 带超时时间的等待
- 让调用者放弃执行
- 让调用者自己执行
- 让调用者抛出异常
- ... 等
有很多方式,如果写死的话,就会有很多else if,那么定死了,没有扩展性,所以使用策略模式,将权利下放,利用函数式接口,让调用者传进来拒绝策略。
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {// 1、 死等//queue.put(task);// 2、 带超时时间等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3、 让调用者放弃
// log.debug("放弃任务{}", task);// 4、 让调用者抛出异常
// throw new RuntimeException("跑出异常" + task);// 5、 让调用者自己执行task.run();});for (int i = 0; i < 3; i ++ ) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);log.debug("{}", j);} catch (InterruptedException e) {throw new RuntimeException(e);}});}}
}@FunctionalInterface
interface RejectFunction<T> {void reject(BlockQueue<T> tBlockQueue, T task);}
@Slf4j(topic = "c.TestPool")
class ThreadPool {// 线程池大小private int capacity;// 队列private BlockQueue<Runnable> blockQueue;// 线程队列private HashSet<Worker> workers = new HashSet<>();private long timeout;private TimeUnit timeUnit;private RejectFunction<Runnable> rejectFunction;public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize, RejectFunction<Runnable> rejectFunction) {this.timeout = timeout;this.timeUnit = unit;this.capacity = capacity;blockQueue = new BlockQueue<>(queueSize);this.rejectFunction = rejectFunction;}// 执行任务public void execute(Runnable runnable) {synchronized (workers) {if (workers.size() < capacity) {Worker worker = new Worker(runnable);log.debug("开始创建线程...{}", worker);workers.add(worker);worker.start();} else {
// blockQueue.put(runnable);blockQueue.tryPut(rejectFunction, runnable);}}}class Worker extends Thread{private Runnable runnable;public Worker(Runnable runnable) {this.runnable = runnable;}@Overridepublic void run() {//执行任务// 1) 当任务不为空是,执行任务// 2) 当任务执行完毕,接着从队列中获取任务执行//这里有赋值不用上锁,是因为poll跟take实现都加了锁
// while ((runnable != null) || (runnable = blockQueue.take()) != null) {while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行。。。{}", runnable);runnable.run();} catch (Exception e) {throw new RuntimeException(e);} finally {runnable = null;}}synchronized (workers) {log.debug("{} 被移除了", this);workers.remove(this);}}}
}@Slf4j(topic = "c.BlockQueue")
class BlockQueue<T> {// 队列大小private int capacity;//存放队列private Deque<T> deque = new ArrayDeque<>();// 一把锁private ReentrantLock lock = new ReentrantLock();// 等待条件private Condition fullCondition = lock.newCondition();private Condition emptyCondition = lock.newCondition();public BlockQueue(int capacity) {this.capacity = capacity;}public int size() {return deque.size();}// 获取任务 设置超时时间public T poll(long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.isEmpty()) {try {if (nanos <= 0) {log.debug("超时结束");return null;}//进行等待nanos = emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 获取任务public T take() {try {lock.lock();while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}
// log.debug("删除队列头...");return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 存放任务public void put(T runnable) {try {lock.lock();while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();} finally {lock.unlock();}}// 带超时时间的存放任务public boolean offer(T runnable, long time, TimeUnit unit) {try {lock.lock();long nanos = unit.toNanos(time);while (deque.size() == capacity) {log.debug("等待加入任务队列 {} ...", runnable);try {if (nanos <= 0) {log.debug("等待加入任务队列超时 {} ...", runnable);return false;}nanos = fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();return true;} finally {lock.unlock();}}public void tryPut(RejectFunction<T> rejectFunction , T runnable) {lock.lock();try {if (deque.size() == capacity) {rejectFunction.reject(this, runnable);} else {log.debug("线程已满...加入队列{}",runnable);deque.addLast(runnable);emptyCondition.signal();}} finally {lock.unlock();}}
}
二、ThreadPoolExecutor
1)线程池状态
ThreadPoolExecutor 使用 int 的高3位来表示线程池状态,低29位表示线程数量。
从数字上比较 RUNNING > SHUTDOWN > STOP > TIDYING > TERMINATED
为什么111 比 00小,因为是一个整数的高3位,所以第一位为1表示负数。
问题:
为什么使用一个整数表示线程池状态和线程数量,不用两个变量来表示呢?
答:
因为要保证在对 线程池状态 和 线程数量 进行赋值操作时的原子性
那么他存在一个变量中,就只需要一次cas操作就可以保证原子性,既可以保存状态信息,也可以保存数量信息。
如果存在两个变量中,就需要两次cas操作才可以保证原子性,才可以对两个变量进行赋值操作。
2)构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
假设 核心线程= 2 救急线程=1 阻塞队列=2
- 一开始任务阻塞队列空闲,线程也空闲
- 任务1 创建 核心线程1 执行
- 任务2 创建 核心线程2 执行
- 核心线程满了,任务3 进入 阻塞队列
- 核心线程满了,任务4 进入 阻塞队列
- 核心线程满了, 阻塞队列满了,任务5 创建 救急线程1 执行
- 核心线程满了, 阻塞队列满了,救急线程满了,任务6执行拒绝策略
救急线程空闲一定时间会自动销毁,等待下次用到在创建
核心线程一直运行。
触发救急线程创建的前提 是配合有界队列来使用
如果线程到达 最大线程数量 仍然有新的任务,这时会执行拒绝策略。
拒绝策略 jdk 提供了 4 种实现,其他著名框架也提供了实现。
3)newFixedThreadPool (固定大小线程池)
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
//可以给第二参数,传线程工厂,提供创建新线程的名字。
特点:
- 核心线程 == 最大线程数 (没有救急线程),因此无需超时时间
- 阻塞队列是无界的,没有指定大小,可以放任务数量任务
评价
适用于任务量已知,相对耗时的任务
4)newCachedThreadPool (带缓冲线程池)
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
}
特点
- 核心线程为0,最大线程为Integer.MAX_VALUE,救急线程的空闲生存时间 60s
-
- 也就是全部都是救急线程(60s后可以回收)
- 救急线程可以无限创建
- 队列采用 SynchronousQueue ,特点是没有容量,没有线程来获取的时候放不进去任务(类似于 一手交钱、一手交货)
- 刚好当前线程池全部都是救急线程,每个任务都会创建一个新的线程来获取任务,合适
评价
整个线程池会不断创建新的线程,任务量有多少线程就会创建多少,然后空闲60s后释放线程
适用于任务量比较密集,但是任务执行时间比较短的情况
5)newSingleThreadExecutor (单线程线程池)
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
使用场景:
希望多个任务串行执行。线程数固定为1,当任务数量多于1时,会存放到阻塞队列中(无界)。任务执行完毕,线程也不会被摧毁释放。
那么跟 固定大小线程池设置成1 或者 自己创建一个线程执行 有什么区别呢
- 自己创建一个线程执行,如果中间发生了异常,那么线程销毁了,阻塞队列中的任务就不执行了,没有什么补救措施,但是如果单线程线程池还会创建一个新的线程,保证正常工作。
- newFixedThreadPool(1) 固定大小线程池设置成1,返回的对象还是可以修改的
-
- 返回的对象还是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize 等方法修改。
- 单线程线程池 newSingleThreadExecutor() 始终线程数量为1,不能修改
-
- 返回的对象 通过new FinalizableDelegatedExecutorService进行了包装,应用了一种装饰器模式,只对外暴露 ExecutorService 接口, 因此不能调用 ThreadPoolExecutor 中特有方法修改。
6)提交任务
1. ALL
2. submit 演示
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool = Executors.newFixedThreadPool(2);//带有返回结果的submit执行,可以使用lambda表达式Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("执行");Thread.sleep(1000);return "ok";}});//获取值,会等待submit执行完,相当于保护性暂停模式log.debug("{}", future.get());}
}
3. invokeAll 演示
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2Show {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool = Executors.newFixedThreadPool(3);// 调用 invokeAll 执行一个链表中的所有任务,然后将所有返回值收集成一个list 类型是Future// 可以设置超时时间, 如果规定时间内没有执行完所有任务,直接终止List<Future<Object>> futures = pool.invokeAll(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);return "1";},() -> {log.debug("begin");Thread.sleep(500);return "2";},() -> {log.debug("begin");Thread.sleep(2000);return "3";}));// 便利所有Future,等待所有执行完毕之后输出// 如果线程数量是2 任务3进入队列等 所以等待时间 500 + 2000// 如果线程数量是3 任务3直接与信工 等待时间 2000futures.forEach(f -> {try {log.debug("{}", f.get());} catch (ExecutionException | InterruptedException e) {throw new RuntimeException(e);}});}
}
4. invokeAny 演示
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool = Executors.newFixedThreadPool(3);// 调用 invokeAny 执行一个链表中的所有任务,然后将执行最快的一个任务的返回值返回,然后其他任务不执行了// 如果只有一个线程, 那么只有第一个任务被执行,所以只会返回第一个任务的结果String result = pool.invokeAny(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);log.debug("end");return "1";},() -> {log.debug("begin");Thread.sleep(500);log.debug("end");return "2";},() -> {log.debug("begin");Thread.sleep(2000);log.debug("end");return "3";}));//最后拿到 的是2 然后1 跟 3的end没输出log.debug("{}", result);}
}
7)关闭线程池
shutdown
showdownNow
其他方法
三、设计模式 之 异步模式 之 工作线程
1) 定义
- 让有限的工作线程(Worker Thread)可以轮流异步处理无限多的任务,也可以归类为分工模式。
- 他的典型实现是 线程池,体现了经典设计模式中的享元模式。
例如:餐厅的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客户分配一位服务员,那么成本就太高了
(对比另外一种多线程设计模式:Thread-Pre-Message 这个是一个任务创建一个新线程处理)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如:如果一个餐厅的工人既要招呼客人(任务类型A),又要后厨做菜(任务类型B) 显然效率很低,因为他全都要干,
这时候如果分成招呼客人一批人(线程池A),做菜一批人(线程池B)更为合理,更细致的分工效率跟更高。
2)饥饿
这里的饥饿跟 synchronized 那里的饥饿有点区别,这里不是因为锁导致的,而是因为线程不足导致的,现象类似于死锁,但是不是锁导致的。
一般是固定大小线程池有饥饿现象。
- 有两个工人在同一个线程池中表示两个线程。
- 两个工人是全能的,既能做饭,又能处理点餐,有以下两种阶段情景
-
- 1、有一个客人点餐,工人A处理点餐,等待上菜,工人B做菜,工人A上菜,一气呵成,分工合作很顺利
- 2、有两个客人同时点餐,工人A和工人B同时处理点餐,同时等待上菜,没有额外工人(线程)去做菜,这时候就死锁了。
情况1
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j(topic = "c.pool")
public class Test3Executor {static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);// 有人点餐,执行任务pool.execute(() -> {log.debug("点餐");Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:17:54.256 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:17:54.261 [pool-1-thread-2] DEBUG c.pool - 做菜
// 21:17:55.262 [pool-1-thread-1] DEBUG c.pool - 做完红烧牛肉1
情况2:
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j(topic = "c.pool")
public class Test3Executor {static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);// 有人点餐,执行任务pool.execute(() -> {log.debug("点餐");Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});// 有人点餐,执行任务pool.execute(() -> {log.debug("点餐");Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:19:36.930 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:19:36.930 [pool-1-thread-2] DEBUG c.pool - 点餐
3)解决饥饿
解决1(不全面):
- 增加核心线程来解决,但是如果同时比较多,还是没有办法解决。
解决2(全面):
- 不同功能的线程放到不同线程池
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j(topic = "c.pool")
public class Test3Executor {static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");public static void main(String[] args) {ExecutorService cookPool = Executors.newFixedThreadPool(1);ExecutorService waitPool = Executors.newFixedThreadPool(1);// 有人点餐,执行任务waitPool.execute(() -> {log.debug("点餐1");Future<String> future = cookPool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜1");Thread.sleep(1000);return list.get(0);}});try {log.debug("做完1{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});// 有人点餐,执行任务waitPool.execute(() -> {log.debug("点餐2");Future<String> future = cookPool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("做菜2");Thread.sleep(1000);return list.get(2);}});try {log.debug("做完2{}", future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:35:52.924 [pool-2-thread-1] DEBUG c.pool - 点餐1
// 21:35:52.924 [pool-2-thread-2] DEBUG c.pool - 点餐2
// 21:35:52.927 [pool-1-thread-1] DEBUG c.pool - 做菜1
// 21:35:53.928 [pool-1-thread-1] DEBUG c.pool - 做菜2
// 21:35:53.928 [pool-2-thread-1] DEBUG c.pool - 做完1红烧牛肉1
// 21:35:54.930 [pool-2-thread-2] DEBUG c.pool - 做完2牛肉3
4)创建多少线程池合适?
- 过小会导致不能充分利用cpu系统资源,容易导致饥饿
- 过大会导致频繁发生线程上下文切换,占用更多内存
1. CPU 密集型运算
通常采用 cpu核数 + 1 能够实现最优的CPU利用率, +1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,
额外的这个线程能够顶上去,保证CPU时钟周期不被浪费。常用于数据分析时。
2. I/O 密集型运算
CPU 不总是在繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但是当你执行I/O操作时、远程RPC调用、数据库操作时等,这时候CPU资源就空闲了
你可以利用多线程提高他的利用率。
经验公式如下:
- 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间 +等待时间) / CPU 计算时间
例如
4 核CPU计算时间是50%,其他等待时间是50%,期望cpu被100%利用, 套用公式
4 * 100% * 100% / 50% = 8
四、任务调度线程池
有些时候我们需要延迟执行任务(延迟几秒后执行),或者需要循环执行任务(每隔几秒执行一次),这时候就需要用到任务调度的线程池了。
JDK5的时候加入任务调度线程池
JDK5之前 可以使用java.util.Timer 来实现定时功能
1. Timer
优点:简单易用,
缺点:所有任务都由一个独立线程执行,所有任务都是串行执行的,如果中间有一个任务发生异常,后面的任务也不会运行了。
@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {Timer timer = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");try {Thread.sleep(0);} catch (InterruptedException e) {throw new RuntimeException(e);}}};log.debug("start");// 一秒后执行 task1 task2timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
}
// 16:17:24.730 [main] DEBUG c.pool - start
// 16:17:25.734 [Timer-0] DEBUG c.pool - task 1
// 16:17:27.735 [Timer-0] DEBUG c.pool - task 2
2. ScheduledExecutorService
解决了Timer的缺点,抛出异常不会终止其他任务,还可以设置多个线程参与调度。
如果ScheduledExecutorService(1) 设置成1,那么也是串行执行,但是不会终止其他任务
a. 延迟任务
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);pool.schedule(() -> {log.debug("task1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, TimeUnit.SECONDS);pool.schedule(() -> {log.debug("task2");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, TimeUnit.SECONDS);}
}
// 16:23:45.724 [pool-1-thread-1] DEBUG c.pool - task1
// 16:23:45.724 [pool-1-thread-2] DEBUG c.pool - task2
b. 定时任务
ⅰ. scheduleAtFixedRate
@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start");// 延迟1秒执行,然后每隔1秒执行一次// 如果任务的延迟比定时还长,那么所有任务会在上一个任务执行完之后 紧接着运行pool.scheduleAtFixedRate(() -> {log.debug("task1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, 1, TimeUnit.SECONDS);}
}
// 16:33:06.263 [main] DEBUG c.pool - start
// 16:33:07.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:09.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:11.303 [pool-1-thread-1] DEBUG c.pool - task1
ⅱ. scheduleWithFixedDelay
@Slf4j(topic = "c.pool")
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start");// 不管任务执行多长时间,都会等他工作完成之后再延迟一秒执行一次pool.scheduleWithFixedDelay(() -> {log.debug("task1");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, 1, TimeUnit.SECONDS);}
}
// 16:35:55.014 [main] DEBUG c.pool - start
// 16:35:56.048 [pool-1-thread-1] DEBUG c.pool - task1
// 16:35:59.051 [pool-1-thread-1] DEBUG c.pool - task1
// 16:36:02.051 [pool-1-thread-1] DEBUG c.pool - task1
正确处理线程池异常
- try catch 手动捕捉
- 普通线程池可以使用配合feture的返回值get()等待,如果有异常,返回值会是异常描述
3. 线程池应用
如何让每周日 18:00:00 定时执行任务?
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class TestTaskRun {// 如何让每周日 18:00:00 定时执行任务?public static void main(String[] args) {// 获取当前时间LocalDateTime now = LocalDateTime.now();System.out.println("now = " + now);// 获取本周周日时间LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);// 如果当前时间 > 本周周四,那么获取的time是不对的,需要是下周的if (now.isAfter(time)) {time = time.plusWeeks(1);}System.out.println("time = " + time);long period = 1000 * 60 * 60 * 24 * 7; // 一周时间long initialDelay = Duration.between(now, time).toMillis(); //获取两个时间之间的毫秒差值ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.scheduleAtFixedRate(() -> {System.out.println("running...");}, initialDelay, period, TimeUnit.MILLISECONDS);}
}
五、Tomcat 线程池
浏览器发出一个请求
首先会经过LimitLatch(作用是限流,控制最大连接数,防止太多连接把服务器压垮),类似于J.U.C 中的 Semaphore 后面再讲
当没有超过最大连接数,到达第二个组件 Acceptor(本质上是一个线程,不断循环看有没有新的连接,主要就是负责接受socket连接,其他不管)
然后Poller (也是一个线程,死循环看连接上是否有这种可读的事件发生,但是也是负责监听是否有,如果有交给一个Executor来执行)来负责看是否有IO的读写操作
一旦Poller 发现可读,会封装一个任务对象(socketProcessor实现了runnable),提交个Executor线程池处理
Executor 线程池中的工作线程(核心救急线程模式)最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor ,行为稍有不同
- 如果总线程数达到 最大线程数 并且队列也满了
-
- 这时不会立刻抛出 RejectedExecutionException 异常
- 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
源码 tomcat- 7.0.42
tomcat 线程池配置
tomcat 线程池相对于 java原生线程池,tomcat进行了一些改进,主要体现在核心在于Tomcat自己定义的任务队列。
TaskQueue —— offer(Runnable o)方法
public class TaskQueue extends LinkedBlockingQueue<Runnable> {private transient volatile ThreadPoolExecutor parent = null;public TaskQueue() {super();}public void setParent(ThreadPoolExecutor tp) {parent = tp;}@Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) {return super.offer(o);}//we are maxed out on threads, simply queue the object// 如果线程数量已经达到最大数量,则进入队列等待执行if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {return super.offer(o);}//we have idle threads, just add it to the queue// 执行到这里 最大线程数 > 当前线程数 > 核心线程数。// 如果提交的任务数 <= 当前线程数则说明存在空闲线程,则提交到任务队列,等待执行if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {return super.offer(o);}//if we have less threads than maximum force creation of a new thread// 执行到这,说明提交的任务数已经大于当前线程数,需要创建新的线程if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {return false;}//if we reached here, we need to add it to the queuereturn super.offer(o);}
}
- submittedCount:Tomcat自定义的ThreadPoolExecutor中使用该字段来标记当前已提交的任务数,提交加一,执行完减一。
- poolSizeNoLock:当前执行的线程数
- 判断当前是否需要创建空闲线程执行任务,而不是全放入队列,因为LinkedBlockingQueue默认是无界的,但也可以设计最大数量;
六、Fork / Join线程池
1)概念
- JDK 1.7 之后加入的新的线程池实现, 使用了分治思想,适用于能够进行任务拆分的 cpu 密集型运算。
- 所谓任务拆分,是一个大任务拆分算法上相同的小任务,直到不能再拆分然后求解,最后合并,类似于归并排序,裴波那契数列等思想
- 分治思想的基础上加入了多线程,每个任务的分解和合并交给不同线程执行,进一步提升运算效率。
- 默认会创建与 cpu 核数大小相同的线程池。
2)使用
1. 拆分不完善(导致串行)
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool poll = new ForkJoinPool(4);// 运行调用很简单System.out.println(poll.invoke(new MyTask(5)));}
}@Slf4j(topic = "c.MyTask")
// 计算机1 - n之间的和
class MyTask extends RecursiveTask<Integer> {private int n;public MyTask(int n) {this.n = n;}@Overridepublic String toString() {return "MyTask{" +"n=" + n +'}';}@Overrideprotected Integer compute() {// 递归结束条件if (n == 1) {log.debug("join() {}", n);return 1;}//分解成子问题MyTask myTask = new MyTask(n - 1);myTask.fork(); //分配线程执行log.debug("fork() {} + {}", n, myTask);//获取线程返回结果Integer join = myTask.join();log.debug("join() {} + {} = {}", n, myTask, n + join);return n + join; //然后拼接子问题返回答案}
}
// 21:07:44.838 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - fork() 5 + MyTask{n=4}
// 21:07:44.838 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - fork() 4 + MyTask{n=3}
// 21:07:44.838 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - fork() 2 + MyTask{n=1}
// 21:07:44.838 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - fork() 3 + MyTask{n=2}
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 1
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 2 + MyTask{n=1} = 3
// 21:07:44.842 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - join() 3 + MyTask{n=2} = 6
// 21:07:44.842 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - join() 4 + MyTask{n=3} = 10
// 21:07:44.842 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - join() 5 + MyTask{n=4} = 15
// 15
2. 分治思想(并行)
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool poll = new ForkJoinPool(4);// 运行调用很简单System.out.println(poll.invoke(new AddTask(1, 5)));}
}
@Slf4j(topic = "c.AddTask")
// 计算机1 - n之间的和
class AddTask extends RecursiveTask<Integer> {private int start;private int end;public AddTask(int start, int end) {this.start = start;this.end = end;}@Overridepublic String toString() {return "AddTask{" +"start=" + start +", end=" + end +'}';}@Overrideprotected Integer compute() {// 递归结束条件if (start == end) {log.debug("join() {}", start);return start;}if (end == start + 1) {log.debug("join() {}", end + start);return end + start;}//分解成子问题int mid = (start + end) / 2;AddTask addTask1 = new AddTask(start, mid);addTask1.fork(); //分配线程执行log.debug("fork() {}", addTask1);AddTask addTask2 = new AddTask(mid + 1, end);addTask2.fork(); //分配线程执行log.debug("fork() {}", addTask2);//获取线程返回结果Integer join = addTask1.join() + addTask2.join();log.debug("join() {} , {} = {}", start, end, join);return join; //然后拼接子问题返回答案}
}
七、AQS原理
1)概述
2)自定义锁
自定义一个同步器组件,例如可重入锁那些都是这么做。在你自定义的同步器组件内部定义一个AQS类的内部类,所有操作交给这个内部类完成,我们的自定义同步组件只需要调用这个内部类的方法
这里要注意,acquire()是AQS实现的模板方法可以直接用,tryAquire()是自定义的加锁方法, 模板会调用此自定义实现方法
它的acquire、release方法是aqs的固定机制,比如怎么把线程阻塞,怎么加入队列等等,而tryxx是自己实现的,自己决定是否可重入,是否可共享等等
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;@Slf4j(topic = "c.MyLock")
public class Test6Lock {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(() -> {lock.lock();log.debug("lock");lock.lock();log.debug("lock");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {log.debug("unlock");lock.unlock();}}).start();
// new Thread(() -> {
// lock.lock();
// log.debug("lock");
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } finally {
// log.debug("unlock");
// lock.unlock();
// }
// }).start();}
}// 自定义锁 (不可重入锁)
class MyLock implements Lock {class MyAQS extends AbstractQueuedSynchronizer {// 独占锁 同步器类@Overrideprotected boolean tryAcquire(int arg) {// cas 操作是否可以获得锁if (compareAndSetState(0, 1)) {//然后锁的Owner 是当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {// 释放锁之后清空OwnersetExclusiveOwnerThread(null);// 因为是volatile 所以因为写屏障,所以前面不会重排指令到state后面,同步到主存setState(0);return true;}@Override //是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition() {return new ConditionObject();}}MyAQS sync = new MyAQS();@Override //加锁 (加锁失败,进入队列)public void lock() {sync.acquire(1);}@Override // 加锁,但是可以打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Override // 尝试加锁(一次)public boolean tryLock() {return sync.tryAcquire(1);}@Override // 尝试加锁,带超时public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Override //释放锁public void unlock() {sync.release(1); //释放锁,底层还会唤醒等待的线程}@Override //创建条件变量public Condition newCondition() {return sync.newCondition();}
}
八、ReentrantLock 原理
1)加锁成功流程
非公平锁实现原理:
加锁解锁流程:
构造器方法,默认是非公平锁实现
public ReentrantLock() {sync = new NonfairSync();
}
NonfairSync 继承自 AQS,原理类似于刚才的AQS实现自定义锁
2)加锁失败流程
第一次尝试:如果已经被别人加锁了,调用acquire
第二次尝试:如果这时候加锁的人已经释放,则不需要继续往下,否则就创建AddWaiter构造一个链表(即head、tail),添加一个Node链表节点,加入队列
- 首次创建会创建两个,第一个Node称为 Dummy(哑元)或哨兵,用来占位,并不关联线程。
- 黄色三角形表示该Node的 waitStatus 状态,默认 0 为正常状态
进入acquireQueue () 逻辑
第三个尝试:循环判断当前节点的前驱节点是不是头结点,说明他是第一个节点,有资格去再次尝试获取锁
进入 shouldParkAfterFailAcquire()逻辑
把前驱节点改成状态改成 -1,代表前驱节点有资格唤醒后继结点,因为如果进行park ,需要有人唤醒,第一次会方法会返回false
第四次尝试:再次尝试获取,获取失败,再次进入shouldParkAfterFailAcquire,因为前驱节点node已经是状态-1,所以这次返回true,进行一个park,阻塞住
3)解锁竞争成功流程
如果有多个线程经历上述过程竞争失败,变成这个样子
这时候 Thread - 0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
然后判断头结点是不是不为空,并且 waitStatus 状态不为0,那么就执行 unparkSuccessor 唤醒距离头结点最近的节点,然后他有资格继续尝试获取锁
获取到锁之后,设置state = 1, 并且 exclusiveOwnerThread = 当前线程,然后将头结点替换成当前节点 】
4)解锁竞争失败流程
因为是非公平锁,此时有另外一个线程Thread4,不是队列中的,而是第一次访问的线程,那么就会发生一个竞争
Thread-1 如果竞争失败,Thread-4 的线程就会设置成 exclusiveOwnerThread ,state = 1
Thread - 1 竞争失败再次进入 acquireQueue 流程,获取锁失败,重新进入 park 阻塞
公平锁的意思也就是 新线程来的时候会不会直接加入队列中。
5)锁重入原理
6)可打断原理
不可打断锁原理:
打断不会影响到获取锁停止,只有获得锁之后才会返回打断标记
可打断原理:
如果打断之后直接抛出异常,不会继续获取锁了
7)公平锁原理
公平锁原理:新线程进来获取锁的时候,判断是不是队列中有线程等待,来判断是否可以进行竞争锁。
8)条件变量 - await
每个条件变量其实对应着一个等待队列, 其实现类是 ConditionObject
开始Thread-0持有锁,调用await,进入 COnditionObject 的 addConditionWaiter 流程
创建 新的 Node 状态为 -2(Node.CONDITION) ,关联 Thread-0,加入等待队列尾部
然后进入 AQS 的fullyRelease 流程, 释放同步器上的锁,也就是当前线程的所有锁
为什么是fullyRelease 不是 因为可能发生了锁重入,所以需要释放所有锁
state = 0, exclusiveOwnerThread = null
然后unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread - 1 竞争成功。
state = 1, exclusiveOwnerThread = thread1
9)条件变量 - signal
假设 Thread- 1 调用 signal 要来唤醒 条件变量中等待的 Thread - 0
调用 signal 方法,然后唤醒 条件变量中的 第一个节点,然后条件变量中的当前点 置为null,
转移成功 然后加入到等待队列队尾等待
转移失败 因为有些时候有时限的等待超时,或者被打断,那么就没有必要加入到队列队尾
加入队列,获取前一个节点,然后设置state 为 -1,因为最后一个是0(前一个节点设置成-1,表示有资格唤醒unpark新加入的线程)
如果前一个节点的状态 > 0 ,表示被取消了,那么需要唤醒当前线程,去找能够当做前驱节点的点。