CompletableFuture原理与实践

CompletableFuture原理与实践

前言

本文源自微博客(www.microblog.store),且以获得授权

1、CompletableFuture是什么

CompletableFuture是Java 8引入的一个用于处理异步编程的类。它提供了一种方便的方式来执行异步操作并处理异步任务的结果。CompletableFuture可以用于执行异步计算、处理任务的结果、组合多个异步任务等。

2、CompletableFuture的特性

  1. 异步执行任务: 你可以使用CompletableFuture来执行异步任务,这样你的代码可以继续执行其他操作,而不需要等待任务完成。
  2. 任务链式处理: CompletableFuture支持链式调用,你可以通过一系列的方法调用来处理异步任务的结果,从而实现更加复杂的任务流程。
  3. 组合多个任务: 你可以使用thenCombinethenCompose等方法将多个CompletableFuture的结果进行组合,从而实现多个异步任务之间的协作。
  4. 异常处理: 你可以使用exceptionallyhandle等方法来处理异步任务中的异常情况。
  5. 等待任务完成: CompletableFuture提供了get方法来等待任务的完成并获取结果,但也可以配合其他方法来进行更加灵活的等待操作。
  6. 并行处理: CompletableFuture可以结合Executor来实现并行处理任务,从而充分利用多核处理器的性能。
  7. 异步计算: 除了处理IO密集型任务,CompletableFuture还可以用于执行异步计算任务,利用多线程进行计算操作。

3、CompletableFuture和Future

CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。

  • Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。
  • CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排。

4、从FutureTask开始

package com.xx.thread;import java.util.concurrent.*;/*** @Author: wxz* @Date: 2023/8/20 21:43*/
public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println("-----come in FutureTask");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextInt(100) + "";});Thread t1 = new Thread(futureTask, "t1");t1.start();//3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,会导致阻塞)
//        System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());//3秒钟后才出来结果,我只想等待1秒钟,超过1秒钟就不等了
//        System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get(1L, TimeUnit.SECONDS));}
}

get()阻塞 一旦调用get()方法,不管是否计算完成都会导致阻塞

package com.xx.thread;import java.util.concurrent.*;/*** @Author: wxz* @Date: 2023/8/20 21:43*/
public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println("-----come in FutureTask");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "" + ThreadLocalRandom.current().nextInt(100);});new Thread(futureTask, "t1").start();System.out.println(Thread.currentThread().getName() + "\t" + "线程完成任务");// 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果while (true) {if (futureTask.isDone()) {System.out.println(futureTask.get());break;}}}
}

轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果

如果想要异步获取结果,通常都会以轮询的方式去获取结果 尽量不要阻塞

  • 任务性质: 考虑任务的性质,如果任务需要监控中间状态,轮询可能更适合;如果任务完成后需要获取最终结果,阻塞等待可能更合适。
  • 响应性要求: 考虑是否需要保持主线程的响应性。轮询方式可以在不阻塞主线程的情况下持续监控,而阻塞等待会阻塞当前线程。
  • 超时和定时: 如果你需要实现超时处理或定时任务,轮询方式更具优势。阻塞等待可能需要额外的超时机制。

5、对Future的改进

1、类CompletableFuture

  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口

2、接口CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如: stage.thenApply(x -> square(x).thenAccept(x -> System.out.print(x)).thenRun(0) ->Svstem.out.printIn0))
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

6、核心的四个静态方法

1、runAsync 无 返回值

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)  

2、supplyAsync 有 返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

7、无返回值Code

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----task is over");});System.out.println(future.get());}
}
ForkJoinPool.commonPool-worker-1	-----come in
-----task is over
null

8、有返回值Code

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextInt(100);});System.out.println(completableFuture.get());}
}

9、回调方法

