文章目录
- Future介绍
- FutureTask使用
- FutureTask 分析
- CompletableFuture
- CompletableFuture的应用
- CompletableFuture 示例
- 总结
Future介绍
Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。
Future是一个接口,一般会使用FutureTask实现类去接收Callable任务的返回结果。
FutureTask使用
下面示例使用FutureTask来执行一个可以返回结果的异步任务。Callable是要执行的任务,FutureTask是存放任务返回结果的位置。
public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<>(() -> {System.out.println("任务执行");Thread.sleep(2000);return 123+764;});Thread t = new Thread(futureTask);t.start();System.out.println("main线程启动了t线程处理任务");Integer result = futureTask.get();System.out.println(result);
}
FutureTask 分析
首先看一下FutureTask的核心属性
/*** NEW -> COMPLETING -> NORMAL 任务正常执行,返回结果是正常的结果* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是返回结果是异常* NEW -> CANCELLED 任务直接被取消的流程* NEW -> INTERRUPTING -> INTERRUPTED*/
// 代表当前任务的状态
private volatile int state;
private static final int NEW = 0; // 任务的初始化状态
private static final int COMPLETING = 1; // Callable的结果(正常结果,异常结果)正在封装给当前的FutureTask
private static final int NORMAL = 2; // NORMAL任务正常结束
private static final int EXCEPTIONAL = 3; // 执行任务时,发生了异常
private static final int CANCELLED = 4; // 任务被取消了。
private static final int INTERRUPTING = 5; // 线程的中断状态,被设置为了true(现在还在运行)
private static final int INTERRUPTED = 6; // 线程被中断了。// 当前要执行的任务
private Callable<V> callable;
// 存放任务返回结果的属性,也就是futureTask.get需要获取的结果
private Object outcome;
// 执行任务的线程。
private volatile Thread runner;
// 单向链表,存放通过get方法挂起等待的线程
private volatile WaitNode waiters;
t.start后,是通过run方法执行的Callable的call方法,该方法是同步的,然后将返回结果赋值给了outcome。
// run方法的执行流程,最终会执行Callable的call方法
public void run() {// 保证任务的状态是NEW才可以运行// 基于CAS的方式,将当前线程设置为runner。if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;// 准备执行任务try {// 要执行任务 cCallable<V> c = callable;// 任务不为null,并且任务的状态还处于NEWif (c != null && state == NEW) {// 放返回结果V result;// 任务执行是否为正常结束boolean ran;try {// 运行call方法,拿到返回结果封装到result中result = c.call();// 正常返回,ran设置为trueran = true;} catch (Throwable ex) {// 结果为nullresult = null;// 异常返回,ran设置为falseran = false;// 设置异常信息setException(ex);}if (ran)// 正常执行结束,设置返回结果set(result);}} finally {// 将执行任务的runner设置空runner = null;// 拿到状态int s = state;// 中断要做一些后续处理if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}// 设置返回结果
protected void set(V v) {// 首先要将任务状态从NEW设置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将返回结果设置给outcome。outcome = v;// 将状态修改为NORMAL,代表正常技术UNSAFE.putOrderedInt(this, stateOffset, NORMAL);finishCompletion();}
}
get方法获取返回结果时会查看当前线程状态,如果状态还未达成,也就是说call方法还未执行完未执行set方法,该线程就会被挂起阻塞LockSupport.park(this);。
public V get() throws InterruptedException, ExecutionException {// 拿状态int s = state;// 满足找个状态就代表现在可能还没有返回结果if (s <= COMPLETING)// 尝试挂起线程,等待拿结果s = awaitDone(false, 0L);return report(s);
}// 线程要等待任务执行结束,等待任务执行的状态变为大于COMPLETING状态
private int awaitDone(boolean timed, long nanos) throws InterruptedException {// 计算deadline,如果是get(),就是0, 如果是get(time,unit)那就追加当前系统时间final long deadline = timed ? System.nanoTime() + nanos : 0L;// 构建WaitNodeWaitNode q = null;// queued = falseboolean queued = false;// 死循环for (;;) {// 找个get的线程是否中断了。if (Thread.interrupted()) {// 将当前节点从waiters中移除。removeWaiter(q);// 并且抛出中断异常throw new InterruptedException();}// 拿到现在任务的状态int s = state;// 判断任务是否已经执行结束了if (s > COMPLETING) {// 如果设置过WaitNode,直接移除WaitNode的线程if (q != null)q.thread = null;// 返回当前任务的状态return s;}// 如果任务的状态处于 COMPLETING ,else if (s == COMPLETING)// COMPLETING的持续时间非常短,只需要做一手现成的让步即可。Thread.yield();// 现在线程的状态是NEW,(call方法可能还没执行完呢,准备挂起线程)else if (q == null)// 封装WaitNode存放当前线程q = new WaitNode();else if (!queued)// 如果WaitNode还没有排在waiters中,现在就排进来(头插法的效果)queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {// get(time,unit)挂起线程的方式// 计算挂起时间nanos = deadline - System.nanoTime();// 挂起的时间,是否小于等于0if (nanos <= 0L) {// 移除waiters中的当前NoderemoveWaiter(q);// 返回任务状态return state;}// 正常指定挂起时间即可。(线程挂起)LockSupport.parkNanos(this, nanos);}else {// get()挂起线程的方式LockSupport.park(this);}}
}
当任务执行完毕(set方法执行完成),由finishCompletion唤醒线程,LockSupport.unpark(t);
// 任务状态已经变为了NORMAL,做一些后续处理
private void finishCompletion() {for (WaitNode q; (q = waiters) != null;) {// 拿到第一个节点后,直接用CAS的方式,将其设置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 基于q拿到线程信息Thread t = q.thread;// 线程不为nullif (t != null) {// 将WaitNode的thread设置为nullq.thread = null;// 唤醒这个线程LockSupport.unpark(t);}// 往后遍历,接着唤醒WaitNode next = q.next;if (next == null)break;q.next = null;// 指向next的WaitNodeq = next;}break;}}// 扩展方法,没任何实现,你可以自己实现done();// 任务处理完了,可以拜拜了!callable = null;
}
拿到返回结果的处理
// 任务结束。
private V report(int s) throws ExecutionException {// 拿到结果Object x = outcome;// 判断是正常返回结束if (s == NORMAL)// 返回结果return (V)x;// 任务状态是大于取消if (s >= CANCELLED)// 甩异常。throw new CancellationException();// 扔异常。throw new ExecutionException((Throwable)x);
}// 正常返回 report
// 异常返回 report
// 取消任务 report
// 中断任务 awaitDone
CompletableFuture
FutureTask存在的问题:
问题1:FutureTask获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果。
问题2:如果不通过get去挂起线程,通过while循环,不停的判断任务的执行状态是否结束,结束后,再拿结果。如果任务长时间没执行完毕,CPU会一直调度查看任务状态的方法,会浪费CPU资源。
FutureTask是一个同步非阻塞处理任务的方式。需要一个异步非阻塞处理任务的方式。CompletableFuture在一定程度上就提供了各种异步非阻塞的处理方案。
CompletableFuture也是实现了Future接口实现的功能,可以不使用FutureTask,提供非常丰富的函数去执行各种异步操作,直接使用CompletableFuture即可。
CompletableFuture的应用
CompletableFuture最重要的就是解决了异步回调的问题
CompletableFuture就是执行一个异步任务,异步任务可以有返回结果,也可以没有返回结果,使用了函数式编程中三个最核心的接口
Supplier - 生产者,没有入参,但是有返回结果
Consumer - 消费者,有入参,但是没有返回结果
Function - 函数,有入参,并且有返回结果
提供了两个最基本运行的基本方法
supplyAsync(Supplier<U> supplier) 异步执行任务,有返回结果
runAsync(Runnable runnable) 异步执行任务,没有返回结果
在不指定线程池的前提下,这两个异步任务都是交给ForkJoinPool去执行的。
但是只是用这两个方法,无法实现异步回调的。如果需要在当前任务执行完毕后,拿着返回结果继续去执行后续任务操作的话,需要基于其他方法去实现。
thenApply(Function<prevResult,currResult>); 等待前一个任务处理结束后,拿着前置任务的返回结果,再做处理,并且返回当前结果
thenApplyAsync(Function<prevResult,currResult>,线程池) 采用全新的线程执行
thenAccept(Consumer<preResult>);等待前一个任务处理结束后,拿着前置任务的返回结果再做处理,没有返回结果
thenAcceptAsync(Consumer<preResult>,线程池);采用全新的线程执行
thenRun(Runnable) 等待前一个任务处理结束后,再做处理。不接收前置任务结果,也不返回结果
thenRunAsync(Runnable[,线程池]) 采用全新的线程执行
其次还有可以执行相对复杂的处理,在前一个任务执行的同时,执行后续任务。等待前置任务和后置任务都搞定之后,再执行最终任务
thenCombine(CompletionStage,Function<prevResult,nextResult,afterResult>) 让prevResult和nextResult一起执行,等待执行完成后,获取前两个任务的结果执行最终处理,最终处理也可以返回结果
thenCombineAsync(CompletionStage,Function<prevResult,nextResult,afterResult>[,线程池]) 采用全新的线程执行
thenAcceptBoth(CompletionStage,Consumer<prevResult,nextResult>);让前置任务和后续任务同时执行,都执行完毕后,拿到两个任务的结果,再做后续处理,但是没有返回结果
thenAcceptBothAsync(CompletionStage,Consumer<prevResult,nextResult>[,线程池])采用全新的线程执行
runAfterBoth(CompletionStage,Runnble) 让前置任务和后续任务同时执行,都执行完毕后再做后续处理
runAfterBothAsync(CompletionStage,Runnble[,线程池]) 采用全新的线程执行
还提供了可以让两个任务一起执行,但是有一个任务结束,有返回结果后,就做最终处理
applyToEither(CompletionStage,Function<firstResult,afterResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,再返回结果
acceptEither(CompletionStage,Consumer<firstResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,没有返回结果
runAfterEither(CompletionStage,Runnable) 前面两个任务同时执行,有一个任务执行完,做最终处理
还提供了等到前置任务处理完,再做后续处理,后续处理返回的结果为CompletionStage
thenCompose(Function<prevResult,CompletionStage>)
最后还有处理异常的各种姿势
exceptionally(Function<Throwable,currResult>)
whenComplete(Consumer<prevResult,Throwable>)
hanle(Function<prevResult,Throwable,currResult>)
CompletableFuture 示例
public static void main(String[] args) throws InterruptedException {sout("我回家吃饭");CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {sout("阿姨做饭!");return "锅包肉!";});sout("我看电视!");sout("我吃饭:" + task.join());
}
public static void main(String[] args) throws InterruptedException {sout("我回家吃饭");CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {sout("阿姨炒菜!");return "锅包肉!";},executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> {sout("小王焖饭");return "大米饭!";},executor),(food,rice) -> {sout("大王端" + food + "," + rice);return "饭菜好了!";},executor);sout("我看电视!");sout("我吃饭:" + task.join());
}
总结
Future和CompletableFuture都是用于处理异步任务的接口和类,它们的主要区别在于功能复杂度和使用场景。Future比较简单,主要用于简单的异步任务处理,而CompletableFuture则更加灵活和强大,适用于复杂的异步任务处理场景。