深入剖析线程池基本原理以及常见面试题详解

文章目录

  • 面试官:能给我讲讲线程池的实现原理?
  • 线程池类继承关系
  • ThreadPoolExecutor
    • 核心数据结构
    • 面试官:给我讲讲线程池的有哪些参数?
    • 面试官:如何优雅的关闭线程?
      • 线程的生命周期
    • 面试官:线程池哪五种状态?
    • 面试官:线程池哪4种拒绝策略?并分别说一下作用和实现原理?
      • DiscardOldestPolicy
      • AbortPolicy
      • DiscardPolicy
      • CallerRunsPolicy
    • 面试官:线程池常用的阻塞队列有哪些?能说下各自的区别?
      • SynchronousQueue应用
      • PriorityBlockedQueue应用
      • DelayQueue应用
    • 面试官:如何结合业务合理的配置线程池参数?CPU密集型和IO密集型如何配置?线程设置过多会造成什么影响?
      • CPU 密集型任务
      • IO密集型任务
    • 面试官:给我讲讲什么是线程复用?
    • 面试官:为什么《阿里巴巴开发手册》不推荐使用Executor创建线程?
  • ScheduledThreadPoolExecutor
    • 延时执行
    • 周期执行
    • 面试题:你知道延迟执行、周期性执行任务实现原理?
    • 面试题:为什么不使用Timer而使用ScheduledThreadPoolExecutor?
  • CompletableFuture异步编程工具
    • 基本使用
    • 四种任务原型
    • 面试题:你知道CompletableFuture内部原理?
      • CompletableFuture的构造:ForkJoinPool
      • 任务类型的适配
      • 任务的链式执行过程分析
  • 什么是 Java8 的 ForkJoinPool?
    • 应用
    • 核心数据结构

面试官:能给我讲讲线程池的实现原理?

声:回答该问题需要了解线程池有哪些方法并讲解每个方法的作用,以及各个类的继承关系,线程池的运行原理,线程池的状态转换、生命周期,线程池的构造参数,线程池Runnable->Worker->Thread执行任务->线程复用机制等

在这里插入图片描述

线程池类继承关系

在这里插入图片描述

ThreadPoolExecutor

核心数据结构

public class ThreadPoolExecutor extends AbstractExecutorService {//存储线程池的状态和线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 存放任务的阻塞队列private final BlockingQueue<Runnable> workQueue;// 对线程池内部各种变量进行互斥访问控制private final ReentrantLock mainLock = new ReentrantLock();// 线程集合private final HashSet<Worker> workers = new HashSet<Worker>();

每一个线程是一个Worker对象,Worker是ThreadPoolExecutor内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread; // Worker封装的线程Runnable firstTask; // Worker接收到的第1个任务volatile long completedTasks; // Worker执行完毕的任务个数
}

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

面试官:给我讲讲线程池的有哪些参数?

ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池。

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) 
  1. corePoolSize:核心线程数-线程池中始终维护的线程
  2. MaxPoolSize:最大线程数-达到核心线程数并且阻塞队列慢的时候会扩充到最大线程数
  3. KeepAliveTime、TimeUnit:空闲超过该时间后线程会被销毁
  4. WorkQueue:任务阻塞队列-当核心线程满的时候会放入阻塞队列中
  5. ThreadFactory:线程工厂-可以根据业务自定义创建线程,修改线程名称
  6. Handler:拒绝策略-最大线程满并且阻塞队列慢了之后新的任务进来会触发拒绝策略

面试官:如何优雅的关闭线程?

线程池的关闭比线程的关闭更加复杂,因为线程池的关闭涉及到很多场景,如果有线程正在执行任务?如果任务队列不为空?还有当前线程进来如何处理,因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

线程的生命周期

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字
段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,
这两个变量是分开存储的。


在这里插入图片描述
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示

private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

面试官:ctl为什么这样设计?这样做的好处?

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

线程状态转换过程:
在这里插入图片描述
状态解释:
在这里插入图片描述

切记:线程状态-1、0、1、2、3转化只能从小到大,而不能逆向转换。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现
自己的线程池,可以重写这几个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