从Java8开始引入了CompletableFuture,它是Future的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");int result = ThreadLocalRandom.current().nextInt(10);//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----计算结束耗时1秒钟,result: " + result);if (result > 6) {int age = 10 / 0;}return result;}).whenComplete((v, e) -> {if (e == null) {System.out.println("-----result: " + v);}}).exceptionally(e -> {System.out.println("-----exception: " + e.getCause() + "\t" + e.getMessage());return -44;});System.out.println(completableFuture.get());}
}

异步任务结束时,会自动回调某个对象的方法;

异步任务出错时,会自动回调某个对象的方法;

10、join和get对比

get会抛出异常,join不需要

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");int result = ThreadLocalRandom.current().nextInt(10);//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----计算结束耗时1秒钟,result: " + result);if (result > 6) {int age = 10 / 0;}return result;}).whenComplete((v, e) -> {if (e == null) {System.out.println("-----result: " + v);}}).exceptionally(e -> {System.out.println("-----exception: " + e.getCause() + "\t" + e.getMessage());return -44;});System.out.println(completableFuture.join());}
}

11、一个简单的例子-电商网站的比价

切记,功能→性能

经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。同理, 对于分布式微服务的调用,按照实际业务,如果是无关联step by step的业务,可以尝试是否可以多箭齐发,同时调用。我们去比同一个商品在各个平台上的价格,要求获得一个清单列表, 1 step by step,查完京东查淘宝,查完淘宝查天猫…

package com.xx.cf;import lombok.Getter;
import org.springframework.util.StopWatch;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Author: wxz* @Date: 2023/8/28 15:07*/
public class T1 {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("tmall"),new NetMall("pdd"),new NetMall("mi"));public static List<String> findPriceSync(List<NetMall> list, String productName) {return list.stream().map(mall -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName))).collect(Collectors.toList());}public static List<String> findPriceASync(List<NetMall> list, String productName) {return list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {StopWatch stopWatch1 = new StopWatch();stopWatch1.start();List<String> list1 = findPriceSync(list, "thinking in java");for (String element : list1) {System.out.println(element);}stopWatch1.stop();System.out.println("----costTime: " + stopWatch1.getTotalTimeMillis() + " 毫秒");StopWatch stopWatch2 = new StopWatch();stopWatch2.start();List<String> list2 = findPriceASync(list, "thinking in java");for (String element : list2) {System.out.println(element);}stopWatch2.stop();System.out.println("----costTime: " + stopWatch2.getTotalTimeMillis() + " 毫秒");}
}class NetMall {@Getterprivate String netMallName;public NetMall(String netMallName) {this.netMallName = netMallName;}public double getPriceByName(String productName) {return calcPrice(productName);}private double calcPrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble() + productName.charAt(0);}
}

12、CompletableFuture常用方法

1、获得结果和触发计算

获取结果

// 不见不散
public T get()// 过时不候
public T get(long timeout, TimeUnit unit)// 没有计算完成的情况下,给我一个替代结果 
// 立即获取结果不阻塞 计算完,返回计算完成后的结果  没算完,返回设定的valueIfAbsent值
public T getNow(T valueIfAbsent)public T join()
package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 903;});// 去掉注释上面计算没有完成,返回914// 开启注释上满计算完成,返回计算结果
//        try {
//            TimeUnit.SECONDS.sleep(2);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }System.out.println(completableFuture.getNow(914));}
}
package com.xx.cf;import java.util.concurrent.CompletableFuture;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) {System.out.println(CompletableFuture.supplyAsync(() -> "abc").thenApply(r -> r + "123").join());}
}

主动触发计算

// 是否打断get方法立即返回括号值
public boolean complete(T value)
package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 903;});// 注释掉暂停线程,get还没有算完只能返回complete方法设置的 914;暂停2秒钟线程,异步线程能够计算完成返回get
//        try {
//            TimeUnit.SECONDS.sleep(2);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }//当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.System.out.println(completableFuture.complete(914) + "\t" + completableFuture.get());}
}

2、对计算结果进行处理

