JDK-CompletableFuture

归档

  • GitHub: JDK-CompletableFuture

使用示例

  • https://github.com/zengxf/small-frame-demo/blob/master/jdk-demo/simple-demo/src/main/java/test/new_features/jdk1_8/juc/TestCompletableFuture.java
  • 基础方法使用测试:testThenApply2()

JDK 版本

openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)

原理

类结构

  • java.util.concurrent.CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {volatile Object result;         // 结果或封装的异常volatile Completion stack;      // 依赖操作的栈顶 (组装单向链表)
}
  • java.util.concurrent.CompletableFuture.Completion
    static abstract class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask{volatile Completion next;   // 组装单向链表// ------ 方法定义 ------/** 触发:执行完成操作,返回可能需要传播的依赖项(如果存在)。 */abstract CompletableFuture<?> tryFire(int mode);abstract boolean isLive();  // 判断是否可触发public final void run()                { tryFire(ASYNC); }  // Runnablepublic final Void getRawResult()       { return null; }     // ForkJoinTaskpublic final void setRawResult(Void v) {}                   // ForkJoinTaskpublic final boolean exec()            { tryFire(ASYNC); return false; } }
  • java.util.concurrent.CompletableFuture.AsynchronousCompletionTask
    // 异步任务标识接口(无其他定义)public static interface AsynchronousCompletionTask {}
  • java.util.concurrent.CompletableFuture.AsyncSupply
    // sign_c_030 异步生成数据static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<? extends T> fn; // 数据提供者AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }   // ForkJoinTaskpublic final void setRawResult(Void v) {}           // ForkJoinTaskpublic final boolean exec() { run(); return false; }// ForkJoinTask}
  • java.util.concurrent.CompletableFuture.UniApply
    // sign_c_040static final class UniApply<T, V> extends UniCompletion<T, V> {Function<? super T, ? extends V> fn;UniApply( // sign_cm_050Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T, ? extends V> fn) {super(executor, dep, src); this.fn = fn;}}
  • java.util.concurrent.CompletableFuture.UniCompletion
    abstract static class UniCompletion<T,V> extends Completion {Executor executor;                 // 要使用的执行器(如果没有则为 null)CompletableFuture<V> dep;          // 要完成的依赖项CompletableFuture<T> src;          // 行动来源UniCompletion(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src) {this.executor = executor; this.dep = dep; this.src = src;}final boolean isLive() { return dep != null; }}
  • java.util.concurrent.CompletableFuture.UniAccept
    // sign_c_060static final class UniAccept<T> extends UniCompletion<T, Void> {Consumer<? super T> fn;... // 构造器类似:UniApply, ref: sign_cm_050}
  • java.util.concurrent.CompletableFuture.UniRun
    // sign_c_070static final class UniRun<T> extends UniCompletion<T, Void> {Runnable fn;... // 构造器类似:UniApply, ref: sign_cm_050}

初始链

supplyAsync()
  • java.util.concurrent.CompletableFuture
    // 调用入口,ref: sign_demo_010public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {...CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));    // 进行执行数据生成,ref: sign_c_030 | sign_m_110return d;}
  • java.util.concurrent.CompletableFuture.AsyncSupply
        // sign_m_110 数据生成public void run() {CompletableFuture<T> d; Supplier<? extends T> f;if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;if (d.result == null) {try {d.completeValue(f.get());   // 获取数据并填充结果} catch (Throwable ex) {d.completeThrowable(ex);    // 出错时,封装异常填充结果}}d.postComplete();   // 传递给后面依赖项,ref: sign_m_310}}
thenApply()
  • java.util.concurrent.CompletableFuture
    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}// sign_m_210private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {...Object r;if ((r = result) != null)return uniApplyNow(r, e, f);CompletableFuture<V> d = newIncompleteFuture();unipush(new UniApply<T,V>(e, d, this, f));  // ref: sign_m_230 | sign_c_040return d;}public <U> CompletableFuture<U> newIncompleteFuture() {return new CompletableFuture<U>();}// sign_m_230final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {if (result != null) {NEXT.set(c, null);  // 相当于:c.next = null;break;}}if (result != null)c.tryFire(SYNC);        // 有结果就直接触发下级执行}}final boolean tryPushStack(Completion c) {Completion h = stack;NEXT.set(c, h);                         // 相当于:c.next = stack;return STACK.compareAndSet(this, h, c); // 相当于:stack = c;}
thenApplyAsync()
  • java.util.concurrent.CompletableFuture
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn); // ref: sign_m_210}
thenAccept()
  • java.util.concurrent.CompletableFuture
    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {...Object r;if ((r = result) != null)return uniAcceptNow(r, e, f);CompletableFuture<Void> d = newIncompleteFuture();unipush(new UniAccept<T>(e, d, this, f));   // ref: sign_m_230 | sign_c_060return d;}
thenRun()
  • java.util.concurrent.CompletableFuture
    public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {...Object r;if ((r = result) != null)return uniRunNow(r, e, f);CompletableFuture<Void> d = newIncompleteFuture();unipush(new UniRun<T>(e, d, this, f));  // ref: sign_m_230 | sign_c_070return d;}
链结构
// dep (new CF)// CompletableFuture (dep.stack)
AsyncSupply-1 -> UniApply-2 -> UniApply-3 -> UniAccept -> UniRun// UniCompletion(next & src)
UniRun -> UniAccept -> UniApply-3 -> UniApply-2 -> AsyncSupply-1

调用链

postComplete()
  • java.util.concurrent.CompletableFuture
    // sign_m_310 弹出并尝试触发所有可到达的依赖项final void postComplete() {CompletableFuture<?> f = this; Completion h;while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {...f = (d = h.tryFire(NESTED)) == null ? this : d; // 触发具体操作逻辑}}}
UniApply
  • java.util.concurrent.CompletableFuture.UniApply
        final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null;tryComplete: if (d.result == null) {... // 源异常处理try {if (mode <= 0 && !claim())          // ref: sign_m_325return null;                    // 如果判断为异步执行,则进入此逻辑else {T t = (T) r;                    // 源的结果d.completeValue(f.apply(t));    // 调用 Function 转换并设置结果}} ... // catch}src = null; dep = null; fn = null;return d.postFire(a, mode); // 传给下一项}
  • java.util.concurrent.CompletableFuture.UniCompletion
        // sign_m_325// 如果操作可以运行,则返回 true (相当于没设置线程池,不用异步执行)final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag(0, 1)) {   // 一般 CAS 成功,进入此逻辑if (e == null)return true;    // 没有设置线程池,表示同步执行executor = null;    // 置空,防止死循环e.execute(this);    // 异步执行}return false;}
UniAccept
  • java.util.concurrent.CompletableFuture.UniAccept
        final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<T> a;Object r; Throwable x; Consumer<? super T> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null;tryComplete: if (d.result == null) {... // 源异常处理try {if (mode <= 0 && !claim())return null;else {T t = (T) r;f.accept(t);        // 调用 Consumer 消费上游结果d.completeNull();}} ... // catch}src = null; dep = null; fn = null;return d.postFire(a, mode); // 传给下一项}
UniRun
  • java.util.concurrent.CompletableFuture.UniRun
        final CompletableFuture<Void> tryFire(int mode) {... // 类似 UniAccept 处理f.run();        // 调用 Runnable 运行d.completeNull();...}

两者组合

acceptEither()
  • 两个只要有一个完成,就传递给下游

  • java.util.concurrent.CompletableFuture.OrAccept

    static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> {final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<? extends T> a, b;Object r; Throwable x; Consumer<? super T> f;if ((a = src) == null || (b = snd) == null|| ((r = a.result) == null && (r = b.result) == null)   // 只要有一个不为 null,就算完成|| (d = dep) == null || (f = fn) == null)return null;... // 类似 UniAccept 处理f.accept(t);        // 调用 Consumer 消费上游结果d.completeNull();...}}
thenAcceptBoth()
  • 两个必须都完成,才传递给下游

  • java.util.concurrent.CompletableFuture.BiAccept

    static final class BiAccept<T, U> extends BiCompletion<T, U, Void> {final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d;  CompletableFuture<T> a;  CompletableFuture<U> b;Object r, s; BiConsumer<? super T,? super U> f;if (   (a = src) == null || (r = a.result) == null|| (b = snd) == null || (s = b.result) == null|| (d = dep) == null || (f = fn) == null|| !d.biAccept(r, s, f, mode > 0 ? null : this)) // r, s 都不为空,才进入此,ref: sign_m_510return null;...}}
  • java.util.concurrent.CompletableFuture
    // sign_m_510final <R, S> boolean biAccept(Object r, Object s,BiConsumer<? super R, ? super S> f,BiAccept<R, S> c) {...if (result == null) {... // 源异常处理try {if (c != null && !c.claim())return false;R rr = (R) r;S ss = (S) s;f.accept(rr, ss);       // 调用 BiConsumer 消费上游结果completeNull();} ... // catch}return true;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/44919.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 异步编程Invoke、beginInvoke、endInvoke的用法和作用

C# 异步编程Invoke、beginInvoke、endInvoke的用法和作用 一、Invoke Invoke的本质只是一个方法&#xff0c;方法一定是要通过对象来调用的。 一般来说&#xff0c;Invoke其实用法只有两种情况&#xff1a; Control的Invoke Delegate的Invoke 也就是说&#xff0c;Invoke前…

C++:创建线程

在C中创建线程&#xff0c;最直接的方式是使用C11标准引入的<thread>库。这个库提供了std::thread类&#xff0c;使得线程的创建和管理变得简单直接。 以下是一个简单的示例&#xff0c;展示了如何在C中使用std::thread来创建和启动线程&#xff1a; 示例1&#xff1a;…

Python基础教学之五:异常处理与文件操作——让程序更健壮

Python基础教学之五&#xff1a;异常处理与文件操作——让程序更健壮 一、异常处理概念 1. 理解异常 异常是程序运行中发生的错误或意外情况&#xff0c;比如除以零、访问不存在的列表元素等。如果不进行处理&#xff0c;异常会导致程序终止运行。在编程过程中&#xff0c;我…

【YOLOv8】 用YOLOv8实现数字式工业仪表智能读数(二)

上一篇圆形表盘指针式仪表的项目受到很多人的关注&#xff0c;咱们一鼓作气&#xff0c;把数字式工业仪表的智能读数也研究一下。本篇主要讲如何用YOLOV8实现数字式工业仪表的自动读数&#xff0c;并将读数结果进行输出&#xff0c;若需要完整数据集和源代码可以私信。 目录 &…

android 消除内部保存的数据

在Android中&#xff0c;有多种方式可以消除应用内部保存的数据。这些数据可能存储在SharedPreferences、SQLite数据库、文件&#xff08;包括缓存文件&#xff09;或Content Providers中。以下是几种常见的方法来消除这些数据&#xff1a; SharedPreferences&#xff1a; 要删…

Spring AOP的几种实现方式

1.通过注解实现 1.1导入依赖 <dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.1.6.RELEASE</version></dependency> 1.2定义注解 import java.lang.annotation.*;Targ…

初识Laravel(Laravel的项目搭建)

初识Laravel&#xff08;Laravel的项目搭建&#xff09; 一、项目简单搭建&#xff08;laravel&#xff09;1.首先我们确保使用国内的 Composer 加速镜像&#xff08;[加速原理](https://learnku.com/php/wikis/30594)&#xff09;&#xff1a;2.新建一个名为 Laravel 的项目&a…

简过网:“三支一扶”这些政策你知道吗?

你好小编&#xff0c;我最近打算备考三支一扶&#xff0c;能介绍一些关于三支一扶的相关知识吗&#xff1f; 为了让大家更好的了解三支一扶&#xff0c;下面这篇文章&#xff0c;小编以问答的方式给大家介绍&#xff0c;希望能够帮助到你&#xff01; 1、什么是三支一扶&#…

电脑 DNS 缓存是什么?如何清除?

DNS&#xff08;Domain Name System&#xff0c;域名系统&#xff09;是互联网的重要组成部分&#xff0c;负责将人类易记的域名转换为机器可读的 IP 地址&#xff0c;从而实现网络通信。DNS 缓存是 DNS 系统中的一个关键机制&#xff0c;通过临时存储已解析的域名信息&#xf…

第一讲 基础算法

#快速排序 #include <iostream> using namespace std; const int N 1e610; int n; int q[N]; void quick_sort(int q[],int l,int r){ if(l>r)return ; int x q[l],i l-1,j r1; while(i<j){do i ;while(q[i]<x);do j--;while(q[j] > x);if(i<j)swap(…

消息队列必知必会-RabbitMQ

文章目录 RabbitMQ是什么&#xff1f;有什么特点&#xff1f;RabbitMQ架构RabbitMQ消息消费过程如何保证消息不丢失&#xff1f;可靠性传输&#xff1f;生产者丢失了数据RabbitMQ&#xff08;broker&#xff09;丢失了数据消费端丢失数据 顺序消息错乱场景解决方案 高可用普通集…

通过SSH和VNC远程连接Centos7(转载)解决linux远程SSH连接

学习笔记&#xff0c;请支持原作者 原作&#xff1a; 通过SSH和VNC远程连接Centos7-腾讯云开发者社区-腾讯云 如下附上我的远程工具putty 链接&#xff1a;https://pan.baidu.com/s/1bOhhn8uXLb9pNr-pUKcK-w?pwdnzxy 提取码&#xff1a;nzxy 学习笔记&#xff0c;请支持原…

Oracle透明数据加密:数据泵文件导出

不带加密的数据泵导出 先给hr用户赋予DATA_PUMP_DIR的读写权限&#xff1a; SQL> grant read, write on directory DATA_PUMP_DIR to hr;Grant succeeded.expdp导出配置文件&#xff1a; $ cat exp.par DIRECTORYDATA_PUMP_DIR DUMPFILEdataonly.dmp CONTENTDATA_ONLY导出…

【vueUse库Reactivity模块各函数简介及使用方法--下篇】

vueUse库是一个专门为Vue打造的工具库,提供了丰富的功能,包括监听页面元素的各种行为以及调用浏览器提供的各种能力等。其中的Browser模块包含了一些实用的函数,以下是这些函数的简介和使用方法: vueUse库Sensors模块各函数简介及使用方法 vueUseReactivity函数1. refWith…

手写函数柯里化示例

今天记录一个经典的手写面试题&#xff0c;手写函数的柯里化示例。 先说一下函数的柯里化是什么样的效果&#xff0c;举个例子&#xff0c;对于计算长方体体积的函数&#xff0c;需要传长、宽、高三个变量&#xff0c;那么可以通过&#xff08;fun为函数名&#xff09; fun(a,…

git提交大文件服务500

错误如图 需保证git服务端能接收大文件 修改项目下.git文件中的config文件&#xff0c;加入 [http] postBuffer 524288000

力扣 160相聚链表

注意 判断是否有交点 用while(A! B) 其中A A nullptr? headb:A->next;B同理 注意&#xff0c;while循环的退出条件是AB指针指向同一个&#xff0c;如果没有相交&#xff0c;仍然可以退出 当AB都为NULLPTR时退出

【信创】信创云规划设计建设方案(2024PPT原件)

信创&#xff0c;即“信息技术应用创新”。我国自主信息产业聚焦信息技术应用创新&#xff0c;旨在通过对IT硬件、软件等各个环节的重构&#xff0c;基于我国自有IT底层架构和标准&#xff0c;形成自有开放生态&#xff0c;从根本上解决本质安全问题&#xff0c;实现信息技术可…

【笔试常见编程题06】最近公共祖先、求最大连续bit数、二进制插入、查找组成一个偶数最接近的两个素数

1. 最近公共祖先 将一棵无穷大满二叉树的结点按根结点一层一层地从左往右编号&#xff0c;根结点编号为1。现给定a&#xff0c;b为两个结点。设计一个算法&#xff0c;返回a、b最近的公共祖先的编号。注意其祖先也可能是结点本身。 测试样例&#xff1a; 2&#xff0c;3 返回&a…

Airtest成功案例分享:KLab连续2年携Airtest私有云产品参加CEDEC大会!

一、KLab株式会社介绍 KLab株式会社是一家位于日本的移动游戏开发公司&#xff0c;成立于2000年。公司以开发和运营基于动漫和漫画IP的手机游戏而闻名&#xff0c;尤其是在音乐节奏游戏领域。KLab的一些知名作品包括《LoveLive!学园偶像祭》、《排球少年&#xff1a;新的征程》…