并发编程 | CompletionService - 如何优雅地处理批量异步任务

引言

上一篇文章中,我们详细地介绍了 CompletableFuture,它是一种强大的并发工具,能帮助我们以声明式的方式处理异步任务。虽然 CompletableFuture 很强大,但它并不总是最适合所有场景的解决方案。
在这篇文章中,我们将介绍 Java 的 CompletionService,这是一种能处理批量异步任务并在完成时获取结果的并发工具。
CompletionService CompletableFuture 在很多方面都相似。它们都用于处理异步任务,并且都提供了获取任务完成结果的机制。然而,CompletionService 采用了更传统并发模型,它将生产者和消费者的角色更明确地分离开来。

回顾我们在上一篇文章:并发编程 | 从Future到CompletableFuture 中讨论的需求,我们需要查找并计算一系列旅行套餐的价格。我们使用 CompletableFuture 实现了这个需求,并且代码看起来很简洁明了。然而,事情都有两面性。有些人并不习惯这种写法,觉得CompletableFuture 的实现中存在大量的嵌套,会让代码难以阅读和理解。另外,我们的代码中有大量的函数式编程,这在一定程度上增加了对代码阅读的门槛,如果你不熟悉这种编程范式,代码可能会看起来很混乱。

有没有一种方法,既简洁的同时,又不回到Future的回调地狱陷阱中去?有,CompletionService 。来看下CompletionService 是怎么解决问题。


使用CompletionService 解决问题

如果我们用 CompletionService 来实现这个需求,会是什么样呢?我们来看下代码:

public List<TravelPackage> searchTravelPackages(SearchCondition searchCondition) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(10);CompletionService<List<TravelPackage>> completionService = new ExecutorCompletionService<>(executorService);List<Flight> flights = searchFlights(searchCondition);for (Flight flight : flights) {// 提交所有的任务completionService.submit(() -> {List<TravelPackage> travelPackagesForFlight = new ArrayList<>();List<Hotel> hotels = searchHotels(flight);for (Hotel hotel : hotels) {TravelPackage travelPackage = calculatePrice(flight, hotel);travelPackagesForFlight.add(travelPackage);}return travelPackagesForFlight;});}List<TravelPackage> allTravelPackages = new ArrayList<>();for (int i = 0; i < flights.size(); i++) {// 等待它们的完成Future<List<TravelPackage>> future = completionService.take();// 如果没完成,这里会阻塞List<TravelPackage> travelPackagesForFlight = future.get();allTravelPackages.addAll(travelPackagesForFlight);}executorService.shutdown();allTravelPackages.sort(Comparator.comparing(TravelPackage::getPrice));return allTravelPackages;
}

通过上面的代码,我们可以看到 CompletionService 提供了一个更传统的并发模型来处理异步任务。相比CompletableFuture 而言,我们的代码中没有复杂的嵌套,代码更加直观。

对初学者来说,这个模型会更容易理解,特别是对于那些不熟悉函数式编程的读者来说。
当然,作为老手的你(假如你弄懂了上篇文章,并实践完),如果你在使用CompletableFuture 过程中发现它嵌套太深太复杂,CompletionService 可能也是个不错的选择。


基于上述代码抽取CompletionService