thenApply(Function fn)

  • 输入参数:上一阶段的任务结果类型为 T。
  • 返回值:新阶段的任务结果类型为 U。
  • 功能:对上一阶段的任务结果进行转换操作,并返回一个新的 CompletableFuture 对象。

计算结果存在依赖关系,这两个线程串行化
由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1024;}).thenApply(f -> {System.out.println("222");return f + 1;}).thenApply(f -> {//int age = 10/0; // 异常情况:那步出错就停在那步。System.out.println("333");return f + 1;}).whenCompleteAsync((v, e) -> {System.out.println("*****v: " + v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}
}
handle
// 有异常也可以往下一步走,根据带的异常参数可以进一步处理
package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1024;}).handle((f, e) -> {int age = 10 / 0;System.out.println("222" + f);return f + 1;}).handle((f, e) -> {System.out.println("333" + f);return f + 1;}).whenCompleteAsync((v, e) -> {System.out.println("*****v: " + v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}
}

当一个异常发生时,handle 方法会执行异常处理的逻辑,然后它会将返回值设置为 null,表示处理后的结果。这就是为什么在异常发生后,f 的值会变成 null

具体来说,在你的代码中,第一个 handle 方法中有一个除以零的操作 int age = 10 / 0;,这会引发一个异常。因此,异常发生时,handle 方法的异常处理逻辑会被执行,异常捕获后,f 被设置为 null,然后异常继续传递到下一个 handle 方法。在第二个 handle 方法中,f 的值仍然是 null,所以你会看到输出 “333null”。

whenCompleteAsync 方法中,vhandle 方法的返回值(即 f 的值),在异常发生时为 null,因此你会看到 “v: null”。

whenComplete 和 whenCompleteAsync的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务

whenCompleteAsync: 是执行把whenCompleteAsync 这个任务继续提交给线程池来进行执行。

3、对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenApply(f -> {return f + 4;}).thenAccept(r -> System.out.println(r));}
}

任务之间的顺序执行

thenRun
thenRun(Runnable runnable)
// 任务 A 执行完执行 B,并且 B 不需要 A 的结果thenAccept
thenAccept(Consumer action)
// 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值thenApply
thenApply(Function fn)
// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());}
}

4、对计算速度进行选用

applyToEither:谁快用谁

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 20;});CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2, f -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return f + 1;});System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());}
}

5、对计算结果进行合并

两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理

先完成的先等着,等待其它分支任务

thenCombine

package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 20;});CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return x + y;});System.out.println(thenCombineResult.get());}
}
package com.xx.cf;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @Author: wxz* @Date: 2023/8/28 14:56*/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");return 20;}), (x, y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");return x + y;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");return 30;}), (a, b) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");return a + b;});System.out.println("-----主线程结束,END");System.out.println(thenCombineResult.get());// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}
}

13、CompletableFuture结合线程池使用

1、查询多次再聚合

@GetMapping("/queryCustomerInfo")
@Operation(summary = "CompletableFuture+自定义线程池分批查询")
public List<Integer> queryCustomerInfo() {List<Integer> result = new ArrayList<>();CompletableFuture<List<Integer>> customerInfoFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "1");List<Integer> customerInfo1 = this.getCustomerInfo1();return customerInfo1;}, threadPoolTaskExecutor);CompletableFuture<List<Integer>> scoreFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "2");List<Integer> customerInfo2 = this.getCustomerInfo2();return customerInfo2;}, threadPoolTaskExecutor);CompletableFuture<List<Integer>> orderFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "3");List<Integer> customerInfo3 = this.getCustomerInfo3();return customerInfo3;}, threadPoolTaskExecutor);//等待所有任务完成CompletableFuture.allOf(customerInfoFuture, scoreFuture, orderFuture).join();Stream.of(customerInfoFuture, scoreFuture, orderFuture).map(CompletableFuture::join).collect(Collectors.toList()).forEach(result::addAll);return result;
}public List<Integer> getCustomerInfo1() {try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}return Arrays.asList(1, 2, 3);
}public List<Integer> getCustomerInfo2() {try {TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}return Arrays.asList(4, 5);
}public List<Integer> getCustomerInfo3() {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return Arrays.asList(6, 7);
}