面试官:线程池哪五种状态?

    // runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

面试官:线程池哪4种拒绝策略?并分别说一下作用和实现原理?

接口类:

public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

实现类:
在这里插入图片描述

DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }/*** 从任务队列中调用poll()方法删除最先入队列的(最老的)任务* 拓展:队列是先进先出,由此调用poll()方法是取出的是先入队列的数据*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

AbortPolicy

    public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 丢弃准备添加的任务并抛出异常* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不做任何处理,丢弃准备添加的任务* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

CallerRunsPolicy

    public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 准备添加的任务,直接调用run()方法交给提交任务的线程执行* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

面试官:线程池常用的阻塞队列有哪些?能说下各自的区别?

队列说明
ArrayBlockedQueue数组实现有界队列,FIFO先入先出,支持公平锁、非公平锁
LinkedBlockedQueue单链表实现的有界队列,如果不指定容量默认为Integer.MAX_VALUE
SynchronousQueue不存储元素的队列,每个put()操作时必须有线程正在调用take(),该元素才存在,Executors.newCacheThreadPool()就使用该队列,每来一个任务如果没有空闲线程(线程复用)则创建新线程执行任务
PriorityBlockedQueue无界的优先队列,默认按自然排序,自定义实现compareTo()定制自己优先级,不同保证同优先级顺序
DelayQueue无界延迟队列,利用PriorityBlockedQueue实现,在创建元素时可以指定多久能够获取到该元素,只有满足延迟时间才能获取到数据,ScheduledThreadPoolExecutor定时任务就是利用自己实现的延时队列(思想一致)

SynchronousQueue应用

    @Testpublic void SynchronousQueue() throws InterruptedException {SynchronousQueue<Integer> queue = new SynchronousQueue<>();Random random = new Random();AtomicInteger ait = new AtomicInteger(0);new Thread(() -> {try {for (int i = 0; i < 3; i++) {Integer integer = queue.take();if (integer != null){int count = ait.incrementAndGet();System.out.println(count + "-" + integer);}}} catch (InterruptedException e) {e.printStackTrace();}}).start();TimeUnit.SECONDS.sleep(3);new Thread(() -> {for (int i = 0; i < 3; i++) {queue.offer(random.nextInt());}}).start();TimeUnit.SECONDS.sleep(5);}

PriorityBlockedQueue应用

和PriorityQueue使用一样,无非就是加了锁阻塞生产、消费者线程

    @Testpublic void priorityQueue(){PriorityQueue<Integer> queue = new PriorityQueue<>(new Comparator<Integer>() {@Overridepublic int compare(Integer o1, Integer o2) {return Integer.compare(o1, o2);}});queue.add(2);queue.add(1);queue.add(3);while (!queue.isEmpty()){System.out.println(queue.poll());}PriorityQueue<CustomRank> queue2 = new PriorityQueue<>();queue2.add(new CustomRank(2));queue2.add(new CustomRank(1));queue2.add(new CustomRank(3));while (!queue2.isEmpty()){System.out.println(queue2.poll().v);}}public class CustomRank implements Comparable<CustomRank>{Integer v;public CustomRank(Integer v) {this.v = v;}@Overridepublic int compareTo(CustomRank o) {return Integer.compare(this.v, o.v);}}

DelayQueue应用

    @Testpublic void delayQueue() throws InterruptedException {DelayQueue<CustomTimeTask> queue = new DelayQueue<>();queue.add(new CustomTimeTask("我是第一个任务", 4, TimeUnit.SECONDS));queue.add(new CustomTimeTask("我是第二个任务", 8, TimeUnit.SECONDS));queue.add(new CustomTimeTask("我是第三个任务", 16, TimeUnit.SECONDS));while (!queue.isEmpty()){CustomTimeTask task = queue.take();System.out.format("name: {%s}, time: {%s} \n", task.name, new Date());}}class CustomTimeTask implements Delayed{//触发时间long time;//任务名称String name;public CustomTimeTask(String name,long time, TimeUnit timeUnit) {this.time = System.currentTimeMillis() + timeUnit.toMillis(time);this.name = name;}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}/*** 利用优先队列将任务按照触发时间从小到大排序* @param o* @return*/@Overridepublic int compareTo(Delayed o) {CustomTimeTask other = (CustomTimeTask) o;return Long.compare(this.time, other.time);}@Overridepublic String toString() {return "CustomTimeTask{" +"time=" + time +", name='" + name + '\'' +'}';}}

