必备知识:
三种创建线程的方式
java线程池
CompletionService
是Java并发库中的一个接口,用于简化处理一组异步任务的执行和结果收集。它结合了Executor和BlockingQueue的功能,帮助管理任务的提交和完成。CompletionService的主要实现类是ExecutorCompletionService
。
ExecutorCompletionService
例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class CompletionServiceExample {public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);// 提交一组任务List<Callable<Integer>> tasks = new ArrayList<>();for (int i = 0; i < 5; i++) {final int index = i;tasks.add(() -> {TimeUnit.SECONDS.sleep(index);return index;});}for (Callable<Integer> task : tasks) {completionService.submit(task);}// 获取任务结果for (int i = 0; i < tasks.size(); i++) {Future<Integer> future = completionService.take();Integer result = future.get();System.out.println("Task completed with result: " + result);}// 关闭线程池executorService.shutdown();}
}
public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor;private final BlockingQueue<Future<V>> completionQueue;public ExecutorCompletionService(Executor executor) {this.executor = executor;this.completionQueue = new LinkedBlockingQueue<>();}public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {this.executor = executor;this.completionQueue = completionQueue;}@Overridepublic Future<V> submit(Callable<V> task) {RunnableFuture<V> f = new FutureTask<>(task);executor.execute(new QueueingFuture(f));return f;}@Overridepublic Future<V> submit(Runnable task, V result) {RunnableFuture<V> f = new FutureTask<>(task, result);executor.execute(new QueueingFuture(f));return f;}@Overridepublic Future<V> take() throws InterruptedException {return completionQueue.take();}@Overridepublic Future<V> poll() {return completionQueue.poll();}@Overridepublic Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {return completionQueue.poll(timeout, unit);}private class QueueingFuture extends FutureTask<V> {QueueingFuture(RunnableFuture<V> task) {super(task);}@Overrideprotected void done() {completionQueue.add(this);}}
}