2、Stream流的方式对数据进行分批次处理

@GetMapping("/doLimitByStream")
@Operation(summary = "CompletableFuture+自定义线程池分批查询 Stream流的方式对数据进行分批次处理")
public void doLimitByStream() {// 1 数据总量集合List<Integer> list = queryCustomerInfo();// 2 限流分批,打算每次处理几条或者几页,开发自己设定,支持第3方分批限流int pageSize = 5;//可以配置// 3int pages = (int) Math.ceil(list.size() * 1.0 / pageSize);for (int i = 1; i <= pages; i++) {List<Integer> batchList = list.stream().skip(pageSize * (i - 1)).limit(pageSize).collect(Collectors.toList());System.out.println("batchList: " + i + " = " + batchList);}
}

3、Guava的方式对数据进行分批次处理

@GetMapping("/doLimitByGuava")
@Operation(summary = "CompletableFuture+自定义线程池分批查询 Guava的方式对数据进行分批次处理")
public void doLimitByGuava() {//1 数据总量集合List<Integer> list = queryCustomerInfo();int pageSize = 3; // 可以配置if (list != null) {List<List<Integer>> partition = Lists.partition(list, pageSize);for (List<Integer> integerList : partition) {System.out.println(integerList);}}
}

14、线程池循环引用会导致死锁

