DeferredResult
是一个可能尚未完成的计算的容器,它将在将来提供。 Spring MVC使用它来表示异步计算,并利用Servlet 3.0 AsyncContext
异步请求处理。 简要介绍一下它是如何工作的:
@RequestMapping("/")
@ResponseBody
public DeferredResult<String> square() throws JMSException {final DeferredResult<String> deferredResult = new DeferredResult<>();runInOtherThread(deferredResult);return deferredResult;
}private void runInOtherThread(DeferredResult<String> deferredResult) {//seconds later in other thread...deferredResult.setResult("HTTP response is: 42");
}
通常,一旦离开控制器处理程序方法,请求处理即告完成。 但不能使用DeferredResult
。 Spring MVC(使用Servlet 3.0功能)将继续响应,并保持空闲HTTP连接。 HTTP工作线程不再使用,但HTTP连接仍处于打开状态。 稍后,其他线程将通过为其分配一些值来解析DeferredResult
。 Spring MVC将立即拾取此事件并将响应(在此示例中为“ HTTP响应:42” )发送到浏览器,从而完成请求处理。
您可能会在Future<V>
和DeferredResult
之间看到一些概念上的相似性–它们都代表计算,并且在将来的某个时间可用。 您可能想知道,为什么Spring MVC不允许我们简单地返回Future<V>
而是引入了新的专有抽象? 原因很简单,再次显示出Future<V>
缺陷。 异步处理的全部要点是避免阻塞线程。 标准的java.util.concurrent.Future
不允许在计算完成后注册回调-因此,您要么需要分配一个线程来阻塞直到将来完成,要么使用一个线程来定期轮询多个未来。 但是,后一种选择会消耗更多的CPU并引入延迟。 但是来自番石榴的 出色ListenableFuture<V>
似乎很合适? 的确如此,但是Spring没有依赖于Guava,幸好将这两个API桥接起来非常简单。
但是首先请看一下实现自定义java.util.concurrent.Future<V>
上一部分。 诚然,这并不像人们期望的那么简单。 清理,处理中断,锁定和同步,维护状态。 当我们需要的一切都像接收一条消息并从get()
返回它一样简单时,就会有很多样板。 让我们尝试改造以前的JmsReplyFuture
实现,以实现更强大的ListenableFuture
,以便稍后在Spring MVC中使用它。
ListenableFuture
只是扩展了标准 Future
从而增加了注册回调(侦听器)的可能性。 因此,一个急切的开发人员只需坐下来,然后将Runnable
侦听器列表添加到现有实现中:
public class JmsReplyFuture<T extends Serializable> implements ListenableFuture<T>, MessageListener {private final List<Runnable> listeners = new ArrayList<Runnable>();@Overridepublic void addListener(Runnable listener, Executor executor) {listeners.add(listener);}//...
但这被大大简化了。 当然,当将来完成或发生异常时,我们必须遍历所有侦听器。 如果添加侦听器时未来已经解决,则必须立即调用该侦听器。 此外,我们忽略了executor
-根据API,每个侦听器都可以使用提供给addListener()
的不同线程池,因此我们必须存储对: Runnable
+ Executor
。 最后但并非最不重要的一点addListener()
不是线程安全的。 渴望的开发人员将在一两个小时内解决所有问题。 再花两个小时来修复同时引入的错误。 几小时后的另一个小时,生产中又弹出了另一个“不可能的”错误。 我不急。 事实上,即使是上面最简单的实现,我也懒得写。 但是我很拼命,要在ListenableFuture
上Ctrl
+ H
(在IntelliJ IDEA中的子类型视图)并浏览可用的骨骼实现树。 AbstractFuture<V>
–宾果游戏!
public class JmsReplyListenableFuture<T extends Serializable> extends AbstractFuture<T> implements MessageListener {private final Connection connection;private final Session session;private final MessageConsumer replyConsumer;public JmsReplyListenableFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;this.replyConsumer = session.createConsumer(replyQueue);this.replyConsumer.setMessageListener(this);}@Overridepublic void onMessage(Message message) {try {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();set((T) object);cleanUp();} catch (Exception e) {setException(e);}}@Overrideprotected void interruptTask() {cleanUp();}private void cleanUp() {try {replyConsumer.close();session.close();connection.close();} catch (Exception e) {Throwables.propagate(e);}}
}
就这样,一切都可以编译并运行。 与初始实现相比,代码减少了近2 ListenableFuture
并且我们获得了更强大的ListenableFuture
。 大部分代码已设置并清理。 AbstractFuture
已经为我们实现了addListener()
,锁定和状态处理。 我们要做的就是在解决未来时调用set()
方法(在我们的情况下,JMS答复到达)。 此外,我们最终会适当地支持异常。 以前我们只是简单地忽略/重新抛出它们,而现在它们在访问时已正确包装并从get()
抛出。 即使我们对ListenableFuture
功能不感兴趣, AbstractFuture
仍然对我们有很大帮助。 我们免费获得ListenableFuture
。
好的程序员喜欢编写代码。 更好的人喜欢删除它 。 更少维护,更少测试,更少破坏。 有时我会惊讶于番石榴能提供多大的帮助。 上一次我使用大量的迭代器代码。 数据是动态生成的,迭代器可以轻松生成数百万个项目,因此我别无选择。 有限的迭代器API和相当复杂的业务逻辑共同构成了无数管道代码。 然后我找到了Iterators
实用程序类 ,它拯救了我的生命。 我建议您打开Guava的JavaDoc并逐一检查所有软件包。 待会儿我会谢谢你的。
一旦有了自定义的ListenableFuture
(显然您可以使用任何实现),我们就可以尝试将其与Spring MVC集成。 这是我们要实现的目标:
- HTTP请求进来
- 我们向JMS队列发送请求
- HTTP工作线程不再使用,它可以处理其他请求
- JMS侦听器异步等待临时队列中的答复
- 回复到达后,我们立即将其作为HTTP响应推送并完成连接。
使用阻止Future
第一个天真的实现:
@Controller
public class JmsController {private final ConnectionFactory connectionFactory;public JmsController(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}@RequestMapping("/square/{value}")@ResponseBodypublic String square(@PathVariable double value) throws JMSException, ExecutionException, InterruptedException {final ListenableFuture<Double> responseFuture = request(value);return responseFuture.get().toString();}//JMS API boilerplateprivate <T extends Serializable> ListenableFuture<T> request(Serializable request) throws JMSException {Connection connection = this.connectionFactory.createConnection();connection.start();final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);final Queue tempReplyQueue = session.createTemporaryQueue();final ObjectMessage requestMsg = session.createObjectMessage(request);requestMsg.setJMSReplyTo(tempReplyQueue);sendRequest(session.createQueue("square"), session, requestMsg);return new JmsReplyListenableFuture<T>(connection, session, tempReplyQueue);}private void sendRequest(Queue queue, Session session, ObjectMessage requestMsg) throws JMSException {final MessageProducer producer = session.createProducer(queue);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);producer.send(requestMsg);producer.close();}}
这种实现不是很幸运。 事实上,我们根本不需要Future
,因为我们几乎没有阻塞get()
,而是同步等待响应。 让我们尝试DeferredResult
:
@RequestMapping("/square/{value}")
@ResponseBody
public DeferredResult<String> square(@PathVariable double value) throws JMSException {final DeferredResult<String> deferredResult = new DeferredResult<>();final ListenableFuture<Double> responseFuture = request(value);Futures.addCallback(responseFuture, new FutureCallback<Double>() {@Overridepublic void onSuccess(Double result) {deferredResult.setResult(result.toString());}@Overridepublic void onFailure(Throwable t) {deferredResult.setErrorResult(t);}});return deferredResult;
}
复杂得多,但可扩展性也更大。 该方法几乎不需要时间来执行,并且HTTP工作线程在准备处理另一个请求之后不久。 要做的最大观察是onSuccess()
和onFailure()
由另一个线程执行,几秒钟甚至几分钟之后。 但是,HTTP工作线程池并未耗尽,并且应用程序仍保持响应状态。
这是一个教科书的例子,但是我们可以做得更好吗? 首先尝试将通用适配器从ListenableFuture
写入DeferredResult
。 这两个抽象代表完全相同的事物,但是具有不同的API。 这很简单:
public class ListenableFutureAdapter<T> extends DeferredResult<String> {public ListenableFutureAdapter(final ListenableFuture<T> target) {Futures.addCallback(target, new FutureCallback<T>() {@Overridepublic void onSuccess(T result) {setResult(result.toString());}@Overridepublic void onFailure(Throwable t) {setErrorResult(t);}});}
}
我们只需扩展DeferredResult
并使用ListenableFuture
回调通知它。 用法很简单:
@RequestMapping("/square/{value}")
@ResponseBody
public DeferredResult<String> square(@PathVariable double value) throws JMSException {final ListenableFuture<Double> responseFuture = request(value);return new ListenableFutureAdapter<>(responseFuture);
}
但是我们可以做得更好! 如果ListenableFuture
和DeferredResult
非常相似,为什么ListenableFuture
从控制器处理程序方法中返回ListenableFuture
?
@RequestMapping("/square/{value}")
@ResponseBody
public ListenableFuture<Double> square2(@PathVariable double value) throws JMSException {final ListenableFuture<Double> responseFuture = request(value);return responseFuture;
}
好吧,这是行不通的,因为Spring无法理解ListenableFuture
并且只会ListenableFuture
。 幸运的是,Spring MVC非常灵活,它使我们能够轻松注册新的所谓的 HandlerMethodReturnValueHandler
。 有12个这样的内置处理程序,每当我们从控制器返回某个对象时,Spring MVC就会按预定义的顺序检查它们,然后选择第一个可以处理给定类型的对象。 这样的处理程序之一就是DeferredResultHandler
(名称说明了一切),我们将其用作参考:
public class ListenableFutureReturnValueHandler implements HandlerMethodReturnValueHandler {public boolean supportsReturnType(MethodParameter returnType) {Class<?> paramType = returnType.getParameterType();return ListenableFuture.class.isAssignableFrom(paramType);}public void handleReturnValue(Object returnValue,MethodParameter returnType, ModelAndViewContainer mavContainer,NativeWebRequest webRequest) throws Exception {if (returnValue == null) {mavContainer.setRequestHandled(true);return;}final DeferredResult<Object> deferredResult = new DeferredResult<>();Futures.addCallback((ListenableFuture<?>) returnValue, new FutureCallback<Object>() {@Overridepublic void onSuccess(Object result) {deferredResult.setResult(result.toString());}@Overridepublic void onFailure(Throwable t) {deferredResult.setErrorResult(t);}});WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);}}
用尽业力,安装此处理程序并不像我希望的那样简单。 从技术上讲,有WebMvcConfigurerAdapter.addReturnValueHandlers()
,如果对Spring MVC使用Java配置,我们可以轻松地覆盖它。 但是此方法在处理程序链的末尾添加了自定义返回值处理程序,并且出于超出本文讨论范围的原因,我们需要在其开头添加它(优先级更高)。 幸运的是,通过一点点黑客攻击,我们也可以实现:
@Configuration
@EnableWebMvc
public class SpringConfig extends WebMvcConfigurerAdapter {@Resourceprivate RequestMappingHandlerAdapter requestMappingHandlerAdapter;@PostConstructpublic void init() {final List<HandlerMethodReturnValueHandler> originalHandlers = new ArrayList<>(requestMappingHandlerAdapter.getReturnValueHandlers().getHandlers());originalHandlers.add(0, listenableFutureReturnValueHandler());requestMappingHandlerAdapter.setReturnValueHandlers(originalHandlers);}@Beanpublic HandlerMethodReturnValueHandler listenableFutureReturnValueHandler() {return new ListenableFutureReturnValueHandler();}}
摘要
在本文中,我们熟悉了称为DeferredResult
的将来/承诺抽象的另一种形式。 它用于推迟对HTTP请求的处理,直到完成一些异步任务。 因此, DeferredResult
对于基于事件驱动系统,消息代理等之上的Web GUI而言非常有用。尽管它不如原始Servlet 3.0 API强大。 例如,我们无法在长时间运行的HTTP连接中流式传输多个事件(例如,新的推文)时– Spring MVC的设计更多地是针对请求-响应模式。
我们还对Spring MVC进行了调整,以允许直接从控制器方法中从Guava中返回ListenableFuture
。 它使我们的代码更加简洁和富于表现力。
参考: DeferredResult –在我们的JCG合作伙伴 Tomasz Nurkiewicz的NoBlogDefFound博客中,Spring MVC中的异步处理 。
翻译自: https://www.javacodegeeks.com/2013/03/deferredresult-asynchronous-processing-in-spring-mvc.html