有时有必要对线程池中的任务施加一定的顺序。 JavaSpecialists通讯的第206期提出了一种这样的情况:我们有多个连接,使用NIO可以从中读取。 我们需要确保给定连接中的事件按顺序执行,但是不同连接之间的事件可以自由混合。
我想提出一个类似但略有不同的情况:我们有N个客户。 我们希望按照提交顺序执行给定客户端的事件,但是来自不同客户端的事件可以自由混合。 另外,有时还会有涉及多个客户端的“汇总”任务。 此类任务应阻止所有相关客户端的任务(但不能阻止更多任务!)。 让我们看一下情况图:
如您所见,来自客户端A和客户端B的任务被并行地愉快地处理,直到出现“汇总”任务。 到那时,不能再处理类型A或B的任务,但是可以执行无关的任务C(前提是有足够的线程)。 我的存储库中提供了这种执行程序的框架。 核心是以下界面:
public interface OrderedTask extends Runnable {boolean isCompatible(OrderedTask that);
}
使用此接口, A.isCompatible(B) && B.isComaptible(A)
确定两个任务是否可以并行运行(如果A.isCompatible(B) && B.isComaptible(A)
则A和B可以并行运行)。 这些方法应以快速,非锁定和时不变的方式实现。
该线程池背后的算法如下:
- 如果要添加的任务与任何现有任务不冲突,请将其添加到元素最少的线程中。
- 如果它与来自一个线程的元素冲突,则安排它在该线程上执行(并隐式地在冲突元素之后执行,以确保提交顺序得以维持)
- 如果它与多个线程冲突,则在第一个线程上等待任务的第一个线程之外的所有任务上添加任务(下面用红色显示),然后在该任务上执行原始任务。
有关实现的更多信息:
- 该代码仅是概念验证,还需要更多代码才能使其具有生产质量(它需要代码来执行任务中的异常处理,正确关闭等)。
- 为了获得最佳性能,它使用可用的无锁*结构:每个工作线程都有一个关联的ConcurrentLinkedQueue。 为了达到睡眠直到工作可用的语义,使用了额外的信号量**
- 为了能够将新的OrderedTask与当前正在执行的OrderedTask进行比较,请保留其引用的副本。 每当新元素入队时,此副本列表都会更新(这可能会导致内存泄漏,并且如果任务不频繁,则应研究足够的替代方法,例如为弱引用提供额外的计时器)
- 与JavaSpecialists时事通讯中的解决方案相比,这更类似于固定线程池执行器,而时事通讯中的解决方案类似于缓存的线程池执行器。
- 如果(a)任务(大部分)短且(大多数)统一,并且(b)很少(一个或两个)线程提交新任务,则此实现是理想的,因为多个提交是互斥的(但是提交和执行不是“ t)
- 如果在提交“汇总”之后(并且可以在执行之前)立即提交相同类型的任务,则不必要地将它们强制在一个线程上。 如果这成为一个问题,我们可以在汇总任务完成后添加代码重排任务。
尽情享受源代码 ! (也许有一天我会花时间删除所有粗糙的边缘)。
*有点用词不当,因为仍然有锁,仅在较低级别(CPU而不是OS)级别上使用,但这是公认的术语
** –基准测试表明这是性能最高的解决方案。 这是从ThreadPoolExecutor的实现中得到启发的。
参考:在Java Advent Calendar博客上, 确保 JCG合作伙伴 Attila-Mihaly Balazs 执行任务的顺序 。
翻译自: https://www.javacodegeeks.com/2012/12/ensuring-the-order-of-execution-for-tasks.html