实现自定义的未来

上一次我们学习了java.util.concurrent.Future<T>背后的原理 。 我们还发现, Future<T>通常由库或框架返回。 但是没有什么可以阻止我们在有意义的情况下自行实现所有功能。 它不是特别复杂,可以显着改善您的设计。 我尽力为我们的示例选择有趣的用例。

JMS(Java消息服务)是用于发送异步消息的标准Java API。 当我们想到JMS时​​,我们立即看到客户端以一发不可收拾的方式向服务器(代理)发送消息。 但是在JMS之上实现请求-答复消息传递模式同样普遍。 实现非常简单:您将请求消息(当然是异步地)发送到另一侧的MDB。
MDB处理该请求,然后将答复发送回硬编码的答复队列或客户机选择的任意队列,并与JMSReplyTo属性中的消息一起发送。 第二种情况更有趣。 客户端可以创建一个临时队列,并在发送请求时将其用作回复队列。 这样,每个请求/答复对使用不同的答复队列,因此不需要关联ID,选择器等。

但是有一个问题。 向JMS代理发送消息是简单且异步的。 但是,收到答复要麻烦得多。 您可以实现MessageListener以使用一条消息,也可以使用阻塞MessageConsumer.receive() 。 第一种方法非常笨重,很难在实践中使用。 第二个失败了异步消息传递的目的。 您还可以按一定的时间间隔轮询答复队列,这听起来更糟。

到现在为止,了解Future抽象您应该有一些设计想法。 如果我们可以发送请求消息并取回Future<T> (表示尚未收到的答复消息)怎么办? Future抽象应该处理所有逻辑,我们可以放心地将其用作将来结果的句柄。 这是用于创建临时队列和发送请求的管道代码:

private <T extends Serializable> Future<T> asynchRequest(ConnectionFactory connectionFactory, Serializable request, String queue) throws JMSException {Connection connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);final Queue tempReplyQueue = session.createTemporaryQueue();final ObjectMessage requestMsg = session.createObjectMessage(request);requestMsg.setJMSReplyTo(tempReplyQueue);sendRequest(session.createQueue(queue), session, requestMsg);return new JmsReplyFuture<T>(connection, session, tempReplyQueue);
}

asynchRequest()方法仅将ConnectionFactory带到JMS代理和任意数据。 该对象将使用ObjectMessage发送到queue 。 最后一行至关重要–我们返回自定义的JmsReplyFuture<T> ,它将表示尚未收到的回复。 注意,我们如何将临时JMS队列传递给JMSReplyTo属性和Future 。 MDB方面的实现并不那么重要。 不用说是将回复发送回指定队列:

final ObjectMessage reply = session.createObjectMessage(...);
session.createProducer(request.getJMSReplyTo()).send(reply);

因此,让我们深入研究JmsReplyFuture<T> 。 我假设请求和答复都是ObjectMessage 。 使用不同类型的消息不是很困难。 首先让我们看看如何设置从回复通道接收消息:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {//...public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic void onMessage(Message message) {//...}}

如您所见, JmsReplyFuture实现了Future<T> (其中T是包装在ObjectMessage的对象的预期类型)和JMS MessageListener 。 在构造函数中,我们只是开始侦听replyQueue 。 根据我们的设计假设,我们知道那里最多会有一条消息,因为回复队列是临时丢弃队列。 在上一篇文章中,我们了解到Future.get()应该在等待结果时阻塞。 另一方面, onMessage()是从某些内部JMS客户端线程/库调用的回调方法。 显然,我们需要一些共享变量/锁,以使等待中的get()知道答复已到达。 最好我们的解决方案应该是轻量级的,并且不引入任何延迟,因此忙于等待volatile变量是一个坏主意。 最初,我虽然使用了Semaphore ,但我将使用它来取消阻塞onMessage() get() onMessage() 。 但是我仍然需要一些共享变量来保存实际的回复对象。 因此,我想到了使用ArrayBlockingQueue的想法。 当我们知道不会再有一个项目时,使用队列听起来可能很奇怪。 但是它利用旧的生产者-消费者模式很好地工作: Future.get()是一个消费者,它阻塞了空队列的poll()方法。 另一方面, onMessage()是生产者,将回复消息放置在该队列中并立即取消阻塞消费者。 外观如下:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);//...@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);//...}}

