Java并发编程笔记之FutureTask源码分析

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

类图结构如下所示:

线程池使用 FutureTask 时候需要注意的一点事,FutureTask 使用不当可能会造成调用线程一直阻塞,如何避免?

线程池使用 FutureTask 的时候如果拒绝策略设置为了 DiscardPolicyDiscardOldestPolicy并且在被拒绝的任务的 Future 对象上调用无参 get 方法那么调用线程会一直被阻塞。

下面先通过一个简单的例子来复现问题,代码如下:

 

public class FutureTest {//(1)线程池单个线程,线程池队列元素个数为1private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {//(2)添加任务oneFuture futureOne = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println("start runable one");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}});//(3)添加任务twoFuture futureTwo = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println("start runable two");}});//(4)添加任务threeFuture futureThree=null;try {futureThree = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println("start runable three");}});} catch (Exception e) {System.out.println(e.getLocalizedMessage());}System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three执行完毕executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕}


 

 

运行结果如下:

代码 (1) 创建了一个单线程并且队列元素个数为 1 的线程池,并且拒绝策略设置为了DiscardPolicy

代码(2)向线程池提交了一个任务 one,那么这个任务会使用唯一的一个线程进行执行,任务在打印 start runable one后会阻塞该线程 5s.

代码(3)向线程池提交了一个任务 two,这时候会把任务 two 放入到阻塞队列

代码(4)向线程池提交任务 three,由于队列已经满了则会触发拒绝策略丢弃任务 three, 从执行结果看在任务 one 阻塞的 5s 内,主线程执行到了代码 (5) 等待任务 one 执行完毕,当任务 one 执行完毕后代码(5)返回,主线程打印出 task one null。任务 one 执行完成后线程池的唯一线程会去队列里面取出任务 two 并执行所以输出 start runable two 然后代码(6)会返回,这时候主线程输出 task two null,然后执行代码(7)等待任务 three 执行完毕,从执行结果看代码(7)会一直阻塞不会返回,至此问题产生,如果把拒绝策略修改为 DiscardOldestPolicy 也会存在有一个任务的 get 方法一直阻塞只是现在是任务 two 被阻塞。但是如果拒绝策略设置为默认的 AbortPolicy 则会正常返回,并且会输出如下结果:

 

要分析这个问题需要看下线程池的 submit 方法里面做了什么,submit 方法源码如下:

 

public Future<?> submit(Runnable task) {...//(1)装饰Runnable为Future对象RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);//(6)返回future对象return ftask;
}protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}public void execute(Runnable command) {...//(2) 如果线程个数消息核心线程数则新增处理线程处理int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//(3)如果当前线程个数已经达到核心线程数则任务放入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//(4)尝试新增处理线程进行处理else if (!addWorker(command, false))reject(command);//(5)新增失败则调用拒绝策略
}


 

 

代码(1)装饰 Runnable 为 FutureTask 对象,然后调用线程池的 execute 方法。

代码 (2) 如果线程个数消息核心线程数则新增处理线程处理

代码(3)如果当前线程个数已经达到核心线程数则任务放入队列

代码(4)尝试新增处理线程进行处理,失败则进行代码(5),否者直接使用新线程处理

代码(5)执行具体拒绝策略,从这里也可以看出拒绝策略执行是使用的业务线程。

所以要分析上面例子中问题所在只需要看步骤(5)对被拒绝任务的影响,这里先看下拒绝策略 DiscardPolicy 的源码,如下:

 

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}


 

 

可知拒绝策略 rejectedExecution 方法里面什么都没做,所以代码(4)调用 submit 后会返回一个 future 对象,这里有必要在重新说 future 是有状态的,FutureTask 内部有一个state用来展示任务的状态,并且是volatile修饰的,future 的状态枚举值如下:

 

/** Possible state transitions:* NEW -> COMPLETING -> NORMAL 正常的状态转移* NEW -> COMPLETING -> EXCEPTIONAL 异常* NEW -> CANCELLED 取消* NEW -> INTERRUPTING -> INTERRUPTED 中断*/private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;


 

 

在代码(1)的时候使用 newTaskFor 方法转换 Runnable 任务为 FutureTask,而 FutureTask 的构造函数里面设置的状态就是 New。FutureTask的构造函数源码如下:

 

public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

 

 

