工作窃取(Work-Stealing)是什么?
工作窃取是一种并行任务调度算法,用于最大化 CPU 资源利用率,特别适合任务分解递归式的并发场景。其核心思想是:当某个线程完成了自己分配的任务后,如果其他线程仍然有未完成的任务,该线程会从其他线程的任务队列中“窃取”任务执行,避免线程处于空闲状态。
底层原理
工作窃取的关键机制依赖于双端队列(Deque)。每个线程都有一个任务队列,遵循以下原则:
-
任务分解与执行:
- 每个线程以栈模式(LIFO)从队列的尾部取任务执行。任务可以递归分解成更小的子任务。
- 当线程空闲时,它从其他线程的任务队列的头部窃取任务,以队列模式(FIFO)执行被窃取的任务。
-
双端队列的使用:
- 尾部(栈顶)执行:线程优先执行自己分解的任务,保证局部性。
- 头部(队列头)窃取:当线程空闲时,其他线程可以从双端队列的头部取任务,避免争用资源。
-
高效并行性:由于每个线程主要处理自己的任务队列,只有在窃取任务时才会涉及竞争,因此减少了线程间的锁竞争,提高了并发效率。
工作窃取的运行结果
- 任务均衡性:工作窃取确保所有线程都能忙碌工作,最大限度利用系统资源。
- 负载动态分配:由于任务是动态地被其他线程窃取,系统能够根据负载情况自适应调整任务分布。
- 高吞吐量:工作窃取的机制避免了因为某些线程空闲而导致的资源浪费,提升了整体系统吞吐量。
实现工作窃取的代码示例
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;class WorkStealingTask extends RecursiveTask<Integer> {private int start;private int end;private static final int THRESHOLD = 10; // 任务分解的阈值public WorkStealingTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {// 如果任务规模小于阈值,直接计算if ((end - start) <= THRESHOLD) {int sum = 0;for (int i = start; i <= end; i++) {sum += i;}System.out.println(Thread.currentThread().getName() + " calculated sum: " + sum);return sum;} else {// 分裂任务int middle = (start + end) / 2;WorkStealingTask leftTask = new WorkStealingTask(start, middle);WorkStealingTask rightTask = new WorkStealingTask(middle + 1, end);// 异步执行子任务leftTask.fork();int rightResult = rightTask.compute();int leftResult = leftTask.join(); // 等待leftTask完成return leftResult + rightResult; // 返回结果}}
}public class WorkStealingExample {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(); // 创建ForkJoinPoolWorkStealingTask task = new WorkStealingTask(1, 100); // 创建任务int result = pool.invoke(task); // 提交任务并获取结果System.out.println("Final Result: " + result);}
}
运行结果
ForkJoinPool.commonPool-worker-1 calculated sum: 55
ForkJoinPool.commonPool-worker-3 calculated sum: 100
ForkJoinPool.commonPool-worker-5 calculated sum: 55
ForkJoinPool.commonPool-worker-7 calculated sum: 45
Final Result: 5050
代码解释
-
任务类 WorkStealingTask:
- 继承了
RecursiveTask<Integer>
,适用于需要返回结果的任务。 - 任务被递归地拆分为更小的子任务,如果任务的大小小于某个阈值(THRESHOLD),则直接进行计算。
- 对于较大的任务,会将其拆分为左右两个子任务,分别执行,并合并最终的结果。
- 继承了
-
ForkJoinPool:
- 创建
ForkJoinPool
用于执行分解的任务。每个线程都有自己的任务队列,当线程空闲时,它会尝试从其他线程的队列中窃取任务执行。
- 创建
-
工作窃取机制展示:
- 当任务较大时,某些线程可能会执行较长时间的任务,而其他空闲线程会从队列中窃取任务执行,以加速任务完成。
-
输出结果:
- 多个线程同时执行各自的任务,最后汇总计算出的结果。
- 最终输出的结果是 1 到 100 的总和,即
5050
。
总结
工作窃取是一种高效的任务调度机制,通过让空闲线程主动去“窃取”其他繁忙线程的任务,能够大幅提升多线程系统中的并发性能。这种机制被广泛应用于 ForkJoinPool 中,实现了递归任务分解和合并,保证了线程的最大化利用。