实施仍未完成,但涵盖了最重要的概念。 请注意, BlockingQueue.poll(long, TimeUnit)方法非常适合Future.get(long, TimeUnit) 。 不幸的是,即使它们来自相同的程序包并且在相同的时间内或多或少地被开发,一种方法在超时时返回null ,而另一种方法应引发异常。 易于修复。

还要注意onMessage()的实现变得多么容易。 我们只是将新收到的消息放在BlockingQueue reply ,而集合将为我们完成所有同步。 我们仍然缺少一些不太重要但仍然重要的细节–取消和清理。 无需赘述,下面是一个完整的实现:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private static enum State {WAITING, DONE, CANCELLED}private final Connection connection;private final Session session;private final MessageConsumer replyConsumer;private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);private volatile State state = State.WAITING;public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {try {state = State.CANCELLED;cleanUp();return true;} catch (JMSException e) {throw Throwables.propagate(e);}}@Overridepublic boolean isCancelled() {return state == State.CANCELLED;}@Overridepublic boolean isDone() {return state == State.DONE;}@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {try {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);state = State.DONE;cleanUp();} catch (Exception e) {throw Throwables.propagate(e);}}private void cleanUp() throws JMSException {replyConsumer.close();session.close();connection.close();}
}

我使用特殊的State枚举来保存有关状态的信息。 与基于多个标志, null检查等的复杂条件相比,我发现它更具可读性。要记住的第二件事是取消。 幸运的是,它非常简单。 我们基本上关闭了基础会话/连接。 在整个请求/答复消息交换的整个过程中,它必须保持打开状态,否则临时JMS答复队列将消失。 请注意,我们不能轻易通知经纪人/ MDB我们对答复不再感兴趣。 我们只是停止监听它,但是MDB仍将处理请求并尝试将答复发送到不再存在的临时队列。

那么这一切在实践中看起来如何? 假设我们有一个MDB接收一个数字并返回一个平方。 假设计算需要一点时间,所以我们提前开始计算,同时做一些工作,然后再取回结果。 这样的设计如下所示:

final Future<Double> replyFuture = asynchRequest(connectionFactory, 7, "square");
//do some more work
final double resp = replyFuture.get();      //49

其中"square"是请求队列的名称。 如果我们重构它并使用依赖注入,我们可以将其进一步简化为:

final Future<Double> replyFuture = calculator.square(7);
//do some more work
final double resp = replyFuture.get();      //49

您知道该设计的最佳选择吗? 即使我们正在利用相当先进的JMS功能,此处也没有JMS代码。 此外,我们稍后可以使用SOAP或GPU将calculator替换为其他实现。 就客户端代码而言,我们仍然使用Future<Double>抽象。 尚未提供计算结果。 根本的机制是无关紧要的。 那就是抽象的美。

显然,此实现尚未准备好生产(到目前为止)。 但更糟糕的是,它缺少一些基本功能。 我们仍然在某个时候调用阻塞Future.get() 。 而且,无法组成/链接期货(例如, 当响应到达时,发送另一条消息 )或等待最快的期货完成。 耐心一点!

参考:在NoBlogDefFound博客上,从我们的JCG合作伙伴 Tomasz Nurkiewicz 实现自定义Future 。

翻译自: https://www.javacodegeeks.com/2013/02/implementing-custom-future.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/369414.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c语言中的两个百分号什么意思,百分号的用法,特别是在两个量词之间的用法,例如50%—70%和50—70%...-百分号-语文-彭都宰同学...