把FutureTask提交到线程池或者线程执行start时候会调用run方法,源码如下:

 

public void run() {//如果当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程可以成功。if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//当前状态为new 则调用任务的call方法执行任务Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);//完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移}//执行任务成功则保存结果更新状态,unpark所有等待线程。if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {//状态从new->COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;//状态从COMPLETING-》NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//unpark所有等待线程。finishCompletion();}
}

 

 

所以使用 DiscardPolicy 策略提交任务后返回了一个状态值为NEW的future对象。那么我们下面就要看下当future的无参get()方法的时候,future变为什么状态才会返回,这时候就要看一下FutureTask的get方法的源码,源码如下:

 

   public V get() throws InterruptedException, ExecutionException {int s = state;//当状态值<=COMPLETING时候需要等待,否者调用report返回if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//如果被中断,则抛异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}//组建单列表int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();//超时则返回if (nanos <= 0L) {removeWaiter(q);return state;}//否者设置park超时时间LockSupport.parkNanos(this, nanos);}else//直接挂起当前线程LockSupport.park(this);}}private V report(int s) throws ExecutionException {Object x = outcome;//状态值为NORMAL正常返回if (s == NORMAL)return (V)x;//状态值大于等于CANCELLED则抛异常if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}


 

 

也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并没有设置该 future 的状态,后面也没有其他机会可以设置该 future 的状态,所以 future 的状态一直是 NEW,所以一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。、

在submit任务后还可以调用futuretask的cancel来取消任务:

 

public boolean cancel(boolean mayInterruptIfRunning) {//只有任务是new的才能取消if (state != NEW)return false;//运行时允许中断if (mayInterruptIfRunning) {//完成new->INTERRUPTINGif (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))return false;Thread t = runner;if (t != null)t.interrupt();//完成INTERRUPTING->INTERRUPTEDUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state}//不允许中断则直接new->CANCELLEDelse if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))return false;finishCompletion();return true;
} 

 

 

那么默认的 AbortPolicy 策略为啥没问题呢?

也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并没有设置该 future 的状态,后面也没有其他机会可以设置该 future 的状态,所以 future 的状态一直是 NEW,所以一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。

所以当使用 Future 的时候,尽量使用带超时时间的 get 方法,这样即使使用了 DiscardPolicy 拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,如果非要使用不带参数的 get 方法则可以重写 DiscardPolicy 的拒绝策略在执行策略时候设置该 Future 的状态大于 COMPLETING 即可,但是查看 FutureTask 提供的方法发现只有 cancel 方法是 public 的并且可以设置 FutureTask 的状态大于 COMPLETING,重写拒绝策略具体代码可以如下:

 

/*** Created by cong on 2018/7/13.*/
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {if (!threadPoolExecutor.isShutdown()) {if(null != runnable && runnable instanceof FutureTask){((FutureTask) runnable).cancel(true);}}}
}

 

使用这个策略时候由于从 report 方法知道在 cancel 的任务上调用 get() 方法会抛出异常所以代码(7)需要使用 try-catch 捕获异常代码(7)修改为如下:

 

package com.hjc;import java.util.concurrent.*;/*** Created by cong on 2018/7/13.*/
public class FutureTest {//(1)线程池单个线程,线程池队列元素个数为1private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L,TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler());public static void main(String[] args) throws Exception {//(2)添加任务oneFuture futureOne = executorService.submit(new Runnable() {public void run() {System.out.println("start runable one");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}});//(3)添加任务twoFuture futureTwo = executorService.submit(new Runnable() {public void run() {System.out.println("start runable two");}});//(4)添加任务threeFuture futureThree = null;try {futureThree = executorService.submit(new Runnable() {public void run() {System.out.println("start runable three");}});} catch (Exception e) {System.out.println(e.getLocalizedMessage());}System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕try{System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three}catch(Exception e){System.out.println(e.getLocalizedMessage());}executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕}
}

 

 

运行结果如下:

当然这相比正常情况下多了一个异常捕获,其实最好的情况是重写拒绝策略时候设置 FutureTask 的状态为 NORMAL,但是这需要重写 FutureTask 方法了,因为 FutureTask 并没有提供接口进行设置。

