如您所知, AWS中的SQS SQS代表“简单队列服务”。 最近,在使用它的同时,我发现了将其称为“简单”的原因之一。 在之前的两篇文章( 此处和此处 )中,我展示了结合Spring Framework将SQS用作JMS队列提供程序 。 通过这个基本设置,我决定更进一步,并开始结合JMS(利用JMS属性'JMSReplyTo'和临时队列)一起尝试请求-响应模式 。 在这篇相当经典的文章中 ,很好地解释了它是如何工作的以及为什么这样工作。
为了显示它应该如何工作,我首先显示与Apache ActiveMQ一起使用的设置。 让我展示一下从队列中挑选消息,对内容执行操作并将答复发送回JMS标头中的JMSReplyTo的bean。 自从我使用Spring以来,这听起来比实际要难。 首先是Java代码:
package net.pascalalma.aws.sqs.requestresponse;import org.springframework.stereotype.Service;@Service
public class MyMessageService implements ResponsiveTextMessageDelegate {public String onMessage(String txt) {return String.valueOf(txt.length());}
}
我要说的是一门很简单的课。 它实现了ResponsiveTextMessageDelegate(此接口的详细信息在此处描述),并仅返回传入消息内容的长度。 Spring框架会处理所有其他需要完成的事情。 该服务的Spring配置如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan><context:annotation-config/><!-- ActiveMQ config --><bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="jmsFactory"/></bean><bean id="requestQueueName" class="java.lang.String"><constructor-arg value="DefaultDemoQueue"/></bean><bean id="myMessageService" class="net.pascalalma.aws.sqs.requestresponse.MyMessageService" /><bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><property name="delegate" ref="myMessageService"/><property name="defaultListenerMethod" value="onMessage"/><property name="messageConverter" ref="messageConverter" /></bean><bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="jmsFactory"/><property name="destinationName" ref="requestQueueName"/><property name="messageListener" ref="messageListener"/></bean>
</beans>
这与我上一篇文章中描述的配置基本相同。 唯一的区别是,我现在使用一个转换器,即SimpleMessageConverter,该转换器负责将返回的String转换为TextMessage。 如果我们不定义此转换器,则会收到以下错误:
java.lang.NoSuchMethodException: net.pascalalma.aws.sqs.requestresponse.MyMessageService.onMessage(org.apache.activemq.command.ActiveMQTextMessage
接下来,我们需要一个可以与我们的服务“对话”的Service客户端bean。 在Java中可能看起来像这样:
package net.pascalalma.aws.sqs.requestresponse;import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import javax.jms.*;
import java.util.Random;@Component
public class MyMessageServiceClient {final static Logger logger = Logger.getLogger(MyMessageServiceClient.class);@Resourceprivate JmsTemplate jmsTemplate;@Autowiredprivate String requestQueueName;public String process(final String txt) {//Setup a message producer to send message to the queue the server is consuming fromMessage response = jmsTemplate.sendAndReceive(requestQueueName,new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText(txt);return message;}});String result = null;try {result = ((TextMessage) response).getText();} catch (JMSException e) {logger.error(e);}return result;}
}
我们看到的是,我们利用jmsTemplate的sendAndReceive来发送在MessageCreator回调中创建的消息,并等待响应消息。 此类的相应Spring配置为:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><context:component-scan base-package="net.pascalalma.aws.sqs.requestresponse"></context:component-scan><context:annotation-config/><!-- ActiveMQ config --><bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean><!-- End ActiveMQ specific --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="jmsFactory"/></bean><bean id="requestQueueName" class="java.lang.String"><constructor-arg value="DefaultDemoQueue"/></bean><bean id="myMessageServiceClient" class="net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient"/>
</beans>
现在剩下的是一些“容器”,可以在我为“服务器”部分创建主类的操作中查看这些bean:
package net.pascalalma.aws.sqs.requestresponse;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class MessageServiceMain {public static void main(String[] args) {//Build application context by reading spring-config.xmlApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context.xml"});}
}
在您的IDE或终端中运行此类仅读取SPring配置并实例化服务bean。 客户的Main类还有更多代码:
package net.pascalalma.aws.sqs.requestresponse;import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;import java.util.HashMap;
import java.util.Map;
import java.util.Random;public class MessageServiceClientMain {final static Logger logger = Logger.getLogger(MessageServiceClientMain.class);public static void main(String[] args) {//Build application context by reading spring-config.xmlApplicationContext ctx = new ClassPathXmlApplicationContext(new String[]{"requestresponse/application-context-client.xml"});//Get an instance of ProductService class;MyMessageServiceClient messageServiceClient = (MyMessageServiceClient) ctx.getBean("myMessageServiceClient");//Call getProduct method of ProductServiceString random = createRandomString();for (int i=0; i<16; i++) {String key = random.substring(i);logger.info("Sending to service: " + key);logger.info("Sending to service with length: " + key.length());String result = messageServiceClient.process(key);logger.info("Received from service: " + result);logger.info("======================================================");}}private static String createRandomString() {Random random = new Random(System.currentTimeMillis());long randomLong = random.nextLong();return Long.toHexString(randomLong);}
}
运行此类将生成消息,并将其发送到服务,并打印从服务接收的结果,如下所示:
2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 42fdcd4355cc5314
2015-04-20 20:29:14 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16
2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 2fdcd4355cc5314
2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15
2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15
2015-04-20 20:29:15 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
到目前为止,一切都很好。 现在,让我们使用AWS SQS代替本地Active MQ实例。 只需在两个Spring配置中简单地修改所用JmsFactory的配置即可轻松实现:
...<bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/><bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder"><property name="regionName" value="eu-west-1"/><property name="numberOfMessagesToPrefetch" value="5"/><property name="awsCredentialsProvider" ref="credentialsProviderBean"/></bean><bean id="jmsFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"factory-bean="connectionFactoryBuilder"factory-method="build"/>
...
现在,如果我们启动“服务器”应用程序和“客户端”应用程序,我们将获得以下输出:
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: f1db848691a26c85
2015-04-25 20:22:49 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Unsupported Methodat org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:986)at org.springframework.jms.core.JmsTemplate.sendAndReceive(JmsTemplate.java:922)at net.pascalalma.aws.sqs.requestresponse.MyMessageServiceClient.process(MyMessageServiceClient.java:29)at net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain.main(MessageServiceClientMain.java:29)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: javax.jms.JMSException: Unsupported Methodat com.amazon.sqs.javamessaging.SQSSession.createTemporaryQueue(SQSSession.java:744)at org.springframework.jms.core.JmsTemplate.doSendAndReceive(JmsTemplate.java:946)at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:926)at org.springframework.jms.core.JmsTemplate$12.doInJms(JmsTemplate.java:922)at org.springframework.jms.core.JmsTemplate.executeLocal(JmsTemplate.java:983)... 8 more
如您所见,我们得到了一个堆栈跟踪,告诉我们SQS不支持JMS方法“ createTemporaryQueue”! 到目前为止,对JMS的支持。 我猜这就是为什么他们称其为简单队列服务,因为仅实现了一些可能的JMS方法;-)。 我搜索了有关此的更多信息,但没有任何运气。 但是,我确实遇到了这个框架: Nevado JMS 。 他们声称是AWS SQS / SNS的JMS驱动程序,所以我决定尝试一下。 首先,我在项目的pom中添加了以下依赖项:
<dependency><groupId>org.skyscreamer</groupId><artifactId>nevado-jms</artifactId><version>1.3.1</version>
</dependency>
然后再次在两个Spring配置中修改JmsFactory,这次是:
...<bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory" /><bean id="jmsFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory"><property name="sqsConnectorFactory" ref="sqsConnectorFactory" /><property name="awsAccessKey" value="${aws.accessKey}" /><property name="awsSecretKey" value="${aws.secretKey}" /></bean>
...
现在,当我运行主类时,我得到了预期的结果:
2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: dad74fbff8e0a2f2
2015-04-25 20:33:27 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 16
2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 16
2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: ad74fbff8e0a2f2
2015-04-25 20:33:53 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 15
2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 15
2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: d74fbff8e0a2f2
2015-04-25 20:34:04 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 14
2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 14
2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 74fbff8e0a2f2
2015-04-25 20:34:09 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 13
2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 13
2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(27) - Sending to service: 4fbff8e0a2f2
2015-04-25 20:34:17 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(28) - Sending to service with length: 12
2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(30) - Received from service: 12
2015-04-25 20:34:21 INFO net.pascalalma.aws.sqs.requestresponse.MessageServiceClientMain(31) - ======================================================
因此,这表明,尽管需要本地社区的一些帮助,但所谓的“简单”服务仍然可以使用更高级的东西:-)
翻译自: https://www.javacodegeeks.com/2015/05/more-advanced-stuff-with-jms-and-aws-sqs.html