面试官:如何结合业务合理的配置线程池参数?CPU密集型和IO密集型如何配置?线程设置过多会造成什么影响?

答案:其实没有完整的公式去计算,我在使用的时候一般是根据业务场景,动态的去改变线程池参数选择最优配置方案

CPU 密集型任务

IO密集型任务

在这里插入图片描述

面试官:给我讲讲什么是线程复用?

什么是线程复用?
通过同一个线程去执行不同的任务,这就是线程复用。

java.util.concurrent.ThreadPoolExecutor#execute

 public void execute(Runnable command) {// 如果传入的Runnable的空,就抛出异常if (command == null)throw new NullPointerException();int c = ctl.get();// 线程池中的线程比核心线程数少 if (workerCountOf(c) < corePoolSize) {// 新建一个核心线程执行任务if (addWorker(command, true))return;c = ctl.get();}// 核心线程已满,但是任务队列未满,添加到队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了if (! isRunning(recheck) && remove(command))// 如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务reject(command);else if (workerCountOf(recheck) == 0)// 如果之前的线程已经被销毁完,新建一个非核心线程addWorker(null, false);}// 核心线程池已满,队列已满,尝试创建一个非核心新的线程else if (!addWorker(command, false))// 如果创建新线程失败,说明线程池关闭或者线程池满了,拒绝任务reject(command);}

线程复用源码分析:java.util.concurrent.ThreadPoolExecutor#runWorker

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 释放锁 设置work的state=0 允许中断boolean completedAbruptly = true;try {//一直执行 如果task不为空 或者 从队列中获取的task不为空while (task != null || (task = getTask()) != null) {task.run();//执行task中的run方法}}completedAbruptly = false;} finally {//1.将 worker 从数组 workers 里删除掉//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组 workersprocessWorkerExit(w, completedAbruptly);}}

面试官:为什么《阿里巴巴开发手册》不推荐使用Executor创建线程?

在这里插入图片描述

ScheduledThreadPoolExecutor

延时执行

    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "schedule-thread");}});/*** 延迟执行* @throws InterruptedException*/@Testvoid testSchedule() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);System.out.println(new Date());threadPool.schedule(new TimeTask(), 3, TimeUnit.SECONDS);countDownLatch.await();}class TimeTask implements Runnable{@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + new Date() + " 任务执行完成");}}

周期执行

1.scheduleAtFixedRate方法

按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。

    @Testvoid testScheduleAtFixedRate() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);threadPool.scheduleAtFixedRate(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}

2.scheduleWithFixedDelay方法

按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。

    @Testvoid testScheduleWithFixedDelay() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);threadPool.scheduleWithFixedDelay(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}

面试题:你知道延迟执行、周期性执行任务实现原理?

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的。

延迟执行任务依靠的是DelayQueue。DelayQueue是 BlockingQueue的一种,其实现原理是二叉堆

而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。

不过这里并没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue

    static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {...}

其原理和DelayQueue一样,但针对任务的取消进行了优化。下面主要讲延迟执行和周期性执行的实现过程。

延迟执行设计原理:
在这里插入图片描述
传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(…)方法把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口。

在这里插入图片描述
在这里插入图片描述
从上面的代码中可以看出,schedule()方法本身很简单,就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中。任务的执行过程还是复用的ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。

周期性执行设计原理:
在这里插入图片描述
在这里插入图片描述
和schedule(…)方法的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue就结束了。

两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?