概述&#xff1a;本道作业题是彭都宰同学的课后练习&#xff0c;分享的知识点是百分号&#xff0c;指导老师为屠老师&#xff0c;涉及到的知识点涵盖&#xff1a;百分号的用法&#xff0c;特别是在两个量词之间的用法&#xff0c;例如50%—70%和50—70%...-百分号-语文&#xf…

Markdown 语法和 MWeb 写作使用说明

# Markdown 语法和 MWeb 写作使用说明 ## Markdown 的设计哲学 > Markdown 的目標是實現「易讀易寫」。> 不過最需要強調的便是它的可讀性。一份使用 Markdown 格式撰寫的文件應該可以直接以純文字發佈&#xff0c;並且看起來不會像是由許多標籤或是格式指令所構成。>…

微信小程序 引入公共页面的几种情况

1、不带参数 首先在pages文件夹中新建一个template文件夹&#xff0c;文件夹中新建一个template.wxml文件&#xff0c;代码如下 <!--template.wxml--> <template name"msgItem"><view><text>This is template.wxml文件&#xff0c;我是一个…

Python学习笔记----基础篇10----模块2

8&#xff09;json& pickle 用于序列化的两个模块 json&#xff0c;用于处理字符串和python数据类型间进行转换 pickle&#xff0c;用于python特有的类型和python的数据类型间进行站换 Json模块提供了四个功能&#xff1a;dumps、dump、loads、load pickle模块提供了四个功…

易语言自定义数据类型转c,一步一步跟我学易语言之自定义数据类型

自定义数据类型什么是“自定义数据类型”&#xff1f;顾名思义&#xff0c;就是用户可以随时在程序中自行定义新的数据类型。自定义数据类型时需要设置数据类型的名称及其成员。数据类型成员各属性的设置方法等同于变量设置时相应属性的设置方法。双击“程序”中的“自定义数据…

(第2部分,共3部分):有关性能调优,Java中的JVM,GC,Mechanical Sympathy等的文章和视频的摘要...

这是以前的文章&#xff08;第3部分&#xff0c;共1部分&#xff09;的继续&#xff1a;有关性能调优&#xff0c;Java中的JVM&#xff0c;GC&#xff0c;Mechanical Sympathy等的文章和视频的提要 。 事不宜迟&#xff0c;让我们开始使用我们的下一组博客和视频&#xff0c;印…

Redis初步整理

1&#xff0c;Redis 简介 Redis 是完全开源免费的&#xff0c;遵守BSD协议&#xff0c;是一个高性能的key-value数据库。 Redis 与其他 key - value 缓存产品有以下三个特点&#xff1a; Redis支持数据的持久化&#xff0c;可以将内存中的数据保持在磁盘中&#xff0c;重启的时…

阶段十-物业项目

可能遇到的错误&#xff1a; 解决jdk17javax.xml.bind.DatatypeConverter错误 <!--解决jdk17javax.xml.bind.DatatypeConverter错误--><dependency><groupId>javax.xml.bind</groupId><artifactId>jaxb-api</artifactId><version>…

echarts中triggeron与trigger不能同时出现吗_好物|痛风、血糖高、虚不受补能吃它吗?你想知道的阿胶十问十答一锅出!...

最近百草君在整理粉丝留言的时候&#xff0c;发现关于阿胶四物膏的留言不少&#xff0c;并且有重复问题。百草君特意整理出来几个粉丝们特别关注的问题&#xff0c;给大家统一解答&#xff0c;顺序不分前后&#xff1a;Q1阿胶四物膏什么口感&#xff0c;甜不甜&#xff1f;阿胶…

【移动端 Web】怎么循序渐进地开发一个移动端页面

1. 移动页面开发基础 1.1 像素——什么是像素 像素是 Web 页面布局的基础&#xff0c;那么到底什么才是一个像素呢&#xff1f; 像素&#xff1a;一个像素就是计算机屏幕所能显示一种特定颜色的最小区域。这是像素的概念&#xff0c;实际上&#xff0c;Web 前端开发领域&…

