这是一个示例章节,摘自Francesco Marchioni编辑的WildFly上的实用Java EE 7开发 。
本章讨论了新的Java EE并发API(JSR 236) ,它概述了使用一组托管资源在Java EE容器上并行执行任务的标准方法。 为了描述如何在您的应用程序中使用此API,我们将遵循以下路线图:
- 并发实用工具简介
- 如何使用ManagedExecutorService利用异步任务
- 如何使用ManagedScheduledExecutorService在特定时间安排任务
- 如何创建动态代理对象,以添加Java EE环境中可用的上下文信息
- 如何使用ManagedThreadFactory创建托管线程以供您的应用程序使用
并发实用工具概述
在Java EE 7之前,在Java EE容器中执行并发任务是众所周知的危险做法,有时甚至被容器禁止:
“企业bean不得尝试管理线程。 企业bean不得尝试启动,停止,挂起或恢复线程,也不能尝试更改线程的优先级或名称。 企业bean不得尝试管理线程组”
实际上,通过使用J2SE API在Java EE容器中创建自己的非托管线程,并不能保证将容器的上下文传播到执行任务的线程。
唯一可用的模式是使用异步EJB或消息驱动Bean ,以便以异步方式执行任务。 通常,这足以用于简单的触发和遗忘模式,但是对Threads的控制仍然位于Container的手中。
通过Java EE并发API(JSR 236),您可以将java.util.concurrent API的扩展用作托管资源 ,即由Container进行管理。 与标准J2SE编程的唯一区别是,您将从容器的JNDI树中检索托管资源。 但是,您仍将使用属于java.util.concurrent
包的一部分的Runnable接口或类,例如Future
或ScheduledFuture
。
在下一节中,我们将从最简单的示例开始,该示例使用ManagedExecutorService
执行异步任务。
使用ManagedExecutorService提交任务
为了创建我们的第一个异步执行,我们将展示如何使用ManagedExecutorService
,它扩展了Java SE ExecutorService以提供用于提交任务以在Java EE环境中执行的方法。 通过使用此托管服务,容器的上下文将传播到执行任务的线程:ManagedExecutorService包含在应用程序服务器的EE配置中:
<subsystem xmlns="urn:jboss:domain:ee:2.0">. . .<concurrent>. . . .<managed-executor-services><managed-executor-service name="default"jndi-name="java:jboss/ee/concurrency/executor/default"context-service="default" hung-task-threshold="60000"core-threads="5" max-threads="25" keepalive-time="5000"/></managed-executor-services>. . . .</concurrent></subsystem>
为了创建我们的第一个示例,我们从容器的JNDI上下文中检索ManagedExecutorService,如下所示:
@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;
通过使用ManagedExecutorService实例,您可以提交可以实现java.lang.Runnable
接口或java.util.concurrent.Callable
接口的任务。
Callable接口提供了一种call()
方法,该方法可以返回任何泛型类型,而不是使用run()
方法。
编写一个简单的异步任务
因此,让我们看一个简单的Servlet示例,该示例使用ManagedExecutorService触发异步任务:
@WebServlet("/ExecutorServlet")public class ExecutorServlet extends HttpServlet {@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter writer = response.getWriter(); executor.execute(new SimpleTask()); writer.write("Task SimpleTask executed! check logs"); }}
在我们的示例中,类SimpleTask
通过提供并发执行来实现Runnable
接口。
public class SimpleTask implements Runnable {@Overridepublic void run() {System.out.println("Thread started.");}}
从异步任务中检索结果
上述任务是脚踏实地的好选择; 您可能已经注意到,无法拦截Task的返回值。 另外,在使用Runnable时,您必须使用不受限制的异常(如果run( )抛出了一个已检查的异常,谁会捕获它呢?)您无法将run()调用封装在处理程序中,因为您没有编写调用它的代码)。
如果你想克服这个限制,那么你可以实现一个java.util.concurrent.Callable
接口相反,它提交给ExecutorService的,并与等待结果FutureTask.isDone()
的返回ExecutorService.submit()
让我们看一下Servlet的新版本,它捕获了一个名为CallableTask
的Task的结果:
@WebServlet("/CallableExecutorServlet")public class CallableExecutorServlet extends HttpServlet {@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {PrintWriter writer = response.getWriter();Future<Long> futureResult = executor.submit(new CallableTask(5)); while (!futureResult.isDone()) {// Waittry {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}try {writer.write("Callable Task returned " +futureResult.get());} catch ( Exception e) {e.printStackTrace();} }}
从代码中可以看到,我们正在使用isDone( )方法轮询任务完成情况。 任务完成后,我们可以调用FutureTask的get( )方法并获取返回值。
现在让我们看一下我们的CallableTask
实现,在我们的示例中,该实现返回一个数字总和的值:
public class CallableTask implements Callable<Long> {private int id;public CallableTask(int id) {this.id = id;}public Long call() {long summation = 0;for (int i = 1; i <= id; i++) {summation += i;}return new Long(summation);}}
在我们的示例中,我们要做的就是实现call
方法,该方法返回Integer,该Integer最终将通过Future接口的get
方法来收集。
如果您的Callable任务引发了Exception,则FutureTask.get()
也将引发Exception,并且可以使用Exception.getCause()
来访问原始Exception。
监视未来任务的状态
在上面的示例中,我们正在使用FutureTask.isDone()
方法检查Future Task的状态。 如果需要对Future Task生命周期进行更准确的控制,则可以实现javax.enterprise.concurrent.ManagedTaskListener
实例,以接收生命周期事件通知。
这是我们增强的Task,它实现了taskSubmitting
, taskStarting
, taskDone
和taskAborted
方法:
public class CallableListenerTask implements Callable<Long>,ManagedTaskListener {private int id;public CallableListenerTask(int id) {this.id = id;}public Long call() {long summation = 0;for (int i = 1; i <= id; i++) {summation += i;}return new Long(summation);}public void taskSubmitted(Future<?> f, ManagedExecutorService es,Object obj) {System.out.println("Task Submitted! "+f);}public void taskDone(Future<?> f, ManagedExecutorService es, Object obj,Throwable exc) {System.out.println("Task DONE! "+f);}public void taskStarting(Future<?> f, ManagedExecutorService es,Object obj) {System.out.println("Task Starting! "+f);}public void taskAborted(Future<?> f, ManagedExecutorService es,Object obj, Throwable exc) {System.out.println("Task Aborted! "+f);}}
生命周期通知按以下顺序调用:
-
taskSubmitting
:关于将任务提交给执行者 -
taskStarting
:在实际启动任务之前 -
taskDone
:在任务完成时触发 -
taskAborted
:当用户调用futureResult.cancel()时触发
在异步任务中使用事务
在分布式Java EE环境中,要确保并发任务执行也能正确执行事务,这是一项艰巨的任务。 Java EE并发API依靠Java事务API(JTA)通过javax.transaction.UserTransaction
来支持其组件顶部的事务,该javax.transaction.UserTransaction
用于显式划分事务边界。
以下代码显示可调用任务如何从JNDI树检索UserTransaction,然后启动并提交与外部组件(EJB)的事务:
public class TxCallableTask implements Callable<Long> {long id;public TxCallableTask(long i) {this.id = i;}public Long call() {long value = 0;UserTransaction tx = lookupUserTransaction();SimpleEJB ejb = lookupEJB();try {tx.begin();value = ejb.calculate(id); // Do Transactions heretx.commit();} catch (Exception e) {e.printStackTrace();try { tx.rollback(); } catch (Exception e1) { e1.printStackTrace(); }}return value;}// Lookup EJB and UserTransaction here ..}
这种方法的主要局限性在于,尽管上下文对象可以开始,提交或回滚事务,但是这些对象无法加入父组件事务。
使用ManagedScheduledExecutorService安排任务
ManagedScheduledExecutorService
扩展了Java SE ScheduledExecutorService
以提供用于提交延迟或定期任务以在Java EE环境中执行的方法。 至于其他托管对象,您可以通过JNDI查找获得ExecutorService的实例:
@Resource(name ="DefaultManagedScheduledExecutorService")
ManagedScheduledExecutorService scheduledExecutor;
一旦有了对ExecutorService的引用,便可以在其上调用schedule
方法以提交延迟或定期的任务。 就像ManagedExecutors一样,ScheduledExecutors也可以绑定到Runnable接口或Callable
接口。 下一节将介绍这两种方法。
提交一个简单的ScheduledTask
以最简单的形式提交计划任务需要设置计划表达式并将其传递给ManagedSchedulerExecutor服务。 在此示例中,由于调用了schedule( )方法,我们将创建一个延迟的任务,该任务仅在10秒内运行一次:
@WebServlet("/ScheduledExecutor")
public class ScheduledExecutor extends HttpServlet {@Resource(name ="DefaultManagedScheduledExecutorService")ManagedScheduledExecutorService scheduledExecutor;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {PrintWriter writer = response.getWriter(); ScheduledFuture<?> futureResult = scheduledExecutor.schedule(new SimpleTask(), 10,TimeUnit.SECONDS);writer.write("Waiting 10 seconds before firing the task");}}
如果需要重复计划任务,则可以使用scheduleAtFixedRate
方法,该方法将触发任务之前的时间,每次重复执行之前的时间和TimeUnit作为输入。 请参阅以下示例,该示例在初始延迟1秒后每10秒秒调度一次任务:
ScheduledFuture<?> futureResult = scheduledExecutor. scheduleAtFixedRate (new SimpleTask(),1, 10,TimeUnit.SECONDS);
捕获计划执行的结果
如果需要从计划执行的任务中获取返回值,则可以使用schedule方法返回的ScheduledFuture
接口。 这是一个示例,它捕获了我们先前编码的阶乘示例Task的结果:
ScheduledFuture<Long> futureResult =scheduledExecutor.schedule(new CallableTask(5), 5, TimeUnit.SECONDS); while (!futureResult.isDone()) { try {Thread.sleep(100); // Wait} catch (InterruptedException e) { e.printStackTrace();}} try {writer.write("Callable Task returned " +futureResult.get());} catch ( Exception e) {e.printStackTrace();}
使用ManagedThreadFactory创建托管线程
javax.enterprise.concurrent.ManagedThreadFactory
等效于J2SE ThreadFactory,可用于创建自己的线程。 为了使用ManagedThreadFactory,您需要照常从JNDI注入它:
@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory;
从工厂创建自己的托管线程(与ManagedExecutorService创建的托管线程相比)的主要优点是,您可以设置一些典型的线程属性(例如名称或优先级),并且可以创建J2SE Executor服务的托管版本。 以下示例将向您展示如何。
从工厂创建托管线程
在此示例中,我们将使用DefaultManagedThreadFactory
创建并启动新的线程。 从代码中可以看到,一旦我们创建了Thread类的实例,就可以为其设置有意义的名称并将其与优先级相关联。 然后,我们将线程与我们的SimpleTask关联,后者在控制台上记录一些数据:
@WebServlet("/FactoryExecutorServlet")public class FactoryExecutorServlet extends HttpServlet {@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {PrintWriter writer = response.getWriter();Thread thread = factory.newThread(new SimpleTask());thread.setName("My Managed Thread");thread.setPriority(Thread.MAX_PRIORITY);thread.start();writer.write("Thread started. Check logs");}}
现在检查服务器日志:毫无疑问,检测自己创建的线程的输出会更容易:
14:44:31,838 INFO [stdout] (My Managed Thread) Simple Task started
在分析线程转储时,收集有关线程名称的信息特别有用,并且线程名称是跟踪线程执行路径的唯一线索。
使用托管执行器服务
java.util.concurrent.ExecutorService
接口是一种标准的J2SE机制,已极大地取代了使用直接线程来执行异步执行的机制。 与标准Thread机制相比,ExecutorService的主要优点之一是您可以定义一个实例池来执行您的作业,并且可以使用一种更安全的方式来中断您的作业。
在企业应用程序中使用ExecutorService很简单:只需将Managed ThreadFactory
的实例传递给ExecutorService
的构造函数即可。 在以下示例中,我们使用SingletonEJB在其方法getThreadPoolExecutor
中将ExecutorService作为服务提供:
@Singletonpublic class PoolExecutorEJB {private ExecutorService threadPoolExecutor = null;int corePoolSize = 5;int maxPoolSize = 10;long keepAliveTime = 5000;@Resource(name = "DefaultManagedThreadFactory")ManagedThreadFactory factory;public ExecutorService getThreadPoolExecutor() {return threadPoolExecutor;}@PostConstructpublic void init() { threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10), factory); }@PreDestroypublic void releaseResources() {threadPoolExecutor.shutdown(); }}
ThreadPoolExecutor在其构造函数中包含两个核心参数: corePoolSize
和maximumPoolSize
。 当在方法中提交新任务且运行的线程数少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。 如果运行的线程数大于corePoolSize但小于maximumPoolSize,则仅在队列已满时才创建新线程。
然后,如以下示例所示, ExecutorService
用于启动新的异步任务,其中在Servlet中提供了Runnable的匿名实现:
@WebServlet("/FactoryExecutorServiceServlet")
public class FactoryExecutorServiceServlet extends HttpServlet {@EJB PoolExecutorEJB ejb;protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {final PrintWriter writer = response.getWriter();writer.write("Invoking ExecutorService. Check Logs.");ExecutorService executorService = ejb.getThreadPoolExecutor();executorService.execute(new Runnable() {public void run() {System.out.println("Message from your Executor!");}});}}
一旦PoolExecutorEJB
终止,ExecutorService也将在Singleton Bean的@PreDestroy
方法中完成,该方法将调用ThreadPoolExecutor的shutdown()
方法。 ExecutorService不会立即关闭,但是将不再接受新任务,并且一旦所有线程都完成了当前任务,ExecutorService就会关闭。
使用动态上下文对象
动态代理是一种有用的Java调整,可用于使用java.lang.reflect.Proxy
API创建接口的动态实现。 您可以将动态代理用于各种不同的目的,例如数据库连接和事务管理,用于单元测试的动态模拟对象以及其他类似于AOP的方法拦截目的。
在Java EE环境中,可以使用一种称为动态上下文代理的特殊类型的动态代理 。
动态上下文对象最有趣的功能是将JNDI命名上下文,类加载器和安全性上下文传播 到代理对象 。 在将J2SE实施引入企业应用程序并希望在容器的上下文中运行它们的情况下,这很有用。
以下代码段显示了如何将上下文对象注入到容器中。 由于上下文对象还需要您可以向其提交任务的ExecutorService,因此还会注入ThreadFactory:
@Resource(name ="DefaultContextService")ContextService cs;@Resource(name ="DefaultManagedThreadFactory")ManagedThreadFactory factory;
在下一节中,我们将展示如何使用修订版的Singleton EJB创建动态上下文对象。
执行上下文任务
以下示例显示了如何为Callable
任务触发上下文代理。 为此,我们将同时需要ManagedThreadfactory和ContextService。 我们的ContextExecutor EJB最初将在其init
方法中创建ThreadPoolExecutor。 然后,在Submit方法内,创建可调用任务的新上下文代理,并将其提交给ThreadPool执行器。
这是我们的ContextExecutorEJB
的代码:
@Singletonpublic class ContextExecutorEJB {private ExecutorService threadPoolExecutor = null;@Resource(name = "DefaultManagedThreadFactory")ManagedThreadFactory factory;@Resource(name = "DefaultContextService")ContextService cs;public ExecutorService getThreadPoolExecutor() {return threadPoolExecutor;}@PostConstructpublic void init() {threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,new ArrayBlockingQueue>Runnable>(10), factory);}public Future>Long> submitJob(Callable>Long> task) {Callable>Long> proxy = cs.createContextualProxy(task, Callable.class);return getThreadPoolExecutor().submit(proxy);}}
CallableTask类比我们的第一个示例复杂一点,因为它将记录有关javax.security.auth.Subject
信息,该信息包含在调用者线程中:
public class CallableTask implements Callable<Long> {private int id;public CallableTask(int id) {this.id = id;}public Long call() {long summation = 0;// Do calculationSubject subject = Subject.getSubject(AccessController.getContext());logInfo(subject, summation); // Log Traces Subject identityreturn new Long(summation);}private void logInfo(Subject subject, long summation) { . . }}
以下是向我们的SingletonEJB提交新的上下文任务的简单方法:
@EJB ContextExecutorEJB ejb; protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { CallableTask task = new CallableTask(5);ejb.submitJob(task);}
建立你的例子
为了对Java EE API使用并发实用程序,您在应用程序中需要以下Maven依赖项:
<dependency><groupId>org.jboss.spec.javax.enterprise.concurrent</groupId><artifactId>jboss-concurrency-api_1.0_spec</artifactId><version>1.0.0.Final</version></dependency>
此摘录摘自《 WildFly上的实用Java EE 7开发 》一书,该手册是动手实践指南,其中介绍了最新WildFly应用程序服务器上Java EE 7开发的所有领域。 涵盖了从基础组件(EJB,Servlet,CDI,JPA)到Java Enterprise Edition 7中定义的新技术堆栈的所有内容,因此包括新的Batch API,JSON-P Api,并发API,Web套接字,JMS 2.0 API,核心Web服务堆栈(JAX-WS,JAX-RS)。 带有Arquillian框架和Security API的测试区域完成了本书中讨论的主题列表。
翻译自: https://www.javacodegeeks.com/2014/07/java-ee-concurrency-api-tutorial.html