并发编程——Future CompletableFuture

文章目录

  • Future介绍
  • FutureTask使用
  • FutureTask 分析
  • CompletableFuture
    • CompletableFuture的应用
    • CompletableFuture 示例
  • 总结

Future介绍

Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。

Future是一个接口,一般会使用FutureTask实现类去接收Callable任务的返回结果。

FutureTask使用

下面示例使用FutureTask来执行一个可以返回结果的异步任务。Callable是要执行的任务,FutureTask是存放任务返回结果的位置。

public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<>(() -> {System.out.println("任务执行");Thread.sleep(2000);return 123+764;});Thread t = new Thread(futureTask);t.start();System.out.println("main线程启动了t线程处理任务");Integer result = futureTask.get();System.out.println(result);
}

FutureTask 分析

首先看一下FutureTask的核心属性

/*** NEW -> COMPLETING -> NORMAL          任务正常执行,返回结果是正常的结果* NEW -> COMPLETING -> EXCEPTIONAL     任务正常执行,但是返回结果是异常* NEW -> CANCELLED              任务直接被取消的流程* NEW -> INTERRUPTING -> INTERRUPTED*/
// 代表当前任务的状态
private volatile int state;
private static final int NEW          = 0;  // 任务的初始化状态
private static final int COMPLETING   = 1;  // Callable的结果(正常结果,异常结果)正在封装给当前的FutureTask
private static final int NORMAL       = 2;  // NORMAL任务正常结束
private static final int EXCEPTIONAL  = 3;  // 执行任务时,发生了异常
private static final int CANCELLED    = 4;  // 任务被取消了。
private static final int INTERRUPTING = 5;  // 线程的中断状态,被设置为了true(现在还在运行)
private static final int INTERRUPTED  = 6;  // 线程被中断了。// 当前要执行的任务
private Callable<V> callable;
// 存放任务返回结果的属性,也就是futureTask.get需要获取的结果
private Object outcome; 
// 执行任务的线程。
private volatile Thread runner;
// 单向链表,存放通过get方法挂起等待的线程
private volatile WaitNode waiters;

t.start后,是通过run方法执行的Callable的call方法,该方法是同步的,然后将返回结果赋值给了outcome。

