你不会忘记你的根源
执行程序是具有单个执行方法的根接口。 任何实现Runnable接口的东西都可以作为参数传递。 但是,傻傻的执行器不支持Callable。
好消息: ExecutorService接口扩展了Executor,增加了对Callable的支持。 它的实现类是ThreadPoolExecutor。
我要假装ScheduledExecutorService接口及其实现类ScheduledThreadPoolExecutor不存在,因为它们只是在ExecutorService和ThreadPoolExecutor之上添加了调度功能。 但是,当功能强大但无聊的java.util.Timer
不够用,而功能强大的外部调度程序实在太多了时,请记住此类。
如果您不熟悉并发性,或者忘记了Callable和Runnable之间的区别,则可能需要先阅读一点,然后再继续阅读。 虚拟指南在这里
ExecutorService.submit事实:
这三个提交变量:
将来提交(可调用任务)
将来提交(可运行任务)
将来提交(可运行任务,T结果)
- 所述
submit
的的ExecutorService的方法过载并且可以接受一个Callable
或Runnable
。 - 由于Runnable的
run
方法返回void,因此在任务完成时Future.get
总是返回null也就Future.get
了。Future<?> submit(Runnable task)
- 另一个接受
Runnable
和泛型的重载submit
方法将返回您作为第二个参数传入的结果。<T> Future<T> submit(Runnable task, T result)
事实上,开放代码( FutureTask
),你会发现, RunnableAdapter
顶级嵌套类的Executors
只需保存结果,并返回相同的结果run方法完成之后。
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T [More ...] call() {task.run();return result;}
}
RunnableAdapter源
在这两种情况下,如果您想要(应该!)终止程序,而不是执行程序线程阻止该程序并进入繁忙循环 ,则应按以下方式调用shutdown方法:
executorService.shutdown()
关闭事实
您可以想象shutdown
是购物中心的半关门。 不会有新客户进入,但现有客户一旦完成就可以离开购物中心。
重申一下,
-
shutdown
是一种礼貌的方法。 它实际上并不会立即关闭池中的任务。 它只是说不会接受任何新任务。 - 除非您使用
invokeAll
执行任务,否则需要等待所有正在进行的任务完成。 这可以通过调用awaitTermination
方法来实现。 (invokeAll并在帖子底部提交示例) - 当前所有任务完成后,执行程序服务将关闭。
如果您需要一种不礼貌的侵入方法,而该方法不关心当前线程是否已完成其任务,那么shutdownNow是您的理想选择。 但是,不能保证该方法将关闭点上的服务,但这是您必须立即关闭的最接近的方法。
在awaitTermination上,您可以指定超时时间,直到主线程等待池线程完成其任务为止。
ExecutorService executorService=Executors.newFixedThreadPool(10);…future = executorService.submit(getInstanceOfCallable(count,sum));…executorService.shutdown();if (executorService.awaitTermination(10, TimeUnit.SECONDS)){System.out.println('All threads done with their jobs');}
执行者–工厂的家伙
上面的课程都很棒。 但是,例如,您想创建一个单线程执行器,您将编写类似
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
比较一下
Executors.newSingleThreadExecutor()
所以,你去。 Executors
是仅具有工厂方法的类,用于使用各种常用默认值创建各种形式的executor服务。 请注意,除了很棒的工厂方法外,它没有为表带来任何新功能。
建议您快速查看工厂方法的实现,并检查它是否适合您的需求。
invokeAll和提交
ExecutorService
的invokeAll
方法的All
部分毫不奇怪。 它只是说您需要传递Callable
的Collection。 再次,正如预期的那样,该方法直到所有线程完成其任务后才返回。 因此,对于仅在所有工作完成后才对结果感兴趣的情况, invokeAll
是您的最佳选择。
另一方面, submit
方法在可调用对象被提交给执行者服务之后立即返回。 除非您在Callable
call
方法中什么都不做,否则理想情况下,当submit
方法返回时,工作线程应该正在运行。
以下示例可能对您有用。 这些程序只是尝试找到所有自然数的和,直到100(当然是蛮力)
package me.rerun.incubator;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;public class ExecutorInvokeAll {public void runApp() throws InterruptedException, ExecutionException{//variable to store the sumAtomicInteger sum=new AtomicInteger();//Use our friendly neighbourhood factory method of the Executors.ExecutorService executorService=Executors.newFixedThreadPool(10);List<Callable<AtomicInteger>> callableList=new ArrayList<Callable<AtomicInteger>>();for (int count = 0; count <= 100;count++) {callableList.add(getInstanceOfCallable(count,sum));}//returns only after all tasks are completeList<Future<AtomicInteger>> resultFuture = executorService.invokeAll(callableList);//Prints 5050 all throughfor (Future<AtomicInteger> future : resultFuture) {//Didn't deliberately put a timeout here for the get method. Remember, the invoke All does not return until the task is done.System.out.println("Status of future : " + future.isDone() +". Result of future : "+future.get().get());}executorService.shutdown();// You might as well call a resultFuture.get(0).get().get() and that would give you the same //result since all your worker threads hold reference to the same atomicinteger sum.System.out.println("Final Sum : "+sum); }//Adds count to the sum and returns the reference of the sum as the resultprivate Callable<AtomicInteger> getInstanceOfCallable(final int count, final AtomicInteger sum) {Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){public AtomicInteger call() {sum.addAndGet(count);System.out.println("Intermediate sum :"+sum);return sum;}};return clientPlanCall;}public static void main(String[] args) throws ExecutionException {try {new ExecutorInvokeAll().runApp();} catch (InterruptedException e) {e.printStackTrace();}} }
package me.rerun.incubator;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;public class ExecutorSubmit {public void runApp() throws InterruptedException, ExecutionException{//holder for the total sumAtomicInteger sum=new AtomicInteger();//Use the factory method of ExecutorsExecutorService executorService=Executors.newFixedThreadPool(10);Future<AtomicInteger> future = null;for (int count = 0; count <= 100; count++) {future = executorService.submit(getInstanceOfCallable(count,sum));//prints intermediate sumtry {System.out.println("Status of future : " + future.isDone() +". Result of future : "+future.get(1000, TimeUnit.MILLISECONDS).get());} catch (TimeoutException e) {System.out.println("<IGNORE> Timeout exception for count : "+count);//e.printStackTrace();}//System.out.println("Result of future : "+future.get().get() +".Status of future : " + future.isDone());}executorService.shutdown();if (executorService.awaitTermination(10, TimeUnit.SECONDS)){System.out.println("All threads done with their jobs");}//execSystem.out.println("Final Sum : "+sum);}//Adds count to the sum and returns the reference of the sum as the resultprivate Callable<AtomicInteger> getInstanceOfCallable(final int count, final AtomicInteger sum) {Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){public AtomicInteger call() {sum.addAndGet(count);//System.out.println("Intermediate sum :"+sum);return sum;}};return clientPlanCall;}public static void main(String[] args) throws ExecutionException {try {new ExecutorSubmit().runApp();} catch (InterruptedException e) {e.printStackTrace();}} }
进一步阅读:
亚历克斯·米勒的惊人博客
亚历克斯·米勒的并发陷阱
Vogella关于与原始API进行比较的文章
总体上很好地介绍了并发
强烈推荐有关Java并发性的书
祝您编程愉快,别忘了分享!
参考: Rerun.me博客上的JCG合作伙伴 Arun Manivannan的Java并发执行器懒惰开发人员简介 。
翻译自: https://www.javacodegeeks.com/2012/10/a-lazy-developers-introduction-to-java.html