看图学源码之FutureTask

RunnableFuture

源码学习:

成员变量

任务的运行状态的转化

image-20231224220654494

package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;/**可取消的异步计算。该类提供了Future的基本实现,包括启动和取消计算的方法,查询计算是否完成以及获取计算结果的方法。只有在计算完成后才能获取结果;如果计算尚未完成,get方法将会阻塞。一旦计算完成,就无法重新启动或取消计算(除非使用runAndReset方法调用计算)。FutureTask可以用来包装一个Callable或Runnable对象。由于FutureTask实现了Runnable接口,因此可以将FutureTask提交给Executor执行。 除了作为独立的类使用外,该类还提供了一些受保护的功能,这些功能在创建自定义任务类时可能会有用。*/
public class FutureTask<V> implements RunnableFuture<V> {/**此任务的运行状态,初始为NEW。运行状态仅在set、setException和cancel方法中转换为终态。在完成过程中,状态可能会暂时变为COMPLETING(在设置结果时)或INTERRUPTING(仅在中断执行者以满足cancel(true)时)。从这些中间状态到最终状态的转换使用更便宜的有序/懒惰写入,因为这些值是唯一的且不会进一步修改。* Possible state transitions:* NEW -> COMPLETING -> NORMAL  正常结束* NEW -> COMPLETING -> EXCEPTIONAL   异常结束* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/// 状态值: 表示任务运行的状态private volatile int state;// 新建 或者 正在运行private static final int NEW          = 0;// 中间状态(任务执行完了,但是结果集(正结果/ 异常) 没有设置到 outcome)private static final int COMPLETING   = 1;// 正常执行完成(结果集设置到outcome之后,正常结束)private static final int NORMAL       = 2;// 异常执行完成(结果集设置到outcome之后,异常结束)private static final int EXCEPTIONAL  = 3;// 取消private static final int CANCELLED    = 4;// 中断(中间值)[但是还没有中断]private static final int INTERRUPTING = 5;// 中断完成,最终状态private static final int INTERRUPTED  = 6;/** The underlying callable; nulled out after running */// 执行目标private Callable<V> callable;/** The result to return or exception to throw from get() */// 结果集private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 执行任务的线程private volatile Thread runner;/** Treiber stack of waiting threads */// get 阻塞的时候,使用 WaitNode{物理结构:链表;逻辑结构:栈}去存储阻塞的线程private volatile WaitNode waiters;public FutureTask(Callable<V> callable) {if (callable == null)  throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);  // 适配器的方式this.state = NEW;       // ensure visibility of callable}public boolean isCancelled() {return state >= CANCELLED;}public boolean isDone() {return state != NEW;}/*** 简单的链表节点,用于记录Treiber堆栈中的等待线程。*/static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}}

run()

任务只能执行一次

image-20231224221445999

public void run() {//  状态 是New   并且 cas 成功的把当前线程设置到  runner 才能执行后续的方法,否则就直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 当前 要执行的任务存在,并且  状态 是New 才会调用目标逻辑  c.call()if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();// 目标逻辑执行成功ran = true;} catch (Throwable ex) {// 目标逻辑执行 失败 ,结果 置为 nullresult = null;ran = false;// 设置异常结果集setException(ex);}if (ran)// 设置正常结果集set(result);}} finally {// 在任务状态被确定之前,runner必须非空,以防止对run()方法的并发调用。runner = null;// 在将runner设置为null之后,必须重新读取任务的状态,以防止泄漏的中断。int s = state;if (s >= INTERRUPTING)  // 处于中断状态,执行中断后续逻辑handlePossibleCancellationInterrupt(s);}
}@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}/*将此Future的结果设置为给定的值,除非此Future已经被设置或已取消。在计算成功完成时,此方法由run方法在内部调用。
*/protected void set(V v) {// cas 的方式把状态变为  COMPLETING ,设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 结果 result outcome = v;// cas 的方式把状态变为最终状态: NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 执行后续操作finishCompletion();}}/*将此Future报告为ExecutionException,将给定的throwable作为其原因,除非此Future已经被设置或已取消。在计算失败时,此方法由run方法在内部调用。
*/protected void setException(Throwable t) {// cas 的方式把状态变为  COMPLETING ,设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 异常outcome = t;// cas 的方式把状态变为最终状态: EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 执行后续操作finishCompletion();}}/*
确保任何来自可能的cancel(true)取消操作的中断仅在run或runAndReset方法中传递给任务的目的。
*/private void handlePossibleCancellationInterrupt(int s) {//  解释了在等待中断信号时使用自旋等待的目的: 通过中断来取消任务的执行。然而,可能存在一种情况,即中断线程在  有机会中断当前线程  之前被阻塞。为了等待中断信号的到来,代码使用了自旋等待的策略。if (s == INTERRUPTING)while (state == INTERRUPTING) // 处于中间状态的时候就 让出 cpuThread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// 解释了在state等于INTERRUPTED时的处理逻辑// 它使用断言(assert)来确保任务的状态为INTERRUPTED。断言通常用于在代码中插入一些检查,以确保某些条件为真。如果断言的条件为假,将会抛出一个AssertionError异常。// Thread.interrupted();// 解释了清除可能来自cancel(true)取消操作的中断的目的。但是,中断也可以作为一个独立的机制,用于任务与其调用者之间的通信,并且没有办法只清除取消中断。因此,为了清除中断,代码调用了Thread.interrupted()方法。// Thread.interrupted()方法用于清除当前线程的中断状态,并返回之前的中断状态。这样做的目的是确保任务的中断状态被清除,以便后续的代码或操作不会受到中断的影响。}
finishCompletion()

image-20231224221706281

    /*** 移除并唤醒所有等待的线程,调用done()方法,并将callable置为null。*/private void finishCompletion() {// assert state > COMPLETING;// 循环获取等待队列中的等待节点 waiters, 等待节点里面保存了等待任务完成的线程for (WaitNode q; (q = waiters) != null;) {// 要是cas 的方式成功的将等待队列 waitersOffset 设置为 null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 循环处理每一个等待的节点for (;;) {// 取出当前等待节点 持有的线程Thread t = q.thread;// 线程存在if (t != null) {// 将等待节点的线程引用设为null,并调用LockSupport.unpark(t)方法来唤醒该线程q.thread = null;LockSupport.unpark(t); //LockSupport.unpark(t)方法用于唤醒一个被阻塞的线程。}WaitNode next = q.next; // 取节点下一个元素if (next == null)// 要是没有后继节点,此时表示已经处理完所有等待节点,退出 死循环break;// 将后继节点置为 nullq.next = null; // unlink to help gc // 节点后移q = next;}break; // (q = waiters) == null 退出循环}}done(); // 调用done()方法来完成任务的执行 —————— 钩子方法/*这段代码是一个保护(protected)方法,当任务转换为已完成状态(isDone)时被调用,无论是正常完成还是通过取消完成。默认实现不执行任何操作。子类可以重写这个方法来调用完成回调或进行记录。注意,在该方法的实现中,您可以查询状态来确定任务是否已被取消。protected void done() { }*/callable = null;        // to reduce footprint  将callable引用设为null,以减少内存占用。}

runAndReset()

任务可以执行多次

  • 和 run() 的区别就是 没有正常的结果设置结果集

image-20231224221750745

/**执行计算,但不设置其结果,然后将该Future重置为初始状态。如果计算遇到异常或被取消,则无法执行重置操作。这个方法设计用于那些本质上需要执行多次的任务。
*/// 执行并重置任务
protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}/*和 run()  的区别就是  没有正常的结果设置结果集*/}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;
}

get()

image-20231224222000579

    /*** 如果计算被取消  会抛出异常*/public V get() throws InterruptedException, ExecutionException {int s = state;// 此时的状态只有  New  (新建 或者 正在执行) 或者  COMPLETING(任务执行结束,结果集该没有设置成功)if (s <= COMPLETING)// 阻塞等待s = awaitDone(false, 0L);// 否则的话 返回结果集(正常 或者 异常)return report(s);}/**等待方法,根据传入的参数决定是否使用定时等待。如果使用定时等待,则会在指定的时间内  等待完成   或者  在中断或超时时中止。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;//[注:] 带有阻塞的都要有 死循环,防止虚假唤醒for (;;) {  // 死循环if (Thread.interrupted()) { // 判断是不是中断removeWaiter(q);  // 移除节点 throw new InterruptedException(); // 抛错} int s = state;if (s > COMPLETING) {// 此时结果设置成功// 当前节点存在,将其持有的线程 置空if (q != null)q.thread = null;// 返回结果,结束阻塞return s;}// 还是在设置结果的状态,让出 cpu else if (s == COMPLETING) // cannot time out yetThread.yield();// 处于 new  状态else if (q == null)// 创建节点q = new WaitNode();// 插入链表else if (!queued)// cas 的方式 头插法queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 超时等待,else if (timed) {nanos = deadline - System.nanoTime();// 时间已过,移除结果,返回状态if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}/***完成任务之后 返回结果或者抛出异常*/@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
removeWaiter(WaitNode node)

image-20231224222121607

    /*** 尝试取消链接超时或中断的等待节点,以避免累积垃圾。内部节点只是简单地取消拼接,而无需使用CAS,因为如果它们被释放器遍历,这样做是无害的。为了避免从已经删除的节点中取消拼接的影响,在明显存在竞争的情况下,列表将被重新遍历。当节点很多时,这会很慢,但我们不希望列表足够长以抵消更高开销的方案。*/private void removeWaiter(WaitNode node) {if (node != null) {// 当前节点存在// 将传入节点的线程引用置为null,表示该节点不再持有线程node.thread = null;retry:// 死循环for (;;) {          // restart on removeWaiter race// 遍历链表 中的所有的 等待节点for (WaitNode pred = null, q = waiters, s; q != null; q = s) {// 取出下一个节点s = q.next;// 要是此等待节点持有的线程不是 nullif (q.thread != null)pred = q; // 将前置节点设置为 当前节点q else if (pred != null) {// 等待节点持有的线程是 null ,但是前置节点 不是 null// 前驱节点的next指向当前节点pred.next = s; // 前置节点持有的线程不存在了,表示存在竞争情况,需要重新开始循环。执行下次死循环if (pred.thread == null) // check for racecontinue retry;}
//等待节点持有的线程是 null ,但是前置节点 是 null, cas的方式成功的将节点下移,当前节点从等待队列中移除,执行下次死循环// cas的方式将waitersOffset处的值从q替换为selse if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}// 内层遍历结束,等待集合中没有无效节点break;}}}

get(long timeout, TimeUnit unit)

image-20231224222252206

public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state; // 获取当前对象的状态//  调用awaitDone方法来等待操作完成,如果返回的状态值小于等于COMPLETING,则表示操作未完成,继续等待,如果等待的时间超过了超时时间,则抛出TimeoutException异常。if (s <= COMPLETING &&  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();//将最终的状态值作为参数传递给report方法,并返回report方法的返回值。return report(s);
}

cancel()

image-20231224222312072

    public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;/*mayInterruptIfRunning:true:  可以中断正在执行的任务 INTERRUPTINGfasle: 不可以中断正在执行的任务 CANCELLED*/
// 状态是 new ;cas 的方式把 任务的状态从"NEW"修改为"INTERRUPTING"或"CANCELLED"。如果修改成功,表示取消成功,返回true。try {    // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;  // 尝试中断正在执行任务的线程if (t != null)  // 如果任务的runner不为null,则调用interrupt()方法中断线程。t.interrupt();} finally { // final state// 设置任务最终的状态  cas 的方式将任务的状态修改为"INTERRUPTED"。UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 调用finishCompletion()方法完成任务的处理finishCompletion();}return true; //返回true表示取消成功。}

手撕FutureTask:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;public class FutureTask_<T> implements Runnable {private Future_<T> future;public FutureTask_(Future_<T> future) {this.future = future;}public FutureTask_(Runnable runnable) {this.future = new FutureAdaptive(runnable);}@Overridepublic void run() {try{res = this.future.code();state = 1;}catch (Exception e){res = e;state = 2;}for (Thread thread : threadList){LockSupport.unpark(thread); // 唤醒}}private Object res;private  volatile  int state;private List<Thread>  threadList = new ArrayList<>();public T get(){for (;;){if(state == 0){threadList.add(Thread.currentThread());LockSupport.park();  // 阻塞}else if(state == 1){return (T)res;}else if(state == 2){throw new RuntimeException(res.toString());}}}private class FutureAdaptive implements Future_<T> {public  Runnable runnable;public FutureAdaptive(Runnable runnable) {this.runnable = runnable;}@Overridepublic T code() throws Exception {this.runnable.run();return null;}}
}class MM {public static void main(String[] args){Future_<String> future = new Future_<String>() {@Overridepublic String code() throws Exception {return "future";}};Runnable runnable = new Runnable(){@Overridepublic void run() {System.out.println("runnable");}};FutureTask_<String> future_ = new FutureTask_<String>(future);FutureTask_<String> runnable_ = new FutureTask_<String>(runnable);new Thread(future_).start();new Thread(runnable_).start();System.out.println(future_.get());LockSupport.parkNanos(2*1000*1000*1000);}
}interface Future_<T>{T code() throws Exception;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/466860.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单片机的引脚,你都清楚吗?

第1课&#xff1a;单片机简叙1.单片机可以做什么&#xff1f;目前单片机渗透到我们生活的各个领域&#xff0c;几乎很难找到哪个领域没有单片机的踪迹。小到电话&#xff0c;玩具&#xff0c;手机&#xff0c;各类刷卡机&#xff0c;电脑键盘&#xff0c;彩电&#xff0c;冰箱&…

Graphviz的安装及纠错

在Anaconda Prompt里边输入conda install graphviz 安装成功之后输入pip install graphviz 它会提示成功安装。 启动 Jupyter Notebook &#xff0c;在文件里边输入 import graphviz 测试&#xff0c;如果没有报错证明&#xff0c;模块安装成功&#xff0c;但是在运行程序…

sklearn——决策树

总结sklearn决策树的使用&#xff0c;方便以后查阅。1.分类决策树 &#xff08;基于CART树&#xff09; 原型&#xff1a;参数&#xff1a;2、回归分类树 原型&#xff1a;参数&#xff1a;3、export_graphviz 当训练完毕一颗决策树时&#xff0c;可以通过sklearn.tree.expor…

Linux下SVN服务器的搭建

Linux下SVN服务器的搭建 宗旨&#xff1a;技术的学习是有限的&#xff0c;分享的精神是无限的。 1、下载工具&#xff08;下载地址&#xff1a;&#xff09; subversion-1.6.1.tar.gz subversion-deps-1.6.1.tar.gz 2、解压两个包&#xff1a; a) tar -xzvf subvers…

记一次解决问题的掉坑过程

这两天在调试一个音频ADC 芯片&#xff0c;也是之前的项目&#xff0c;但是一直调不出来&#xff0c;我发现我总是在这样的问题上纠结很久&#xff0c;以前踩过的坑后面照样会踩&#xff0c;只不过踩完会迅速把脚拉出来继续前进&#xff0c;我经常听到有人说「做嵌入式真的太容…

sklearn——AdaBoost应用

选自《python大战机器学习》

面试常见的C语言字符串操作

#字符串倒序输出实现逻辑&#xff0c;通过strlen获取字符串长度&#xff0c;然后通过 len/2 进行交叉赋值&#xff0c;这里需要注意&#xff0c;不需要考虑len是奇数还是偶数的问题。如果len是奇数&#xff0c;最后一个字符就不需要倒序&#xff0c;如果是偶数&#xff0c;最后…

HttpHandler:给指定路径下的图片添加水印显示

圣诞节&#xff0c;25日&#xff0c;要交ACCP5.0认证的项目&#xff0c;其中有这样一个要求&#xff1a;书店的所有图书的封面放在了\images\convers\下面&#xff0c;要求所有引用这一路径下的图片都添加书店的店名水印图片。就是说拦截Http请求了&#xff0c;自然想到HttpHan…

Linux 下的复制命令,这几个比较靠谱

平时我们使用Linux复制命令的时候&#xff0c;一般使用 cp命令&#xff0c;但是cp 命令性能比较令人担忧使用tar 命令来拷贝大量文件通过对比下面的几个命令&#xff0c;在拷贝比较多而且比较大的文件的话&#xff0c;用git clone 比较靠谱&#xff0c;特别是复制代码库&#x…

Mendeley文献管理软件使用介绍

<!DOCTYPE html>New DocumentMendeley 是一款免费的跨平台文献管理软件&#xff0c;同时也是一个在线的学术社交网络平台。Mendeley 对 PDF、Bibtex 的支持非常好&#xff0c;可以直接导出 Bibtex 格式&#xff0c;还可以直接导入zotero数据库&#xff0c;决定了其兼容性…

过拟合问题——正则化方法

看了很多资料&#xff0c;本身想放一个正则化的概念的&#xff0c;实在不敢放&#xff0c;怕吓跑一堆人&#xff0c;所以&#xff0c;将就吧。首先&#xff0c;我们知道正则化&#xff08;Regularization&#xff09;是解决过拟合问题的&#xff0c;简单来说&#xff0c;过拟合…

CentOS 8明年正式停止维护,以后再也不会有免费的RHEL了!

CentOS 8 明年正式停止维护&#xff0c;以后再也不会有免费的 RHEL 了!CentOS 是 Community Enterprise Operating System&#xff08;社区企业操作系统&#xff09;的首字母缩写&#xff0c;是 100&#xff05; 重建的 RHEL&#xff08;红帽企业 Linux&#xff09;。尽管 RHEL…

Python sqlalchemy orm 多外键关联

多外键关联 注&#xff1a;在两个表之间进行多外键链接 如图&#xff1a; 案例&#xff1a; # 创建两张表并添加外键主键 # 调用Column创建字段 加类型 from sqlalchemy import Integer, ForeignKey, String, Column# 调用基类Base from sqlalchemy.ext.declarative import dec…

将DataFrame格式的数据存入到mysql数据库中

因为最近频繁操作数据库&#xff0c;特别是写入数据比较麻烦。在DataFrame格式或者是Series格式的数据处理之后&#xff0c;总是会面临写入数据&#xff0c;迫不得已只能进行格式转换&#xff0c;搜索过程中发现了to_sql&#xff08;&#xff09;函数&#xff0c;就百度了用法&…

从小米智能家居入手,揭秘物联网关键技术

物联网已不知不觉融入我们的生活中给我们带来便捷&#xff0c;比如&#xff0c;智能门锁、ETC 电子自动收费系统等&#xff0c;一开始感觉还挺很新奇的&#xff0c;现在也习以为常了。那到底什么是物联网&#xff1f;可能很多人还挺蒙圈的。所谓物联网&#xff0c;最终目的就是…

Orange——The Data

The Data 这个部分描述的是怎样在Orange上加载数据。我们也将展示如何探索数据&#xff0c;发现一些基本统计特性&#xff0c;怎么数据取样。 Data Input Orange可以读取本机以制表符分隔的格式的文件&#xff0c;也可以从任何主要的标准电子表格文件类型加载数据&#xff0…

[转载]VirtualBox网络配置详解

标题: [原创]VirtualBox网络配置详解来自 http://www.linuxsir.org/bbs/showthread.php?p1800679#post1800679 欢迎转载, 转载请注明作者, 谢谢下面简单介绍一下Test-bed Environment:Host : 偶的本本OS : Arch Linux (Kernel Version 2.6.20) 已安装uml_utilities(包含tunctl…

工作4年工资8K,还有什么理由不努力?

—— 提问发哥&#xff0c;我现在还是从事裸机开发&#xff0c;51单片机&#xff0c;会点arm&#xff0c;但没有用arm真正做完整开发的经验&#xff0c;细想感觉自己会的不是很多&#xff0c;薪资提不上去&#xff0c;想做更高级点的工作&#xff0c;虽然自学点&#xff0c;到没…

嵌入式的坑在哪方面?

在知乎看到的&#xff0c;觉得见解不错&#xff0c;看完我觉得会对嵌入式有些见解&#xff0c;分享给大家。今年毕业刚工作半年&#xff0c;稍微写一下自己的见解。嵌入式就个万金油&#xff01;干得活可以涉及硬件和软件&#xff01;永远学不完的知识。嵌入式工程师简直就是硬…

Orange-Classification,Regression

1.Classification Orange和sklearn一样&#xff0c;提供了Classification和Regression等机器学习的算法&#xff0c;具体使用如下&#xff1a; import Orangedata Orange.data.Table("voting") lr Orange.classification.LogisticRegressionLearner() rf Orange…