// run方法的执行流程,最终会执行Callable的call方法
public void run() {// 保证任务的状态是NEW才可以运行// 基于CAS的方式,将当前线程设置为runner。if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;// 准备执行任务try {// 要执行任务 cCallable<V> c = callable;// 任务不为null,并且任务的状态还处于NEWif (c != null && state == NEW) {// 放返回结果V result;// 任务执行是否为正常结束boolean ran;try {// 运行call方法,拿到返回结果封装到result中result = c.call();// 正常返回,ran设置为trueran = true;} catch (Throwable ex) {// 结果为nullresult = null;// 异常返回,ran设置为falseran = false;// 设置异常信息setException(ex);}if (ran)// 正常执行结束,设置返回结果set(result);}} finally {// 将执行任务的runner设置空runner = null;// 拿到状态int s = state;// 中断要做一些后续处理if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}// 设置返回结果
protected void set(V v) {// 首先要将任务状态从NEW设置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将返回结果设置给outcome。outcome = v;// 将状态修改为NORMAL,代表正常技术UNSAFE.putOrderedInt(this, stateOffset, NORMAL);finishCompletion();}
}

get方法获取返回结果时会查看当前线程状态,如果状态还未达成,也就是说call方法还未执行完未执行set方法,该线程就会被挂起阻塞LockSupport.park(this);。

public V get() throws InterruptedException, ExecutionException {// 拿状态int s = state;// 满足找个状态就代表现在可能还没有返回结果if (s <= COMPLETING)// 尝试挂起线程,等待拿结果s = awaitDone(false, 0L);return report(s);
}// 线程要等待任务执行结束,等待任务执行的状态变为大于COMPLETING状态
private int awaitDone(boolean timed, long nanos) throws InterruptedException {// 计算deadline,如果是get(),就是0,  如果是get(time,unit)那就追加当前系统时间final long deadline = timed ? System.nanoTime() + nanos : 0L;// 构建WaitNodeWaitNode q = null;// queued = falseboolean queued = false;// 死循环for (;;) {// 找个get的线程是否中断了。if (Thread.interrupted()) {// 将当前节点从waiters中移除。removeWaiter(q);// 并且抛出中断异常throw new InterruptedException();}// 拿到现在任务的状态int s = state;// 判断任务是否已经执行结束了if (s > COMPLETING) {// 如果设置过WaitNode,直接移除WaitNode的线程if (q != null)q.thread = null;// 返回当前任务的状态return s;}// 如果任务的状态处于 COMPLETING ,else if (s == COMPLETING)// COMPLETING的持续时间非常短,只需要做一手现成的让步即可。Thread.yield();// 现在线程的状态是NEW,(call方法可能还没执行完呢,准备挂起线程)else if (q == null)// 封装WaitNode存放当前线程q = new WaitNode();else if (!queued)// 如果WaitNode还没有排在waiters中,现在就排进来(头插法的效果)queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {// get(time,unit)挂起线程的方式// 计算挂起时间nanos = deadline - System.nanoTime();// 挂起的时间,是否小于等于0if (nanos <= 0L) {// 移除waiters中的当前NoderemoveWaiter(q);// 返回任务状态return state;}// 正常指定挂起时间即可。(线程挂起)LockSupport.parkNanos(this, nanos);}else {// get()挂起线程的方式LockSupport.park(this);}}
}

当任务执行完毕(set方法执行完成),由finishCompletion唤醒线程,LockSupport.unpark(t);

// 任务状态已经变为了NORMAL,做一些后续处理
private void finishCompletion() {for (WaitNode q; (q = waiters) != null;) {// 拿到第一个节点后,直接用CAS的方式,将其设置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 基于q拿到线程信息Thread t = q.thread;// 线程不为nullif (t != null) {// 将WaitNode的thread设置为nullq.thread = null;// 唤醒这个线程LockSupport.unpark(t);}// 往后遍历,接着唤醒WaitNode next = q.next;if (next == null)break;q.next = null;// 指向next的WaitNodeq = next;}break;}}// 扩展方法,没任何实现,你可以自己实现done();// 任务处理完了,可以拜拜了!callable = null;   
}

拿到返回结果的处理

// 任务结束。
private V report(int s) throws ExecutionException {// 拿到结果Object x = outcome;// 判断是正常返回结束if (s == NORMAL)// 返回结果return (V)x;// 任务状态是大于取消if (s >= CANCELLED)// 甩异常。throw new CancellationException();// 扔异常。throw new ExecutionException((Throwable)x);
}// 正常返回 report
// 异常返回 report
// 取消任务 report
// 中断任务 awaitDone

CompletableFuture

FutureTask存在的问题:
问题1:FutureTask获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果。
问题2:如果不通过get去挂起线程,通过while循环,不停的判断任务的执行状态是否结束,结束后,再拿结果。如果任务长时间没执行完毕,CPU会一直调度查看任务状态的方法,会浪费CPU资源。

FutureTask是一个同步非阻塞处理任务的方式。需要一个异步非阻塞处理任务的方式。CompletableFuture在一定程度上就提供了各种异步非阻塞的处理方案。

CompletableFuture也是实现了Future接口实现的功能,可以不使用FutureTask,提供非常丰富的函数去执行各种异步操作,直接使用CompletableFuture即可。

CompletableFuture的应用

CompletableFuture最重要的就是解决了异步回调的问题

CompletableFuture就是执行一个异步任务,异步任务可以有返回结果,也可以没有返回结果,使用了函数式编程中三个最核心的接口

Supplier - 生产者,没有入参,但是有返回结果
Consumer - 消费者,有入参,但是没有返回结果
Function - 函数,有入参,并且有返回结果

提供了两个最基本运行的基本方法

supplyAsync(Supplier<U> supplier) 异步执行任务,有返回结果
runAsync(Runnable runnable) 异步执行任务,没有返回结果

在不指定线程池的前提下,这两个异步任务都是交给ForkJoinPool去执行的。

但是只是用这两个方法,无法实现异步回调的。如果需要在当前任务执行完毕后,拿着返回结果继续去执行后续任务操作的话,需要基于其他方法去实现。

thenApply(Function<prevResult,currResult>); 等待前一个任务处理结束后,拿着前置任务的返回结果,再做处理,并且返回当前结果
thenApplyAsync(Function<prevResult,currResult>,线程池) 采用全新的线程执行
thenAccept(Consumer<preResult>);等待前一个任务处理结束后,拿着前置任务的返回结果再做处理,没有返回结果
thenAcceptAsync(Consumer<preResult>,线程池);采用全新的线程执行
thenRun(Runnable) 等待前一个任务处理结束后,再做处理。不接收前置任务结果,也不返回结果
thenRunAsync(Runnable[,线程池]) 采用全新的线程执行

其次还有可以执行相对复杂的处理,在前一个任务执行的同时,执行后续任务。等待前置任务和后置任务都搞定之后,再执行最终任务

thenCombine(CompletionStage,Function<prevResult,nextResult,afterResult>) 让prevResult和nextResult一起执行,等待执行完成后,获取前两个任务的结果执行最终处理,最终处理也可以返回结果
thenCombineAsync(CompletionStage,Function<prevResult,nextResult,afterResult>[,线程池]) 采用全新的线程执行
thenAcceptBoth(CompletionStage,Consumer<prevResult,nextResult>);让前置任务和后续任务同时执行,都执行完毕后,拿到两个任务的结果,再做后续处理,但是没有返回结果
thenAcceptBothAsync(CompletionStage,Consumer<prevResult,nextResult>[,线程池])采用全新的线程执行
runAfterBoth(CompletionStage,Runnble) 让前置任务和后续任务同时执行,都执行完毕后再做后续处理
runAfterBothAsync(CompletionStage,Runnble[,线程池]) 采用全新的线程执行

还提供了可以让两个任务一起执行,但是有一个任务结束,有返回结果后,就做最终处理

applyToEither(CompletionStage,Function<firstResult,afterResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,再返回结果
acceptEither(CompletionStage,Consumer<firstResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,没有返回结果
runAfterEither(CompletionStage,Runnable) 前面两个任务同时执行,有一个任务执行完,做最终处理

还提供了等到前置任务处理完,再做后续处理,后续处理返回的结果为CompletionStage

thenCompose(Function<prevResult,CompletionStage>)

最后还有处理异常的各种姿势

exceptionally(Function<Throwable,currResult>)  
whenComplete(Consumer<prevResult,Throwable>) 
hanle(Function<prevResult,Throwable,currResult>)

CompletableFuture 示例

public static void main(String[] args) throws InterruptedException {sout("我回家吃饭");CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {sout("阿姨做饭!");return "锅包肉!";});sout("我看电视!");sout("我吃饭:" + task.join());
}
public static void main(String[] args) throws InterruptedException {sout("我回家吃饭");CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {sout("阿姨炒菜!");return "锅包肉!";},executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> {sout("小王焖饭");return "大米饭!";},executor),(food,rice) -> {sout("大王端" + food + "," + rice);return "饭菜好了!";},executor);sout("我看电视!");sout("我吃饭:" + task.join());
}

总结

Future和CompletableFuture都是用于处理异步任务的接口和类,它们的主要区别在于功能复杂度和使用场景。Future比较简单,主要用于简单的异步任务处理,而CompletableFuture则更加灵活和强大,适用于复杂的异步任务处理场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/84196.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux——(第十一章)软件包管理

目录 一、RPM 1.概述 2.RPM查询指令 3.RPM卸载指令 4.RPM安装命令 二、YUM 1.概述 2.YUM常用命令 一、RPM 1.概述 RPM&#xff08;RedHat Package Manager&#xff09;&#xff0c;RedHat软件包管理工具&#xff0c;类似windows里面的setup.exe是Linux这系列操作系统里…

Hadoop初识及信息安全(大数据的分布式存储和计算平台)

目录 什么是Hadoop Hadoop的特点 Hadoop优点 Hadoop的缺点 Hadoop的重要组成 信息安全 什么是Hadoop Hadoop 是一个适合大数据的分布式存储和计算平台。 Hadoop的广义和狭义区分&#xff1a; 狭义的Hadoop:指的是一个框架&#xff0c;Hadoop是由三部分组成&#xff1a;H…

机器视觉-标定篇

3D结构光标定 结构光视觉的优点&#xff1a; 非接触、信息量大、测精度高、抗干扰能力强。 结构光视觉传感器参数的标定包括&#xff1a;摄像机参数标定、结构光平面参数标定。 结构光视觉测量原理图 我们不考虑镜头的畸变&#xff0c;将相机的成像模型简化为小孔成像模型…

ClickHouse面向列的数据库管理系统(原理简略理解)

目录 官网 什么是Clickhouse 什么是OLAP 面向列的数据库与面向行的数据库 特点 为什么面向列的数据库在OLAP场景中工作得更好 为什么ClickHouse这么快 真实的处理分析查询 OLAP场景的关键属性 引擎作用 ClickHouse引擎 输入/输出 CPU 官网 https://clickhouse.com…

Flink-CDC 抽取SQLServer问题总结

Flink-CDC 抽取SQLServer问题总结 背景 flink-cdc 抽取数据到kafka 中&#xff0c;使用flink-sql进行开发&#xff0c;相关问题总结flink-cdc 配置SQLServer cdc参数 1.创建CDC 使用的角色, 并授权给其查询待采集数据数据库 -- a.创建角色 create role flink_role;-- b.授权…

Direct3D融合技术

该技术能使我们将当前要进行光栅化的像素的颜色与先前已已光栅化并处于同一位置的像素的颜色进行合成&#xff0c;即将正在处理的图元颜色值与存储在后台缓存中的像素颜色值进行合成(混合)&#xff0c;利用该技术我们可得到各种各样的效果&#xff0c;尤其是透明效果。 在融合…

zookeeper未授权漏洞复现及处理

一、漏洞详情 Zookeeper是一个分布式的&#xff0c;开放源码的分布式应用程序协调服务&#xff0c;是Google的Chubby一个开源的实现&#xff0c;是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件&#xff0c;提供的功能包括&#xff1a;配置维护、域名服…

华为云云耀云服务器L实例评测|云耀云服务器L实例的购买及使用体验

华为云云耀云服务器L实例评测&#xff5c;云耀云服务器L实例的购买及使用体验 一、云耀云服务器L实例介绍1.1 云耀云服务器L实例简介1.2 云耀云服务器L实例特点1.3 云耀云服务器L实例使用场景 二、云耀云服务器L实例支持的镜像2.1 镜像类型2.2 系统镜像2.3 应用镜像 三、购买云…

前端悬浮菜单的实现方法及完整代码示例

前言 悬浮菜单作为网页设计中常见的交互元素&#xff0c;通常被用于展示常用功能或导航链接。 在前端开发领域中&#xff0c;我们可以利用纯CSS技术实现一个简单的悬浮菜单。 本文将详细介绍实现悬浮菜单的方法&#xff0c;并提供一个完整的代码示例。 实现方法 要实现一个悬浮…

nginx中的location指令用法

在Nginx中&#xff0c;location指令用于定义如何处理特定类型的请求。它通常用于定义不同的URL匹配规则和相应的处理方式。 以下是location指令的一般用法 location [|~|~*|^~] /uri/ {# 处理指令 }&#xff1a;表示精确匹配。只有当请求的URL与指定的URL完全匹配时&#xff…

docker学习:dockerfile和docker-compose

学习如何使用dockerfile 以下内容&#xff0c;部分来自gpt生成&#xff0c;里面的描述可能会出现问题&#xff0c;但代码部分&#xff0c;我都会进行测试。 1. 需求 对于一个docker&#xff0c;例如python&#xff0c;我们需要其在构建成容器时&#xff0c;就有np。有以下两种方…

Qt重写QTreeWidget实现拖拽

介绍 此文章记录QTreeWidget的重写进度&#xff0c;暂时停滞使用&#xff0c;重写了QTreeWidget的拖拽功能&#xff0c;和绘制功能&#xff0c;自定义了数据结构&#xff0c;增加复制&#xff0c;粘贴&#xff0c;删除&#xff0c;准备实现动态刷新数据支持千万数据动态刷新&a…

Prometheus-Rules 实战

文章目录 1 node rules2 nginx rule2.1 Nginx 4xx 错误率太多2.2 Nginx 5xx 错误率太多2.3 Nginx 延迟高 3 mysql rule3.1 MySQL 宕机3.2 实例连接数过多3.3 MySQL高线程运行3.4 MySQL 从服务器 IO 线程没有运行3.5 MySQL 从服务器 SQL 线程没有运行3.6 MySQL复制滞后3.7 慢查询…

天津专升本文化课考试计算机应用基础考试大纲(2023年9月修订)

天津市高等院校“高职升本科”招生统一考试计算机应用基础考试大纲&#xff08;2023年9月修订&#xff09; 一、考试性质 天津市高等院校“高职升本科”招生统一考试是由合格的高职高专毕业生参加的选拔性 考试。高等院校根据考生的成绩&#xff0c;按照已确定的招生计划&am…

transformer系列2---transformer架构详细解析

transformer详细解析 Encoder1 输入1.1 Embedding 词嵌入1.1.1 Embedding 定义1.1.2 几种编码方式对比1.1.3 实现代码 1.2 位置编码1.2.1 使用位置编码原因1.2.2 位置编码方式1.2.3 位置编码代码 2 注意力 Attention2.1 自注意力self-attention2.1.1 QKV含义2.1.2 自注意力公式…

Innodb底层原理与Mysql日志机制

MySQL内部组件结构 Server层 主要包括连接器、词法分析器、优化器、执行器等&#xff0c;涵盖 MySQL 的大多数核心服务功能&#xff0c;以及所有的内置函数&#xff08;如日期、时间、数学和加密函数等&#xff09;&#xff0c;所有跨存储引擎的功能都在这一层实现&#xff0c…

【Vue】路由与Node.js下载安装及环境配置教程

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Vue快速入门》。&#x1f3af;&#x1f3af; &…

无涯教程-JavaScript - COUNT函数

描述 COUNT函数计算包含数字的单元格的数量,并计算参数列表中的数字。使用COUNT函数获取在数字范围或数字数组中的数字字段中的条目数。 语法 COUNT (value1, [value2] ...)争论 Argument描述Required/Optionalvalue1The first item, cell reference, or range within whic…

React TypeScript | 快速了解 antd 的使用

1. 安装&#xff1a; 就像安装其他插件库一样&#xff0c;在项目文件夹下执行&#xff1a; npm install antd --save如果你安装了 yarn&#xff0c;也可以执行&#xff1a; yarn add antd2. 引用 import { Button, Tooltip } from "antd"; import "antd/dis…

第三、四、五场面试

第三场 共享屏幕做题&#xff08;三道简单题&#xff09; 替换空格成%20&#xff08;双指针&#xff09; 删除升序链表中的重复元素&#xff08;指针&#xff09;有效的括号&#xff08;栈&#xff09; 第四场、第五场 自我介绍 项目拷打 整个项目架构rpc模块的情况分析的数…