用于生成任务序列号的sequencer,创建ScheduledFutureTask的时候使用:
在这里插入图片描述

    private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {/** Sequence number to break ties FIFO */private final long sequenceNumber;/** 延时时间 */private long time;private final long period;/** The actual task to be re-enqueued by reExecutePeriodic */RunnableScheduledFuture<V> outerTask = this;/*** Index into delay queue, to support faster cancellation.*/int heapIndex;/*** Creates a one-shot action with given nanoTime-based trigger time.*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/*** Creates a periodic action with given nano time and period.*/ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}public boolean isPeriodic() {return period != 0;}/*** 设置下一个执行时间*/private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = super.cancel(mayInterruptIfRunning);if (cancelled && removeOnCancel && heapIndex >= 0)remove(this);return cancelled;}/***实现Runnable*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 如果不是周期执行,则执行一次else if (!periodic)ScheduledFutureTask.super.run();// 如果是周期执行,则重新设置下一次运行的时间,重新入队列else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}//下一次触发时间long triggerTime(long delay) {return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}//放到队列中,等待下一次执行void reExecutePeriodic(RunnableScheduledFuture<?> task{if (canRunInCurrentRunState(true)) {super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}}

withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。

如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间+period;

如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),为now+(-period),now即上一次执行的结束时间。

面试题:为什么不使用Timer而使用ScheduledThreadPoolExecutor?

  1. Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。
  2. Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。
  3. Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在afterExecute()回调方法中进行处理),所以更加安全。

CompletableFuture异步编程工具

基本使用

package net.dreamzuora.thread;import org.testng.annotations.Test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;/*** 异步编程工具*/
public class CompletableFutureDemo {/*** CompletableFuture实现了Future接口,所以它也具有Future的特性:调用get()方法会阻塞在那,* 直到结果返回。* 另外1个线程调用complete方法完成该Future,则所有阻塞在get()方法的线程都将获得返回结果。* @throws ExecutionException* @throws InterruptedException*/@Testvoid complete() throws ExecutionException, InterruptedException {CompletableFuture<String> completeFuture = new CompletableFuture<>();new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}completeFuture.complete("gome");}).start();System.out.println(completeFuture.get());}/*** 阻塞等待任务执行完成*/@Testvoid runAsyncTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println("hello word!");} catch (InterruptedException e) {e.printStackTrace();}});//阻塞等待任务完成completableFuture.get();System.out.println("succ");}/*** 带返回值的任务执行* @throws ExecutionException* @throws InterruptedException*/@Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}});String result = stringCompletableFuture.get();System.out.println(result);}/*** thenRun():上个任务结束再执行(不带上一个返回值结果)下一个任务* thenAccept后面跟的是一个有参数、无返回值的方法,称为Consumer,返回值也是* CompletableFuture<Void>类型。顾名思义,只进不出,所以称为Consumer;前面的* Supplier,是无参数,有返回值,只出不进,和Consumer刚好相反。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenRun() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("第一次执行");}).thenRun(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次执行");}});completableFuture.get();}/*** thenAccept():上个任务结束再执行(前面任务的结果作为下一个任务的入参)下一个任务* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenAccept() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "hello";}}).thenAccept(new Consumer<String>() {@Overridepublic void accept(String param) {System.out.println(param + " word!");}});completableFuture.get();}/*** thenApply 后面跟的是一个有参数、有返回值的方法,称为Function。返回值是* CompletableFuture<String>类型。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenApply() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "第一个任务执行完成!";}}).thenApply(new Function<String, String>() {@Overridepublic String apply(String firstTaskResult) {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return firstTaskResult + " 第二个任务执行完成!";}});String result = stringCompletableFuture.get();System.out.println(result);}/*** 第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就* 是该方法有2个输入参数,1个返回值。* 从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个* CompletableFuture的返回值传进去,再额外做一些事情。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenCompose() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync((Supplier<String>) () -> "第一个任务执行完成!").thenCompose(new Function<String, CompletionStage<String>>() {@Overridepublic CompletionStage<String> apply(String firstTask) {return CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return firstTask + " 第二个任务执行完成!";}});}});String s = future.get();System.out.println(s);}/*** 如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose:* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenCombine() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "第一个任务执行完成! ";}}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "第二个任务执行完成! ";}}), new BiFunction<String, String, Integer>() {@Overridepublic Integer apply(String s1, String s2) {return s1.length() + s2.length();}});System.out.println(future.get());}/*** 等待所有的CompletableFuture执行完成,无返回值* @throws ExecutionException* @throws InterruptedException*/@Testvoid allOf() throws ExecutionException, InterruptedException {AtomicInteger atc = new AtomicInteger(0);CompletableFuture[] completableFutures = new CompletableFuture[10];for (int i = 0; i < 10; i++){CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] = supplyAsync;}CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFutures);completableFuture.get();System.out.println(atc);}/*** anyOf:只要有任意一个CompletableFuture结束,就可以做接下来的事情,而无须像* AllOf那样,等待所有的CompletableFuture结束。* 但由于每个CompletableFuture的返回值类型都可能不同,任意一个,意味着无法判断是什么类* 型,所以anyOf的返回值是CompletableFuture<Object>类型*/@Testvoid anyOf() throws ExecutionException, InterruptedException {AtomicInteger atc = new AtomicInteger(0);CompletableFuture[] completableFutures = new CompletableFuture[10];for (int i = 0; i < 10; i++){CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] = supplyAsync;}Integer result = (Integer) CompletableFuture.anyOf(completableFutures).get();System.out.println(result);}
}

四种任务原型

通过上面的例子可以总结出,提交给CompletableFuture执行的任务有四种类型:Runnable、Consumer、Supplier、Function。下面是这四种任务原型的对比。
在这里插入图片描述
runAsync 与 supplierAsync 是 CompletableFuture 的静态方法;而 thenAccept、thenAsync、thenApply是CompletableFutre的成员方法。

因为初始的时候没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable或者Supplier,只能是静态方法;

通过静态方法生成CompletableFuture对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable、Consumer、Function,且都是成员方法。

面试题:你知道CompletableFuture内部原理?

CompletableFuture的构造:ForkJoinPool

    private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

任务执行

    public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();e.execute(new AsyncRun(d, f));return d;}    

在这里插入图片描述
通过上面的代码可以看到,asyncPool是一个static类型,supplierAsync、asyncSupplyStage也都是static方法。

Static方法会返回一个CompletableFuture类型对象,之后就可以链式调用CompletionStage里面的各个方法。

任务类型的适配

我们向CompletableFuture提交的任务是Runnable/Supplier/Consumer/Function 。因此,肯定需要一个适配机制,把这四种类型的任务转换成ForkJoinTask,然后提交给ForkJoinPool,如下图所示:
在这里插入图片描述
supplyAsync()->Supplier->AsyncSupply

在 supplyAsync(…)方法内部,会把一个 Supplier 转换成一个 AsyncSupply,然后提交给ForkJoinPool执行;

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<T> fn;AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}...}

runAsync()->Runnable->AsyncRun
在runAsync(…)方法内部,会把一个Runnable转换成一个AsyncRun,然后提交给ForkJoinPool执行;

    public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();e.execute(new AsyncRun(d, f));return d;}static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<Void> dep; Runnable fn;AsyncRun(CompletableFuture<Void> dep, Runnable fn) {this.dep = dep; this.fn = fn;}...}

thenAccept()->Consumer->UniAccept
在 thenRun/thenAccept/thenApply 内部,会分别把Runnable/Consumer/Function 转换成UniRun/UniAccept/UniApply对象,然后提交给ForkJoinPool执行;

除此之外,还有两种 CompletableFuture 组合的情况,分为“与”和“或”,所以有对应的Bi和Or类型
的Completion类型

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();if (e != null || !d.uniAccept(this, f, null)) {UniAccept<T> c = new UniAccept<T>(e, d, this, f);push(c);c.tryFire(SYNC);}return d;}

在这里插入图片描述

任务的链式执行过程分析

下面以CompletableFuture.supplyAsync(…).thenApply(…).thenRun(…)链式代码为例,分析整个执行过程。

    static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {...}

在这里插入图片描述

什么是 Java8 的 ForkJoinPool?

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的
Map/Reduce,多个线程并行计算。

相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。

假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。

利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而
实现任务计算的负载均衡。

在这里插入图片描述