我们把关键代码抽取出来并简化,就可以得到下面这段代码:

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);long start = System.currentTimeMillis();
// 提交3个任务
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(5000);return "任务1完成";
});
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(3000);return "任务2完成";
});
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);return "任务3完成";
});
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);return "任务4完成";
});// 获取结果
for (int i = 0; i < 4; i++) {try {Future<String> future = completionService.take();// 如果没完成,这里会阻塞System.out.println(future.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}
executor.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务花费时间: " + (end - start) + " ms");

结合文中代码注释,我把它总结为一句口诀:批量提交,快速获取。

批量我知道啊,就是遍历呗,但是提交到那里去?快速获取是什么意思?别急,我们接着往下看。


使用ExecutorService 实现需求

在回答这个问题之前,我们先来看一下代码。我们先sumbit()一下…然后get()拿到数据…
嗯?这不是和之前ExecutorService 差不多吗?好像可以用它实现啊,你看代码:

public List<TravelPackage> searchTravelPackages(SearchCondition searchCondition) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(10);List<Flight> flights = searchFlights(searchCondition);List<Future<List<TravelPackage>>> futureList = new ArrayList<>();for (Flight flight : flights) {Future<List<TravelPackage>> future = executorService.submit(() -> {List<TravelPackage> travelPackagesForFlight = new ArrayList<>();List<Hotel> hotels = searchHotels(flight);for (Hotel hotel : hotels) {TravelPackage travelPackage = calculatePrice(flight, hotel);travelPackagesForFlight.add(travelPackage);}return travelPackagesForFlight;});futureList.add(future);}List<TravelPackage> allTravelPackages = new ArrayList<>();for (Future<List<TravelPackage>> future : futureList) {List<TravelPackage> travelPackagesForFlight = future.get();allTravelPackages.addAll(travelPackagesForFlight);}executorService.shutdown();allTravelPackages.sort(Comparator.comparing(TravelPackage::getPrice));return allTravelPackages;
}

看,是不是可以实现了。那CompletionService这玩意存在的意义是啥?我们继续往下看。


提交先后顺序 VS 任务完成快慢顺序

我们先把上面抽取出来的代码执行,结果如下:

任务3完成
任务4完成
任务2完成
任务1完成
任务花费时间: 5012 ms
Disconnected from the target VM, address: '127.0.0.1:10373', transport: 'socket'Process finished with exit code 0

然后,我们换成ExecutorService 执行,抽取的ExecutorService 代码如下:

ExecutorService executor = Executors.newFixedThreadPool(3);
ArrayList<Future<String>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(4);futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(5000);latch.countDown();return "任务1完成";
}));
futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(3000);latch.countDown();return "任务2完成";
}));
futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);latch.countDown();return "任务3完成";
}));
futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);latch.countDown();return "任务4完成";
}));for (Future<String> future : futures) {try {// 如果没完成,这里会阻塞System.out.println(future.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}
latch.await();
executor.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务花费时间: " + (end - start) + " ms");

执行结果如下:

任务1完成
任务2完成
任务3完成
任务4完成
任务花费时间: 5007 ms
Disconnected from the target VM, address: '127.0.0.1:14882', transport: 'socket'Process finished with exit code 0

细心的你肯定可以看到它们执行结果上的差异。CompletionService 是按照任务时间的顺序消费的。好,搞懂了这个,我们就可以回答上面其中一个问题:

快速获取是什么?

CompletionService是按照任务的快慢,谁先执行完谁就先返回。可以看到上面示例代码的结果,任务3只需要500ms,所以任务3先返回。


CompletionService 的适用场景

既然CompletionService 可以按照任务快慢顺序来返回,我们来看下它适合哪些场景:

执行一组任务并处理结果

上面就是很好的例子,我们可以在任何任务完成后立即获取并处理其结果,以实现快速响应。提高程序的吞吐量(先执行完任务,就有多的线程空闲,可以响应更多任务)。

生产者-消费者模式

我们在最早的开篇说过,CompletionService可以天然地实现生产者-消费者模式。这个模式中,生产者线程负责批量提交任务,消费者线程负责获取并处理任务的结果,而且它也可以安全地在多个线程之间共享


新的问题又出现了,为什么又可以在多个线程之间共享?提交到那里去?快速获取是怎么做到的?以问题为导向,我们来分析下源码。

CompletionService源码分析

提交到那里去?为什么可以在多线程之间共享?

我们先看下构造函数中做了什么:

public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

ExecutorCompletionService使用了一个BlockingQueue来存储已完成的任务。因为,任务的提交ExecutorBlockingQueue都是线程安全的。所以多线程共享的数据竞争问题已经在内部解决了。

快速获取是怎么做到的?

我们可以看下submit()方法是怎么实现的。当你提交一个任务时,这个任务被封装在一个QueueingFuture对象中:

public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;
}

QueueingFuture重写了done()方法。当任务完成时,done()方法会被调用,QueueingFuture会将自己添加到completionQueue中:

private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); } //当任务完成时,将任务添加到队列中private final Future<V> task;
}

这样似乎就可以解释,快速获取的机制。完成的任务优先被放入BlockingQueue中按照完成顺序排队。
现在,我换一种表述,你看下是否正确:快的任务在消费的时候就会被排在队列前面先被消费,这样就形成一个任务完成快慢的顺序,第一个被消费到的任务一定是最快的。


第一个被消费到的任务一定是最快的吗?

从上面的代码测试示例结果来看, 确实如此。但是,我很遗憾的告诉你,这句话是错误的。
这句话的正确性是建立在任务数等于线程数的前提下。这就显得很鸡肋了,在在生产中很难达到这个效果,因为资源是稀缺的。当然,我们还是拿代码说话:

ExecutorService executor = Executors.newFixedThreadPool(3);CompletionService<String> completionService = new ExecutorCompletionService<>(executor);long start = System.currentTimeMillis();completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(5000);return "任务1完成";});completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(3000);return "任务2完成";});completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(6000);return "任务3完成";});completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);return "任务4完成";});for (int i = 0; i < 4; i++) {try {System.out.println(completionService.take().get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}executor.shutdown();long end = System.currentTimeMillis();System.out.println("任务花费时间: " + (end - start) + " ms");

假如遵循执行快慢顺序,理想的状态应该是:4 -> 2 -> 1 -> 3;而结果却是:

Connected to the target VM, address: '127.0.0.1:5068', transport: 'socket'
任务2完成
任务4完成
任务1完成
任务3完成
任务花费时间: 6020 ms
Disconnected from the target VM, address: '127.0.0.1:5068', transport: 'socket'

这个结果也是意料之外,但在情理之中。因为线程总共只有3个,在1,2,3之间排序,任务顺序应该是2,1,3;然后当2执行完之后,1和3依然未执行完;这个时候4正好执行完。于是就插队到任务中。最终得到2,4,1,3的结果。
因此,我们可以说:在生产环境中,这个顺序是不可控的,除非你把线程设置为1;


CompletionService相关面试题

如何使用CompletionService处理一组任务并获取结果?

比较ExecutorService和CompletionService,它们有什么相同之处和不同之处?

在何种情况下,你会选择使用CompletionService而不是ExecutorService?

解释CompletionService是如何保证按任务完成顺序获取结果的

当一个任务被提交到CompletionService后,它的生命周期是怎样的?在任务执行过程中,CompletionService内部都发生了什么?
在使用CompletionService处理任务时,如果某个任务执行异常,应该如何处理?
如果我想取消CompletionService中的所有任务,应该如何做?
谈谈你对Java中的Executor,ExecutorService,CompletionService和Future之间关系的理解

看完上面的文章,你可以试着来回答了吗?


参考文献

  1. Java并发编程小册

总结

让我们一起回顾今天所学。首先,我引导你使用了CompletionService和ExecutorService来实现了先前复杂的需求。相较于CompletableFuture,它们可能显得更为传统,但也更易理解。然后,我们一起探索了CompletionService的存在意义。我们试图解答,既然ExecutorService已经足够应对需求,为什么还要有CompletionService这样的设计。为了揭示这个疑惑,我们深入到源码中,同时也纠正了一个错误观点,以帮助你对CompletionService有更深刻的理解。最后,我们通过面试题形式,来巩固和复习我们所学的知识。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/11984.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ECMAScript 6语法简介

ECMAScript 6语法简介 1. 块作用域构造let和const1.1 let声明1.2 const声明1.3 全局决定作用域绑定 2. 模板字面量&#xff08;Template literals&#xff09;2.1 多行字符串2.2 字符串占位符字符串占位符允许我们在模板字面量中嵌入变量&#xff0c;以便更灵活地拼接字符串。 …

数据结构:线索二叉树

线索二叉树 通过前面对二叉树的学习&#xff0c;了解到二叉树本身是一种非线性结构&#xff0c;采用任何一种遍历二叉树的方法&#xff0c;都可以得到树中所有结点的一个线性序列。在这个序列中&#xff0c;除第一个结点外&#xff0c;每个结点都有自己的直接前趋&#xff1b;…

记录Selenium自动化测试过程中接口的调用信息

上一篇博客&#xff0c;我写了python自动化框架的一些知识和粗浅的看法&#xff0c;在上一篇中我也给自己提出一个需求&#xff1a;如果记录在测试过程中接口的调用情况&#xff1f;提出这个需求&#xff0c;我觉得是有意义的。你在测试过程中肯定会遇到一些莫名其妙的问题&…

【数据结构与算法】排序算法(选择排序,冒泡排序,插入排序,希尔排序)

基本概念这了就不浪费时间解释了&#xff0c;这四种都是很简单的排序方式&#xff0c;本专栏后续文章会出归并排序&#xff0c;计数排序&#xff0c;快速排序&#xff0c;堆排序&#xff0c;桶排序等排序算法&#xff0c;今天这篇文章中给出选择排序&#xff0c;冒泡排序&#…

【JAVA】 String 类简述笔记

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言String类创建一个String类 常用方法字符串长度 length() 方法连接字符串 concat() 方法创建格式化字符串 format()功能 前言 string是C、java、VB等编程语言中的字符串&…

C语言假期作业 DAY 08

选择题 1、如下程序的运行结果是&#xff08; &#xff09; char c[5]{a, b, \0, c, \0}; printf("%s", c); A: a b B: ab\0c\0 C: ab c D: ab 答案解析 正确答案&#xff1a; D 字符串的结束标志是 \0 &#xff0c;而 \0 的 ASCII 值是 0 &#xff0c;而 c[2] 被初…

行星碰撞(力扣)栈 JAVA

给定一个整数数组 asteroids&#xff0c;表示在同一行的行星。 对于数组中的每一个元素&#xff0c;其绝对值表示行星的大小&#xff0c;正负表示行星的移动方向&#xff08;正表示向右移动&#xff0c;负表示向左移动&#xff09;。每一颗行星以相同的速度移动。 找出碰撞后剩…

[SQL挖掘机] - 窗口函数介绍

介绍: 窗口函数也称为 OLAP 函数。OLAP 是 OnLine AnalyticalProcessing 的简称&#xff0c;意思是对数据库数据进行实时分析处理。窗口函数是一种用于执行聚合计算和排序操作的功能强大的sql函数。它们可以在查询结果集中创建一个窗口&#xff08;window&#xff09;&#xf…

unity进阶--xml的使用学习笔记

文章目录 xml实例解析方法一解析方法二 xml-path创建xml文档 xml实例 解析方法一 解析方法二 xml-path 创建xml文档

C++数据结构笔记(11)二叉树的#号创建法及计算叶子节点数

首先分享一段计算叶子节点数目的代码&#xff0c;如下图&#xff1a; 不难发现&#xff0c;上面的二叉树叶子节点数目为4。我们可以采用递归的方式&#xff0c;每当一个结点既没有左结点又没有右节点时&#xff0c;即可算为一个叶子结点。 int num0; //全局变量&#xff0c;代…

MyBatis-入门-快速入门程序

本次使用MyBatis框架是基于SpringBoot框架进行的&#xff0c;在IDEA中创建一个SpringBBot工程&#xff0c;根据自己的需求选择对应的依赖即可 快速入门 需求&#xff1a;使用MyBatis查询所有用户数据步骤&#xff1a; 准备工作&#xff08;创建Spring Boot工程、数据库user表…

【误差自适应跟踪方法AUV】自适应跟踪(EAT)方法研究(Matlab代码Simulin实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码、Simulink模型、文献 &#x1f4a5;1 概述 摘要&#xff1a;跟踪问题&#xff08;即如何遵循先前记忆的路径&#xff09;是移动机器人中最重要的问题之一。根据机器人状…

机器学习深度学习——线性回归的从零开始实现

虽然现在的深度学习框架几乎可以自动化实现下面的工作&#xff0c;但从零开始实现可以更了解工作原理&#xff0c;方便我们自定义模型、自定义层或自定义损失函数。 import random import torch from d2l import torch as d2l线性回归的从零开始实现 生成数据集读取数据集初始…

repvit 测试

目录 依赖项:timm库。 cuda版1060显卡运行时间 14ms左右 高通不支持gelu激活函数 需要的 SqueezeExcite代码,不是SqueezeExcite_o

windows默认编码格式修改

1.命令提示符界面输入 chcp 936 对应 GBK 65001 对应 UTF-8 2.临时更改编码格式 chcp 936(或65001) 3.永久更改编码格式 依次开控制面板->时钟和区域->区域->管理->更改系统区域设置&#xff0c;然后按下图所示&#xff0c;勾选使用UTF-8语言支持。然后重启电脑。此…

防止连点..

1.连点js文件 let timer; letflag /*** 节流原理&#xff1a;在一定时间内&#xff0c;只能触发一次** param {Function} func 要执行的回调函数* param {Number} wait 延时的时间* param {Boolean} immediate 是否立即执行* return null*/ function throttle(func, wait 500…

【数字IC基础】竞争与冒险

竞争-冒险 1. 基本概念2. 冒险的分类3. 静态冒险产生的判断4. 毛刺的消除使用同步电路使用格雷码增加滤波电容增加冗余项&#xff0c;消除逻辑冒险引入选通脉冲 1. 基本概念 示例一&#xff1a; 如上图所示的这个电路&#xff0c;使用了两个逻辑门&#xff0c;一个非门和一个与…

【javascript】refreshTime对象包括年月日时分秒;把一个时间戳转化为refreshTime对象并分别赋值

要将一个时间戳转换为包含年月日时分秒的 refreshTime 对象&#xff0c;您可以使用 JavaScript 的 Date 对象和其相应的方法。 以下是一个示例代码&#xff1a; function convertTimestampToRefreshTime(timestamp) {const date new Date(timestamp);const refreshTime {ye…

Windows 找不到文件‘chrome‘。请确定文件名是否正确后,再试一次

爱像时间&#xff0c;永恒不变而又短暂&#xff1b;爱像流水&#xff0c;浩瀚壮阔却又普普通通。 Windows 找不到文件chrome。请确定文件名是否正确后&#xff0c;再试一次 如果 Windows 提示找不到文件 "chrome"&#xff0c;可能是由于以下几种原因导致的&#xff1…

selenium交互代码

一&#xff1a;selenium交互 用selenium打开网页后&#xff0c;也可以做一系列真人的操作&#xff0c;也就是利用selenium和浏览器进行交互&#xff0c;可利用以下几个函数进行操作&#xff1a; input.send_keys() 传递输入内容给某输入框button.click() 点击某按钮browser.e…