带有Spring的JavaFX 2

我将从一个大胆的声明开始&#xff1a;我一直很喜欢Java Swing或applet。 在那里&#xff0c;我说了。 如果我进行一些自我分析&#xff0c;那么这种钦佩可能是在我入门Java时开始的。 Swing&#xff08;实际上&#xff09;是我使用Java所做的第一件事&#xff0c;它给出了一些…

c语言怎么样文件存储数据,急求如何将下列C语言程序数据存储到文件中?

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼求如何改动才能将下列程序的存储输入或输出数据(或两者一起)到指定的文件(或运行时直接创立一个文件)如Arrangement中。#include int n0;int rest[7][7]; //全局声明,以供全局调用int main(){void perm(int list[],int ,int );int …

PHP面试题之优化

* PHP性能问题 * 1.PHP语法使用的不恰当 * 2.使用PHP语言做了它不擅长的事 * 3.使用PHP语言链接的服务不给力 * 4.PHP自身做不了的事情 * * PHP的性能问题的解决方向 * 1.PHP语言级的性能优化(代码优化) * 2.PHP周边问题的性能优化(linux,mysql,磁盘等) * 3.PHP语言性能的优化…

3.4 内置函数(1)

3.4转载于:https://www.cnblogs.com/Jermy/articles/10940658.html

c++ 分页展示_分合相宜 Excel透视报表生成分页和汇总报表

我们经常要利用Excel生成指定类型的报表&#xff0c;但是很多报表原始数据是混杂在一起&#xff0c;或者是分布在各个子表中。现在利用Excel透视报表的功能&#xff0c;我们可以快速将混杂的数据分离为分页报表&#xff0c;或者将独立子表整合为汇总报表。简单分页&#xff0c;…

二分法在数组内查找数c语言,C++二分法在数组中查找关键字的方法

本文实例讲述了C二分法在数组中查找关键字的方法。分享给大家供大家参考。具体如下&#xff1a;/*此程序演示了二分法查找算法(针对按从小到大排列的数组)的实现。*/#include using namespace std;/*功能&#xff1a; 实现数组的二分法查找(只算法只适合按从小到大排列的数组)返…

(第1部分,共3部分):有关性能调优,Java中的JVM,GC,Mechanical Sympathy等的文章和视频的摘要...

我已经花了几个月的时间考虑审查有关性能调优&#xff0c;JVM&#xff0c;Java中的GC&#xff0c;Mechanical Sympathy等主题的文章和视频的缓存&#xff0c;并最终花了点时间–也许这就是重点我什么时候需要做我的智力进步&#xff01; 感谢Attila-Mihaly给我提供了为其年度时…

【springBoot】之定制Banner

springboot启动时控制台打印图案如下: 1、假如我们不想看到这个图案 public static void main(String[] args) {SpringApplication applicationnew SpringApplication(Application.class);/*** OFF G关闭* CLOSED 后台控制台输出&#xff0c;默认就是这种* LOG 日志输出*/appli…

Web前端体系的脉络结构

Web前端技术由 html、css 和 javascript 三大部分构成&#xff0c;是一个庞大而复杂的技术体系&#xff0c;其复杂程度不低于任何一门后端语言。而我们在学习它的时候往往是先从某一个点切入&#xff0c;然后不断地接触和学习新的知识点&#xff0c;因此对于初学者很难理清楚整…

无法访问netflix服务_Choerodon 的微服务之路(三):服务注册与发现

本文是 Choerodon 的微服务之路系列推文第三篇。在上一篇《Choerodon的微服务之路&#xff08;二&#xff09;&#xff1a;微服务网关》中&#xff0c;介绍了Choerodon 在搭建微服务网关时考虑的一些问题以及两种常见的微服务网关模式&#xff0c;并且通过代码介绍了Choerodon …