应用

1.斐波那契数列

    @Testvoid testForkJoin() throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(5));System.out.println(task.get());}// 1 1 2 3 5 8 ...class FibonacciTask extends RecursiveTask<Integer> {int n;public FibonacciTask(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n <= 1){return 1;}FibonacciTask task1 = new FibonacciTask(n - 1);task1.fork();FibonacciTask task2 = new FibonacciTask(n - 2);task2.fork();return task1.join() + task2.join();}}

核心数据结构

与ThreadPoolExector不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。
在这里插入图片描述

本课程内容参考:
1.《并发编程78讲》-徐隆曦 滴滴出行高级工程师
2.美团技术博客-Java线程池实现原理及其在美团业务中的实践
3.《java并发编程实战》
4.CSDN博客-面试官:你知道什么是线程池的线程复用原理吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508795.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式七大设计原则

文章目录设计模式七大设计原则开闭原则里氏替换原则依赖倒置原则接口隔离原则迪米特法则-最少知道原则单一职责原则合成复用原则设计模式 面向对象的三个基本特征&#xff1a; 继承封装多态 设计模式体现了代码的耦合性、内聚性、可维护性、可扩展性、重用性、灵活性。 代码…

从框架源码中学习结构型设计模式

文章目录从框架源码学习结构型设计模式适配器模式应用实例案例一&#xff1a;dubbo框架日志适配器Logger接口日志实现类Logger适配器接口LoggerAdapter实现类Logger日志工厂桥接模式应用场景案例&#xff1a;dubbo源码-远程调用模块channelHandler设计ChannelHandler是一个SPI拓…

MDC日志logback整合使用

MDC日志logback整合使用 为什么使用MDC记录日志&#xff1f; 场景&#xff1a; 由于我的搜索服务并发量比较高&#xff0c;而处理一次搜索请求需要记录多个日志&#xff0c;因此日志特别多的情况下去查一次搜索整个日志打印情况会比较复杂。 解决方案&#xff1a; 可以使用用…

如何合理的配置线程数?

文章目录题记Java并发编程实战美团技术团队追求参数设置合理性线程池参数动态化题记 我想不管是在面试中、还是工作中&#xff0c;我们总会面临这种问题&#xff0c;那么到底有没有一种计算公式去告诉我们如何去配置呢&#xff1f; 答案是&#xff1a;没有 想要合理的配置线…

基于CompletableFuture并发任务编排实现

文章目录并发任务编排实现不带返回值/参数传递任务串行执行并行执行并行执行-自定义线程池阻塞等待&#xff1a;多并行任务执行完再执行任意一个任务并发执行完就执行下个任务串并行任务依赖场景带返回值/参数传递任务带返回值实现串行执行多线程任务串行执行对任务并行执行&am…

搜索研发工程师需要掌握的一些技能

文章目录基础语言数据结构与算法工程方面搜索相关搜索主要模块电商搜索流程分词相关搜索召回相似度算法相关词推荐排序相关国美搜索搜索算法工程师需要掌握的技能基础 语言 大部分公司用的是Solr、ElasticSearch&#xff0c;都是基于Java实现的&#xff0c;因此熟悉掌握Java语…

Flink入门看完这篇文章就够了

文章目录第一章&#xff1a;概述第一节&#xff1a;什么是Flink&#xff1f;第二节&#xff1a;Flink特点&#xff1f;第三节&#xff1a;Flink应用场景&#xff1f;第四节&#xff1a;Flink核心组成第五节&#xff1a;Flink处理模型&#xff1a;流处理和批处理第六节&#xff…

word小结

域代码/域结果显示设置 word选项---->>高级------>>显示域代码而非域值将样式传给其它文件使用 首先启动Word打开包含这些样式的一个文件&#xff0c;然后选择“工具”---->“模板和加载项”。在弹出的对话框中单击“管理器”按钮。在弹出的“管理器”对话框中&…

线程属性总结

今天面试那哥们问起线程属性&#xff0c;me竟然就说出了一个&#xff0c;囧 学习&#xff1a;http://blog.csdn.net/zsf8701/article/details/7842392 http://blog.csdn.net/jxhnuaa/article/details/3254299 http://blog.sina.com.cn/s/blog_9bd573450101hgdr.html int pthre…