package com.xx.cf;import java.util.concurrent.*;/*** @Author: wxz* @Date: 2023/9/6 20:24*/
public class CfCyclic {public static void main(String[] args) {ExecutorService threadPool1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {System.out.println("1111");//do sthreturn CompletableFuture.supplyAsync(() -> {System.out.println("child");return "child";}, threadPool1).join();//子任务}, threadPool1);cf1.join();threadPool1.shutdown();}
}
  1. 在第一个 CompletableFuture 中,你使用了 CompletableFuture.supplyAsync 来执行一个任务,并在其中又创建了一个子任务。这个子任务也使用了相同的线程池 threadPool1 来执行。
  2. 在第一个 CompletableFuture 中,你调用了 join() 方法来等待任务的完成。这将阻塞当前线程,直到 CompletableFuture 完成。
  3. 在第一个 CompletableFuture 中,你的任务返回了另一个 CompletableFuture,然后立即调用了 join() 方法等待它的完成。
  4. 子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复
package com.xx.cf;import java.util.concurrent.*;/*** @Author: wxz* @Date: 2023/9/6 20:24*/
public class CfCyclic {public static void main(String[] args) {ExecutorService threadPool1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));ExecutorService threadPool2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {System.out.println("1111");//do sthreturn CompletableFuture.supplyAsync(() -> {System.out.println("child");return "child";}, threadPool2).join();//子任务}, threadPool1);cf1.join();threadPool1.shutdown();threadPool2.shutdown();}
}
package com.xx.cf;import java.util.concurrent.*;/*** @Author: wxz* @Date: 2023/9/6 20:24*/
public class CfCyclic {public static void main(String[] args) {ExecutorService threadPool1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));ExecutorService threadPool2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println("1111");// 执行主任务逻辑return "result";}, threadPool1).thenCompose(result -> CompletableFuture.supplyAsync(() -> {System.out.println("child");// 执行子任务逻辑,使用不同的线程池return "child";}, threadPool2));cf1.join();threadPool1.shutdown();threadPool2.shutdown();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/30351.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无监督医学图像翻译与对抗扩散模型| 文献速递-深度学习结合医疗影像疾病诊断与病灶分割

Title 题目 Unsupervised Medical Image Translation With Adversarial Diffusion Models 无监督医学图像翻译与对抗扩散模型 01 文献速递介绍 多模态成像对于全面评估人体解剖结构和功能至关重要[1]。通过各自模态捕获的互补组织信息&#xff0c;有助于提高诊断准确性并改…

换位置(C++)

问题描述 体育课上&#xff0c;有一个班级的同学站成了一队&#xff0c;体育老师请最高的和最矮的两位同学调换一下位置&#xff0c;其余的同学不要动&#xff0c;请编程实现&#xff01;&#xff08;假设所有人的高矮都是不一样的&#xff09; 输入 第一行有一个整数 &…

NSSCTF-Web题目10

目录 [强网杯 2019]随便注 1、题目 2、知识点 3、思路 [GXYCTF 2019]BabyUpload 1、题目 2、知识点 3、思路 [强网杯 2019]随便注 1、题目 2、知识点 数据库注入&#xff0c;堆叠注入&#xff0c;更改表名 3、思路 正常提交查询&#xff0c;看看数据回显 加入单引号…

api-ms-win-crt-runtime-l1-1-0.dll文件丢失的情况要怎么处理?比较靠谱的多种修复方法分享

遇到api-ms-win-crt-runtime-l1-1-0.dll文件丢失的情况实际上是一个常见问题&#xff0c;解决此类问题存在多种方法。首先我们先来了解一下api-ms-win-crt-runtime-l1-1-0.dll文件吧&#xff0c;只有了解了我们才知道怎么去解决这个api-ms-win-crt-runtime-l1-1-0.dll文件丢失的…

flutter 打包 exe

采用官方的MSIX打包 原文链接 https://blog.csdn.net/weixin_44786530/article/details/135308360 第一步&#xff1a;安装依赖 在项目根目录&#xff0c;执行命令&#xff1a; flutter pub add --dev msix 等待安装完成&#xff0c;就好了 第二步&#xff1a;打包编译 当m…

LVGL开发教程-img图片

系列文章目录 知不足而奋进 望远山而前行 目录 系列文章目录 文章目录 前言 1. 显示静态图像 2. 显示动态图像gif 3. 文件系统使用 总结 前言 在嵌入式系统中&#xff0c;使用LVGL&#xff08;Light and Versatile Graphics Library&#xff09;显示静态和动态图像是一…

Python使用抽象工厂模式和策略模式的组合实现生成指定长度的随机数

设计模式选择理由&#xff1a; 抽象工厂模式&#xff1a; 抽象工厂模式适合于创建一组相关或依赖对象的场景。在这里&#xff0c;我们可以定义一个抽象工厂来创建不同类型&#xff08;数字、字母、特殊符号&#xff09;的随机数据生成器。 策略模式&#xff1a; 策略模式允许你…

索引与书架、新华字典的爱恨情仇

在MySQL的索引世界中&#xff0c;性能优化一直是开发者们关注的焦点。而索引&#xff0c;作为提升查询速度的关键技术之一&#xff0c;是非常重要的。索引根据存储类型可以分为聚簇索引(聚集)与非聚簇索引(非聚集)&#xff0c;它们决定了数据在磁盘上的存储方式和查询时的访问路…

HANA 自动生成年月维度,指定起始即可生成

官方指导文档&#xff1a;SERIES_GENERATE Function (Series Data) | SAP Help Portal select * from SERIES_GENERATE_DATE(INTERVAL 1 MONTH, 2024-01-01, 2024-12-01) 以下示例生成范围从1999-01-01到1999-01-02的一系列时间戳&#xff0c;该时间戳以30秒的间隔递增&#…

重生之 SpringBoot3 入门保姆级学习(24、场景整合 kafka 消息发送服务)

重生之 SpringBoot3 入门保姆级学习&#xff08;24、场景整合 kafka 消息发送服务&#xff09; 6.4 消息发送服务 6.4 消息发送服务 访问 kafka-ui &#xff08;注意这里需要换成你自己的服务器或者虚拟机的 IP 地址&#xff0c;虚拟机可以用局域网 192.168.xxx.xxx 的地址&…

引领未来建筑潮流:轻空间设计团队打造“淄博珍珠”

作为国内单体最大的气膜会展场馆&#xff0c;“淄博珍珠”自四年前启用以来&#xff0c;已成为淄博市的重要地标和经济引擎。该场馆首次亮相于第二十届中国&#xff08;淄博&#xff09;国际陶瓷博览会&#xff0c;凭借其独特的设计和先进的建筑理念&#xff0c;吸引了社会各界…

机器人、人工智能相关领域 news/events (专栏目录)

Some Insights 一些机器人、人工智能或相关领域的news、events等 专栏直达链接 1. 智能制造 - 你需要了解的 10 个基本要素 2. 现实世界中的人工智能&#xff1a;工业制造的 4 个成功案例研究 3. 企业使用人工智能情况调查 4. 未来工厂中的人工智能&#xff1a;人工智能加…

A comprehensive review of machine learning-based models for fake news detection

Abstract 互联网在假新闻传播中的作用使其成为一个严重的问题&#xff0c;需要复杂的技术来自动检测。为了应对 Facebook、Twitter、Instagram 和 WhatsApp 等社交媒体网站上误导性材料的快速传播&#xff0c;本研究探索了深度学习方法和各种分类策略领域。该研究特别调查了基…

Linux安装kvm虚拟机

kvm是基于内核的虚拟机&#xff0c;为什么要用kvm不用vmware、virtual box… 只有一个原因&#xff0c;它非常快&#xff01;本机使用linux开发也是因为它快&#xff01;linux在老电脑上都能流畅运行&#xff0c;更别说现代电脑&#xff0c;如果你觉得装Linux并没有比win快多少…

meson构建入门

Meson 是一个现代的开源构建系统&#xff0c;旨在提高编译速度和使得构建配置文件更易读&#xff0c;类似的构建系统有 Makefile、CMake、automake …。 Meson 是一个由 Python 实现的开源项目&#xff0c;其思想是&#xff0c;开发人员花费在构建调试上的每一秒都是浪费&#…

一文带你搞定Linux开发环境配置

Linux开发必备 万事先更新 sudo apt update && sudo apt upgrade安装gcc工具链 sudo apt install build-essential像mysql、nacos等等建议装docker里&#xff0c;数据挂载到宿主机&#xff0c;日志别挂&#xff0c;直接装本机哪天日志塞满了就要开始重装系统了 以下…

SpringCloud Alibaba Sentinel基础入门与安装

GitHub地址&#xff1a;https://github.com/alibaba/Sentinel 中文文档&#xff1a;https://sentinelguard.io/zh-cn/docs/introduction.html 下载地址&#xff1a;https://github.com/alibaba/Sentinel/releases Spring Cloud Alibaba 官方说明文档&#xff1a;Spring Clou…

基于SSM的美食推荐系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SSM的美食推荐系统,java项目。 ecli…

Bitwise 首席投资官:最大的 Alpha 将是华盛顿对加密货币态度的变化

原文标题&#xff1a;​ 《Washington Awakens: This Is What Alpha Looks Like》 撰文&#xff1a;Matt Hougan&#xff0c;Bitwise 首席投资官 编译&#xff1a;Chris&#xff0c;Techub News 更多相关内容点击查看香港Web3媒体TechubNews 在加密货币的泡沫之外&#xff…

商淘云:服装实体店引流会员营销方案

服装零售实体店面临着越来越大的挑战&#xff0c;尤其是在吸引和保持忠诚顾客方面。为了应对这一挑战&#xff0c;制定一套有效的引流会员营销方案显得尤为重要。商淘云将探讨如何通过创新的营销策略和增强的顾客体验&#xff0c;提升实体店的会员数量和销售业绩&#xff0c;从…