消息传递是用于构建不同级别的分布式软件系统的极其强大的工具。 通常,至少在Java生态系统中,客户端(前端)从不直接与消息代理(或交换)进行交互,而是通过调用服务器端(后端)服务来进行交互。 否则,客户甚至可能不知道已安装了消息传递解决方案。
随着Websockets越来越被采用,对诸如STOMP (用于与消息代理或交换进行通信)之类的面向文本的协议的广泛支持将产生影响。 今天的帖子将尝试解释公开两个非常流行的JMS实现(Apache ActiveMQ和JBoss HornetQ)是多么简单,这两个Web前端(JavaScript)可以使用基于Websockets的 STOMP来用于Web前端(JavaScript)。
在深入研究代码之前,可能有人认为这样做不是一个好主意。 那目的是什么? 答案确实取决于:
- 您正在开发原型/概念证明,并且需要简单的方法来集成发布/订阅或对等消息传递
- 您不需要/不需要构建复杂的体系结构,而最简单的解决方案就足够了
可扩展性,故障转移和许多其他非常重要的决定在这里没有考虑,但是如果您正在开发健壮和有弹性的体系结构,则绝对应该考虑。
因此,让我们开始吧。 与往常一样,最好从我们要解决的问题开始:我们想开发一个简单的发布/订阅解决方案,使用JavaScript编写的Web客户端能够发送消息并侦听特定主题。 只要收到任何消息,客户端就会显示简单的警报窗口。 请注意,我们需要使用支持Websocket的现代浏览器,例如Google Chrome或Mozilla Firefox 。
对于我们的两个示例,客户端的代码均保持不变,因此我们从此开始。 最佳起点是STOMP Over WebSocket文章,其中介绍了stomp.js模块,这是我们的index.html :
<script src="stomp.js"></script><script type="text/javascript">var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp" );client.connect( "", "",function() {client.subscribe("jms.topic.test",function( message ) {alert( message );}, { priority: 9 } );client.send("jms.topic.test", { priority: 9 }, "Pub/Sub over STOMP!");});</script>
非常简单的代码,但很少有细节值得解释。 首先,我们正在ws:// localhost:61614 / stomp寻找Websockets端点。 这足以进行本地部署,但最好用真实IP地址或主机名替换localhost 。 其次,一旦连接,客户端就订阅该主题(仅对优先级为9的消息感兴趣),并在此之后立即将消息发布到该主题。 从客户的角度来看,我们已经完成了。
让我们继续进行消息代理,列表中的第一个是Apache ActiveMQ 。 为了简化示例,我们将不使用配置XML文件将Apache ActiveMQ代理嵌入到简单的Spring应用程序中。 由于源代码在GitHub上可用 ,因此我将跳过POM文件片段,仅显示代码:
package com.example.messaging;import java.util.Collections;import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class AppConfig {@Bean( initMethod = "start", destroyMethod = "stop" )public BrokerService broker() throws Exception {final BrokerService broker = new BrokerService(); broker.addConnector( "ws://localhost:61614" ); broker.setPersistent( false );broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );broker.setDestinations( new ActiveMQDestination[] { topic } );final ManagementContext managementContext = new ManagementContext();managementContext.setCreateConnector( true );broker.setManagementContext( managementContext );return broker;}
}
如我们所见, ActiveMQ代理配置有ws:// localhost:61614连接器,该连接器假定使用STOMP协议。 另外,我们正在创建名称为jms.topic.test的 JMS主题,并启用JMX管理工具。 并运行它,简单的Starter类:
package com.example.messaging;import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class Starter {public static void main( String[] args ) {ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );}
}
现在,启动并运行它,让我们在浏览器中打开index.html文件,我们应该看到类似以下内容:
简单! 对于好奇的读者, ActiveMQ使用Jetty 7.6.7.v20120910来支持Websockets,并且不能与最新的Jetty发行版一起使用。
接下来,就HornetQ而言 ,实现看起来有些不同,尽管也不是很复杂。 由于Starter类保持不变,因此唯一的变化是配置:
package com.example.hornetq;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class AppConfig {@Bean( initMethod = "start", destroyMethod = "stop" )public EmbeddedJMS broker() throws Exception {final ConfigurationImpl configuration = new ConfigurationImpl();configuration.setPersistenceEnabled( false );configuration.setJournalType( JournalType.NIO );configuration.setJMXManagementEnabled( true );configuration.setSecurityEnabled( false );final Map< String, Object > params = new HashMap<>();params.put( TransportConstants.HOST_PROP_NAME, "localhost" );params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );params.put( TransportConstants.PORT_PROP_NAME, "61614" );final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );configuration.getAcceptorConfigurations().add( stomp );configuration.getConnectorConfigurations().put( "stomp_ws", stomp );final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );final JMSConfiguration jmsConfig = new JMSConfigurationImpl();jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );jmsConfig.getTopicConfigurations().add( topicConfig );final EmbeddedJMS jmsServer = new EmbeddedJMS();jmsServer.setConfiguration( configuration );jmsServer.setJmsConfiguration( jmsConfig );return jmsServer;}
}
完整的源代码在GitHub上 。 在运行Starter类并在浏览器中打开index.html之后,我们应该看到非常相似的结果:
HornetQ配置看起来更加冗长,但是除了出色的Netty框架之外,没有涉及其他依赖项。
出于好奇,我将ActiveMQ代理替换为Apollo实现。 尽管我成功实现了预期的功能,但我发现该API非常麻烦,至少在当前版本1.6中如此,因此本文中没有涉及它。
翻译自: https://www.javacodegeeks.com/2013/09/easy-messaging-with-stomp-over-websockets-using-activemq-and-hornetq.html