百度2015校园招聘软件开发笔试题及答案

简单题&#xff08;本题共30分&#xff09; 请简述Tcp-ip的3次握手以及4次挥手过程&#xff1f;并解释为何关闭连接需要4次挥手(10分) 详细答案参见TCP/IP协议三次握手与四次握手流程解析 TCP三次握手、四次挥手过程如下: 通常情况下&#xff0c;一个正常的TCP连接&#xf…

linux ps 命令使用

Linux中的ps命令是Process Status的缩写。ps命令用来列出系统中当前运行的那些进程。ps命令列出的是当前那些进程的快照&#xff0c;就是执行ps命令的那个时刻的那些进程&#xff0c;如果想要动态的显示进程信息&#xff0c;就可以使用top命令。 linux上进程有5种状态 ps命令使…

UML序列图总结

序列图主要用于展示对象之间交互的顺序。 序列图将交互关系表示为一个二维图。纵向是时间轴&#xff0c;时间沿竖线向下延伸。横向轴代表了在协作中各独立对象的类元角色。类元角色用生命线表示。当对象存在时&#xff0c;角色用一条虚线表示&#xff0c;当对象的过程处于激活…

UML用例图总结

用例图主要用来描述 用户、需求、系统功能单元 之间的关系。它展示了一个外部用户能够观察到的系统功能模型图。 【用途】&#xff1a;帮助开发团队以一种可视化的方式理解系统的功能需求。 用例图所包含的元素如下&#xff1a; 1. 参与者(Actor) 表示与您的应用程序或…

Linux网络编程常见面试题

概述 TCP和UDP是网络体系结构TCP/IP模型中传输层一层中的两个不同的通信协议。 TCP&#xff1a;传输控制协议&#xff0c;一种面向连接的协议&#xff0c;给用户进程提供可靠的全双工的字节流&#xff0c;TCP套接口是字节流套接口(stream socket)的一种。UDP&#xff1a;用户…

linux动态库查找路径以及依赖关系梳理

编译时与运行时库的路径 linux下&#xff0c;编译时与运行时库的搜索路径是不同的 运行时动态库的路径搜索顺序 LD_PRELOAD环境变量&#xff0c;一般用于hack 编译目标代码时指定的动态库搜索路径(指的是用 -wl,rpath 或-R选项而不是-L)&#xff0c;readelf -d命令可以查看编…

eclipse--android开发环境搭建教程

引言 在windows安装Android的开发环境不简单也说不上算复杂&#xff0c;但由于国内无法正常访问google给android开发环境搭建带来不小的麻烦。现将本人搭建过程记录如下&#xff0c;希望会对投身android开发的小伙伴有所帮助。 android开发环境部署过程 安装JDK环境 下载安装…

eclipse--python开发环境搭建

pydev插件介绍 PyDev is a Python IDE for Eclipse pydev官方网站&#xff1a;http://www.pydev.org/ 在Eclipse中安装pydev插件 启动Eclipse, 点击Help->Install New Software… 在弹出的对话框中&#xff0c;点Add 按钮。 Name中填:Pydev, Location中填http://pydev.or…

Win7虚拟无线AP以及Android手机抓包

设备要求 Windows7操作系统装有无线网卡的笔记本或台式机无线网卡必须支持“承载网络” 查看无线网卡是否支持“承载” 方法一: 开始菜单→所有程序→附件→命令提示符→右键“以管理员权限运行”; 键入命令“netsh wlan show drivers”,查看“支持承载网络”这一项,如果是…

CMD命令之BAT脚本路径信息

CD命令解疑 cd是chdir的缩写&#xff0c;命令详解参见cd /? 可以看到/d参数的解释如下&#xff1a; 使用 /D命令行开关&#xff0c;除了改变驱动器的当前目录之外&#xff0c;还可改变当前驱动器。 通常我们在xp系统中打开cmd窗口时&#xff0c;会显示 C:\Documents and Se…