RunnableFuture
源码学习:
成员变量
任务的运行状态的转化
package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;/**可取消的异步计算。该类提供了Future的基本实现,包括启动和取消计算的方法,查询计算是否完成以及获取计算结果的方法。只有在计算完成后才能获取结果;如果计算尚未完成,get方法将会阻塞。一旦计算完成,就无法重新启动或取消计算(除非使用runAndReset方法调用计算)。FutureTask可以用来包装一个Callable或Runnable对象。由于FutureTask实现了Runnable接口,因此可以将FutureTask提交给Executor执行。 除了作为独立的类使用外,该类还提供了一些受保护的功能,这些功能在创建自定义任务类时可能会有用。*/
public class FutureTask<V> implements RunnableFuture<V> {/**此任务的运行状态,初始为NEW。运行状态仅在set、setException和cancel方法中转换为终态。在完成过程中,状态可能会暂时变为COMPLETING(在设置结果时)或INTERRUPTING(仅在中断执行者以满足cancel(true)时)。从这些中间状态到最终状态的转换使用更便宜的有序/懒惰写入,因为这些值是唯一的且不会进一步修改。* Possible state transitions:* NEW -> COMPLETING -> NORMAL 正常结束* NEW -> COMPLETING -> EXCEPTIONAL 异常结束* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/// 状态值: 表示任务运行的状态private volatile int state;// 新建 或者 正在运行private static final int NEW = 0;// 中间状态(任务执行完了,但是结果集(正结果/ 异常) 没有设置到 outcome)private static final int COMPLETING = 1;// 正常执行完成(结果集设置到outcome之后,正常结束)private static final int NORMAL = 2;// 异常执行完成(结果集设置到outcome之后,异常结束)private static final int EXCEPTIONAL = 3;// 取消private static final int CANCELLED = 4;// 中断(中间值)[但是还没有中断]private static final int INTERRUPTING = 5;// 中断完成,最终状态private static final int INTERRUPTED = 6;/** The underlying callable; nulled out after running */// 执行目标private Callable<V> callable;/** The result to return or exception to throw from get() */// 结果集private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 执行任务的线程private volatile Thread runner;/** Treiber stack of waiting threads */// get 阻塞的时候,使用 WaitNode{物理结构:链表;逻辑结构:栈}去存储阻塞的线程private volatile WaitNode waiters;public FutureTask(Callable<V> callable) {if (callable == null) throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result); // 适配器的方式this.state = NEW; // ensure visibility of callable}public boolean isCancelled() {return state >= CANCELLED;}public boolean isDone() {return state != NEW;}/*** 简单的链表节点,用于记录Treiber堆栈中的等待线程。*/static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}}
run()
任务只能执行一次
public void run() {// 状态 是New 并且 cas 成功的把当前线程设置到 runner 才能执行后续的方法,否则就直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 当前 要执行的任务存在,并且 状态 是New 才会调用目标逻辑 c.call()if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();// 目标逻辑执行成功ran = true;} catch (Throwable ex) {// 目标逻辑执行 失败 ,结果 置为 nullresult = null;ran = false;// 设置异常结果集setException(ex);}if (ran)// 设置正常结果集set(result);}} finally {// 在任务状态被确定之前,runner必须非空,以防止对run()方法的并发调用。runner = null;// 在将runner设置为null之后,必须重新读取任务的状态,以防止泄漏的中断。int s = state;if (s >= INTERRUPTING) // 处于中断状态,执行中断后续逻辑handlePossibleCancellationInterrupt(s);}
}@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}/*将此Future的结果设置为给定的值,除非此Future已经被设置或已取消。在计算成功完成时,此方法由run方法在内部调用。
*/protected void set(V v) {// cas 的方式把状态变为 COMPLETING ,设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 结果 result outcome = v;// cas 的方式把状态变为最终状态: NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 执行后续操作finishCompletion();}}/*将此Future报告为ExecutionException,将给定的throwable作为其原因,除非此Future已经被设置或已取消。在计算失败时,此方法由run方法在内部调用。
*/protected void setException(Throwable t) {// cas 的方式把状态变为 COMPLETING ,设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 异常outcome = t;// cas 的方式把状态变为最终状态: EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 执行后续操作finishCompletion();}}/*
确保任何来自可能的cancel(true)取消操作的中断仅在run或runAndReset方法中传递给任务的目的。
*/private void handlePossibleCancellationInterrupt(int s) {// 解释了在等待中断信号时使用自旋等待的目的: 通过中断来取消任务的执行。然而,可能存在一种情况,即中断线程在 有机会中断当前线程 之前被阻塞。为了等待中断信号的到来,代码使用了自旋等待的策略。if (s == INTERRUPTING)while (state == INTERRUPTING) // 处于中间状态的时候就 让出 cpuThread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// 解释了在state等于INTERRUPTED时的处理逻辑// 它使用断言(assert)来确保任务的状态为INTERRUPTED。断言通常用于在代码中插入一些检查,以确保某些条件为真。如果断言的条件为假,将会抛出一个AssertionError异常。// Thread.interrupted();// 解释了清除可能来自cancel(true)取消操作的中断的目的。但是,中断也可以作为一个独立的机制,用于任务与其调用者之间的通信,并且没有办法只清除取消中断。因此,为了清除中断,代码调用了Thread.interrupted()方法。// Thread.interrupted()方法用于清除当前线程的中断状态,并返回之前的中断状态。这样做的目的是确保任务的中断状态被清除,以便后续的代码或操作不会受到中断的影响。}
finishCompletion()
/*** 移除并唤醒所有等待的线程,调用done()方法,并将callable置为null。*/private void finishCompletion() {// assert state > COMPLETING;// 循环获取等待队列中的等待节点 waiters, 等待节点里面保存了等待任务完成的线程for (WaitNode q; (q = waiters) != null;) {// 要是cas 的方式成功的将等待队列 waitersOffset 设置为 null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 循环处理每一个等待的节点for (;;) {// 取出当前等待节点 持有的线程Thread t = q.thread;// 线程存在if (t != null) {// 将等待节点的线程引用设为null,并调用LockSupport.unpark(t)方法来唤醒该线程q.thread = null;LockSupport.unpark(t); //LockSupport.unpark(t)方法用于唤醒一个被阻塞的线程。}WaitNode next = q.next; // 取节点下一个元素if (next == null)// 要是没有后继节点,此时表示已经处理完所有等待节点,退出 死循环break;// 将后继节点置为 nullq.next = null; // unlink to help gc // 节点后移q = next;}break; // (q = waiters) == null 退出循环}}done(); // 调用done()方法来完成任务的执行 —————— 钩子方法/*这段代码是一个保护(protected)方法,当任务转换为已完成状态(isDone)时被调用,无论是正常完成还是通过取消完成。默认实现不执行任何操作。子类可以重写这个方法来调用完成回调或进行记录。注意,在该方法的实现中,您可以查询状态来确定任务是否已被取消。protected void done() { }*/callable = null; // to reduce footprint 将callable引用设为null,以减少内存占用。}
runAndReset()
任务可以执行多次
- 和 run() 的区别就是 没有正常的结果设置结果集
/**执行计算,但不设置其结果,然后将该Future重置为初始状态。如果计算遇到异常或被取消,则无法执行重置操作。这个方法设计用于那些本质上需要执行多次的任务。
*/// 执行并重置任务
protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}/*和 run() 的区别就是 没有正常的结果设置结果集*/}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;
}
get()
/*** 如果计算被取消 会抛出异常*/public V get() throws InterruptedException, ExecutionException {int s = state;// 此时的状态只有 New (新建 或者 正在执行) 或者 COMPLETING(任务执行结束,结果集该没有设置成功)if (s <= COMPLETING)// 阻塞等待s = awaitDone(false, 0L);// 否则的话 返回结果集(正常 或者 异常)return report(s);}/**等待方法,根据传入的参数决定是否使用定时等待。如果使用定时等待,则会在指定的时间内 等待完成 或者 在中断或超时时中止。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;//[注:] 带有阻塞的都要有 死循环,防止虚假唤醒for (;;) { // 死循环if (Thread.interrupted()) { // 判断是不是中断removeWaiter(q); // 移除节点 throw new InterruptedException(); // 抛错} int s = state;if (s > COMPLETING) {// 此时结果设置成功// 当前节点存在,将其持有的线程 置空if (q != null)q.thread = null;// 返回结果,结束阻塞return s;}// 还是在设置结果的状态,让出 cpu else if (s == COMPLETING) // cannot time out yetThread.yield();// 处于 new 状态else if (q == null)// 创建节点q = new WaitNode();// 插入链表else if (!queued)// cas 的方式 头插法queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 超时等待,else if (timed) {nanos = deadline - System.nanoTime();// 时间已过,移除结果,返回状态if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}/***完成任务之后 返回结果或者抛出异常*/@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
removeWaiter(WaitNode node)
/*** 尝试取消链接超时或中断的等待节点,以避免累积垃圾。内部节点只是简单地取消拼接,而无需使用CAS,因为如果它们被释放器遍历,这样做是无害的。为了避免从已经删除的节点中取消拼接的影响,在明显存在竞争的情况下,列表将被重新遍历。当节点很多时,这会很慢,但我们不希望列表足够长以抵消更高开销的方案。*/private void removeWaiter(WaitNode node) {if (node != null) {// 当前节点存在// 将传入节点的线程引用置为null,表示该节点不再持有线程node.thread = null;retry:// 死循环for (;;) { // restart on removeWaiter race// 遍历链表 中的所有的 等待节点for (WaitNode pred = null, q = waiters, s; q != null; q = s) {// 取出下一个节点s = q.next;// 要是此等待节点持有的线程不是 nullif (q.thread != null)pred = q; // 将前置节点设置为 当前节点q else if (pred != null) {// 等待节点持有的线程是 null ,但是前置节点 不是 null// 前驱节点的next指向当前节点pred.next = s; // 前置节点持有的线程不存在了,表示存在竞争情况,需要重新开始循环。执行下次死循环if (pred.thread == null) // check for racecontinue retry;}
//等待节点持有的线程是 null ,但是前置节点 是 null, cas的方式成功的将节点下移,当前节点从等待队列中移除,执行下次死循环// cas的方式将waitersOffset处的值从q替换为selse if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}// 内层遍历结束,等待集合中没有无效节点break;}}}
get(long timeout, TimeUnit unit)
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state; // 获取当前对象的状态// 调用awaitDone方法来等待操作完成,如果返回的状态值小于等于COMPLETING,则表示操作未完成,继续等待,如果等待的时间超过了超时时间,则抛出TimeoutException异常。if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();//将最终的状态值作为参数传递给report方法,并返回report方法的返回值。return report(s);
}
cancel()
public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;/*mayInterruptIfRunning:true: 可以中断正在执行的任务 INTERRUPTINGfasle: 不可以中断正在执行的任务 CANCELLED*/
// 状态是 new ;cas 的方式把 任务的状态从"NEW"修改为"INTERRUPTING"或"CANCELLED"。如果修改成功,表示取消成功,返回true。try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner; // 尝试中断正在执行任务的线程if (t != null) // 如果任务的runner不为null,则调用interrupt()方法中断线程。t.interrupt();} finally { // final state// 设置任务最终的状态 cas 的方式将任务的状态修改为"INTERRUPTED"。UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 调用finishCompletion()方法完成任务的处理finishCompletion();}return true; //返回true表示取消成功。}
手撕FutureTask:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;public class FutureTask_<T> implements Runnable {private Future_<T> future;public FutureTask_(Future_<T> future) {this.future = future;}public FutureTask_(Runnable runnable) {this.future = new FutureAdaptive(runnable);}@Overridepublic void run() {try{res = this.future.code();state = 1;}catch (Exception e){res = e;state = 2;}for (Thread thread : threadList){LockSupport.unpark(thread); // 唤醒}}private Object res;private volatile int state;private List<Thread> threadList = new ArrayList<>();public T get(){for (;;){if(state == 0){threadList.add(Thread.currentThread());LockSupport.park(); // 阻塞}else if(state == 1){return (T)res;}else if(state == 2){throw new RuntimeException(res.toString());}}}private class FutureAdaptive implements Future_<T> {public Runnable runnable;public FutureAdaptive(Runnable runnable) {this.runnable = runnable;}@Overridepublic T code() throws Exception {this.runnable.run();return null;}}
}class MM {public static void main(String[] args){Future_<String> future = new Future_<String>() {@Overridepublic String code() throws Exception {return "future";}};Runnable runnable = new Runnable(){@Overridepublic void run() {System.out.println("runnable");}};FutureTask_<String> future_ = new FutureTask_<String>(future);FutureTask_<String> runnable_ = new FutureTask_<String>(runnable);new Thread(future_).start();new Thread(runnable_).start();System.out.println(future_.get());LockSupport.parkNanos(2*1000*1000*1000);}
}interface Future_<T>{T code() throws Exception;
}