作者:狂小白

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/521449.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动动手指头, Feed 流系统亿级规模不用愁

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 少强责编 | 阿秃导读&#xff1a;互联网进入移动互联网时代&#xff0c;最具代表性的产品就是各种信息流&#xff0c;像是朋友圈、微博、头条等。这些移动化联网时代的新产品在过去几年间借着智能手机的风高速成长。这些产品…

(需求实战_进阶_02)SSM集成RabbitMQ 关键代码讲解、开发、测试

接上一篇&#xff1a;&#xff08;企业内部需求实战_进阶_01&#xff09;SSM集成RabbitMQ 关键代码讲解、开发、测试 https://gblfy.blog.csdn.net/article/details/104197309 文章目录一、RabbitMQ配置文件1. RabbitMQ生产者配置文件2. RabbitMQ消费者配置文件3. 连接配置文件…

vue 判断同一数组内的值是否一直_vue一些笔记

vuex action&#xff1a;尤雨溪在知乎回答了&#xff0c;区分 actions 和 mutations 并不是为了解决竞态问题&#xff0c;vuex 真正限制你的只有 mutation 必须是同步的这一点&#xff0c;只是为了devtools追踪状态变化&#xff0c;或者说出于单一职责原则。https://www.zhihu.…

机器学习从业人员到底做什么?

这篇文章是系列文章的第1部分,第2部分将阐述AutoML和神经架构搜索、第3部分将特别地介绍Google的AutoML。 关于机器学习人才的稀缺和公司声称他们的产品能够自动化机器学习而且能完全消除对ML专业知识需求的承诺经常登上媒体的新闻头条。在TensorFlow DevSummit的主题演讲中&a…

黑科技揭秘:面对海量的文本翻译任务,阿里翻译团队是如何解决的

摘要&#xff1a; 对国际化企业来说语言问题是亟待突破的重要关口。面对海量的文本翻译任务&#xff0c;昂贵低效的人工翻译显然不能满足需求&#xff0c;利用计算机自动进行文本翻译的机器翻译才是解决这个问题的关键。阿里翻译团队在机器翻译领域做了大量技术储备&#xff0c…

(需求实战_进阶_03)SSM集成RabbitMQ 路由模式关键代码讲解、开发、测试

接上一篇&#xff1a;&#xff08;企业内部需求实战_进阶_02&#xff09;SSM集成RabbitMQ 关键代码讲解、开发、测试 https://gblfy.blog.csdn.net/article/details/104214033 上一篇给大家介绍了在RabbitMQ 的管控台中&#xff0c;将队列绑定到指定的交换机上&#xff1b;这片…

在计算机中dos代表什么意思,Boot是什么意思

Boot是什么意思如果你去问一个学计算机的人&#xff0c;“启动”是计算机中的那个单词?回答一定是Boot。可是&#xff0c;Boot原来的意思是靴子&#xff0c;“启动”与靴子有什么关系呢?原来&#xff0c;这里的Boot是bootstrap(鞋带)的缩写&#xff0c;它来自一句谚语&#x…

你知道哪些情况下不该使用深度学习吗?

深度学习不适用于什么样的任务&#xff1f;依我之见&#xff0c;以下这些主要场景的深度学习弊大于利。01低成本或者低承诺问题深网是非常灵活的模型&#xff0c;有着许多架构和节点类型&#xff0c;优化器和正则化策略。根据应用&#xff0c;你的模型可能会有卷基层&#xff0…

秒后面的单位是什么_单位与国际单位制是如何由来的?

2013年国庆期的一则网络消息说&#xff0c;11万人看升旗留下了5吨垃圾。有人认为这是一则假消息&#xff0c;因为5吨&#xff1d;5000千克&#xff0c;110000500022千克/人&#xff0c;而每人携带22千克&#xff08;44斤&#xff09;的垃圾是不可能的。以前还看过一个说法&…

(需求实战_进阶_04)SSM集成RabbitMQ 通配符模式 关键代码讲解、开发、测试

背景&#xff1a; 为了减轻服务器的压力&#xff0c;现在原有项目的基础上集成消息队列来异步处理消息! 此项目是企业真实需求&#xff0c;项目的代码属于线上生产代码&#xff0c;直接用于生产即可&#xff01; 此项目采用MQ发送消息模式为:通配符模式&#xff0c;如果对Rabbi…

halo多人正在连接服务器,在线人数过低 《光晕2》PC版多人服务器下月关闭

这也许是一个让粉丝略伤感的消息&#xff0c;《光晕2(Halo2)》PC多人游戏服务器将在下个月永久关闭。343 Industries注意到服务器的峰值在线人数一直仅有20人&#xff0c;因此做出了关闭服务器的决定。343 Industries在Halo Waypoint中说道&#xff1a;“我们很遗憾地宣布&…

纯Python实现鸢尾属植物数据集神经网络模型

摘要&#xff1a; 本文以Python代码完成整个鸾尾花图像分类任务&#xff0c;没有调用任何的数据包&#xff0c;适合新手阅读理解&#xff0c;并动手实践体验下机器学习方法的大致流程。 尝试使用过各大公司推出的植物识别APP吗&#xff1f;比如微软识花、花伴侣等这些APP。当你…

【明人不说暗话】我就只讲进程与线程

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 阮一峰责编 | 阿秃进程&#xff08;process&#xff09;和线程&#xff08;thread&#xff09;是操作系统的基本概念&#xff0c;但是它们比较抽象&#xff0c;不容易掌握。最近&#xff0c;我读到一篇材料&#xff0c;发现有…

(需求实战_进阶_05)SSM集成RabbitMQ 通配符模式 关键代码讲解、开发、测试

接上一篇&#xff1a; 文章目录一、RabbitMQ 配置文件1. RabbitMQ 生产者配置文件更新二、启动项目2.1. 启动项目2.2. 清空控制台三、管控台总览3.1. 登录管控台3.2. 交换机中查看绑定队列总览四、验证测试4.4. 生产者①请求4.5. 生产者②请求五、启动RabbitMQ5.1. 进入sbin目录…

两台邮件服务器共用一个公网地址,两个不同域邮件服务器的互通

两个不同域的邮件服务的互通如图&#xff0c;有两个不同域的邮件服务器(postfix)通过一个DNS服务器实现互通。首先说明一下IP分配情况服务器1qq.cometh0(VMnet2)&#xff1a; ip:192.168.2.2 netmask:255.255.255.0 gw 192.168.2.1 hostname:mail.qq.com服务器2(qq.neteht0VMne…

希捷银河声音大_【推仔说新闻】那款硬盘它终于来了 希捷推出首款双磁臂硬盘...

经常关注科技新闻的朋友们应该都知道&#xff0c;现在机械硬盘领域可以说是被固态硬盘冲击的不清&#xff0c;而对于我们广大用户们来说&#xff0c;HDD这一个储存介质就被我们更多的用来充当仓库盘使用&#xff0c;毕竟现在的固态已经下探到白菜级别的价格了。但是对于那些HDD…

(需求实战_进阶_06)SSM集成RabbitMQ 订阅模式 关键代码讲解、开发、测试

背景&#xff1a; 为了减轻服务器的压力&#xff0c;现在原有项目的基础上集成消息队列来异步处理消息! 此项目是企业真实需求&#xff0c;项目的代码属于线上生产代码&#xff0c;直接用于生产即可&#xff01; 此项目采用MQ发送消息模式为:订阅模式&#xff0c;如果对RabbitM…

【目瞪口呆】通信机房内部长这样

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 小枣君责编 | 刘晶晶大家好&#xff0c;我是小枣君。一直以来&#xff0c;我都在努力给大家做通信知识科普&#xff0c;也写了很多有趣的文章。不过&#xff0c;文章再有趣也只是文字&#xff0c;不是实物。现实生活中&#…

NLP的ImageNet时代已经到来

摘要&#xff1a; NLP领域即将巨变&#xff0c;你准备好了吗&#xff1f; 自然语言处理&#xff08;NLP&#xff09;领域正在发生变化。 作为NLP的核心表现技术——词向量&#xff0c;其统治地位正在被诸多新技术挑战&#xff0c;如&#xff1a;ELMo&#xff0c;ULMFiT及Open…

mysql字段分隔符拆分_面试题Mysql数据库优化之垂直分表

在日常的开发工作中&#xff0c;除了JAVA相关的技术&#xff0c;打交道最多的就是Mysql数据库&#xff0c;当数据积累到一定程度&#xff0c;比如500W时就会难免出现一些慢sql&#xff0c;对数据库的优化方式有很多&#xff0c;比如通过增加合理的索引&#xff0c;今天我们来说…