aws sqs
Amazon WEB服务为我们提供了SQS消息传递服务。 sqs的java sdk与JMS兼容。
因此,可以将SQS与spring提供的JMS集成框架集成在一起,而不是将SQS用作简单的spring bean。
我将使用spring-boot和gradle。
gradle文件:
group 'com.gkatzioura.sqstesting'
version '1.0-SNAPSHOT'buildscript {repositories {mavenCentral()}dependencies {classpath("org.springframework.boot:spring-boot-gradle-plugin:1.2.7.RELEASE")}
}apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'spring-boot'sourceCompatibility = 1.8repositories {mavenCentral()
}dependencies {compile "org.springframework.boot:spring-boot-starter-thymeleaf"compile "com.amazonaws:aws-java-sdk:1.10.55"compile "org.springframework:spring-jms"compile "com.amazonaws:amazon-sqs-java-messaging-lib:1.0.0"compile 'org.slf4j:slf4j-api:1.6.6'compile 'ch.qos.logback:logback-classic:1.0.13'testCompile "junit:junit:4.11"
}
应用类别
package com.gkatzioura.sqstesting;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Created by gkatziourasemmanouil on 8/26/15.*/
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}
并应用yml文件
- 队列:
- 端点: http:// localhost:9324
- 名称:样本队列
我指定了一个本地主机端点,因为我使用了ElasticMq 。
SQSConfig类是一个配置类,以使SQS客户端作为spring bean可用。
package com.gkatzioura.sqstesting.config;import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Created by gkatziourasemmanouil on 25/02/16.*/
@Configuration
public class SQSConfig {@Value("${queue.endpoint}")private String endpoint;@Value("${queue.name}")private String queueName;@Beanpublic AmazonSQSClient createSQSClient() {AmazonSQSClient amazonSQSClient = new AmazonSQSClient(new BasicAWSCredentials("",""));amazonSQSClient.setEndpoint(endpoint);amazonSQSClient.createQueue(queueName);return amazonSQSClient;}}
SQSListener是实现JMS MessageListener接口的侦听器类。
package com.gkatzioura.sqstesting.listeners;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;/*** Created by gkatziourasemmanouil on 25/02/16.*/
@Component
public class SQSListener implements MessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(SQSListener.class);public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {LOGGER.info("Received message "+ textMessage.getText());} catch (JMSException e) {LOGGER.error("Error processing message ",e);}}
}
JMSSQSConfig类包含JmsTemplate和DefaultMessageListenerContainer的配置。 通过JMSSQSConfig类,我们注册了JMS MessageListeners。
package com.gkatzioura.sqstesting.config;import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.*;
import com.gkatzioura.sqstesting.listeners.SQSListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;/*** Created by gkatziourasemmanouil on 25/02/16.*/
@Configuration
public class JMSSQSConfig {@Value("${queue.endpoint}")private String endpoint;@Value("${queue.name}")private String queueName;@Autowiredprivate SQSListener sqsListener;@Beanpublic DefaultMessageListenerContainer jmsListenerContainer() {SQSConnectionFactory sqsConnectionFactory = SQSConnectionFactory.builder().withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain()).withEndpoint(endpoint).withAWSCredentialsProvider(awsCredentialsProvider).withNumberOfMessagesToPrefetch(10).build();DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();dmlc.setConnectionFactory(sqsConnectionFactory);dmlc.setDestinationName(queueName);dmlc.setMessageListener(sqsListener);return dmlc;}@Beanpublic JmsTemplate createJMSTemplate() {SQSConnectionFactory sqsConnectionFactory = SQSConnectionFactory.builder().withAWSCredentialsProvider(awsCredentialsProvider).withEndpoint(endpoint).withNumberOfMessagesToPrefetch(10).build();JmsTemplate jmsTemplate = new JmsTemplate(sqsConnectionFactory);jmsTemplate.setDefaultDestinationName(queueName);jmsTemplate.setDeliveryPersistent(false);return jmsTemplate;}private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() {@Overridepublic AWSCredentials getCredentials() {return new BasicAWSCredentials("", "");}@Overridepublic void refresh() {}};}
MessageService是使用JMSTemplate以便将消息发送到队列的服务
package com.gkatzioura.sqstesting;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;/*** Created by gkatziourasemmanouil on 28/02/16.*/
@Service
public class MessageService {@Autowiredprivate JmsTemplate jmsTemplate;@Value("${queue.name}")private String queueName;private static final Logger LOGGER = LoggerFactory.getLogger(MessageService.class);public void sendMessage(final String message) {jmsTemplate.send(queueName, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}});}}
最后但并非最不重要的一点是添加了控制器。 控制器将发布请求正文作为消息发送到队列。
package com.gkatzioura.sqstesting;import com.amazonaws.util.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;/*** Created by gkatziourasemmanouil on 24/02/16.*/
@Controller
@RequestMapping("/main")
public class MainController {@Autowiredprivate MessageService messageService;@RequestMapping(value = "/write",method = RequestMethod.POST)public void write(HttpServletRequest servletRequest,HttpServletResponse servletResponse) throws IOException {InputStream inputStream = servletRequest.getInputStream();String message = IOUtils.toString(inputStream);messageService.sendMessage(message);}}
- 您可以在此处下载源代码。
翻译自: https://www.javacodegeeks.com/2016/02/aws-sqs-spring-jms-integration.html
aws sqs