前言
在进行多线程编程时,我们离不开两个重要的任务接口:Runnable、Callable。一个线程想要运行,首先它得知道它的任务是什么(它要做什么),而这两个接口恰好是用于表示一个线程需要执行的任务。
Runnable和Callable两个接口都是任务接口,它们之间有何不同呢?Runnable中的run方法是没有返回值的,而Callable中的call方法有返回值V(泛型);同时call方法支持抛出异常,一般情况下我们都会使用Runnable接口,当需要线程的执行结果时就使用Callable接口。
public interface Runnable {public abstract void run();
}public interface Callable<V> {V call() throws Exception;
}
那么我们如何获取一个线程的执行结果呢?此时就需要用到Future接口及其实现类FutureTask了。Future接口中有一个get()方法,用于同步获取线程执行结果,同步表示当线程还没有执行完任务时,调用get()方法获取执行结果的线程会阻塞,直至线程返回执行结果。
FutureTask源码解读
1、FutureTask不直接实现Future接口,而是实现了一个RunnableFuture<V>接口,RunnableFuture<V>接口又继承自Runnable、Future接口,属于这两个接口的子接口,所以FutureTask不仅可以用来同步获取线程执行结果,还可以作为任务提交给线程执行。
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}
2、FutureTask中的字段:
//任务的执行状态(get方法的关键)private volatile int state;//任务刚被创建private static final int NEW = 0;//任务正在处理private static final int COMPLETING = 1;//任务正常完成private static final int NORMAL = 2;//任务执行过程中出现异常private static final int EXCEPTIONAL = 3;//任务被取消private static final int CANCELLED = 4;//任务执行过程中被打断private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;//任务private Callable<V> callable;//任务执行结果private Object outcome; //执行任务的线程private volatile Thread runner;//链表结构:保存等待线程private volatile WaitNode waiters;
2.1、WaitNode具体实现:
static final class WaitNode {//结点存储的线程volatile Thread thread;//结点的next指针volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}
3、FutureTask的构造方法:
//需要传递一个Callable接口的实现类
//在FutureTask作为任务提交给线程时,执行的是实现类实现的call方法
public FutureTask(Callable<V> callable) {//如果入参的Callable为null,抛出异常if (callable == null)throw new NullPointerException();//不然赋值给成员变量//并将FutureTask状态设置为NEWthis.callable = callable;this.state = NEW; // ensure visibility of callable}/**需要传递两个参数:Runnable、Result前者是FutureTask作为任务提交给线程时,线程的执行逻辑后者是线程在任务完成时,需要get方法返回的结果result入参可以为null,表示不需要给定的结果
*/
public FutureTask(Runnable runnable, V result) {//callable方法会将runnable、result封装成一个callablethis.callable = Executors.callable(runnable, result);//将FutureTask状态设置为NEWthis.state = NEW; // ensure visibility of callable}//将Runnable封装成Callable返回
public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}//RunnableAdapter实现了Callable,所以也可以作为任务交给线程执行。
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}}
4、获取执行结果的get()方法:
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)//如果任务是刚被创建(NEW)、或者是正在处理(COMPLETING)状态//说明任务还未被处理完毕,阻塞获取任务执行结果的线程s = awaitDone(false, 0L);//如果任务是其它状态,说明任务已经处理完毕//report方法会根据任务状态返回结果给调用get方法的线程return report(s);}
4.1、report()方法:
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)//任务状态为NORMAL,正常返回结果return (V)x;if (s >= CANCELLED)//任务状态为CANCELLED、INTERRUPTING、INTERRUPTED其中之一//抛出CancellationException异常throw new CancellationException();//任务状态为EXCEPTIONAL,抛出ExecutionException异常throw new ExecutionException((Throwable)x);}
4.2、awaitDone()方法:
private int awaitDone(boolean timed, long nanos)throws InterruptedException {//deadline变量赋值分两种情况//调用了get()方法:deadline = 0//调用了get(long timeout,TimeUnit unit)方法:deadline = 当前时间 + timeoutfinal long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//如果当前线程被打断了//就将其从等待链表中移除,并抛出InterruptedException异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}//获取当前任务的状态int s = state;//任务已完成(正常完成、取消、打断都算完成)if (s > COMPLETING) {//如果有线程在等待,就将线程设置为null,防止内存溢出if (q != null)q.thread = null;//返回任务状态return s;}//任务正在处理//当前等待执行结果的线程调用yield方法让os将CPU时间片分给其它线程else if (s == COMPLETING) // cannot time out yetThread.yield();//任务刚被创建,处于NEW状态//若等待节点q为null,则创建一个等待节点else if (q == null)q = new WaitNode();//如果当前等待节点还未加入等待队列,则通过CAS操作将其加入等待队列else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);//如果设置了超时时间,则计算等待时间//1.等待时间 >= 超时时间,那么将等待节点移除,并返回任务状态//2.等待时间 < 超时时间,那么就阻塞:超时时间 - 等待时间else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}//如果没有设置超时时间,通过LockSupport无时间限制的阻塞当前线程elseLockSupport.park(this);}}
4.3、get方法流程图:
get方法测试:
public class FutureTaskTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//通过匿名内部类的形式实现call方法Callable<String> callable = new Callable<String>() {@Overridepublic String call() throws Exception {//模拟线程执行任务耗时Thread.sleep(3000);return "task is completed!";}};FutureTask<String> task = new FutureTask<>(callable);long begin = System.currentTimeMillis();//将任务task分配给worker线程Thread worker = new Thread(task);worker.start();//主线程获取worker线程的执行结果String result = task.get();long end = System.currentTimeMillis();System.out.println("等待" + (end - begin) + "ms后获取到执行结果:" + result);}
}
执行结果:
补充:
在分析FutureTask构造方法时,第二个构造方法很有意思。RunnableAdapter只负责组合Runnable,并实现Callable接口,call()方法的逻辑由作为成员变量的Runnable实现,这里有点适配器模式的味道。如下图所示: