Fork / Join框架是使用并发分治法解决问题的框架。 引入它们是为了补充现有的并发API。 在介绍它们之前,现有的ExecutorService实现是运行异步任务的流行选择,但是当任务同质且独立时,它们会发挥最佳作用。 运行依赖的任务并使用这些实现来组合其结果并不容易。 随着Fork / Join框架的引入,人们试图解决这一缺陷。 在本文中,我们将简要介绍API,并解决几个简单的问题以了解其工作原理。
解决非阻塞任务
让我们直接跳入代码。 让我们创建一个任务,该任务将返回List的所有元素的总和。 以下步骤以伪代码表示我们的算法:
01.查找列表的中间索引
02.在中间划分列表
03.递归创建一个新任务,该任务将计算剩余部分的总和
04.递归创建一个新任务,该任务将计算正确部分的总和
05.将左总和,中间元素和右总和的结果相加
这是代码–
@Slf4j
public class ListSummer extends RecursiveTask<Integer> {private final List<Integer> listToSum;ListSummer(List<Integer> listToSum) {this.listToSum = listToSum;}@Overrideprotected Integer compute() {if (listToSum.isEmpty()) {log.info("Found empty list, sum is 0");return 0;}int middleIndex = listToSum.size() / 2;log.info("List {}, middle Index: {}", listToSum, middleIndex);List<Integer> leftSublist = listToSum.subList(0, middleIndex);List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());ListSummer leftSummer = new ListSummer(leftSublist);ListSummer rightSummer = new ListSummer(rightSublist);leftSummer.fork();rightSummer.fork();Integer leftSum = leftSummer.join();Integer rightSum = rightSummer.join();int total = leftSum + listToSum.get(middleIndex) + rightSum;log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);return total;}
}
首先,我们扩展了ForkJoinTask的RecursiveTask子类型。 这是我们期望并发任务返回结果时的扩展类型。 当任务不返回结果而仅执行效果时,我们扩展RecursiveAction子类型。 对于我们解决的大多数实际任务,这两个子类型就足够了。
其次,RecursiveTask和RecursiveAction都定义了一种抽象计算方法。 这是我们进行计算的地方。
第三,在我们的计算方法内部,我们检查通过构造函数传递的列表的大小。 如果为空,则我们已经知道总和的结果为零,然后我们立即返回。 否则,我们将列表分为两个子列表,并创建ListSummer类型的两个实例。 然后,我们在这两个实例上调用fork()方法(在ForkJoinTask中定义)–
leftSummer.fork();
rightSummer.fork();
导致将这些任务安排为异步执行的原因,稍后将在本文中解释用于此目的的确切机制。
之后,我们调用join()方法(也在ForkJoinTask中定义)以等待这两部分的结果
Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();
然后将其与列表的中间元素相加以获得最终结果。
添加了许多日志消息,以使示例更易于理解。 但是,当我们处理包含数千个条目的列表时,拥有详细的日志记录(尤其是记录整个列表)可能不是一个好主意。
就是这样。 现在为测试运行创建一个测试类–
public class ListSummerTest {@Testpublic void shouldSumEmptyList() {ListSummer summer = new ListSummer(List.of());ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isZero();}@Testpublic void shouldSumListWithOneElement() {ListSummer summer = new ListSummer(List.of(5));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(5);}@Testpublic void shouldSumListWithMultipleElements() {ListSummer summer = new ListSummer(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(45);}
}
在测试中,我们创建一个ForkJoinPool的实例。 ForkJoinPool是用于运行ForkJoinTasks的唯一ExecutorService实现。 它采用一种称为工作窃取算法的特殊算法。 与其他ExecutorService实现相反,在该实现中,只有一个队列包含要执行的所有任务,在工作窃取实现中,每个工作线程都获得其工作队列。 每个线程都从其队列开始执行任务。
当我们检测到ForkJoinTask可以分解为多个较小的子任务时,便将它们分解为较小的任务,然后在这些任务上调用fork()方法。 该调用导致子任务被推入执行线程的队列中。 在执行期间,当一个线程用尽队列/没有要执行的任务时,它可以从其他线程的队列中“窃取”任务(因此称为“工作窃取”)。 与使用任何其他ExecutorService实现相比,这种窃取行为可以带来更高的吞吐量。
之前,当我们在leftSummer和rightSummer任务实例上调用fork()时,它们被推入执行线程的工作队列中,之后它们被池中的其他活动线程“偷”(依此类推),因为它们确实那时没有其他事情要做。
很酷吧?
解决阻止任务
我们刚才解决的问题本质上是非阻塞的。 如果我们想解决一个阻塞操作的问题,那么为了获得更好的吞吐量,我们将需要改变策略。
让我们用另一个例子来研究一下。 假设我们要创建一个非常简单的网络搜寻器。 该搜寻器将接收HTTP链接列表,执行GET请求以获取响应主体,然后计算响应长度。 这是代码–
@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {private final List<String> links;ResponseLengthCalculator(List<String> links) {this.links = links;}@Overrideprotected Map<String, Integer> compute() {if (links.isEmpty()) {log.info("No more links to fetch");return Collections.emptyMap();}int middle = links.size() / 2;log.info("Middle index: {}", links, middle);ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));log.info("Forking left partition");leftPartition.fork();log.info("Left partition forked, now forking right partition");rightPartition.fork();log.info("Right partition forked");String middleLink = links.get(middle);HttpRequester httpRequester = new HttpRequester(middleLink);String response;try {log.info("Calling managedBlock for {}", middleLink);ForkJoinPool.managedBlock(httpRequester);response = httpRequester.response;} catch (InterruptedException ex) {log.error("Error occurred while trying to implement blocking link fetcher", ex);response = "";}Map<String, Integer> responseMap = new HashMap<>(links.size());Map<String, Integer> leftLinks = leftPartition.join();responseMap.putAll(leftLinks);responseMap.put(middleLink, response.length());Map<String, Integer> rightLinks = rightPartition.join();responseMap.putAll(rightLinks);log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);return responseMap;}private static class HttpRequester implements ForkJoinPool.ManagedBlocker {private final String link;private String response;private HttpRequester(String link) {this.link = link;}@Overridepublic boolean block() {HttpGet headRequest = new HttpGet(link);CloseableHttpClient client = HttpClientBuilder.create().disableRedirectHandling().build();try {log.info("Executing blocking request for {}", link);CloseableHttpResponse response = client.execute(headRequest);log.info("HTTP request for link {} has been executed", link);this.response = EntityUtils.toString(response.getEntity());} catch (IOException e) {log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());this.response = "";}return true;}@Overridepublic boolean isReleasable() {return false;}}
}
我们创建ForkJoinPool.ManagedBlocker的实现,在其中放置阻塞的HTTP调用。 该接口定义了两个方法– block()和isReleasable() 。 block()方法是我们进行阻塞调用的地方。 在完成阻塞操作之后,我们返回true,指示不再需要进一步的阻塞。 我们从isReleasable()实现中返回false,以向fork-join工作线程指示block()方法实现本质上可能在阻塞。 isReleasable()实现将在调用block()方法之前先由fork-join工作线程调用。 最后,我们通过调用ForkJoinPool.managedBlock()静态方法将HttpRequester实例提交到池中。 之后,我们的阻止任务将开始执行。 当它阻塞HTTP请求时,如果有必要,ForkJoinPool.managedBlock()方法还将安排激活备用线程,以确保足够的并行性。
那么,让我们将此实现用于测试驱动! 这是代码–
public class ResponseLengthCalculatorTest {@Testpublic void shouldReturnEmptyMapForEmptyList() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).isEmpty();}@Testpublic void shouldHandle200Ok() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(1).containsKeys("http://httpstat.us/200").containsValue(0);}@Testpublic void shouldFetchResponseForDifferentResponseStatus() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200","http://httpstat.us/302","http://httpstat.us/404","http://httpstat.us/502"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(4);}
}
今天就这样,伙计们! 与往常一样,任何反馈/改进建议/评论都将受到高度赞赏!
此处讨论的所有示例都可以在Github上找到( 特定提交 )。
大呼大叫的http://httpstat.us服务,对于开发简单的测试非常有帮助。
翻译自: https://www.javacodegeeks.com/2019/01/brief-overview-fork-join-framework-java.html